Spaces:
Runtime error
Runtime error
| import math | |
| import os | |
| from io import BytesIO | |
| import gradio as gr | |
| import cv2 | |
| from PIL import Image | |
| import requests | |
| from transformers import pipeline | |
| from pydub import AudioSegment | |
| from faster_whisper import WhisperModel | |
| import joblib | |
| import mediapipe as mp | |
| import numpy as np | |
| import pandas as pd | |
| import moviepy as mpe | |
| import time | |
| theme = gr.themes.Base( | |
| primary_hue="cyan", | |
| secondary_hue="blue", | |
| neutral_hue="slate", | |
| ) | |
| model = WhisperModel("small", device="cpu", compute_type="int8") | |
| body_lang_model = joblib.load('body_language.pkl') | |
| mp_holistic = mp.solutions.holistic | |
| holistic = mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) | |
| mp_face_mesh = mp.solutions.face_mesh | |
| face_mesh = mp_face_mesh.FaceMesh(min_detection_confidence=0.5, min_tracking_confidence=0.5) | |
| API_KEY = os.getenv('HF_API_KEY') | |
| pipe1 = pipeline("image-classification", model="dima806/facial_emotions_image_detection") | |
| pipe2 = pipeline("text-classification", model="SamLowe/roberta-base-go_emotions") | |
| AUDIO_API_URL = "https://api-inference.huggingface.co/models/ehcalabres/wav2vec2-lg-xlsr-en-speech-emotion-recognition" | |
| headers = {"Authorization": "Bearer " + API_KEY + ""} | |
| def extract_frames(video_path): | |
| clip = mpe.VideoFileClip(video_path) | |
| clip.write_videofile('mp4file.mp4', fps=60) | |
| cap = cv2.VideoCapture('mp4file.mp4') | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| interval = int(fps/2) | |
| print(interval, total_frames) | |
| result = [] | |
| distract_count = 0 | |
| total_count = 0 | |
| output_list = [] | |
| for i in range(0, total_frames, interval): | |
| total_count += 1 | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, i) | |
| ret, frame = cap.read() | |
| if ret: | |
| image = cv2.cvtColor(cv2.flip(frame, 1), cv2.COLOR_BGR2RGB) | |
| image.flags.writeable = False | |
| results = face_mesh.process(image) | |
| image.flags.writeable = True | |
| image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) | |
| img_h, img_w, img_c = image.shape | |
| face_3d = [] | |
| face_2d = [] | |
| flag = False | |
| if results.multi_face_landmarks: | |
| for face_landmarks in results.multi_face_landmarks: | |
| for idx, lm in enumerate(face_landmarks.landmark): | |
| if idx == 33 or idx == 263 or idx == 1 or idx == 61 or idx == 291 or idx == 199: | |
| if idx == 1: | |
| nose_2d = (lm.x * img_w, lm.y * img_h) | |
| nose_3d = (lm.x * img_w, lm.y * img_h, lm.z * 3000) | |
| x, y = int(lm.x * img_w), int(lm.y * img_h) | |
| face_2d.append([x, y]) | |
| face_3d.append([x, y, lm.z]) | |
| face_2d = np.array(face_2d, dtype=np.float64) | |
| face_3d = np.array(face_3d, dtype=np.float64) | |
| focal_length = 1 * img_w | |
| cam_matrix = np.array([ [focal_length, 0, img_h / 2], | |
| [0, focal_length, img_w / 2], | |
| [0, 0, 1]]) | |
| dist_matrix = np.zeros((4, 1), dtype=np.float64) | |
| success, rot_vec, trans_vec = cv2.solvePnP(face_3d, face_2d, cam_matrix, dist_matrix) | |
| rmat, jac = cv2.Rodrigues(rot_vec) | |
| angles, mtxR, mtxQ, Qx, Qy, Qz = cv2.RQDecomp3x3(rmat) | |
| x = angles[0] * 360 | |
| y = angles[1] * 360 | |
| z = angles[2] * 360 | |
| if y < -7 or y > 7 or x < -7 or x > 7: | |
| flag = True | |
| else: | |
| flag = False | |
| if flag == True: | |
| distract_count += 1 | |
| image2 = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| results2 = holistic.process(image2) | |
| pose = results2.pose_landmarks.landmark | |
| pose_row = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in pose]).flatten()) | |
| face = results2.face_landmarks.landmark | |
| face_row = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in face]).flatten()) | |
| row = pose_row+face_row | |
| X = pd.DataFrame([row]) | |
| body_language_class = body_lang_model.predict(X)[0] | |
| body_language_prob = body_lang_model.predict_proba(X)[0] | |
| output_dict = {} | |
| for class_name, prob in zip(body_lang_model.classes_, body_language_prob): | |
| output_dict[class_name] = prob | |
| output_list.append(output_dict) | |
| pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) | |
| response = pipe1(pil_image) | |
| temp = {} | |
| for ele in response: | |
| label, score = ele.values() | |
| temp[label] = score | |
| result.append(temp) | |
| distraction_rate = distract_count/total_count | |
| total_bad_prob = 0 | |
| total_good_prob = 0 | |
| for output_dict in output_list: | |
| total_bad_prob += output_dict['Bad'] | |
| total_good_prob += output_dict['Good'] | |
| num_frames = len(output_list) | |
| avg_bad_prob = total_bad_prob / num_frames | |
| avg_good_prob = total_good_prob / num_frames | |
| final_output = {'Bad': avg_bad_prob, 'Good': avg_good_prob} | |
| cap.release() | |
| video_emotion_totals = {} | |
| emotion_totals = { 'admiration': 0.0, 'amusement': 0.0, 'angry': 0.0, 'annoyance': 0.0, 'approval': 0.0, 'caring': 0.0, 'confusion': 0.0, 'curiosity': 0.0, 'desire': 0.0, 'disappointment': 0.0, 'disapproval': 0.0, 'disgust': 0.0, 'embarrassment': 0.0, 'excitement': 0.0, 'fear': 0.0, 'gratitude': 0.0, 'grief': 0.0, 'happy': 0.0, 'love': 0.0, 'nervousness': 0.0, 'optimism': 0.0, 'pride': 0.0, 'realization': 0.0, 'relief': 0.0, 'remorse': 0.0, 'sad': 0.0, 'surprise': 0.0, 'neutral': 0.0 } | |
| counter = 0 | |
| for ele in result: | |
| for emotion in ele.keys(): | |
| emotion_totals[emotion] += ele.get(emotion) | |
| counter += 1 | |
| for emotion in emotion_totals: | |
| emotion_totals[emotion] /= counter | |
| if (emotion_totals[emotion]) > 0.0: | |
| video_emotion_totals[emotion] = emotion_totals[emotion] | |
| return video_emotion_totals, result, final_output, distraction_rate | |
| def analyze_sentiment(text): | |
| response = pipe2(text) | |
| sentiment_results = {} | |
| for ele in response: | |
| label, score = ele.values() | |
| sentiment_results[label] = score | |
| return sentiment_results | |
| def video_to_audio(input_video): | |
| temp = requests.get('https://parthcodes-test-flask-deploy.hf.space/useridping') | |
| user_id = temp.json().get('current') | |
| print(user_id) | |
| video_emotion_totals, frames_sentiments, body_language, distraction_rate = extract_frames(input_video) | |
| print("Total Video Emotions ... Done") | |
| print("Video Frame Sentiment ... Done") | |
| print("Body Language ... Done") | |
| print("Distraction Rate ... Done") | |
| cap = cv2.VideoCapture(input_video) | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
| audio = AudioSegment.from_file(input_video) | |
| audio_binary = audio.export(format="wav").read() | |
| audio_bytesio2 = BytesIO(audio_binary) | |
| flag = False | |
| while flag == False: | |
| audio_bytesio = BytesIO(audio_binary) | |
| response = requests.post(AUDIO_API_URL, headers=headers, data=audio_bytesio) | |
| if type(response.json()) == type({}): | |
| print(response.json()) | |
| time.sleep(30) | |
| print("Retrying for Speech Emotions") | |
| else: | |
| flag = True | |
| formatted_response = {} | |
| for ele in response.json(): | |
| score, label = ele.values() | |
| formatted_response[label] = score | |
| print("Speech Sentiments ... Done") | |
| segments, info = model.transcribe(audio_bytesio2, beam_size=5) | |
| transcript = '' | |
| video_sentiment_final = [] | |
| final_output = [] | |
| for segment in segments: | |
| transcript = transcript + segment.text + " " | |
| transcript_segment_sentiment = analyze_sentiment(segment.text) | |
| emotion_totals = { | |
| 'admiration': 0.0, | |
| 'amusement': 0.0, | |
| 'angry': 0.0, | |
| 'annoyance': 0.0, | |
| 'approval': 0.0, | |
| 'caring': 0.0, | |
| 'confusion': 0.0, | |
| 'curiosity': 0.0, | |
| 'desire': 0.0, | |
| 'disappointment': 0.0, | |
| 'disapproval': 0.0, | |
| 'disgust': 0.0, | |
| 'embarrassment': 0.0, | |
| 'excitement': 0.0, | |
| 'fear': 0.0, | |
| 'gratitude': 0.0, | |
| 'grief': 0.0, | |
| 'happy': 0.0, | |
| 'love': 0.0, | |
| 'nervousness': 0.0, | |
| 'optimism': 0.0, | |
| 'pride': 0.0, | |
| 'realization': 0.0, | |
| 'relief': 0.0, | |
| 'remorse': 0.0, | |
| 'sad': 0.0, | |
| 'surprise': 0.0, | |
| 'neutral': 0.0 | |
| } | |
| counter = 0 | |
| for i in range(math.ceil(segment.start), math.floor(segment.end)): | |
| for emotion in frames_sentiments[i].keys(): | |
| emotion_totals[emotion] += frames_sentiments[i].get(emotion) | |
| counter += 1 | |
| for emotion in emotion_totals: | |
| emotion_totals[emotion] /= counter | |
| video_sentiment_final.append(emotion_totals) | |
| video_segment_sentiment = {key: value for key, value in emotion_totals.items() if value != 0.0} | |
| segment_finals = {segment.id: (segment.text, segment.start, segment.end, transcript_segment_sentiment, video_segment_sentiment)} | |
| final_output.append(segment_finals) | |
| total_transcript_sentiment = {key: value for key, value in analyze_sentiment(transcript).items() if value >= 0.01} | |
| print("Full Transcript Sentiments ... Done") | |
| emotion_finals = { | |
| 'admiration': 0.0, | |
| 'amusement': 0.0, | |
| 'angry': 0.0, | |
| 'annoyance': 0.0, | |
| 'approval': 0.0, | |
| 'caring': 0.0, | |
| 'confusion': 0.0, | |
| 'curiosity': 0.0, | |
| 'desire': 0.0, | |
| 'disappointment': 0.0, | |
| 'disapproval': 0.0, | |
| 'disgust': 0.0, | |
| 'embarrassment': 0.0, | |
| 'excitement': 0.0, | |
| 'fear': 0.0, | |
| 'gratitude': 0.0, | |
| 'grief': 0.0, | |
| 'happy': 0.0, | |
| 'love': 0.0, | |
| 'nervousness': 0.0, | |
| 'optimism': 0.0, | |
| 'pride': 0.0, | |
| 'realization': 0.0, | |
| 'relief': 0.0, | |
| 'remorse': 0.0, | |
| 'sad': 0.0, | |
| 'surprise': 0.0, | |
| 'neutral': 0.0 | |
| } | |
| for i in range(0, video_sentiment_final.__len__()-1): | |
| for emotion in video_sentiment_final[i].keys(): | |
| emotion_finals[emotion] += video_sentiment_final[i].get(emotion) | |
| for emotion in emotion_finals: | |
| emotion_finals[emotion] /= video_sentiment_final.__len__() | |
| emotion_finals = {key: value for key, value in emotion_finals.items() if value != 0.0} | |
| print("Video Frame (Mapping & AVG.) ... Done") | |
| print("\nProcessing Completed!!\n") | |
| payload = { | |
| 'from': 'gradio', | |
| 'user_id': user_id, | |
| 'total_video_emotions': video_emotion_totals, | |
| 'emotions_final': emotion_finals, | |
| 'body_language': body_language, | |
| 'distraction_rate': distraction_rate, | |
| 'formatted_response': formatted_response, | |
| 'total_transcript_sentiment': total_transcript_sentiment | |
| } | |
| print(payload) | |
| response = requests.post('https://parthcodes-test-flask-deploy.hf.space/interview', json=payload) | |
| with gr.Blocks(theme=theme, css=".gradio-container { background: rgba(0, 0, 0, 0.4) !important; box-shadow: 0 8px 32px 0 rgba( 31, 38, 135, 0.37 ) !important; backdrop-filter: blur( 10px ) !important; -webkit-backdrop-filter: blur( 10px ) !important; border-radius: 12px !important;}") as Video: | |
| input_video = gr.Video(sources=["webcam", "upload"], format='mp4') | |
| input_video.stop_recording(fn=video_to_audio, inputs=input_video) | |
| input_video.upload(fn=video_to_audio, inputs=input_video) | |
| Video.launch() |