Clémentine
buttons on row
a092a3a
raw
history blame
6.85 kB
import gradio as gr
import time
from apscheduler.schedulers.background import BackgroundScheduler
import threading
import globals
from utils.io import initialize_models_providers_file, save_results, load_results, load_models_providers, get_results_table
from utils.jobs import run_single_job, launch_jobs, update_job_statuses
from typing import List, Optional
def status_monitor() -> None:
"""Background thread to monitor job statuses."""
while True:
update_job_statuses()
time.sleep(240) # Check every 30 seconds
def daily_checkpoint() -> None:
"""Daily checkpoint - save current state."""
print("Daily checkpoint - saving current state")
save_results()
# Create Gradio interface
def create_app() -> gr.Blocks:
with gr.Blocks(title="Inference Provider Testing Dashboard") as demo:
gr.Markdown("# Inference Provider Testing Dashboard")
gr.Markdown("Launch and monitor evaluation jobs for multiple models and providers.")
# All action buttons in one row
with gr.Row():
init_btn = gr.Button("Fetch and Initialize Models/Providers", variant="secondary")
launch_btn = gr.Button("Launch All Jobs", variant="primary")
relaunch_all_btn = gr.Button("Relaunch All", variant="primary")
relaunch_failed_btn = gr.Button("Relaunch Failed", variant="stop")
refresh_btn = gr.Button("Refresh Results", variant="secondary")
output = gr.Textbox(label="Status", interactive=False)
with gr.Row():
with gr.Column():
gr.Markdown("## Job Results")
results_table = gr.Dataframe(
value=get_results_table(),
interactive=True,
show_search="search",
show_copy_button=True,
show_fullscreen_button=True,
wrap=True,
static_columns=list(range(7)),
datatype=["str", "str", "str", "str", "str", "str", "html", "str"],
elem_id="results_table"
)
def update_model_choices() -> gr.update:
models_providers = load_models_providers(globals.LOCAL_CONFIG_FILE)
model_choices = sorted(list(set([mp[0] for mp in models_providers])))
return gr.update(choices=model_choices, value=model_choices[0] if model_choices else None)
def update_provider_choices(model: Optional[str]) -> gr.update:
"""Update provider dropdown based on selected model."""
if not model:
return gr.update(choices=[])
# Get providers for the selected model from the config file
models_providers = load_models_providers(globals.LOCAL_CONFIG_FILE)
providers = [mp[1] for mp in models_providers if mp[0] == model]
return gr.update(choices=providers, value=providers[0] if providers else None)
# Event handlers
init_btn.click(
fn=initialize_models_providers_file,
outputs=output
)
launch_btn.click(
fn=launch_jobs,
outputs=output
)
def relaunch_all_jobs():
"""Relaunch all existing model-provider combinations from job results."""
if not globals.job_results:
return "No existing jobs to relaunch"
relaunched_count = 0
for key, info in globals.job_results.items():
model = info["model"]
provider = info["provider"]
job_id = run_single_job(model, provider, globals.TASKS)
if job_id != -1:
relaunched_count += 1
time.sleep(2) # Small delay between launches to avoid rate limiting
return f"Relaunched {relaunched_count}/{len(globals.job_results)} jobs"
relaunch_all_btn.click(
fn=relaunch_all_jobs,
outputs=output
)
def relaunch_failed_jobs():
"""Relaunch only failed model-provider combinations from job results."""
if not globals.job_results:
return "No existing jobs to relaunch"
failed_jobs = [(key, info) for key, info in globals.job_results.items()
if info.get("status") in ["ERROR", "FAILED"]]
if not failed_jobs:
return "No failed jobs to relaunch"
relaunched_count = 0
for key, info in failed_jobs:
model = info["model"]
provider = info["provider"]
job_id = run_single_job(model, provider, globals.TASKS)
if job_id != -1:
relaunched_count += 1
time.sleep(2) # Small delay between launches to avoid rate limiting
return f"Relaunched {relaunched_count}/{len(failed_jobs)} failed jobs"
relaunch_failed_btn.click(
fn=relaunch_failed_jobs,
outputs=output
)
refresh_btn.click(
fn=get_results_table,
outputs=results_table
)
# Handle dataframe cell selection for relaunch
def handle_table_select(evt: gr.SelectData):
"""Handle when a cell in the results table is clicked."""
print(f"[Relaunch] Cell selected - Row: {evt.index[0]}, Col: {evt.index[1]}, Value: {evt.value}")
# If we selected a "rerun" cell, we relaunch a job
if evt.index[1] == 7:
# Get the full row data from the dataframe
df = get_results_table()
row_data = df.data.iloc[evt.index[0]]
model = row_data['Model']
provider = row_data['Provider']
print(f"[Relaunch] Relaunching job - Model: {model}, Provider: {provider}")
run_single_job(model, provider, globals.TASKS)
# Then update the table
return get_results_table()
results_table.select(
fn=handle_table_select,
inputs=[],
outputs=results_table
)
return demo
if __name__ == "__main__":
# Load previous results
load_results()
print("Starting Inference Provider Testing Dashboard")
# Start status monitor thread
monitor_thread = threading.Thread(target=status_monitor, daemon=True)
monitor_thread.start()
print("Job status monitor started")
# Start APScheduler for daily checkpoint
scheduler = BackgroundScheduler()
scheduler.add_job(daily_checkpoint, 'cron', hour=0, minute=0) # Run at midnight
scheduler.start()
print("Daily checkpoint scheduler started (saves at 00:00)")
# Create and launch the Gradio interface
demo = create_app()
demo.launch(server_name="0.0.0.0", server_port=7860)