File size: 6,871 Bytes
8dafde0
7f5506e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8dafde0
7f5506e
8dafde0
7f5506e
8dafde0
7f5506e
 
 
8dafde0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7f5506e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8dafde0
 
 
 
 
 
7f5506e
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
from huggingface_hub import run_job, inspect_job, fetch_job_logs
import os
import re
import time
from datetime import datetime
import globals
from utils.io import save_results, load_models_providers
from typing import Optional


def extract_score_from_job(job_id: str) -> Optional[float]:
    """Extract average score from completed job logs.

    Parses the results table and calculates the average of the main metric
    for each task (the metric on the same line as the task name).
    """
    try:
        # Inspect the job to get details and logs
        logs = fetch_job_logs(job_id=job_id)

        scores = []

        for line in logs:
            # Find the results table
            # Look for lines that match the pattern: |task_name|version|metric|value|...|
            # We want to extract the score (value) from lines where the task name is not empty
            if '|' in line:
                parts = [p.strip() for p in line.split('|')]

                # Skip header and separator lines
                # Table format: | Task | Version | Metric | Value | | Stderr |
                if len(parts) == 8:
                    _, task, _, metric, value, _, _, _  = parts

                    # Is the task name correct
                    if task and task in [t.replace("|", ":") for t in globals.TASKS.split(",")]:
                        # Try to extract numeric value
                        # Remove any extra characters and convert to float
                        score = float(value)
                        scores.append(score)
                        print(f"Extracted score {score} for task '{task}' metric '{metric}'")

        # Calculate average of all task scores
        if scores:
            average_score = sum(scores) / len(scores)
            print(f"Calculated average score: {average_score:.4f} from {len(scores)} tasks")
            return average_score
        else:
            print("No scores found in job logs")

        return None

    except Exception as e:
        print(f"Error extracting score for job {job_id}: {e}")
        import traceback
        traceback.print_exc()
        return None


def run_single_job(model: str, provider: str, tasks: str) -> Optional[str]:
    """Run a single job for a model-provider combination."""

    if not model or not provider:
        print("Missing model or provider")
        return -1

    # Verify the model-provider combination exists in the config
    models_providers = load_models_providers(globals.LOCAL_CONFIG_FILE)
    if (model, provider) not in models_providers:
        print( f"Error: {model} with {provider} not found in {globals.LOCAL_CONFIG_FILE}")
        return -1

    # Check if job is already running
    key = globals.get_model_provider_key(model, provider)
    with globals.results_lock:
        if key in globals.job_results:
            current_status = globals.job_results[key].get("status")
            if current_status == "running":
                print( f"Job for {model} on {provider} is already running. Please wait for it to complete.")
                return -1

    print(f"Starting job for model={model}, provider={provider}")

    job = run_job(
        image="hf.co/spaces/OpenEvals/EvalsOnTheHub",
        command=[
            "lighteval", "endpoint", "inference-providers",
            f"model_name={model},provider={provider}",
            tasks,
            "--push-to-hub", "--save-details",
            "--results-org", "IPTesting",
            "--max-samples", "10"
        ],
        namespace="clefourrier",
        secrets={"HF_TOKEN": os.getenv("HF_TOKEN")},
        token=os.getenv("HF_TOKEN")
    )

    job_id = job.id
    key = globals.get_model_provider_key(model, provider)

    with globals.results_lock:
        # Move current score to previous score if it exists (relaunching)
        previous_score = None
        if key in globals.job_results and globals.job_results[key].get("current_score"):
            previous_score = globals.job_results[key]["current_score"]

        globals.job_results[key] = {
            "model": model,
            "provider": provider,
            "last_run": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "status": "running",
            "current_score": None,
            "previous_score": previous_score,
            "job_id": job_id
        }

    save_results()
    print(f"Job launched: ID={job_id}, model={model}, provider={provider}")
    return job_id


def launch_jobs(tasks: str = globals.TASKS, config_file: str = globals.LOCAL_CONFIG_FILE):
    """Launch jobs for all models and providers."""
    models_providers = load_models_providers(config_file)

    if not models_providers:
        print("No valid model-provider combinations found in config file")
        return "No valid model-provider combinations found"

    print(f"Found {len(models_providers)} model-provider combinations")

    launched_count = 0
    for model, provider in models_providers:
        job_id = run_single_job(model, provider, tasks)
        if job_id != -1:
            launched_count += 1
        # Small delay between launches to avoid rate limiting
        time.sleep(2)

    print(f"Launched {launched_count}/{len(models_providers)} jobs successfully")
    return f"Launched {launched_count} jobs"


def update_job_statuses() -> None:
    """Check and update the status of active jobs."""
    try:
        with globals.results_lock:
            keys = list(globals.job_results.keys())

        for key in keys:
            try:
                with globals.results_lock:
                    job_id = globals.job_results[key]["job_id"]

                job_info = inspect_job(job_id=job_id)
                new_status = job_info.status.stage

                with globals.results_lock:
                    old_status = globals.job_results[key]["status"]

                    if old_status != new_status:
                        globals.job_results[key]["status"] = new_status
                        print(f"Job {job_id} status changed: {old_status} -> {new_status}")

                        # If job completed, try to extract score
                        if new_status == "COMPLETED":
                            score = extract_score_from_job(job_id)
                            if score is not None:
                                globals.job_results[key]["current_score"] = score

                    if new_status == "COMPLETED" and globals.job_results[key]["current_score"] is None:
                        score = extract_score_from_job(job_id)
                        if score is not None:
                            globals.job_results[key]["current_score"] = score


            except Exception as e:
                print(f"Error checking job: {str(e)}")

        save_results()

    except Exception as e:
        print(f"Error in update_job_statuses: {str(e)}")