Spaces:
Sleeping
Sleeping
| # Tone Classification System | |
| # This implementation combines text and acoustic features to detect emotions, | |
| # including sarcasm and figures of speech | |
| # Part 1: Install required packages with improved error handling | |
| import sys | |
| import os | |
| # Function to install packages with error handling | |
| def install_packages(): | |
| packages = [ | |
| "hf_xet","transformers", "pytorch-lightning", "datasets", | |
| "numpy", "pandas", "matplotlib", "seaborn", | |
| "librosa", "opensmile", "torch", "torchaudio", | |
| "accelerate", "nltk", "scikit-learn" | |
| ] | |
| for package in packages: | |
| try: | |
| print(f"Installing {package}...") | |
| import subprocess | |
| # Install a package quietly | |
| subprocess.run([sys.executable, '-m', 'pip', 'install', package, '-q']) | |
| print(f"Successfully installed {package}") | |
| except Exception as e: | |
| print(f"Error installing {package}: {e}") | |
| print("Package installation completed!") | |
| install_packages() | |
| # Part 2: Import libraries with error handling | |
| import numpy as np | |
| import pandas as pd | |
| import torch | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.metrics import confusion_matrix, classification_report | |
| from torch.utils.data import Dataset, DataLoader | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| import torch.optim as optim | |
| # Check for CUDA availability | |
| DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| print(f"Using device: {DEVICE}") | |
| # Try to import libraries that might cause issues with specific error handling | |
| try: | |
| import torchaudio | |
| print("Successfully imported torchaudio") | |
| except Exception as e: | |
| print(f"Error importing torchaudio: {e}") | |
| print("Some audio functionality may be limited") | |
| try: | |
| import librosa | |
| print("Successfully imported librosa") | |
| except Exception as e: | |
| print(f"Error importing librosa: {e}") | |
| print("Audio processing capabilities will be limited") | |
| try: | |
| import opensmile | |
| print("Successfully imported opensmile") | |
| except Exception as e: | |
| print(f"Error importing opensmile: {e}") | |
| print("Will use fallback feature extraction methods") | |
| # Part 3: Define constants | |
| EMOTIONS = ["neutral", "happy", "sad", "angry", "fearful", "disgust", "surprised", "sarcastic"] | |
| MODEL_CACHE_DIR = "./model_cache" | |
| # Create cache directory if it doesn't exist | |
| os.makedirs(MODEL_CACHE_DIR, exist_ok=True) | |
| print(f"Using model cache directory: {MODEL_CACHE_DIR}") | |
| # Part 4: Model Loading with Error Handling and Cache | |
| def load_model_with_cache(model_class, model_name, cache_subdir=""): | |
| """Load a model with proper error handling and caching""" | |
| cache_path = os.path.join(MODEL_CACHE_DIR, cache_subdir) | |
| os.makedirs(cache_path, exist_ok=True) | |
| print(f"Loading model: {model_name}") | |
| try: | |
| model = model_class.from_pretrained( | |
| model_name, | |
| cache_dir=cache_path, | |
| local_files_only=os.path.exists(os.path.join(cache_path, model_name.replace('/', '-'))) | |
| ) | |
| print(f"Successfully loaded model: {model_name}") | |
| return model | |
| except KeyboardInterrupt: | |
| print("\nModel download interrupted. Try again or download manually.") | |
| return None | |
| except Exception as e: | |
| print(f"Error loading model {model_name}: {e}") | |
| print("Will try to continue with limited functionality.") | |
| return None | |
| # Part 5: Modified Whisper Transcriber with Error Handling | |
| class WhisperTranscriber: | |
| def __init__(self, model_size="tiny"): # Changed from base to tiny for faster loading | |
| from transformers import WhisperProcessor, WhisperForConditionalGeneration | |
| print("Initializing Whisper transcriber...") | |
| try: | |
| self.processor = load_model_with_cache( | |
| WhisperProcessor, | |
| f"openai/whisper-{model_size}", | |
| "whisper" | |
| ) | |
| self.model = load_model_with_cache( | |
| WhisperForConditionalGeneration, | |
| f"openai/whisper-{model_size}", | |
| "whisper" | |
| ) | |
| if self.model is not None: | |
| self.model = self.model.to(DEVICE) | |
| print("Whisper model loaded successfully and moved to device") | |
| else: | |
| print("Failed to load Whisper model") | |
| except Exception as e: | |
| print(f"Error initializing Whisper: {e}") | |
| self.processor = None | |
| self.model = None | |
| def transcribe(self, audio_path): | |
| if self.processor is None or self.model is None: | |
| print("Whisper not properly initialized. Cannot transcribe.") | |
| return "Error: Transcription failed." | |
| try: | |
| # Load audio | |
| waveform, sample_rate = librosa.load(audio_path, sr=16000) | |
| # Process audio | |
| input_features = self.processor(waveform, sampling_rate=16000, return_tensors="pt").input_features.to(DEVICE) | |
| # Generate transcription | |
| with torch.no_grad(): | |
| predicted_ids = self.model.generate(input_features, max_length=100) | |
| # Decode the transcription | |
| transcription = self.processor.batch_decode(predicted_ids, skip_special_tokens=True)[0] | |
| return transcription | |
| except Exception as e: | |
| print(f"Error in transcription: {e}") | |
| return "Error: Transcription failed." | |
| # Part 6: Text-based Emotion Analysis with Fallback Options | |
| # Improved Text-based Emotion Analysis | |
| class TextEmotionClassifier: | |
| def __init__(self): | |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
| print("Initializing text emotion classifier...") | |
| # Primary emotion model | |
| self.emotion_model_name = "j-hartmann/emotion-english-distilroberta-base" | |
| self.tokenizer = load_model_with_cache( | |
| AutoTokenizer, | |
| self.emotion_model_name, | |
| "text_emotion" | |
| ) | |
| self.model = load_model_with_cache( | |
| AutoModelForSequenceClassification, | |
| self.emotion_model_name, | |
| "text_emotion" | |
| ) | |
| if self.model is not None: | |
| self.model = self.model.to(DEVICE) | |
| # Sentiment model for sarcasm detection | |
| self.sentiment_model_name = "cardiffnlp/twitter-roberta-base-sentiment" | |
| self.sarcasm_tokenizer = load_model_with_cache( | |
| AutoTokenizer, | |
| self.sentiment_model_name, | |
| "sentiment" | |
| ) | |
| self.sarcasm_model = load_model_with_cache( | |
| AutoModelForSequenceClassification, | |
| self.sentiment_model_name, | |
| "sentiment" | |
| ) | |
| if self.sarcasm_model is not None: | |
| self.sarcasm_model = self.sarcasm_model.to(DEVICE) | |
| # Enhanced keyword-based analyzer as fallback and enhancement | |
| self.keyword_analyzer = EnhancedKeywordEmotionAnalyzer() | |
| def predict_emotion(self, text): | |
| if self.tokenizer is None or self.model is None: | |
| print("Text emotion model not properly initialized.") | |
| # Use keyword-based analysis as primary method in this case | |
| return self.keyword_analyzer.analyze(text) | |
| try: | |
| # Get model predictions | |
| inputs = self.tokenizer(text, return_tensors="pt", truncation=True, padding=True).to(DEVICE) | |
| with torch.no_grad(): | |
| outputs = self.model(**inputs) | |
| # Get probabilities from model | |
| model_probs = F.softmax(outputs.logits, dim=1).cpu().numpy()[0] | |
| # Get keyword-based analysis | |
| keyword_probs = self.keyword_analyzer.analyze(text) | |
| # Combine both methods with weighting | |
| # If text contains strong emotional keywords, give more weight to keyword analysis | |
| keyword_strength = self.keyword_analyzer.get_keyword_strength(text) | |
| # Adaptive weighting based on keyword strength | |
| keyword_weight = min(0.6, keyword_strength * 0.1) # Cap at 0.6 | |
| model_weight = 1.0 - keyword_weight | |
| # Combine predictions | |
| combined_probs = (model_weight * model_probs) + (keyword_weight * keyword_probs) | |
| # Normalize to ensure sum is 1 | |
| combined_probs = combined_probs / np.sum(combined_probs) | |
| return combined_probs | |
| except Exception as e: | |
| print(f"Error in text emotion prediction: {e}") | |
| # Fallback to keyword analysis | |
| return self.keyword_analyzer.analyze(text) | |
| def detect_sarcasm(self, text): | |
| if self.sarcasm_tokenizer is None or self.sarcasm_model is None: | |
| print("Sarcasm model not properly initialized.") | |
| # Use keyword-based sarcasm detection as fallback | |
| return self.keyword_analyzer.detect_sarcasm(text) | |
| try: | |
| inputs = self.sarcasm_tokenizer(text, return_tensors="pt", truncation=True, padding=True).to(DEVICE) | |
| with torch.no_grad(): | |
| outputs = self.sarcasm_model(**inputs) | |
| sentiment_probs = F.softmax(outputs.logits, dim=1).cpu().numpy()[0] | |
| # Enhance with keyword-based sarcasm detection | |
| keyword_sarcasm = self.keyword_analyzer.detect_sarcasm(text) | |
| # If keyword analysis strongly suggests sarcasm, blend with model prediction | |
| if keyword_sarcasm[2] > 0.5: # If sarcasm probability is high from keywords | |
| # Give 40% weight to keyword analysis | |
| combined_probs = 0.6 * sentiment_probs + 0.4 * keyword_sarcasm | |
| return combined_probs | |
| return sentiment_probs | |
| except Exception as e: | |
| print(f"Error in sarcasm detection: {e}") | |
| # Fallback to keyword analysis | |
| return self.keyword_analyzer.detect_sarcasm(text) | |
| # Enhanced keyword-based emotion analyzer | |
| class EnhancedKeywordEmotionAnalyzer: | |
| def __init__(self): | |
| # Enhanced emotion keywords with weights | |
| self.emotion_keywords = { | |
| "happy": [ | |
| ("happy", 1.0), ("joy", 1.0), ("delight", 0.9), ("excited", 0.9), | |
| ("glad", 0.8), ("pleased", 0.8), ("cheerful", 0.9), ("smile", 0.7), | |
| ("enjoy", 0.8), ("wonderful", 0.8), ("great", 0.7), ("excellent", 0.8), | |
| ("thrilled", 1.0), ("ecstatic", 1.0), ("content", 0.7), ("satisfied", 0.7), | |
| ("pleasure", 0.8), ("fantastic", 0.9), ("awesome", 0.9), ("love", 0.9), | |
| ("amazing", 0.9), ("perfect", 0.8), ("fun", 0.8), ("delighted", 1.0) | |
| ], | |
| "sad": [ | |
| ("sad", 1.0), ("unhappy", 0.9), ("depressed", 1.0), ("sorrow", 1.0), | |
| ("grief", 1.0), ("tearful", 0.9), ("miserable", 1.0), ("disappointed", 0.8), | |
| ("upset", 0.8), ("down", 0.7), ("heartbroken", 1.0), ("gloomy", 0.9), | |
| ("devastated", 1.0), ("hurt", 0.8), ("blue", 0.7), ("regret", 0.8), | |
| ("dejected", 0.9), ("dismal", 0.9), ("lonely", 0.8), ("terrible", 0.8), | |
| ("hopeless", 0.9), ("lost", 0.7), ("crying", 0.9), ("tragic", 0.9) | |
| ], | |
| "angry": [ | |
| ("angry", 1.0), ("mad", 0.9), ("furious", 1.0), ("annoyed", 0.8), | |
| ("irritated", 0.8), ("enraged", 1.0), ("livid", 1.0), ("outraged", 1.0), | |
| ("frustrated", 0.8), ("infuriated", 1.0), ("pissed", 0.9), ("hate", 0.9), | |
| ("hostile", 0.9), ("bitter", 0.8), ("resentful", 0.8), ("fuming", 0.9), | |
| ("irate", 1.0), ("outraged", 1.0), ("seething", 1.0), ("cross", 0.7), | |
| ("exasperated", 0.8), ("disgusted", 0.8), ("indignant", 0.9), ("rage", 1.0) | |
| ], | |
| "fearful": [ | |
| ("afraid", 1.0), ("scared", 1.0), ("frightened", 1.0), ("fear", 0.9), | |
| ("terror", 1.0), ("panic", 1.0), ("horrified", 1.0), ("worried", 0.8), | |
| ("anxious", 0.9), ("nervous", 0.8), ("terrified", 1.0), ("dread", 0.9), | |
| ("alarmed", 0.8), ("petrified", 1.0), ("threatened", 0.8), ("intimidated", 0.8), | |
| ("apprehensive", 0.8), ("uneasy", 0.7), ("tense", 0.7), ("stressed", 0.7), | |
| ("spooked", 0.9), ("paranoid", 0.9), ("freaked", 0.9), ("jumpy", 0.8) | |
| ], | |
| "disgust": [ | |
| ("disgust", 1.0), ("gross", 0.9), ("repulsed", 1.0), ("revolted", 1.0), | |
| ("sick", 0.8), ("nauseous", 0.8), ("yuck", 0.9), ("ew", 0.8), | |
| ("nasty", 0.9), ("repugnant", 1.0), ("foul", 0.9), ("appalled", 0.9), | |
| ("sickened", 0.9), ("offended", 0.8), ("distaste", 0.9), ("aversion", 0.9), | |
| ("abhorrent", 1.0), ("odious", 1.0), ("repellent", 1.0), ("objectionable", 0.8), | |
| ("detestable", 1.0), ("loathsome", 1.0), ("vile", 1.0), ("horrid", 0.9) | |
| ], | |
| "surprised": [ | |
| ("surprised", 1.0), ("shocked", 0.9), ("astonished", 1.0), ("amazed", 0.9), | |
| ("startled", 0.9), ("stunned", 0.9), ("speechless", 0.8), ("unexpected", 0.8), | |
| ("wow", 0.8), ("whoa", 0.8), ("unbelievable", 0.8), ("incredible", 0.8), | |
| ("dumbfounded", 1.0), ("flabbergasted", 1.0), ("staggered", 0.9), ("aghast", 0.9), | |
| ("astounded", 1.0), ("taken aback", 0.9), ("disbelief", 0.8), ("bewildered", 0.8), | |
| ("thunderstruck", 1.0), ("wonder", 0.7), ("sudden", 0.6), ("jaw-dropping", 0.9) | |
| ], | |
| "neutral": [ | |
| ("okay", 0.7), ("fine", 0.7), ("alright", 0.7), ("normal", 0.8), | |
| ("calm", 0.8), ("steady", 0.8), ("balanced", 0.8), ("ordinary", 0.8), | |
| ("routine", 0.8), ("regular", 0.8), ("standard", 0.8), ("moderate", 0.8), | |
| ("usual", 0.8), ("typical", 0.8), ("average", 0.8), ("common", 0.8), | |
| ("so-so", 0.7), ("fair", 0.7), ("acceptable", 0.7), ("stable", 0.8), | |
| ("unchanged", 0.8), ("plain", 0.7), ("mild", 0.7), ("middle-of-the-road", 0.8) | |
| ], | |
| "sarcastic": [ | |
| ("yeah right", 1.0), ("sure thing", 0.9), ("oh great", 0.9), ("how wonderful", 0.9), | |
| ("wow", 0.7), ("really", 0.7), ("obviously", 0.8), ("definitely", 0.7), | |
| ("of course", 0.7), ("totally", 0.7), ("exactly", 0.7), ("perfect", 0.7), | |
| ("brilliant", 0.8), ("genius", 0.8), ("whatever", 0.8), ("right", 0.7), | |
| ("nice job", 0.8), ("good one", 0.8), ("bravo", 0.8), ("slow clap", 1.0), | |
| ("im shocked", 0.9), ("never would have guessed", 0.9), ("shocking", 0.7), ("unbelievable", 0.7) | |
| ] | |
| } | |
| # Sarcasm indicators | |
| self.sarcasm_indicators = [ | |
| "yeah right", "sure thing", "oh great", "riiiight", "suuure", | |
| "*slow clap*", "/s", "wow just wow", "you don't say", "no kidding", | |
| "what a surprise", "shocker", "congratulations", "well done", "genius", | |
| "oh wow", "oh really", "totally", "absolutely", "clearly", "obviously", | |
| "genius idea", "brilliant plan", "fantastic job", "amazing work" | |
| ] | |
| # Negation words | |
| self.negations = [ | |
| "not", "no", "never", "none", "nothing", "neither", "nor", "nowhere", | |
| "hardly", "scarcely", "barely", "doesn't", "isn't", "wasn't", "shouldn't", | |
| "wouldn't", "couldn't", "won't", "can't", "don't", "didn't", "haven't" | |
| ] | |
| # Intensifiers | |
| self.intensifiers = [ | |
| "very", "really", "extremely", "absolutely", "completely", "totally", | |
| "utterly", "quite", "particularly", "especially", "remarkably", "truly", | |
| "so", "too", "such", "incredibly", "exceedingly", "extraordinarily" | |
| ] | |
| # Compile patterns for more efficient matching | |
| import re | |
| self.emotion_patterns = {} | |
| for emotion, keywords in self.emotion_keywords.items(): | |
| self.emotion_patterns[emotion] = [ | |
| (re.compile(r'\b' + re.escape(word) + r'\b', re.IGNORECASE), weight) | |
| for word, weight in keywords | |
| ] | |
| self.negation_pattern = re.compile(r'\b(' + '|'.join(re.escape(n) for n in self.negations) + r')\s+(\w+)', re.IGNORECASE) | |
| self.intensifier_pattern = re.compile(r'\b(' + '|'.join(re.escape(i) for i in self.intensifiers) + r')\s+(\w+)', re.IGNORECASE) | |
| def analyze(self, text): | |
| """ | |
| Analyze text for emotions using enhanced keyword matching | |
| Returns numpy array of emotion probabilities | |
| """ | |
| # Initialize scores | |
| emotion_scores = {emotion: 0.0 for emotion in EMOTIONS} | |
| # Set base score for neutral | |
| emotion_scores["neutral"] = 1.0 | |
| # Convert to lowercase for case-insensitive matching | |
| text_lower = text.lower() | |
| # Process each emotion | |
| for emotion, patterns in self.emotion_patterns.items(): | |
| for pattern, weight in patterns: | |
| matches = pattern.findall(text_lower) | |
| if matches: | |
| # Add score based on number of matches and their weights | |
| emotion_scores[emotion] += len(matches) * weight | |
| # Process negations - look for "not happy" patterns | |
| negation_matches = self.negation_pattern.finditer(text_lower) | |
| for match in negation_matches: | |
| negation, word = match.groups() | |
| # Check if the negated word is in any emotion keywords | |
| for emotion, keywords in self.emotion_keywords.items(): | |
| if any(word == kw[0] for kw in keywords): | |
| # Reduce score for this emotion and slightly increase opposite emotions | |
| emotion_scores[emotion] -= 0.7 | |
| # Increase opposite emotions (e.g., if "not happy", increase "sad") | |
| if emotion == "happy": | |
| emotion_scores["sad"] += 0.3 | |
| elif emotion == "sad": | |
| emotion_scores["happy"] += 0.3 | |
| # Process intensifiers - "very happy" should increase score | |
| intensifier_matches = self.intensifier_pattern.finditer(text_lower) | |
| for match in intensifier_matches: | |
| intensifier, word = match.groups() | |
| # Check if the intensified word is in any emotion keywords | |
| for emotion, keywords in self.emotion_keywords.items(): | |
| if any(word == kw[0] for kw in keywords): | |
| # Increase score for this emotion | |
| emotion_scores[emotion] += 0.5 | |
| # Ensure no negative scores | |
| for emotion in emotion_scores: | |
| emotion_scores[emotion] = max(0, emotion_scores[emotion]) | |
| # Normalize to probabilities | |
| total = sum(emotion_scores.values()) | |
| if total > 0: | |
| probs = {emotion: score/total for emotion, score in emotion_scores.items()} | |
| else: | |
| # If no emotions detected, default to neutral | |
| probs = {emotion: 0.0 for emotion in EMOTIONS} | |
| probs["neutral"] = 1.0 | |
| # Convert to numpy array in the same order as EMOTIONS | |
| return np.array([probs[emotion] for emotion in EMOTIONS]) | |
| def detect_sarcasm(self, text): | |
| """ | |
| Detect sarcasm in text | |
| Returns [negative, neutral, positive] probability array where high "positive" | |
| with negative context indicates sarcasm | |
| """ | |
| text_lower = text.lower() | |
| sarcasm_score = 0.0 | |
| # Check for direct sarcasm indicators | |
| for indicator in self.sarcasm_indicators: | |
| if indicator in text_lower: | |
| sarcasm_score += 0.3 | |
| # Check for common sarcasm patterns | |
| positive_words = [kw[0] for kw in self.emotion_keywords["happy"]] | |
| has_positive = any(word in text_lower for word in positive_words) | |
| negative_context = any(neg in text_lower for neg in ["terrible", "awful", "horrible", "fail", "disaster", "mess"]) | |
| # Positive words in negative context suggests sarcasm | |
| if has_positive and negative_context: | |
| sarcasm_score += 0.4 | |
| # Check for excessive punctuation which might indicate sarcasm | |
| if "!!!" in text or "?!" in text: | |
| sarcasm_score += 0.2 | |
| # Cap the score | |
| sarcasm_score = min(1.0, sarcasm_score) | |
| # If sarcasm detected, return sentiment array biased toward sarcasm | |
| # [negative, neutral, positive] - high positive with negative context indicates sarcasm | |
| if sarcasm_score > 0.3: | |
| return np.array([0.1, 0.1, 0.8]) # High positive signal for sarcasm detection | |
| else: | |
| # Return balanced array (no strong indication of sarcasm) | |
| return np.array([0.33, 0.34, 0.33]) | |
| def get_keyword_strength(self, text): | |
| """ | |
| Measure the strength of emotional keywords in the text | |
| Returns a value between 0 and 10 | |
| """ | |
| text_lower = text.lower() | |
| total_matches = 0 | |
| weighted_matches = 0 | |
| # Count all matches across all emotions with their weights | |
| for emotion, patterns in self.emotion_patterns.items(): | |
| for pattern, weight in patterns: | |
| matches = pattern.findall(text_lower) | |
| total_matches += len(matches) | |
| weighted_matches += len(matches) * weight | |
| # Calculate strength score on a scale of 0-10 | |
| if total_matches > 0: | |
| avg_weight = weighted_matches / total_matches | |
| # Scale based on number of matches and their average weight | |
| strength = min(10, (total_matches * avg_weight) / 2) | |
| return strength | |
| else: | |
| return 0.0 | |
| # Part 7: Acoustic Feature Extraction with Fallback | |
| class AcousticFeatureExtractor: | |
| def __init__(self): | |
| self.use_opensmile = True | |
| try: | |
| import opensmile | |
| # Initialize OpenSMILE with the eGeMAPS feature set instead of ComParE_2016 | |
| # eGeMAPS is specifically designed for voice analysis and emotion recognition | |
| self.smile = opensmile.Smile( | |
| feature_set=opensmile.FeatureSet.eGeMAPSv02, | |
| feature_level=opensmile.FeatureLevel.Functionals, | |
| ) | |
| print("OpenSMILE feature extractor initialized successfully with eGeMAPS") | |
| except Exception as e: | |
| print(f"Failed to initialize OpenSMILE: {e}") | |
| print("Using librosa for feature extraction instead.") | |
| self.use_opensmile = False | |
| def extract_features(self, audio_path): | |
| try: | |
| if self.use_opensmile: | |
| # Use OpenSMILE for feature extraction | |
| features = self.smile.process_file(audio_path) | |
| return features.values | |
| else: | |
| # Fallback to improved librosa feature extraction | |
| return self._extract_librosa_features(audio_path) | |
| except Exception as e: | |
| print(f"Error in acoustic feature extraction: {e}") | |
| print("Using dummy features as fallback") | |
| # Return dummy features in case of error | |
| return np.zeros(88) # eGeMAPS dimension | |
| def _extract_librosa_features(self, audio_path): | |
| """Improved librosa feature extraction focusing on emotion-relevant features""" | |
| try: | |
| # Load audio | |
| y, sr = librosa.load(audio_path, sr=22050) | |
| # Extract features specifically relevant to emotion detection | |
| # 1. Pitch features (fundamental frequency) | |
| pitches, magnitudes = librosa.piptrack(y=y, sr=sr) | |
| pitch_mean = np.mean(pitches[magnitudes > np.median(magnitudes)]) | |
| pitch_std = np.std(pitches[magnitudes > np.median(magnitudes)]) | |
| # 2. Energy/intensity features | |
| rms = librosa.feature.rms(y=y)[0] | |
| energy_mean = np.mean(rms) | |
| energy_std = np.std(rms) | |
| # 3. Tempo and rhythm features | |
| tempo, _ = librosa.beat.beat_track(y=y, sr=sr) | |
| # 4. Spectral features | |
| spectral_centroid = librosa.feature.spectral_centroid(y=y, sr=sr)[0] | |
| spectral_bandwidth = librosa.feature.spectral_bandwidth(y=y, sr=sr)[0] | |
| spectral_rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr)[0] | |
| # 5. Voice quality features | |
| zero_crossing_rate = librosa.feature.zero_crossing_rate(y)[0] | |
| # Compute statistics for each feature | |
| features = [] | |
| for feature in [spectral_centroid, spectral_bandwidth, spectral_rolloff, zero_crossing_rate]: | |
| features.extend([np.mean(feature), np.std(feature), np.min(feature), np.max(feature)]) | |
| # Add pitch and energy features | |
| features.extend([pitch_mean, pitch_std, energy_mean, energy_std, tempo]) | |
| # Add MFCCs (critical for speech emotion) | |
| mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13) | |
| for mfcc in mfccs: | |
| features.extend([np.mean(mfcc), np.std(mfcc)]) | |
| # Convert to numpy array | |
| features = np.array(features) | |
| # Handle NaN values | |
| features = np.nan_to_num(features) | |
| # Pad or truncate to match eGeMAPS dimension (88) | |
| if len(features) < 88: | |
| features = np.pad(features, (0, 88 - len(features))) | |
| else: | |
| features = features[:88] | |
| return features | |
| except Exception as e: | |
| print(f"Error in librosa feature extraction: {e}") | |
| return np.zeros(88) # Same dimension as eGeMAPS | |
| # Part 8: Acoustic Emotion Classifier | |
| class AcousticEmotionClassifier(nn.Module): | |
| def __init__(self, input_dim, hidden_dim=128, num_classes=len(EMOTIONS)): | |
| super().__init__() | |
| # Normalize input features | |
| self.batch_norm = nn.BatchNorm1d(input_dim) | |
| # Feature extraction layers | |
| self.feature_extractor = nn.Sequential( | |
| nn.Linear(input_dim, hidden_dim * 2), | |
| nn.ReLU(), | |
| nn.Dropout(0.3), | |
| nn.Linear(hidden_dim * 2, hidden_dim), | |
| nn.ReLU(), | |
| nn.Dropout(0.3) | |
| ) | |
| # Emotion classification head | |
| self.classifier = nn.Sequential( | |
| nn.Linear(hidden_dim, hidden_dim // 2), | |
| nn.ReLU(), | |
| nn.Dropout(0.2), | |
| nn.Linear(hidden_dim // 2, num_classes) | |
| ) | |
| # Initialize weights properly | |
| self._init_weights() | |
| def _init_weights(self): | |
| """Initialize weights with Xavier initialization""" | |
| for m in self.modules(): | |
| if isinstance(m, nn.Linear): | |
| nn.init.xavier_uniform_(m.weight) | |
| if m.bias is not None: | |
| nn.init.zeros_(m.bias) | |
| def forward(self, x): | |
| # Handle different input shapes | |
| if len(x.shape) == 1: | |
| x = x.unsqueeze(0) # Add batch dimension | |
| # Normalize features | |
| x = self.batch_norm(x) | |
| # Extract features | |
| features = self.feature_extractor(x) | |
| # Classify emotions | |
| output = self.classifier(features) | |
| return output | |
| class PretrainedAudioClassifier: | |
| """A rule-based classifier for audio emotion detection until proper training""" | |
| def __init__(self): | |
| # Define acoustic feature thresholds for emotions based on research | |
| # These are simplified heuristics based on acoustic phonetics research | |
| self.feature_thresholds = { | |
| "happy": { | |
| "pitch_mean": (220, 400), # Higher pitch for happiness | |
| "energy_mean": (0.6, 1.0), # Higher energy | |
| "speech_rate": (0.8, 1.0) # Faster speech rate | |
| }, | |
| "sad": { | |
| "pitch_mean": (100, 220), # Lower pitch for sadness | |
| "energy_mean": (0.1, 0.5), # Lower energy | |
| "speech_rate": (0.3, 0.7) # Slower speech rate | |
| }, | |
| "angry": { | |
| "pitch_mean": (250, 400), # Higher pitch for anger | |
| "energy_mean": (0.7, 1.0), # Higher energy | |
| "speech_rate": (0.7, 1.0) # Faster speech rate | |
| }, | |
| "fearful": { | |
| "pitch_mean": (200, 350), # Higher pitch | |
| "energy_mean": (0.4, 0.8), # Medium energy | |
| "speech_rate": (0.6, 0.9) # Medium-fast speech rate | |
| }, | |
| "neutral": { | |
| "pitch_mean": (180, 240), # Medium pitch | |
| "energy_mean": (0.3, 0.6), # Medium energy | |
| "speech_rate": (0.4, 0.7) # Medium speech rate | |
| } | |
| } | |
| def extract_key_features(self, audio_path): | |
| """Extract key acoustic features for rule-based classification""" | |
| try: | |
| y, sr = librosa.load(audio_path, sr=22050) | |
| # Extract pitch | |
| pitches, magnitudes = librosa.piptrack(y=y, sr=sr) | |
| pitch_mean = np.mean(pitches[magnitudes > 0.1]) if np.any(magnitudes > 0.1) else 200 | |
| # Normalize pitch to 0-1 range (assuming human pitch range 80-400 Hz) | |
| pitch_mean_norm = (pitch_mean - 80) / (400 - 80) | |
| pitch_mean_norm = max(0, min(1, pitch_mean_norm)) | |
| # Extract energy | |
| rms = librosa.feature.rms(y=y)[0] | |
| energy_mean = np.mean(rms) | |
| # Normalize energy | |
| energy_mean_norm = energy_mean / 0.1 # Assuming 0.1 is a reasonable max RMS | |
| energy_mean_norm = max(0, min(1, energy_mean_norm)) | |
| # Estimate speech rate from onsets | |
| onset_env = librosa.onset.onset_strength(y=y, sr=sr) | |
| onsets = librosa.onset.onset_detect(onset_envelope=onset_env, sr=sr) | |
| if len(onsets) > 1: | |
| speech_rate = len(onsets) / (len(y) / sr) # Onsets per second | |
| speech_rate_norm = min(1.0, speech_rate / 5.0) # Normalize, assuming 5 onsets/sec is fast | |
| else: | |
| speech_rate_norm = 0.5 # Default to medium if can't detect | |
| return { | |
| "pitch_mean": pitch_mean_norm, | |
| "energy_mean": energy_mean_norm, | |
| "speech_rate": speech_rate_norm | |
| } | |
| except Exception as e: | |
| print(f"Error extracting key features: {e}") | |
| return { | |
| "pitch_mean": 0.5, # Default to medium values | |
| "energy_mean": 0.5, | |
| "speech_rate": 0.5 | |
| } | |
| def predict(self, audio_path): | |
| """Predict emotion based on acoustic features""" | |
| # Extract key features | |
| features = self.extract_key_features(audio_path) | |
| # Calculate match scores for each emotion | |
| emotion_scores = {} | |
| for emotion, thresholds in self.feature_thresholds.items(): | |
| score = 0 | |
| for feature, (min_val, max_val) in thresholds.items(): | |
| # Normalize threshold to 0-1 range | |
| min_norm = (min_val - 80) / (400 - 80) if feature == "pitch_mean" else min_val | |
| max_norm = (max_val - 80) / (400 - 80) if feature == "pitch_mean" else max_val | |
| # Check if feature is in the emotion's range | |
| if min_norm <= features[feature] <= max_norm: | |
| # Higher score if closer to the middle of the range | |
| middle = (min_norm + max_norm) / 2 | |
| distance = abs(features[feature] - middle) / ((max_norm - min_norm) / 2) | |
| feature_score = 1 - distance | |
| score += feature_score | |
| else: | |
| # Penalty for being outside the range | |
| score -= 0.5 | |
| emotion_scores[emotion] = max(0, score) | |
| # Add small values for other emotions not in our basic set | |
| for emotion in EMOTIONS: | |
| if emotion not in emotion_scores: | |
| emotion_scores[emotion] = 0.1 | |
| # Normalize scores to probabilities | |
| total = sum(emotion_scores.values()) | |
| if total > 0: | |
| probs = {emotion: score/total for emotion, score in emotion_scores.items()} | |
| else: | |
| # Default to neutral if all scores are 0 | |
| probs = {emotion: 0.1 for emotion in EMOTIONS} | |
| probs["neutral"] = 0.5 | |
| # Convert to array in the same order as EMOTIONS | |
| return np.array([probs[emotion] for emotion in EMOTIONS]) | |
| # Part 9: Improved Fusion Model for combining text and acoustic predictions | |
| class AdaptiveModalityFusionModel(nn.Module): | |
| def __init__(self, text_dim, acoustic_dim, hidden_dim=128, num_classes=len(EMOTIONS)): | |
| super().__init__() | |
| # Confidence estimators for each modality | |
| self.text_confidence = nn.Sequential( | |
| nn.Linear(text_dim, hidden_dim), | |
| nn.ReLU(), | |
| nn.Linear(hidden_dim, 1), | |
| nn.Sigmoid() | |
| ) | |
| self.acoustic_confidence = nn.Sequential( | |
| nn.Linear(acoustic_dim, hidden_dim), | |
| nn.ReLU(), | |
| nn.Linear(hidden_dim, 1), | |
| nn.Sigmoid() | |
| ) | |
| # Feature transformation | |
| self.text_transform = nn.Linear(text_dim, hidden_dim) | |
| self.acoustic_transform = nn.Linear(acoustic_dim, hidden_dim) | |
| # Final classifier | |
| self.classifier = nn.Sequential( | |
| nn.Linear(hidden_dim, num_classes), | |
| nn.Softmax(dim=1) | |
| ) | |
| # Initialize weights | |
| self._init_weights() | |
| def _init_weights(self): | |
| for m in self.modules(): | |
| if isinstance(m, nn.Linear): | |
| nn.init.xavier_uniform_(m.weight) | |
| if m.bias is not None: | |
| nn.init.zeros_(m.bias) | |
| def forward(self, text_features, acoustic_features): | |
| # Estimate confidence for each modality | |
| text_conf = self.text_confidence(text_features) | |
| acoustic_conf = self.acoustic_confidence(acoustic_features) | |
| # Normalize confidences to sum to 1 | |
| total_conf = text_conf + acoustic_conf | |
| text_weight = text_conf / total_conf | |
| acoustic_weight = acoustic_conf / total_conf | |
| # Transform features | |
| text_transformed = self.text_transform(text_features) | |
| acoustic_transformed = self.acoustic_transform(acoustic_features) | |
| # Weighted combination | |
| combined = text_weight * text_transformed + acoustic_weight * acoustic_transformed | |
| # Classification | |
| output = self.classifier(combined) | |
| return output | |
| # Part 10: Simple Rule-based Fallback Classifier | |
| class RuleBasedClassifier: | |
| """A simple rule-based classifier for fallback when models fail""" | |
| def predict(self, text): | |
| """Predict emotion based on simple word matching""" | |
| text = text.lower() | |
| # Simple emotion keywords | |
| emotion_keywords = { | |
| "happy": ["happy", "joy", "delight", "excited", "glad", "pleased", "cheerful", "smile"], | |
| "sad": ["sad", "unhappy", "depressed", "sorrow", "grief", "tearful", "miserable"], | |
| "angry": ["angry", "mad", "furious", "annoyed", "irritated", "enraged", "livid"], | |
| "fearful": ["afraid", "scared", "frightened", "fear", "terror", "panic", "horrified"], | |
| "disgust": ["disgust", "gross", "repulsed", "revolted", "sick", "nauseous"], | |
| "surprised": ["surprised", "shocked", "astonished", "amazed", "startled"], | |
| "sarcastic": ["yeah right", "sure thing", "oh great", "wow", "really", "obviously"] | |
| } | |
| # Count matches for each emotion | |
| emotion_scores = {emotion: 0 for emotion in EMOTIONS} | |
| emotion_scores["neutral"] = 1 # Default to neutral | |
| for emotion, keywords in emotion_keywords.items(): | |
| for keyword in keywords: | |
| if keyword in text: | |
| emotion_scores[emotion] += 1 | |
| # Return the emotion with highest score | |
| max_emotion = max(emotion_scores, key=emotion_scores.get) | |
| # Convert to probabilities | |
| total = sum(emotion_scores.values()) | |
| probs = {emotion: score/total for emotion, score in emotion_scores.items()} | |
| return max_emotion, probs | |
| # Part 11: Complete Emotion Recognition Pipeline with Comprehensive Error Handling | |
| class EmotionRecognitionPipeline: | |
| def __init__(self, acoustic_model_path=None, fusion_model_path=None): | |
| try: | |
| print("Initializing Improved Emotion Recognition Pipeline...") | |
| # Initialize transcriber | |
| self.transcriber = WhisperTranscriber() | |
| # Initialize text classifier | |
| self.text_classifier = TextEmotionClassifier() | |
| # Initialize feature extractor with improved features | |
| self.feature_extractor = AcousticFeatureExtractor() | |
| # Initialize rule-based audio classifier as fallback | |
| self.rule_based_audio = PretrainedAudioClassifier() | |
| # Initialize simple rule-based fallback | |
| self.rule_based = RuleBasedClassifier() | |
| # Define simple fusion strategy | |
| self.use_adaptive_fusion = False | |
| print("Improved Emotion Recognition Pipeline initialized successfully") | |
| except Exception as e: | |
| print(f"Error initializing pipeline: {e}") | |
| print("Some functionality may be limited") | |
| def predict(self, audio_path): | |
| results = { | |
| "transcription": "", | |
| "text_emotions": {emotion: 0.0 for emotion in EMOTIONS}, | |
| "acoustic_emotions": {emotion: 0.0 for emotion in EMOTIONS}, | |
| "final_emotions": {emotion: 0.0 for emotion in EMOTIONS}, | |
| "predicted_emotion": "neutral", | |
| "is_sarcastic": False, | |
| "errors": [] | |
| } | |
| # Step 1: Transcribe audio | |
| try: | |
| transcription = self.transcriber.transcribe(audio_path) | |
| results["transcription"] = transcription | |
| print(f"Transcription: {transcription}") | |
| except Exception as e: | |
| error_msg = f"Failed to transcribe audio: {e}" | |
| print(error_msg) | |
| results["errors"].append(error_msg) | |
| results["transcription"] = "Error: Could not transcribe audio" | |
| # Step 2: Analyze text emotions | |
| try: | |
| if results["transcription"].startswith("Error:"): | |
| # Skip text analysis if transcription failed | |
| text_emotions = np.ones(len(EMOTIONS)) / len(EMOTIONS) # Equal probabilities | |
| sarcasm_indicators = np.array([0.33, 0.33, 0.33]) | |
| # Try rule-based as fallback | |
| rule_emotion, rule_probs = self.rule_based.predict(results["transcription"]) | |
| results["text_emotions"] = rule_probs | |
| else: | |
| text_emotions = self.text_classifier.predict_emotion(results["transcription"]) | |
| sarcasm_indicators = self.text_classifier.detect_sarcasm(results["transcription"]) | |
| # Format text emotions result | |
| results["text_emotions"] = {EMOTIONS[i]: float(text_emotions[i]) | |
| for i in range(min(len(text_emotions), len(EMOTIONS)))} | |
| print(f"Text-based emotions: {results['text_emotions']}") | |
| except Exception as e: | |
| error_msg = f"Failed to analyze text emotions: {e}" | |
| print(error_msg) | |
| results["errors"].append(error_msg) | |
| # Use equal probabilities as fallback | |
| results["text_emotions"] = {emotion: 1.0/len(EMOTIONS) for emotion in EMOTIONS} | |
| # Step 3: Use rule-based audio classifier instead of the untrained model | |
| try: | |
| # Get predictions from rule-based classifier | |
| audio_probs = self.rule_based_audio.predict(audio_path) | |
| # Format acoustic emotions result | |
| results["acoustic_emotions"] = {EMOTIONS[i]: float(audio_probs[i]) | |
| for i in range(min(len(audio_probs), len(EMOTIONS)))} | |
| print(f"Acoustic-based emotions: {results['acoustic_emotions']}") | |
| except Exception as e: | |
| error_msg = f"Failed to predict acoustic emotions: {e}" | |
| print(error_msg) | |
| results["errors"].append(error_msg) | |
| # Use equal probabilities as fallback | |
| results["acoustic_emotions"] = {emotion: 1.0/len(EMOTIONS) for emotion in EMOTIONS} | |
| audio_probs = np.ones(len(EMOTIONS)) / len(EMOTIONS) | |
| # Step 4: Improved fusion strategy - text-biased weighted average | |
| try: | |
| # Convert dictionaries to arrays | |
| text_array = np.array(list(results["text_emotions"].values())) | |
| audio_array = np.array(list(results["acoustic_emotions"].values())) | |
| # Calculate confidence scores | |
| text_confidence = 1.0 - np.std(text_array) # Higher confidence if distribution is more certain | |
| audio_confidence = 1.0 - np.std(audio_array) | |
| # Bias toward text model since it's working better | |
| text_confidence *= 1.5 # Increase text confidence | |
| # Normalize confidences | |
| total_confidence = text_confidence + audio_confidence | |
| text_weight = text_confidence / total_confidence | |
| audio_weight = audio_confidence / total_confidence | |
| # Weighted average | |
| final_probs = (text_weight * text_array) + (audio_weight * audio_array) | |
| # Format final emotions | |
| results["final_emotions"] = {EMOTIONS[i]: float(final_probs[i]) | |
| for i in range(len(EMOTIONS))} | |
| print(f"Fusion weights: Text={text_weight:.2f}, Audio={audio_weight:.2f}") | |
| except Exception as e: | |
| error_msg = f"Failed to fuse predictions: {e}" | |
| print(error_msg) | |
| results["errors"].append(error_msg) | |
| # Fallback to text-only predictions since they're more reliable | |
| results["final_emotions"] = results["text_emotions"] | |
| # Get predicted emotion | |
| try: | |
| emotion_values = list(results["final_emotions"].values()) | |
| emotion_idx = np.argmax(emotion_values) | |
| predicted_emotion = EMOTIONS[emotion_idx] | |
| results["predicted_emotion"] = predicted_emotion | |
| # Check for sarcasm | |
| is_sarcastic = False | |
| if hasattr(sarcasm_indicators, "__len__") and len(sarcasm_indicators) > 0: | |
| if predicted_emotion in ["happy", "neutral"] and np.argmax(sarcasm_indicators) == 0: | |
| is_sarcastic = True | |
| results["predicted_emotion"] = "sarcastic" | |
| results["is_sarcastic"] = is_sarcastic | |
| except Exception as e: | |
| error_msg = f"Failed to determine final emotion: {e}" | |
| print(error_msg) | |
| results["errors"].append(error_msg) | |
| results["predicted_emotion"] = "neutral" # Default fallback | |
| return results | |
| # Part 12: Example on sample audio (with better error handling) | |
| def demo_on_sample_audio(pipeline, audio_path): | |
| if not os.path.exists(audio_path): | |
| print(f"Error: Audio file not found at {audio_path}") | |
| return | |
| print(f"Analyzing audio file: {audio_path}") | |
| try: | |
| # Predict emotion from audio | |
| result = pipeline.predict(audio_path) | |
| # Print results | |
| print("\n===== EMOTION ANALYSIS RESULTS =====") | |
| print(f"Transcription: {result['transcription']}") | |
| print(f"\nPredicted Emotion: {result['predicted_emotion'].upper()}") | |
| print(f"Is Sarcastic: {'Yes' if result['is_sarcastic'] else 'No'}") | |
| print("\nText-based Emotions:") | |
| for emotion, score in result['text_emotions'].items(): | |
| print(f" {emotion}: {score:.4f}") | |
| print("\nAcoustic-based Emotions:") | |
| for emotion, score in result['acoustic_emotions'].items(): | |
| print(f" {emotion}: {score:.4f}") | |
| print("\nFinal Fusion Emotions:") | |
| for emotion, score in result['final_emotions'].items(): | |
| print(f" {emotion}: {score:.4f}") | |
| if 'errors' in result and result['errors']: | |
| print("\nErrors encountered:") | |
| for error in result['errors']: | |
| print(f" - {error}") | |
| # Plot results for visualization | |
| try: | |
| emotions = list(result['text_emotions'].keys()) | |
| text_scores = list(result['text_emotions'].values()) | |
| acoustic_scores = list(result['acoustic_emotions'].values()) | |
| final_scores = list(result['final_emotions'].values()) | |
| plt.figure(figsize=(12, 6)) | |
| x = np.arange(len(emotions)) | |
| width = 0.25 | |
| plt.bar(x - width, text_scores, width, label='Text') | |
| plt.bar(x, acoustic_scores, width, label='Acoustic') | |
| plt.bar(x + width, final_scores, width, label='Final') | |
| plt.xlabel('Emotions') | |
| plt.ylabel('Probability') | |
| plt.title('Emotion Prediction Results') | |
| plt.xticks(x, emotions, rotation=45) | |
| plt.legend() | |
| plt.tight_layout() | |
| plt.show() | |
| except Exception as e: | |
| print(f"Error creating visualization: {e}") | |
| except Exception as e: | |
| print(f"Error in demo: {e}") | |
| # Part 13: Simplified dataset loading for RAVDESS dataset | |
| def load_ravdess_sample(): | |
| """ | |
| Download a small sample from RAVDESS dataset for testing | |
| """ | |
| # Create directory for sample data | |
| sample_dir = "./sample_data" | |
| os.makedirs(sample_dir, exist_ok=True) | |
| # Try to download a sample file | |
| try: | |
| import urllib.request | |
| # Example file from RAVDESS dataset (happy emotion) | |
| url = "https://zenodo.org/record/1188976/files/Audio_Speech_Actors_01-24/Actor_01/03-01-01-01-01-01-01.wav" | |
| sample_path = os.path.join(sample_dir, "sample_happy.wav") | |
| if not os.path.exists(sample_path): | |
| print(f"Downloading sample audio file from RAVDESS dataset...") | |
| urllib.request.urlretrieve(url, sample_path) | |
| print(f"Downloaded sample to {sample_path}") | |
| else: | |
| print(f"Sample file already exists at {sample_path}") | |
| return sample_path | |
| except Exception as e: | |
| print(f"Error downloading RAVDESS sample: {e}") | |
| return None | |
| # Part 14: Simplified main function with proper error handling | |
| def main(): | |
| print("Starting Tone Classification System...") | |
| try: | |
| # Create the pipeline | |
| pipeline = EmotionRecognitionPipeline() | |
| # Try to load a sample file | |
| sample_audio_path = load_ravdess_sample() | |
| if sample_audio_path and os.path.exists(sample_audio_path): | |
| demo_on_sample_audio(pipeline, sample_audio_path) | |
| else: | |
| print("\nNo sample audio file available.") | |
| print("To use the system, provide an audio file path when calling the demo_on_sample_audio function:") | |
| print("\ndemo_on_sample_audio(pipeline, '/path/to/your/audio.wav')") | |
| except Exception as e: | |
| print(f"Error in main execution: {e}") | |
| print("\nTroubleshooting tips:") | |
| print("1. Check if your audio file exists and is in a supported format (WAV recommended)") | |
| print("2. Ensure you have sufficient memory for model loading") | |
| print("3. Try with a smaller model size in WhisperTranscriber (tiny instead of base)") | |
| print("4. Make sure you have stable internet connection for model downloading") | |
| if __name__ == "__main__": | |
| main() | |
| # Add this after the main() function definition but before the if __name__ == "__main__": line | |
| def upload_and_analyze(): | |
| from IPython.display import display | |
| import ipywidgets as widgets | |
| # Create upload widget | |
| upload_widget = widgets.FileUpload( | |
| accept='.wav, .mp3', | |
| multiple=False, | |
| description='Upload Audio File', | |
| button_style='primary' | |
| ) | |
| display(upload_widget) | |
| # Create button to trigger analysis | |
| analyze_button = widgets.Button(description='Analyze Audio') | |
| display(analyze_button) | |
| # Create output area for results | |
| output = widgets.Output() | |
| display(output) | |
| def on_analyze_click(b): | |
| with output: | |
| output.clear_output() | |
| if not upload_widget.value: | |
| print("Please upload an audio file first.") | |
| return | |
| # Get the uploaded file | |
| file_data = next(iter(upload_widget.value.values())) | |
| file_name = next(iter(upload_widget.value.keys())) | |
| # Save to temp file | |
| temp_file = f"./temp_{file_name}" | |
| with open(temp_file, 'wb') as f: | |
| f.write(file_data['content']) | |
| print(f"Analyzing uploaded file: {file_name}") | |
| # Create pipeline and analyze | |
| pipeline = EmotionRecognitionPipeline() | |
| demo_on_sample_audio(pipeline, temp_file) | |
| analyze_button.on_click(on_analyze_click) | |
| # Then modify the if __name__ == "__main__": section | |
| if __name__ == "__main__": | |
| try: | |
| import ipywidgets | |
| # If ipywidgets is available, we're in a notebook | |
| print("Running in notebook mode - use the upload widget below:") | |
| upload_and_analyze() | |
| except ImportError: | |
| # Otherwise, run the standard main function | |
| main() | |
| import os | |
| import numpy as np | |
| import torch | |
| import matplotlib.pyplot as plt | |
| import gradio as gr | |
| from io import BytesIO | |
| # Use the existing EmotionRecognitionPipeline class from your code | |
| def analyze_audio(audio_path): | |
| """ | |
| Analyze an audio file and return the emotion recognition results | |
| """ | |
| if audio_path is None: | |
| return "Please provide an audio file.", None, None | |
| try: | |
| # Create the pipeline | |
| pipeline = EmotionRecognitionPipeline() | |
| # Predict emotion from audio | |
| result = pipeline.predict(audio_path) | |
| # Format the results for display | |
| transcription = result['transcription'] | |
| predicted_emotion = result['predicted_emotion'].upper() | |
| is_sarcastic = 'Yes' if result['is_sarcastic'] else 'No' | |
| # Create text summary | |
| summary = f"Transcription: {transcription}\n\n" | |
| summary += f"Predicted Emotion: {predicted_emotion}\n" | |
| summary += f"Is Sarcastic: {is_sarcastic}\n\n" | |
| summary += "Text-based Emotions:\n" | |
| for emotion, score in result['text_emotions'].items(): | |
| summary += f" {emotion}: {score:.4f}\n" | |
| summary += "\nAcoustic-based Emotions:\n" | |
| for emotion, score in result['acoustic_emotions'].items(): | |
| summary += f" {emotion}: {score:.4f}\n" | |
| summary += "\nFinal Fusion Emotions:\n" | |
| for emotion, score in result['final_emotions'].items(): | |
| summary += f" {emotion}: {score:.4f}\n" | |
| if 'errors' in result and result['errors']: | |
| summary += "\nErrors encountered:\n" | |
| for error in result['errors']: | |
| summary += f" - {error}\n" | |
| # Create visualization | |
| fig = create_emotion_plot(result) | |
| return summary, fig, result['predicted_emotion'] | |
| except Exception as e: | |
| return f"Error analyzing audio: {str(e)}", None, "error" | |
| def create_emotion_plot(result): | |
| """ | |
| Create a visualization of the emotion recognition results | |
| """ | |
| emotions = list(result['text_emotions'].keys()) | |
| text_scores = list(result['text_emotions'].values()) | |
| acoustic_scores = list(result['acoustic_emotions'].values()) | |
| final_scores = list(result['final_emotions'].values()) | |
| fig = plt.figure(figsize=(10, 6)) | |
| x = np.arange(len(emotions)) | |
| width = 0.25 | |
| plt.bar(x - width, text_scores, width, label='Text') | |
| plt.bar(x, acoustic_scores, width, label='Acoustic') | |
| plt.bar(x + width, final_scores, width, label='Final') | |
| plt.xlabel('Emotions') | |
| plt.ylabel('Probability') | |
| plt.title('Emotion Recognition Results') | |
| plt.xticks(x, emotions, rotation=45) | |
| plt.legend() | |
| plt.tight_layout() | |
| return fig | |
| # Create the Gradio interface with tabs for microphone and file upload | |
| def create_gradio_interface(): | |
| with gr.Blocks(title="Tone Classification System") as demo: | |
| gr.Markdown("# Tone Classification System") | |
| gr.Markdown("This system analyzes audio to detect emotions, including sarcasm and figures of speech.") | |
| with gr.Tabs(): | |
| with gr.TabItem("Microphone Input"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| audio_input = gr.Audio( | |
| sources=["microphone"], | |
| type="filepath", | |
| label="Record your voice" | |
| ) | |
| analyze_btn = gr.Button("Analyze Recording", variant="primary") | |
| with gr.Column(): | |
| result_text = gr.Textbox(label="Analysis Results", lines=15) | |
| emotion_plot = gr.Plot(label="Emotion Probabilities") | |
| emotion_label = gr.Label(label="Detected Emotion") | |
| analyze_btn.click( | |
| fn=analyze_audio, | |
| inputs=audio_input, | |
| outputs=[result_text, emotion_plot, emotion_label] | |
| ) | |
| with gr.TabItem("File Upload"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| file_input = gr.Audio( | |
| sources=["upload"], | |
| type="filepath", | |
| label="Upload audio file (.wav, .mp3)" | |
| ) | |
| file_analyze_btn = gr.Button("Analyze File", variant="primary") | |
| with gr.Column(): | |
| file_result_text = gr.Textbox(label="Analysis Results", lines=15) | |
| file_emotion_plot = gr.Plot(label="Emotion Probabilities") | |
| file_emotion_label = gr.Label(label="Detected Emotion") | |
| file_analyze_btn.click( | |
| fn=analyze_audio, | |
| inputs=file_input, | |
| outputs=[file_result_text, file_emotion_plot, file_emotion_label] | |
| ) | |
| gr.Markdown("## How to Use") | |
| gr.Markdown(""" | |
| 1. **Microphone Input**: Record your voice and click 'Analyze Recording' | |
| 2. **File Upload**: Upload an audio file (.wav or .mp3) and click 'Analyze File' | |
| The system will transcribe the speech, analyze emotions from both text and acoustic features, | |
| and display the results with a visualization of emotion probabilities. | |
| """) | |
| gr.Markdown("## About") | |
| gr.Markdown(""" | |
| This tone classification system combines text and acoustic features to detect emotions in speech. | |
| It uses a multi-modal approach with: | |
| - Speech-to-text transcription | |
| - Text-based emotion analysis | |
| - Acoustic feature extraction | |
| - Fusion of both modalities for final prediction | |
| The system can detect: neutral, happy, sad, angry, fearful, disgust, surprised, and sarcastic tones. | |
| """) | |
| return demo | |
| # Main function to launch the Gradio interface | |
| def main(): | |
| demo = create_gradio_interface() | |
| demo.launch() | |
| if __name__ == "__main__": | |
| main() | |