Spaces:
Running
Running
| #ุจุณู ุงููู ุงูุฑุญู ู ุงูุฑุญูู | |
| from fastmcp import FastMCP | |
| import asyncio | |
| import json | |
| import os | |
| import time | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| from langchain_groq import ChatGroq | |
| from dotenv import load_dotenv | |
| import nltk | |
| import sys | |
| from modal_tool import train_with_modal, app, upload_to_hf_from_volume, evaluate_model | |
| mcp = FastMCP(name="FistalMCP") | |
| groq = os.getenv("GROQ_API_KEY") | |
| hf = os.getenv("HF_TOKEN") | |
| if not groq: | |
| print("GROQ_API_KEY missing!", file=sys.stderr) | |
| if not hf: | |
| print("HF Token not valid", file=sys.stderr) | |
| gk1 = os.environ.get("GOOGLE_API_KEY_1") | |
| gk2 = os.environ.get("GOOGLE_API_KEY_2") | |
| gk3 = os.environ.get("GOOGLE_API_KEY_3") | |
| GOOGLE_API_KEYS = [k for k in [gk1, gk2, gk3] if k] | |
| if not GOOGLE_API_KEYS: | |
| print("No Google API keys found!", file=sys.stderr) | |
| async def genBatch(topic: str, samples_per_batch: int, batch_num: int, api_key: str, task_type: str) -> list: | |
| """Generate one batch of samples using a single API key""" | |
| if not api_key or api_key == "YOUR_API_KEY": | |
| return [] | |
| llm = ChatGoogleGenerativeAI( | |
| model="gemini-2.5-flash", | |
| temperature=0.7, | |
| google_api_key=api_key | |
| ) | |
| prompt_template = """ | |
| You are an expert dataset generator. | |
| Generate authentic, high-quality data on the topic: {topic} for task type: {task_type} using your knowledge. | |
| Generate exactly {num} concise, varied, and high-quality samples. | |
| Return a JSON list of objects, each with keys: instruction, input, and output. | |
| Do not add extra texts, markdown, or code fences. | |
| RESPONSE: | |
| """ | |
| promptJSON = ChatPromptTemplate.from_template(prompt_template) | |
| chain = promptJSON | llm | |
| try: | |
| user_input = { | |
| "topic": topic, | |
| "num": samples_per_batch, | |
| "task_type": task_type | |
| } | |
| response = await asyncio.to_thread(chain.invoke, user_input) | |
| content = response.content.strip() | |
| if content.startswith("```json"): | |
| content = content[7:] | |
| if content.startswith("```"): | |
| content = content[3:] | |
| if content.endswith("```"): | |
| content = content[:-3] | |
| content = content.strip() | |
| data = json.loads(content) | |
| return data if isinstance(data, list) else [data] | |
| except json.JSONDecodeError as e: | |
| print(f"JSON decode error in batch {batch_num}: {e}") | |
| return [] | |
| except Exception as e: | |
| print(f"Error in batch {batch_num}: {e}") | |
| return [] | |
| async def generate_json_data(topic: str, task_type: str, num_samples: int = 1000) -> str: | |
| """ | |
| Generate a training dataset with instruction, input, and output fields. | |
| Uses parallel batching for efficiency. Can generate up to 2000 samples. | |
| Args: | |
| topic: The topic or theme for the dataset | |
| num_samples: Number of training examples to generate (recommended: 100-2000) | |
| Returns: | |
| JSON string with status, topic, total_samples, and data array | |
| """ | |
| topic = str(topic).strip() if topic else "" | |
| task_type = str(task_type).strip() if task_type else "text-generation" | |
| try: | |
| num_samples = int(num_samples) | |
| except (ValueError, TypeError): | |
| num_samples = 100 | |
| if not topic: | |
| return json.dumps({ | |
| "status": "error", | |
| "message": "Topic cannot be empty" | |
| }) | |
| if num_samples <= 0 or num_samples > 2000: | |
| num_samples = min(max(50, num_samples), 2000) | |
| valid_keys = [k for k in GOOGLE_API_KEYS if k and k.strip() and k != "YOUR_API_KEY"] | |
| if not valid_keys: | |
| return json.dumps({ | |
| "status": "error", | |
| "message": "No valid Google API keys configured" | |
| }) | |
| start_time = time.time() | |
| samples_per_batch = 50 | |
| total_batches = (num_samples + samples_per_batch - 1) // samples_per_batch | |
| try: | |
| tasks = [] | |
| for batch_num in range(total_batches): | |
| api_key = valid_keys[batch_num % len(valid_keys)] | |
| task = genBatch( | |
| topic=topic.strip(), | |
| samples_per_batch=samples_per_batch, | |
| batch_num=batch_num + 1, | |
| api_key=api_key, | |
| task_type=task_type.strip() | |
| ) | |
| tasks.append(task) | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| all_samples = [] | |
| for batch_result in results: | |
| if isinstance(batch_result, Exception): | |
| continue | |
| if isinstance(batch_result, list): | |
| all_samples.extend(batch_result) | |
| all_samples = all_samples[:num_samples] | |
| end_time = time.time() | |
| gen_time = end_time - start_time | |
| return json.dumps({ | |
| "status": "success", | |
| "topic": topic, | |
| "task_type": task_type, | |
| "total_samples": len(all_samples), | |
| "requested_samples": num_samples, | |
| "total_batches": total_batches, | |
| "generation_time_seconds": round(gen_time, 1), | |
| "generation_time_minutes": round(gen_time / 60, 2), | |
| "samples_per_second": round(len(all_samples) / gen_time, 2) if gen_time > 0 else 0, | |
| "data": all_samples | |
| }) | |
| except Exception as e: | |
| return json.dumps({ | |
| "status": "error", | |
| "message": f"Error generating dataset: {str(e)}" | |
| }) | |
| async def format_json(raw_data) -> str: | |
| """ | |
| Convert raw dataset to ChatML format for training | |
| Args: | |
| raw_data: List or JSON string of samples with instruction/input/output | |
| Returns: | |
| JSON string with status, num_samples, and formatted data | |
| """ | |
| try: | |
| if isinstance(raw_data, list): | |
| data = raw_data | |
| elif isinstance(raw_data, str): | |
| parsed = json.loads(raw_data) | |
| if isinstance(parsed, dict) and "data" in parsed: | |
| data = parsed["data"] | |
| else: | |
| data = parsed | |
| elif isinstance(raw_data, dict) and "data" in raw_data: | |
| data = raw_data["data"] | |
| else: | |
| return json.dumps({ | |
| "status": "error", | |
| "message": f"Unexpected input type: {type(raw_data).__name__}" | |
| }) | |
| if not isinstance(data, list): | |
| return json.dumps({ | |
| "status": "error", | |
| "message": "Data must be a list of samples" | |
| }) | |
| # Convert to ChatML format | |
| converted = [] | |
| for item in data: | |
| if not isinstance(item, dict): | |
| continue | |
| if 'instruction' not in item or 'output' not in item: | |
| continue | |
| user_msg = str(item['instruction']) | |
| if item.get('input'): | |
| user_msg += f"\n\n{item['input']}" | |
| converted.append({ | |
| "messages": [ | |
| {"role": "system", "content": "You are a helpful assistant."}, | |
| {"role": "user", "content": user_msg}, | |
| {"role": "assistant", "content": str(item['output'])} | |
| ] | |
| }) | |
| if not converted: | |
| return json.dumps({ | |
| "status": "error", | |
| "message": "No valid samples to format" | |
| }) | |
| return json.dumps({ | |
| "status": "success", | |
| "num_samples": len(converted), | |
| "data": converted, | |
| "message": f"โ Formatted {len(converted)} samples" | |
| }, ensure_ascii=False) | |
| except Exception as e: | |
| import traceback | |
| return json.dumps({ | |
| "status": "error", | |
| "message": f"Formatting failed: {str(e)}", | |
| "traceback": traceback.format_exc() | |
| }) | |
| async def finetune_model(formatted_data, model_name: str, topic: str, task_type: str) -> str: | |
| """ | |
| Fine-tune model on Modal GPU | |
| Args: | |
| formatted_data: List or JSON string with formatted training samples | |
| model_name: Base model to fine-tune | |
| Returns: | |
| JSON string with status, repo_id, model_url | |
| """ | |
| model_name = str(model_name).strip() | |
| models = [ | |
| "unsloth/Llama-3.2-1B-Instruct-bnb-4bit", | |
| "unsloth/Phi-3-mini-4k-instruct", | |
| "unsloth/Phi-3-medium-4k-instruct", | |
| "unsloth/Llama-3.2-3B-Instruct-bnb-4bit", | |
| "unsloth/Qwen2.5-3B-Instruct-bnb-4bit", | |
| "unsloth/Qwen2.5-1.5B-Instruct-bnb-4bit", | |
| "unsloth/Qwen2.5-0.5B-Instruct-bnb-4bit", | |
| "unsloth/Qwen2.5-Coder-3B-Instruct-bnb-4bit", | |
| "unsloth/gemma-2-2b-it-bnb-4bit", | |
| "unsloth/SmolLM2-1.7B-Instruct-bnb-4bit", | |
| "unsloth/Phi-3.5-mini-instruct-bnb-4bit", | |
| "unsloth/Granite-3.0-2b-instruct-bnb-4bit", | |
| "unsloth/granite-4.0-h-1b-bnb-4bit" | |
| ] | |
| if model_name not in models: | |
| return json.dumps({ | |
| "status": "error", | |
| "message": f"Model not supported. Choose from: {', '.join(models[:3])}..." | |
| }) | |
| try: | |
| if isinstance(formatted_data, list): | |
| training_data = formatted_data | |
| elif isinstance(formatted_data, str): | |
| parsed = json.loads(formatted_data) | |
| if isinstance(parsed, dict) and "data" in parsed: | |
| training_data = parsed["data"] | |
| else: | |
| training_data = parsed | |
| elif isinstance(formatted_data, dict) and "data" in formatted_data: | |
| training_data = formatted_data["data"] | |
| else: | |
| return json.dumps({ | |
| "status": "error", | |
| "message": f"Unexpected input type: {type(formatted_data).__name__}" | |
| }) | |
| if not isinstance(training_data, list) or not training_data: | |
| return json.dumps({ | |
| "status": "error", | |
| "message": "No training samples provided" | |
| }) | |
| jsonl_content = "\n".join([json.dumps(s, ensure_ascii=False) for s in training_data]) | |
| with app.run(): | |
| result = train_with_modal.remote(jsonl_content, model_name) | |
| if result["status"] != "success": | |
| return json.dumps({ | |
| "status": "error", | |
| "message": "Training failed" | |
| }) | |
| repoTemp = """ | |
| Generate a short repository name for an unsloth finetuned model based on {topic} and {task_type}. | |
| Use '_' instead of spaces. Only return the name without quotations. | |
| """ | |
| repoPrompt = ChatPromptTemplate.from_template(repoTemp) | |
| llm = ChatGroq( | |
| model="llama-3.1-8b-instant", | |
| temperature=0.4, | |
| api_key=groq | |
| ) | |
| chain = repoPrompt | llm | |
| inp = { | |
| "topic": topic, | |
| "task_type": task_type | |
| } | |
| repoName = await asyncio.to_thread(chain.invoke, inp) | |
| repoName = repoName.content.strip() | |
| with app.run(): | |
| hf_result = upload_to_hf_from_volume.remote( | |
| result["volume_path"], | |
| result["timestamp"], | |
| repoName | |
| ) | |
| return json.dumps({ | |
| "status": "success", | |
| "repo_id": str(hf_result["repo_id"]), | |
| "model_url": str(hf_result["model_url"]), | |
| "model_path": str(hf_result["repo_id"]), | |
| "num_samples": len(training_data), | |
| "message": f"โ Model at {hf_result['model_url']}" | |
| }) | |
| except Exception as e: | |
| import traceback | |
| return json.dumps({ | |
| "status": "error", | |
| "message": f"Training failed: {str(e)}", | |
| "traceback": traceback.format_exc() | |
| }) | |
| async def llm_as_judge(repo_id:str, topic: str, task_type: str) -> dict: | |
| """Use LLM to judge model quality based on topic and task type""" | |
| import evaluate | |
| eval_llm = ChatGroq( | |
| model="llama-3.1-8b-instant", | |
| temperature=0.2, | |
| api_key=groq | |
| ) | |
| test_prompt_text = f"""Generate 3 test cases for evaluating a model fine-tuned strictly based on **{topic} for {task_type}**. | |
| Return ONLY a JSON array with this exact format, no other text: | |
| [{{"input": "test question 1", "expected_output": "expected answer 1"}}, {{"input": "test question 2", "expected_output": "expected answer 2"}}, {{"input": "test question 3", "expected_output": "expected answer 3"}}]""" | |
| try: | |
| text_responses = await eval_llm.ainvoke(test_prompt_text) | |
| response = text_responses.content.strip() | |
| response = response.replace("```json", "").replace("```", "").strip() | |
| import re | |
| match = re.search(r'\[.*\]', response, re.DOTALL) | |
| if match: | |
| response = match.group(0) | |
| test_cases = json.loads(response)[:3] | |
| test_inputs = [case['input'] for case in test_cases] | |
| with app.run(): | |
| ft_output = evaluate_model.remote(repo_id, test_inputs) | |
| outputs = [] | |
| for i, case in enumerate(test_cases): | |
| outputs.append( | |
| { | |
| "input": case['input'], | |
| "expected_output": case['expected_output'], | |
| "model_output": ft_output[i] | |
| } | |
| ) | |
| #METRICS: | |
| bleu = evaluate.load("bleu") | |
| rouge = evaluate.load("rouge") | |
| predictions = [output['model_output'] for output in outputs] | |
| references = [[output['expected_output']] for output in outputs] | |
| bleu_score = bleu.compute(predictions=predictions, references=references) | |
| rouge_score = rouge.compute(predictions=predictions, references=references) | |
| additional_metrics = {} | |
| if task_type.lower() in ["classification", "question-answering"]: | |
| accuracy_metric = evaluate.load("accuracy") | |
| f1_metric = evaluate.load("f1") | |
| predictions_binary = [1 if pred.strip().lower() == ref[0].strip().lower() else 0 | |
| for pred, ref in zip(predictions, references)] | |
| references_binary = [1] * len(predictions_binary) | |
| accuracy_score = accuracy_metric.compute(predictions=predictions_binary, references=references_binary) | |
| f1_score = f1_metric.compute(predictions=predictions_binary, references=references_binary, average="binary") | |
| additional_metrics["accuracy"] = accuracy_score["accuracy"] | |
| additional_metrics["f1_score"] = f1_score["f1"] | |
| eval_prompt_text = f"""You are evaluating a model fine-tuned using Unsloth on the topic "{topic}" for {task_type} tasks. | |
| **Your Task:** Provide an accurate, positive markdown evaluation report focusing on the model's strengths and capabilities based on your judgement and metrics. | |
| **Test Results:** | |
| Test Cases: | |
| {json.dumps(test_cases, indent=2)} | |
| Model Outputs: | |
| {json.dumps(outputs, indent=2)} | |
| **Metrics** | |
| - BLEU Score: {bleu_score['bleu']:.4f} | |
| - ROUGE-L Score: {rouge_score['rougeL']:.4f} | |
| {f"- Accuracy: {additional_metrics.get('accuracy', 0):.4f}" if task_type.lower() in ["classification", "question-answering"] else ""} | |
| {f"- F1 Score: {additional_metrics.get('f1_score', 0):.4f}" if task_type.lower() in ["classification", "question-answering"] else ""} | |
| **Report Structure:** | |
| ## ๐ Evaluation Report | |
| ### ๐ Performance Overview | |
| Create a comparison table with columns: Test Input | Expected Output | Model Output | โ Assessment | |
| ### ๐ Metrics: | |
| - Explain each evaluated metrics and categorize the performance based on average threshold | |
| - Use percentages and numerical figures to stance yoir report | |
| ### ๐ช Key Strengths adn Weaknesses | |
| - Accuracy and relevance | |
| - Response coherence | |
| - Task-specific capabilities | |
| - Language quality | |
| ### โจ Conclusion | |
| Summarize the model's overall performance and recommended use cases. | |
| Now write the complete evaluation report following this structure. Be enthusiastic and highlight strengths! ๐""" | |
| eval_response = await eval_llm.ainvoke(eval_prompt_text) | |
| return { | |
| "status": "success", | |
| "report": str(eval_response.content), | |
| "test_cases": test_cases, | |
| "model_outputs": outputs | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "message": str(e), | |
| "error_type": type(e).__name__ | |
| } | |
| if __name__ == "__main__": | |
| mcp.run() | |