| | |
| | """ |
| | MAC OS X INSTALL: pip3 install torch==2.1.1 torchvision torchaudio transformers==4.48.0 accelerate==0.28.0 (You must use these versions, higher version have some numerical instability bug on MPS chips) |
| | Interactive model evaluation script for pretraining experiments. |
| | Automatically discovers and loads all models with /hf subdirectories. |
| | """ |
| |
|
| | import os |
| | import glob |
| | from pathlib import Path |
| | from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline |
| | import torch |
| | import warnings |
| |
|
| | |
| | warnings.filterwarnings("ignore") |
| |
|
| | MODEL_NAME_FILTER = None |
| |
|
| | class ModelEvaluator: |
| | def __init__(self): |
| | self.models = {} |
| | self.tokenizers = {} |
| | self.pipelines = {} |
| | self.model_names = [] |
| | |
| | def discover_models(self): |
| | """Discover all models with /hf subdirectories.""" |
| | print("๐ Discovering models with /hf subdirectories...") |
| | |
| | |
| | hf_dirs = [] |
| | for item in os.listdir('.'): |
| | if os.path.isdir(item) and os.path.exists(os.path.join(item, 'hf')): |
| | if MODEL_NAME_FILTER is None or MODEL_NAME_FILTER in item: |
| | hf_dirs.append(item) |
| | |
| | if not hf_dirs: |
| | print("โ No models with /hf subdirectories found!") |
| | return False |
| | |
| | print(f"โ
Found {len(hf_dirs)} models:") |
| | for model_dir in hf_dirs: |
| | print(f" - {model_dir}") |
| | return hf_dirs |
| | |
| | def load_model(self, model_dir): |
| | """Load a single model and its tokenizer.""" |
| | try: |
| | hf_path = os.path.join(model_dir, 'hf') |
| | print(f"๐ Loading {model_dir}...") |
| | |
| | |
| | tokenizer = AutoTokenizer.from_pretrained(hf_path) |
| | if tokenizer.pad_token is None: |
| | tokenizer.pad_token = tokenizer.eos_token |
| | |
| | |
| | model = AutoModelForCausalLM.from_pretrained( |
| | hf_path, |
| | device_map=None, |
| | torch_dtype=torch.float16, |
| | trust_remote_code=True |
| | ) |
| | model = model.to(torch.float16) |
| | if torch.cuda.is_available(): |
| | model.to("cuda:0") |
| | else: |
| | model.to("mps") |
| | |
| | |
| | if "chat" in model_dir.lower() or "sft" in model_dir.lower(): |
| | pipe = pipeline( |
| | "text-generation", |
| | model=model, |
| | tokenizer=tokenizer, |
| | device_map="auto", |
| | torch_dtype=torch.float16 |
| | ) |
| | print(f" ๐ Using conversational pipeline for chat model") |
| | else: |
| | pipe = pipeline( |
| | "text-generation", |
| | model=model, |
| | tokenizer=tokenizer, |
| | device_map="auto", |
| | torch_dtype=torch.float16 |
| | ) |
| | print(f" ๐ Using text-generation pipeline") |
| | |
| | self.models[model_dir] = model |
| | self.tokenizers[model_dir] = tokenizer |
| | self.pipelines[model_dir] = pipe |
| | self.model_names.append(model_dir) |
| | |
| | print(f" โ
{model_dir} loaded successfully") |
| | return True |
| | |
| | except Exception as e: |
| | print(f" โ Failed to load {model_dir}: {str(e)}") |
| | return False |
| | |
| | def load_all_models(self): |
| | """Load all discovered models.""" |
| | hf_dirs = self.discover_models() |
| | if not hf_dirs: |
| | return False |
| | |
| | print("\n๐ Loading models...") |
| | successful_loads = 0 |
| | |
| | for model_dir in hf_dirs: |
| | if self.load_model(model_dir): |
| | successful_loads += 1 |
| | |
| | print(f"\n๐ Loaded {successful_loads}/{len(hf_dirs)} models successfully") |
| | return successful_loads > 0 |
| | |
| | def generate_response(self, model_name, prompt, max_length=256): |
| | """Generate response for a specific model.""" |
| | try: |
| | pipe = self.pipelines[model_name] |
| | |
| | |
| | if "chat" in model_name.lower() or "sft" in model_name.lower(): |
| | |
| | chat_input = [{"role": "user", "content": prompt}] |
| | outputs = pipe( |
| | chat_input, |
| | max_new_tokens=max_length, |
| | do_sample=True, |
| | temperature=0.7, |
| | top_p=0.9, |
| | repetition_penalty=1.1, |
| | pad_token_id=self.tokenizers[model_name].eos_token_id |
| | ) |
| | |
| | if outputs and len(outputs) > 0: |
| | |
| | |
| | conversation = outputs[0]['generated_text'] |
| | if isinstance(conversation, list) and len(conversation) > 1: |
| | |
| | for message in reversed(conversation): |
| | if message.get('role') == 'assistant': |
| | return message.get('content', 'No response generated') |
| | |
| | return conversation[-1].get('content', 'No response generated') |
| | else: |
| | return str(conversation) |
| | else: |
| | return "No response generated" |
| | else: |
| | |
| | outputs = pipe( |
| | prompt, |
| | max_new_tokens=max_length, |
| | do_sample=True, |
| | temperature=0.7, |
| | top_p=0.9, |
| | pad_token_id=self.tokenizers[model_name].eos_token_id, |
| | return_full_text=False |
| | ) |
| | |
| | return outputs[0]['generated_text'] |
| | |
| | except Exception as e: |
| | return f"โ Generation failed: {str(e)}" |
| | |
| | def evaluate_prompt(self, prompt): |
| | """Evaluate a prompt across all loaded models.""" |
| | print(f"\n๐ฏ Evaluating prompt: '{prompt}'") |
| | print("=" * 80) |
| | |
| | for model_name in self.model_names: |
| | print(f"\n๐ค {model_name}:") |
| | print("-" * 40) |
| | |
| | response = self.generate_response(model_name, prompt) |
| | print(response) |
| | |
| | print("\n" + "=" * 80) |
| | |
| | def interactive_loop(self): |
| | """Main interactive evaluation loop.""" |
| | print("\n๐ฎ Interactive Evaluation Mode") |
| | print("Commands:") |
| | print(" - Type your prompt to evaluate all models") |
| | print(" - Type 'quit' or 'exit' to end") |
| | print(" - Type 'help' for this message") |
| | print(" - Type 'models' to list loaded models") |
| | print(" - Type 'clear' to clear screen") |
| | print("\n๐ก Note: Models with 'chat' in their name use conversational pipeline,") |
| | print(" other models use text-generation pipeline.") |
| | |
| | while True: |
| | try: |
| | user_input = input("\n๐ฌ Enter prompt (or command): ").strip() |
| | |
| | if not user_input: |
| | continue |
| | |
| | if user_input.lower() in ['quit', 'exit', 'q']: |
| | print("๐ Goodbye!") |
| | break |
| | |
| | elif user_input.lower() == 'help': |
| | print("\n๐ฎ Interactive Evaluation Mode") |
| | print("Commands:") |
| | print(" - Type your prompt to evaluate all models") |
| | print(" - Type 'quit' or 'exit' to end") |
| | print(" - Type 'help' for this message") |
| | print(" - Type 'models' to list loaded models") |
| | print(" - Type 'clear' to clear screen") |
| | print("\n๐ก Note: Models with 'chat' in their name use conversational pipeline,") |
| | print(" other models use text-generation pipeline.") |
| | |
| | elif user_input.lower() == 'models': |
| | print(f"\n๐ Loaded models ({len(self.model_names)}):") |
| | for i, model_name in enumerate(self.model_names, 1): |
| | print(f" {i}. {model_name}") |
| | |
| | elif user_input.lower() == 'clear': |
| | os.system('clear' if os.name == 'posix' else 'cls') |
| | |
| | else: |
| | self.evaluate_prompt(user_input) |
| | |
| | except KeyboardInterrupt: |
| | print("\n\n๐ Goodbye!") |
| | break |
| | except Exception as e: |
| | print(f"โ Error: {str(e)}") |
| |
|
| | def main(): |
| | print("๐ Model Evaluation Script") |
| | print("=" * 50) |
| | |
| | evaluator = ModelEvaluator() |
| | |
| | |
| | if not evaluator.load_all_models(): |
| | print("โ No models could be loaded. Exiting.") |
| | return |
| | |
| | |
| | evaluator.interactive_loop() |
| |
|
| | if __name__ == "__main__": |
| | main() |