Spaces:
Paused
Paused
| from vitpose import VitPose | |
| import requests | |
| import os | |
| from config import API_URL,API_KEY | |
| from fastapi import UploadFile | |
| import logging | |
| import cv2 | |
| import numpy as np | |
| from dataclasses import dataclass | |
| from typing import Optional, Tuple, Dict, List | |
| import time | |
| import json | |
| from fastapi.responses import JSONResponse | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Jump Analysis Constants | |
| JUMP_THRESHOLD_PERCENT = 0.05 | |
| SMOOTHING_WINDOW = 5 | |
| HORIZONTAL_OFFSET_FACTOR = 0.75 | |
| VELOCITY_WINDOW = 3 | |
| METRICS_BELOW_FEET_OFFSET = 20 | |
| # Color Constants | |
| BLUE = (255, 0, 0) | |
| GREEN = (0, 255, 0) | |
| YELLOW = (0, 255, 255) | |
| WHITE = (255, 255, 255) | |
| BLACK = (0, 0, 0) | |
| GRAY = (128, 128, 128) | |
| LIGHT_GRAY = (200, 200, 200) | |
| COLORS = { | |
| "blue": BLUE, | |
| "green": GREEN, | |
| "yellow": YELLOW, | |
| "white": WHITE, | |
| "black": BLACK, | |
| "gray": GRAY, | |
| "light_gray": LIGHT_GRAY | |
| } | |
| # Keypoint indices | |
| KEYPOINT_INDICES = { | |
| 'L_Ankle': 15, 'L_Ear': 3, 'L_Elbow': 7, 'L_Eye': 1, 'L_Hip': 11, | |
| 'L_Knee': 13, 'L_Shoulder': 5, 'L_Wrist': 9, 'Nose': 0, 'R_Ankle': 16, | |
| 'R_Ear': 4, 'R_Elbow': 8, 'R_Eye': 2, 'R_Hip': 12, 'R_Knee': 14, | |
| 'R_Shoulder': 6, 'R_Wrist': 10 | |
| } | |
| # Skeleton connections | |
| SKELETON_CONNECTIONS = [ | |
| ("Nose", "L_Eye"), ("Nose", "R_Eye"), ("L_Eye", "L_Ear"), ("R_Eye", "R_Ear"), | |
| ("Nose", "L_Shoulder"), ("Nose", "R_Shoulder"), ("L_Shoulder", "R_Shoulder"), | |
| ("L_Shoulder", "L_Elbow"), ("R_Shoulder", "R_Elbow"), ("L_Elbow", "L_Wrist"), | |
| ("R_Elbow", "R_Wrist"), ("L_Shoulder", "L_Hip"), ("R_Shoulder", "R_Hip"), | |
| ("L_Hip", "R_Hip"), ("L_Hip", "L_Knee"), ("R_Hip", "R_Knee"), | |
| ("L_Knee", "L_Ankle"), ("R_Knee", "R_Ankle") | |
| ] | |
| class JumpMetrics: | |
| max_jump_height: float = 0.0 | |
| velocity_vertical: float = 0.0 | |
| peak_power_sayer: float = 0.0 | |
| jump_peak_power: float = 0.0 | |
| repetition_count: int = 0 | |
| ground_level: Optional[float] = None | |
| takeoff_head_y: Optional[float] = None | |
| max_head_height_px: Optional[float] = None | |
| jump_started: bool = False | |
| class OverlayConfig: | |
| alpha: float = 0.7 | |
| font: int = cv2.FONT_HERSHEY_SIMPLEX | |
| font_scale_title_metric: float = 0.5 | |
| font_scale_value: float = 0.7 | |
| font_scale_title_main: float = 1.2 | |
| font_thickness_metric: int = 1 | |
| font_thickness_title_main: int = 1 | |
| line_height_title_metric: int = int(20 * 1.2) | |
| line_height_value: int = int(25 * 1.2) | |
| padding_vertical: int = int(15 * 1.2) | |
| padding_horizontal: int = int(15 * 1.2) | |
| border_thickness: int = 1 | |
| corner_radius: int = 10 | |
| spacing_horizontal: int = 30 | |
| title_y_offset: int = 50 | |
| metrics_y_offset_alto: int = 80 | |
| class FramePosition: | |
| x: int | |
| y: int | |
| width: int | |
| height: int | |
| def process_video(file_name: str,vitpose: VitPose,user_id: str,player_id: str): | |
| """ | |
| Process a video file using VitPose for pose estimation and send results to webhook. | |
| This function processes a video file by applying pose estimation, saving the annotated | |
| video to the static directory, and sending the processed video to a webhook endpoint. | |
| Args: | |
| file_name (str): Path to the input video file | |
| vitpose (VitPose): VitPose instance for pose estimation | |
| user_id (str): ID of the user uploading the video | |
| player_id (str): ID of the player in the video | |
| Returns: | |
| None | |
| Raises: | |
| ValueError: If video file cannot be opened or processed | |
| requests.RequestException: If webhook request fails | |
| """ | |
| video_path = file_name | |
| contents = open(video_path, "rb").read() | |
| with open(video_path, "wb") as f: | |
| f.write(contents) | |
| logger.info(f"file saved {video_path}") | |
| logger.info(f"starting task {video_path}") | |
| new_file_name = os.path.join("static", video_path) | |
| logger.info(f"new file name {new_file_name}") | |
| vitpose.output_video_path = new_file_name | |
| annotated_frames = vitpose.run(video_path) | |
| vitpose.frames_to_video(annotated_frames) | |
| logger.info(f"Video processed {video_path}") | |
| with open(new_file_name, "rb") as f: | |
| contents = f.read() | |
| url = API_URL+ "/excercises/webhooks/video-processed" | |
| logger.info(f"Sending video to {url}") | |
| files = {"file": (video_path, contents, "video/mp4")} | |
| logger.info(f"video_path: {video_path}") | |
| response = requests.post(url, files=files, | |
| data={"user_id":user_id,"typeMessage":"video_processed","file_name":video_path, | |
| "player_id":player_id}, | |
| stream=True, | |
| headers={"token":API_KEY}) | |
| logger.info(f"Response: {response.status_code}") | |
| logger.info(f"Response: {response.text}") | |
| logger.info(f"Video sent to {url}") | |
| def process_salto_alto(file_name: str, | |
| vitpose: VitPose, | |
| player_data: dict, | |
| exercise_id: str, | |
| repetitions) -> dict: | |
| """ | |
| Process a high jump exercise video using VitPose for pose estimation and analyze jump metrics. | |
| This function processes a high jump video by analyzing pose keypoints to calculate | |
| jump metrics including height, velocity, and power. Results are sent to an API endpoint. | |
| Args: | |
| file_name (str): Path to the input video file | |
| vitpose (VitPose): VitPose instance for pose estimation | |
| player_data (dict): Dictionary containing player information including: | |
| - height: Player height in cm | |
| - weight: Player weight in kg | |
| - id: Player identifier | |
| exercise_id (str): Unique identifier for the exercise | |
| repetitions (int): Expected number of jump repetitions in the video | |
| Returns: | |
| dict: Dictionary containing analysis results and video information | |
| Raises: | |
| ValueError: If video processing fails or player data is invalid | |
| requests.RequestException: If API request fails | |
| """ | |
| # Use the provided VitPose instance | |
| print(f"start processing") | |
| model = vitpose.pipeline | |
| # Get player parameters from player_data or use defaults | |
| reference_height = player_data.get('height', 1.68) # Altura aproximada de la persona en metros | |
| body_mass_kg = player_data.get('weight', 64) # Peso corporal en kg | |
| # Generate output paths | |
| output_video = file_name.replace('.mp4', '_analyzed.mp4') | |
| # Process the video and get the jump metrics | |
| # print(f"reference_height: {reference_height}") | |
| results_dict = analyze_jump_video( | |
| model=model, | |
| input_video=file_name, | |
| output_video=output_video, | |
| player_height= float(reference_height) / 100, #cm to m | |
| body_mass_kg= float(body_mass_kg), | |
| repetitions=repetitions | |
| ) | |
| results_dict = {'video_analysis': {'output_video': 'user_id_2_player_id_2_exercise_salto_alto_VIDEO-2025-05-19-18-55-47_analyzed.mp4'}, 'repetition_data': [{'repetition': 1, 'distancia_elevada': 0.47999998927116394, 'salto_alto': 2.180000066757202, 'potencia_sayer': 3768.719970703125}, {'repetition': 2, 'distancia_elevada': 0.49000000953674316, 'salto_alto': 2.190000057220459, 'potencia_sayer': 3827.929931640625}, {'repetition': 3, 'distancia_elevada': 0.5099999904632568, 'salto_alto': 2.2100000381469727, 'potencia_sayer': 3915.5}]} | |
| print(f"results_dict: {results_dict}") | |
| response = send_results_api(results_dict, | |
| player_data["id"], | |
| exercise_id, | |
| file_name) | |
| # os.remove(file_name) | |
| # os.remove(output_video) | |
| def send_results_api(results_dict: dict, | |
| player_id: str, | |
| exercise_id: str, | |
| video_path: str) -> JSONResponse: | |
| """ | |
| Send video analysis results to the API webhook endpoint. | |
| This function uploads the analyzed video file along with the computed metrics | |
| to the API's webhook endpoint for processing and storage. | |
| Args: | |
| results_dict (dict): Dictionary containing analysis results including: | |
| - video_analysis: Information about the processed video | |
| - repetition_data: List of metrics for each jump repetition | |
| player_id (str): Unique identifier for the player | |
| exercise_id (str): Unique identifier for the exercise | |
| video_path (str): Path to the video file to upload | |
| Returns: | |
| JSONResponse: HTTP response from the API endpoint | |
| Raises: | |
| FileNotFoundError: If the video file doesn't exist | |
| requests.RequestException: If the API request fails | |
| json.JSONEncodeError: If results_dict cannot be serialized to JSON | |
| """ | |
| url = API_URL + "/excercises/webhooks/video-processed-results" | |
| logger.info(f"Sending video results to {url}") | |
| # Open the video file | |
| with open(video_path, 'rb') as video_file: | |
| # Prepare the files dictionary for file upload | |
| files = { | |
| 'file': (video_path.split('/')[-1], video_file, 'video/mp4') | |
| } | |
| # Prepare the form data | |
| data = { | |
| 'player_id': player_id, | |
| 'exercise_id': exercise_id, | |
| 'results': json.dumps(results_dict) # Convert dict to JSON string | |
| } | |
| # Send the request with both files and data | |
| response = requests.post( | |
| url, | |
| headers={"token": API_KEY}, | |
| files=files, | |
| data=data, | |
| stream=True | |
| ) | |
| logger.info(f"Response: {response.status_code}") | |
| logger.info(f"Response: {response.text}") | |
| return response | |
| def setup_video_capture(input_video: str, output_video: str) -> Tuple[cv2.VideoCapture, cv2.VideoWriter, int, int]: | |
| """ | |
| Initialize video capture and writer objects for video processing. | |
| This function creates OpenCV VideoCapture and VideoWriter objects with matching | |
| properties (frame rate, dimensions) for reading from input and writing to output. | |
| Args: | |
| input_video (str): Path to the input video file | |
| output_video (str): Path for the output video file | |
| Returns: | |
| Tuple[cv2.VideoCapture, cv2.VideoWriter, int, int]: A tuple containing: | |
| - cap: VideoCapture object for reading input video | |
| - out: VideoWriter object for writing output video | |
| - width: Video frame width in pixels | |
| - height: Video frame height in pixels | |
| Raises: | |
| ValueError: If the input video cannot be opened or read | |
| cv2.error: If video writer initialization fails | |
| """ | |
| cap = cv2.VideoCapture(input_video) | |
| if not cap.isOpened(): | |
| raise ValueError("Error al abrir el video") | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| out = cv2.VideoWriter(output_video, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height)) | |
| return cap, out, width, height | |
| def calibrate_pose_detection(model, cap, player_height: float) -> Tuple[float, int, int]: | |
| """ | |
| Calibrate pose detection scale and reference points using the first video frame. | |
| This function analyzes the first frame to establish the pixel-to-meter conversion | |
| ratio based on the player's known height and detects initial shoulder positions | |
| for reference during video processing. | |
| Args: | |
| model: VitPose model instance for pose estimation | |
| cap: OpenCV VideoCapture object | |
| player_height (float): Actual height of the player in meters | |
| Returns: | |
| Tuple[float, int, int]: A tuple containing: | |
| - PX_PER_METER: Conversion factor from pixels to meters | |
| - initial_left_shoulder_x: X-coordinate of left shoulder in pixels | |
| - initial_right_shoulder_x: X-coordinate of right shoulder in pixels | |
| Raises: | |
| ValueError: If video cannot be read or pose detection fails on first frame | |
| IndexError: If required keypoints are not detected in the first frame | |
| """ | |
| ret, frame = cap.read() | |
| if not ret: | |
| raise ValueError("Error al leer el video") | |
| output = model(frame) | |
| keypoints = output.keypoints_xy.float().cpu().numpy() | |
| labels = model.pose_estimator_config.label2id | |
| nose_keypoint = labels["Nose"] | |
| L_ankle_keypoint = labels["L_Ankle"] | |
| R_ankle_keypoint = labels["R_Ankle"] | |
| L_shoulder_keypoint = labels["L_Shoulder"] | |
| R_shoulder_keypoint = labels["R_Shoulder"] | |
| PX_PER_METER = None | |
| initial_left_shoulder_x = None | |
| initial_right_shoulder_x = None | |
| if (keypoints is not None and len(keypoints) > 0 and len(keypoints[0]) > 0): | |
| kpts_first = keypoints[0] | |
| if len(kpts_first[nose_keypoint]) > 0 and len(kpts_first[L_ankle_keypoint]) > 0: | |
| initial_person_height_px = min(kpts_first[L_ankle_keypoint][1], kpts_first[R_ankle_keypoint][1]) - kpts_first[nose_keypoint][1] | |
| PX_PER_METER = initial_person_height_px / player_height | |
| if len(kpts_first[L_shoulder_keypoint]) > 0 and len(kpts_first[R_shoulder_keypoint]) > 0: | |
| initial_left_shoulder_x = int(kpts_first[L_shoulder_keypoint][0]) | |
| initial_right_shoulder_x = int(kpts_first[R_shoulder_keypoint][0]) | |
| if PX_PER_METER is None or initial_left_shoulder_x is None or initial_right_shoulder_x is None: | |
| raise ValueError("No se pudo calibrar la escala o detectar los hombros en el primer frame.") | |
| return PX_PER_METER, initial_left_shoulder_x, initial_right_shoulder_x | |
| def process_frame_keypoints(model, frame): | |
| """ | |
| Process a video frame and extract human pose keypoints. | |
| This function applies the pose estimation model to a frame and validates | |
| that all required keypoints (nose, ankles, shoulders) are detected and visible. | |
| Args: | |
| model: VitPose model instance for pose estimation | |
| frame: Input video frame as numpy array | |
| Returns: | |
| Tuple containing: | |
| - success (bool): True if all required keypoints were detected, False otherwise | |
| - current_ankle_y (float or None): Y-coordinate of the highest ankle point if detected | |
| - current_head_y (float or None): Y-coordinate of the nose point if detected | |
| - keypoints (numpy.ndarray or None): Array of detected keypoints if successful | |
| """ | |
| try: | |
| output = model(frame) | |
| keypoints = output.keypoints_xy.float().cpu().numpy() | |
| labels = model.pose_estimator_config.label2id | |
| nose_keypoint = labels["Nose"] | |
| L_ankle_keypoint = labels["L_Ankle"] | |
| R_ankle_keypoint = labels["R_Ankle"] | |
| L_shoulder_keypoint = labels["L_Shoulder"] | |
| R_shoulder_keypoint = labels["R_Shoulder"] | |
| if (keypoints is not None and | |
| len(keypoints) > 0 and | |
| len(keypoints[0]) > 0 and | |
| keypoints.size > 0): | |
| kpts = keypoints[0] | |
| if (nose_keypoint < len(kpts) and L_ankle_keypoint < len(kpts) and | |
| R_ankle_keypoint < len(kpts) and L_shoulder_keypoint < len(kpts) and | |
| R_shoulder_keypoint < len(kpts)): | |
| nose = kpts[nose_keypoint] | |
| ankles = [kpts[L_ankle_keypoint], kpts[R_ankle_keypoint]] | |
| left_shoulder = kpts[L_shoulder_keypoint] | |
| right_shoulder = kpts[R_shoulder_keypoint] | |
| if (nose[0] > 0 and nose[1] > 0 and | |
| all(a[0] > 0 and a[1] > 0 for a in ankles) and | |
| left_shoulder[0] > 0 and left_shoulder[1] > 0 and | |
| right_shoulder[0] > 0 and right_shoulder[1] > 0): | |
| current_ankle_y = min(a[1] for a in ankles) | |
| current_head_y = nose[1] | |
| return True, current_ankle_y, current_head_y, keypoints | |
| return False, None, None, None | |
| except Exception as e: | |
| print(f"Error processing frame: {e}") | |
| return False, None, None, None | |
| def detect_jump_events(metrics: JumpMetrics, smoothed_ankle_y: float, smoothed_head_y: float, | |
| repetition_data: List[Dict], player_height: float, body_mass_kg: float, | |
| repetitions: int) -> bool: | |
| """ | |
| Detect jump start and end events based on ankle position changes. | |
| This function monitors ankle position relative to ground level to detect when | |
| a jump begins and ends. It calculates jump metrics for completed jumps and | |
| tracks repetition count. | |
| Args: | |
| metrics (JumpMetrics): Object tracking current jump state and metrics | |
| smoothed_ankle_y (float): Current smoothed ankle Y-coordinate | |
| smoothed_head_y (float): Current smoothed head Y-coordinate | |
| repetition_data (List[Dict]): List to store completed jump data | |
| player_height (float): Player height in meters | |
| body_mass_kg (float): Player body mass in kilograms | |
| repetitions (int): Target number of repetitions to detect | |
| Returns: | |
| bool: True if target number of repetitions has been reached, False otherwise | |
| Side Effects: | |
| - Updates metrics object with jump state | |
| - Appends completed jump data to repetition_data list | |
| - Modifies metrics.ground_level, metrics.jump_started, metrics.repetition_count | |
| """ | |
| if metrics.ground_level is None: | |
| metrics.ground_level = smoothed_ankle_y | |
| metrics.takeoff_head_y = smoothed_head_y | |
| return False | |
| relative_ankle_change = (metrics.ground_level - smoothed_ankle_y) / metrics.ground_level if metrics.ground_level > 0 else 0 | |
| # Detect jump start | |
| if not metrics.jump_started and relative_ankle_change > JUMP_THRESHOLD_PERCENT: | |
| metrics.jump_started = True | |
| metrics.takeoff_head_y = smoothed_head_y | |
| metrics.max_jump_height = 0 | |
| metrics.max_head_height_px = smoothed_head_y | |
| metrics.jump_peak_power = 0.0 | |
| return False | |
| # Detect jump end | |
| if metrics.jump_started and relative_ankle_change <= JUMP_THRESHOLD_PERCENT: | |
| high_jump = calculate_high_jump(player_height, metrics.max_jump_height) | |
| repetition_data.append({ | |
| "repetition": metrics.repetition_count + 1, | |
| "distancia_elevada": round(metrics.max_jump_height, 2), | |
| "salto_alto": round(high_jump, 2), | |
| "potencia_sayer": round(metrics.jump_peak_power, 2) | |
| }) | |
| metrics.repetition_count += 1 | |
| metrics.jump_started = False | |
| return metrics.repetition_count >= repetitions | |
| return False | |
| def calculate_jump_metrics(metrics: JumpMetrics, smoothed_head_y: float, PX_PER_METER: float, | |
| body_mass_kg: float, head_y_buffer: List[float], fps: float): | |
| """ | |
| Calculate jump metrics during an active jump phase. | |
| This function continuously updates jump metrics while a jump is in progress, | |
| tracking maximum jump height, peak power, and other performance indicators. | |
| Args: | |
| metrics (JumpMetrics): Object containing current jump state and metrics | |
| smoothed_head_y (float): Current smoothed head Y-coordinate in pixels | |
| PX_PER_METER (float): Conversion factor from pixels to meters | |
| body_mass_kg (float): Player body mass in kilograms | |
| head_y_buffer (List[float]): Buffer of recent head positions for velocity calculation | |
| fps (float): Video frame rate in frames per second | |
| Returns: | |
| None | |
| Side Effects: | |
| - Updates metrics.max_jump_height if current jump exceeds previous maximum | |
| - Updates metrics.max_head_height_px with lowest Y-coordinate (highest position) | |
| - Updates metrics.jump_peak_power and metrics.peak_power_sayer with calculated power values | |
| """ | |
| if not metrics.jump_started: | |
| return | |
| relative_jump = (metrics.takeoff_head_y - smoothed_head_y) / PX_PER_METER | |
| if relative_jump > metrics.max_jump_height: | |
| metrics.max_jump_height = relative_jump | |
| if smoothed_head_y < metrics.max_head_height_px: | |
| metrics.max_head_height_px = smoothed_head_y | |
| if relative_jump: | |
| current_power = calculate_peak_power_sayer(relative_jump, body_mass_kg) | |
| if current_power > metrics.jump_peak_power: | |
| metrics.jump_peak_power = current_power | |
| if current_power > metrics.peak_power_sayer: | |
| metrics.peak_power_sayer = current_power | |
| def calculate_velocity(head_y_buffer: List[float], PX_PER_METER: float, fps: float) -> float: | |
| """ | |
| Calculate vertical velocity based on head position changes over time. | |
| This function computes the vertical velocity by analyzing the change in head | |
| position over a specified time window, converting from pixel coordinates to | |
| real-world units. | |
| Args: | |
| head_y_buffer (List[float]): Buffer containing recent head Y-coordinates in pixels | |
| PX_PER_METER (float): Conversion factor from pixels to meters | |
| fps (float): Video frame rate in frames per second | |
| Returns: | |
| float: Vertical velocity in meters per second (positive = upward motion) | |
| Returns 0.0 if calculation cannot be performed | |
| Note: | |
| - Requires at least VELOCITY_WINDOW frames in the buffer | |
| - Velocity is calculated as the change from oldest to newest position | |
| - Y-coordinates decrease as objects move upward in image coordinates | |
| """ | |
| if len(head_y_buffer) < VELOCITY_WINDOW or PX_PER_METER is None or fps <= 0: | |
| return 0.0 | |
| delta_y_pixels = head_y_buffer[0] - head_y_buffer[-1] | |
| delta_y_meters = delta_y_pixels / PX_PER_METER | |
| delta_t = VELOCITY_WINDOW / fps | |
| return delta_y_meters / delta_t | |
| def draw_skeleton(frame, keypoints): | |
| """ | |
| Draw human pose skeleton on a video frame. | |
| This function visualizes the detected pose by drawing keypoints as circles | |
| and connecting them with lines according to the human body structure. | |
| Args: | |
| frame (numpy.ndarray): Video frame to draw on (modified in-place) | |
| keypoints (numpy.ndarray or None): Array of detected keypoints with shape (N, 17, 2) | |
| where N is batch size, 17 is number of keypoints, | |
| and 2 represents (x, y) coordinates | |
| Returns: | |
| None | |
| Side Effects: | |
| - Modifies the input frame by drawing circles for keypoints | |
| - Draws lines connecting related body parts (skeleton connections) | |
| - Uses GREEN color for keypoints and YELLOW for connections | |
| Note: | |
| - Safely handles None or empty keypoints arrays | |
| - Only draws keypoints and connections with positive coordinates | |
| - Uses SKELETON_CONNECTIONS constant for body part relationships | |
| """ | |
| if keypoints is None or len(keypoints) == 0 or len(keypoints[0]) == 0: | |
| return | |
| try: | |
| kpts = keypoints[0] | |
| # Draw points | |
| for point in kpts: | |
| if point[0] > 0 and point[1] > 0: | |
| cv2.circle(frame, (int(point[0]), int(point[1])), 5, GREEN, -1) | |
| # Draw connections | |
| for connection in SKELETON_CONNECTIONS: | |
| start_name, end_name = connection | |
| start_idx = KEYPOINT_INDICES[start_name] | |
| end_idx = KEYPOINT_INDICES[end_name] | |
| if (start_idx < len(kpts) and end_idx < len(kpts) and | |
| kpts[start_idx][0] > 0 and kpts[start_idx][1] > 0 and | |
| kpts[end_idx][0] > 0 and kpts[end_idx][1] > 0): | |
| start_point = (int(kpts[start_idx][0]), int(kpts[start_idx][1])) | |
| end_point = (int(kpts[end_idx][0]), int(kpts[end_idx][1])) | |
| cv2.line(frame, start_point, end_point, YELLOW, 2) | |
| except Exception as e: | |
| print(f"Error drawing skeleton: {e}") | |
| def analyze_jump_video(model: VitPose, | |
| input_video: str, | |
| output_video: str, | |
| player_height: float, | |
| body_mass_kg: float, | |
| repetitions: int) -> dict | None: | |
| """ | |
| Analyze a jump video to calculate various jump metrics. | |
| Args: | |
| model: VitPose model instance | |
| input_video: Path to input video | |
| output_video: Path to output video | |
| player_height: Height of the person in meters | |
| body_mass_kg: Weight of the person in kg | |
| repetitions: Expected number of repetitions | |
| Returns: | |
| Dictionary containing jump metrics and video analysis data | |
| """ | |
| try: | |
| # Setup video capture and writer | |
| cap, out, width, height = setup_video_capture(input_video, output_video) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| # Calibrate pose detection | |
| PX_PER_METER, initial_left_shoulder_x, initial_right_shoulder_x = calibrate_pose_detection( | |
| model, cap, player_height) | |
| # Reset video for processing | |
| cap.release() | |
| cap = cv2.VideoCapture(input_video) | |
| # Initialize tracking variables | |
| metrics = JumpMetrics() | |
| repetition_data = [] | |
| head_y_history = [] | |
| ankle_y_history = [] | |
| head_y_buffer = [] | |
| last_detected_ankles_y = None | |
| # Process each frame | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| annotated_frame = frame.copy() | |
| if metrics.repetition_count >= repetitions: | |
| out.write(annotated_frame) | |
| continue | |
| # Process frame keypoints | |
| keypoints_valid, current_ankle_y, current_head_y, keypoints = process_frame_keypoints(model, annotated_frame) | |
| if keypoints_valid: | |
| last_detected_ankles_y = current_ankle_y | |
| # Smooth positions | |
| ankle_y_history.append(current_ankle_y) | |
| if len(ankle_y_history) > SMOOTHING_WINDOW: | |
| ankle_y_history.pop(0) | |
| smoothed_ankle_y = np.mean(ankle_y_history) | |
| head_y_history.append(current_head_y) | |
| if len(head_y_history) > SMOOTHING_WINDOW: | |
| head_y_history.pop(0) | |
| smoothed_head_y = np.mean(head_y_history) | |
| # Calculate velocity | |
| head_y_buffer.append(smoothed_head_y) | |
| if len(head_y_buffer) > VELOCITY_WINDOW: | |
| head_y_buffer.pop(0) | |
| metrics.velocity_vertical = calculate_velocity(head_y_buffer, PX_PER_METER, fps) | |
| # Detect jump events | |
| should_stop = detect_jump_events(metrics, smoothed_ankle_y, smoothed_head_y, | |
| repetition_data, player_height, body_mass_kg, repetitions) | |
| if should_stop: | |
| break | |
| # Calculate jump metrics during jump | |
| calculate_jump_metrics(metrics, smoothed_head_y, PX_PER_METER, body_mass_kg, head_y_buffer, fps) | |
| else: | |
| last_detected_ankles_y = None | |
| metrics.velocity_vertical = 0.0 | |
| # Draw overlay and skeleton | |
| high_jump = calculate_high_jump(player_height, metrics.max_jump_height) | |
| annotated_frame = draw_metrics_overlay( | |
| frame=annotated_frame, | |
| max_jump_height=metrics.max_jump_height, | |
| salto_alto=high_jump, | |
| velocity_vertical=metrics.velocity_vertical, | |
| peak_power_sayer=metrics.peak_power_sayer, | |
| repetition_count=metrics.repetition_count, | |
| last_detected_ankles_y=last_detected_ankles_y, | |
| initial_left_shoulder_x=initial_left_shoulder_x, | |
| initial_right_shoulder_x=initial_right_shoulder_x, | |
| width=width, | |
| height=height, | |
| colors=COLORS, | |
| metrics_below_feet_offset=METRICS_BELOW_FEET_OFFSET, | |
| horizontal_offset_factor=HORIZONTAL_OFFSET_FACTOR | |
| ) | |
| if keypoints_valid and keypoints is not None: | |
| draw_skeleton(annotated_frame, keypoints) | |
| out.write(annotated_frame) | |
| # Prepare results | |
| results_dict = { | |
| "video_analysis": { | |
| "output_video": str(output_video), | |
| }, | |
| "repetition_data": [ | |
| { | |
| "repetition": int(rep["repetition"]), | |
| "distancia_elevada": float(rep["distancia_elevada"]), | |
| "salto_alto": float(rep["salto_alto"]), | |
| "potencia_sayer": float(rep["potencia_sayer"]) | |
| } for rep in repetition_data | |
| ] | |
| } | |
| cap.release() | |
| out.release() | |
| return results_dict | |
| except Exception as e: | |
| print(f"Error in analyze_jump_video: {e}") | |
| return None | |
| def calculate_peak_power_sayer(jump_height_m, body_mass_kg): | |
| """ | |
| Estimates peak anaerobic power using Sayer's equation. | |
| Args: | |
| jump_height_m: Jump height in meters | |
| body_mass_kg: Body mass in kg | |
| Returns: | |
| Estimated peak power in watts | |
| """ | |
| jump_height_cm = jump_height_m * 100 | |
| return (60.7 * jump_height_cm) + (45.3 * body_mass_kg) - 2055 | |
| def calculate_high_jump(player_height:float, max_jump_height:float) -> float: | |
| """ | |
| Calculate the high jump height based on the player height and the max jump height. | |
| Args: | |
| player_height: Player height in meters | |
| max_jump_height: Relative jump height in meters | |
| Returns: | |
| the high jump height in meters | |
| """ | |
| return player_height + max_jump_height | |
| def draw_rounded_rect(img, pt1, pt2, color, thickness=-1, lineType=cv2.LINE_AA, radius=10): | |
| """ | |
| Draw a rectangle with rounded corners on an image. | |
| This function creates a rounded rectangle by drawing four corner ellipses | |
| and connecting them with straight rectangular sections. | |
| Args: | |
| img (numpy.ndarray): Image to draw on (modified in-place) | |
| pt1 (tuple): Top-left corner coordinates (x, y) | |
| pt2 (tuple): Bottom-right corner coordinates (x, y) | |
| color (tuple): BGR color tuple (B, G, R) | |
| thickness (int, optional): Line thickness. -1 for filled rectangle. Defaults to -1. | |
| lineType (int, optional): Type of line drawing. Defaults to cv2.LINE_AA. | |
| radius (int, optional): Corner radius in pixels. Defaults to 10. | |
| Returns: | |
| numpy.ndarray: The modified image with rounded rectangle drawn | |
| Note: | |
| - If radius is 0, draws a regular rectangle | |
| - For filled rectangles, use thickness=-1 | |
| - Corner ellipses are drawn at each corner with specified radius | |
| - Rectangle sections fill the gaps between ellipses | |
| """ | |
| x1, y1 = pt1 | |
| x2, y2 = pt2 | |
| if radius > 0: | |
| img = cv2.ellipse(img, (x1 + radius, y1 + radius), (radius, radius), 0, 0, 90, color, thickness, lineType) | |
| img = cv2.ellipse(img, (x2 - radius, y1 + radius), (radius, radius), 0, 90, 180, color, thickness, lineType) | |
| img = cv2.ellipse(img, (x2 - radius, y2 - radius), (radius, radius), 0, 180, 270, color, thickness, lineType) | |
| img = cv2.ellipse(img, (x1 + radius, y2 - radius), (radius, radius), 0, 270, 360, color, thickness, lineType) | |
| img = cv2.rectangle(img, (x1, y1 + radius), (x2, y2 - radius), color, thickness, lineType) | |
| img = cv2.rectangle(img, (x1 + radius, y1), (x2 - radius, y2), color, thickness, lineType) | |
| else: | |
| img = cv2.rectangle(img, pt1, pt2, color, thickness, lineType) | |
| return img | |
| def draw_main_title(overlay, config: OverlayConfig, width: int, colors: Dict): | |
| """ | |
| Draw the main title text centered at the top of the video frame. | |
| This function renders "Ejercicio de Salto" (Jump Exercise) as the main title | |
| using specified font configuration and centers it horizontally. | |
| Args: | |
| overlay (numpy.ndarray): Image overlay to draw on (modified in-place) | |
| config (OverlayConfig): Configuration object containing font settings | |
| width (int): Width of the video frame in pixels | |
| colors (Dict): Dictionary containing color definitions | |
| Returns: | |
| None | |
| Side Effects: | |
| - Draws text on the overlay image using white color | |
| - Text is positioned at the top center of the frame | |
| - Uses config.font_scale_title_main and config.font_thickness_title_main | |
| """ | |
| title_text = "Ejercicio de Salto" | |
| title_text_size = cv2.getTextSize(title_text, config.font, config.font_scale_title_main, config.font_thickness_title_main)[0] | |
| title_x = (width - title_text_size[0]) // 2 | |
| title_y = config.title_y_offset | |
| cv2.putText(overlay, title_text, (title_x, title_y), config.font, config.font_scale_title_main, | |
| colors["white"], config.font_thickness_title_main, cv2.LINE_AA) | |
| def calculate_metric_box_size(title: str, value: str, config: OverlayConfig) -> Tuple[int, int]: | |
| """ | |
| Calculate the required dimensions for a metric display box. | |
| This function determines the width and height needed to display a metric | |
| with its title and value, including padding and spacing requirements. | |
| Args: | |
| title (str): The metric title text (e.g., "SALTO ALTO") | |
| value (str): The metric value text (e.g., "2.15 m") | |
| config (OverlayConfig): Configuration object with font and spacing settings | |
| Returns: | |
| Tuple[int, int]: A tuple containing: | |
| - bg_width: Required width in pixels for the metric box | |
| - bg_height: Required height in pixels for the metric box | |
| Note: | |
| - Width is based on the maximum of title and value text widths | |
| - Height accounts for both text lines plus vertical padding | |
| - Includes horizontal padding on both sides | |
| """ | |
| title_size = cv2.getTextSize(title, config.font, config.font_scale_title_metric, config.font_thickness_metric)[0] | |
| value_size = cv2.getTextSize(value, config.font, config.font_scale_value, config.font_thickness_metric)[0] | |
| bg_width = max(title_size[0], value_size[0]) + 2 * config.padding_horizontal | |
| bg_height = config.line_height_title_metric + config.line_height_value + 2 * config.padding_vertical | |
| return bg_width, bg_height | |
| def draw_metric_box(overlay, title: str, value: str, x: int, y: int, bg_width: int, bg_height: int, | |
| config: OverlayConfig, colors: Dict): | |
| """ | |
| Draw a styled metric box with title and value text. | |
| This function creates a rounded rectangle background and draws metric information | |
| with proper text alignment and styling for video overlay display. | |
| Args: | |
| overlay (numpy.ndarray): Image overlay to draw on (modified in-place) | |
| title (str): Metric title text (displayed in smaller font) | |
| value (str): Metric value text (displayed in larger font) | |
| x (int): X-coordinate of box top-left corner | |
| y (int): Y-coordinate of box top-left corner | |
| bg_width (int): Width of the background box in pixels | |
| bg_height (int): Height of the background box in pixels | |
| config (OverlayConfig): Configuration object with styling settings | |
| colors (Dict): Dictionary containing color definitions | |
| Returns: | |
| numpy.ndarray: The modified overlay with the metric box drawn | |
| Side Effects: | |
| - Draws a rounded rectangle background with gray fill and white border | |
| - Centers title text in light gray color | |
| - Centers value text in white color below the title | |
| - Uses different font scales for title and value | |
| """ | |
| pt1 = (x, y) | |
| pt2 = (x + bg_width, y + bg_height) | |
| # Draw background | |
| overlay = draw_rounded_rect(overlay, pt1, pt2, colors["gray"], cv2.FILLED, cv2.LINE_AA, config.corner_radius) | |
| cv2.rectangle(overlay, pt1, pt2, colors["white"], config.border_thickness, cv2.LINE_AA) | |
| # Draw title | |
| title_size = cv2.getTextSize(title, config.font, config.font_scale_title_metric, config.font_thickness_metric)[0] | |
| title_x = x + (bg_width - title_size[0]) // 2 | |
| title_y = y + config.padding_vertical + config.line_height_title_metric // 2 + 2 | |
| cv2.putText(overlay, title, (title_x, title_y), config.font, config.font_scale_title_metric, | |
| colors["light_gray"], config.font_thickness_metric, cv2.LINE_AA) | |
| # Draw value | |
| value_size = cv2.getTextSize(value, config.font, config.font_scale_value, config.font_thickness_metric)[0] | |
| value_x = x + (bg_width - value_size[0]) // 2 | |
| value_y = y + config.padding_vertical + config.line_height_title_metric + config.line_height_value // 2 + 5 | |
| cv2.putText(overlay, value, (value_x, value_y), config.font, config.font_scale_value, | |
| colors["white"], config.font_thickness_metric, cv2.LINE_AA) | |
| return overlay | |
| def calculate_positions(width: int, height: int, last_detected_ankles_y: Optional[float], | |
| initial_left_shoulder_x: Optional[int], initial_right_shoulder_x: Optional[int], | |
| config: OverlayConfig, horizontal_offset_factor: float, | |
| metrics_below_feet_offset: int) -> Dict[str, Tuple[int, int]]: | |
| """ | |
| Calculate optimal positions for all metric display boxes on the video frame. | |
| This function determines where to place metric boxes based on detected body positions | |
| to avoid overlapping with the person while maintaining good visibility. | |
| Args: | |
| width (int): Video frame width in pixels | |
| height (int): Video frame height in pixels | |
| last_detected_ankles_y (Optional[float]): Y-coordinate of last detected ankles | |
| initial_left_shoulder_x (Optional[int]): X-coordinate of left shoulder reference | |
| initial_right_shoulder_x (Optional[int]): X-coordinate of right shoulder reference | |
| config (OverlayConfig): Configuration object with layout settings | |
| horizontal_offset_factor (float): Factor for horizontal positioning relative to shoulders | |
| metrics_below_feet_offset (int): Vertical offset below feet for metric placement | |
| Returns: | |
| Dict[str, Tuple[int, int]]: Dictionary mapping metric names to (x, y) positions: | |
| - "relativo": Position for relative jump metric | |
| - "alto": Position for high jump metric | |
| - "reps": Position for repetitions counter | |
| - "velocidad": Position for velocity metric (if ankles detected) | |
| - "potencia": Position for power metric (if ankles detected) | |
| Note: | |
| - Positions are calculated to avoid overlapping with the detected person | |
| - Some metrics are positioned relative to body parts when available | |
| - Falls back to default positions when body parts are not detected | |
| """ | |
| positions = {} | |
| # Relative jump box (left side, dynamically positioned) | |
| relativo_bg_width, relativo_bg_height = calculate_metric_box_size("SALTO RELATIVO", "0.00 m", config) | |
| x_relativo = 20 | |
| if last_detected_ankles_y is not None: | |
| y_relativo = int(last_detected_ankles_y - relativo_bg_height - 10) | |
| if y_relativo < config.title_y_offset + 50: | |
| y_relativo = int(last_detected_ankles_y + metrics_below_feet_offset) | |
| else: | |
| y_relativo = height - 150 | |
| positions["relativo"] = (x_relativo, y_relativo) | |
| # High jump box (top right) | |
| alto_bg_width, alto_bg_height = calculate_metric_box_size("SALTO ALTO", "0.00 m", config) | |
| x_alto = width - alto_bg_width - 20 | |
| if initial_right_shoulder_x is not None: | |
| available_space = width - initial_right_shoulder_x | |
| x_alto_calculated = initial_right_shoulder_x + int(available_space * (1 - horizontal_offset_factor)) - alto_bg_width | |
| if (x_alto_calculated > x_relativo + relativo_bg_width + config.spacing_horizontal + 10 and | |
| x_alto_calculated + alto_bg_width < width - 10): | |
| x_alto = x_alto_calculated | |
| positions["alto"] = (x_alto, config.metrics_y_offset_alto) | |
| # Repetitions box (below relative jump) | |
| positions["reps"] = (x_relativo, y_relativo + relativo_bg_height + 10) | |
| # Velocity and power boxes (centered below feet) | |
| if last_detected_ankles_y is not None: | |
| velocidad_bg_width, velocidad_bg_height = calculate_metric_box_size("VELOCIDAD VERTICAL", "0.00 m/s", config) | |
| x_velocidad = int(width / 2 - velocidad_bg_width / 2) | |
| y_velocidad = int(last_detected_ankles_y + metrics_below_feet_offset + velocidad_bg_height) | |
| positions["velocidad"] = (x_velocidad, y_velocidad - velocidad_bg_height) | |
| positions["potencia"] = (x_velocidad, y_velocidad + 5) | |
| return positions | |
| def draw_metrics_overlay(frame, max_jump_height, salto_alto, velocity_vertical, peak_power_sayer, | |
| repetition_count, last_detected_ankles_y, initial_left_shoulder_x, | |
| initial_right_shoulder_x, width, height, colors, metrics_below_feet_offset=20, | |
| horizontal_offset_factor=0.75): | |
| """ | |
| Draw metrics overlay on the frame. | |
| Args: | |
| frame: Input frame | |
| max_jump_height: Maximum jump height in meters | |
| salto_alto: Absolute jump height in meters | |
| velocity_vertical: Vertical velocity in m/s | |
| peak_power_sayer: Peak power in watts | |
| repetition_count: Number of repetitions | |
| last_detected_ankles_y: Y-coordinate of last detected ankles | |
| initial_left_shoulder_x: X-coordinate of left shoulder | |
| initial_right_shoulder_x: X-coordinate of right shoulder | |
| width: Frame width | |
| height: Frame height | |
| colors: Dictionary with color values | |
| metrics_below_feet_offset: Offset for metrics below feet | |
| horizontal_offset_factor: Factor for horizontal offset | |
| Returns: | |
| Frame with metrics overlay | |
| """ | |
| overlay = frame.copy() | |
| config = OverlayConfig() | |
| # Draw main title | |
| draw_main_title(overlay, config, width, colors) | |
| # Calculate positions for all metric boxes | |
| positions = calculate_positions(width, height, last_detected_ankles_y, | |
| initial_left_shoulder_x, initial_right_shoulder_x, | |
| config, horizontal_offset_factor, metrics_below_feet_offset) | |
| # Draw relative jump box | |
| if "relativo" in positions: | |
| relativo_value = f"{max(0, max_jump_height):.2f} m" | |
| bg_width, bg_height = calculate_metric_box_size("SALTO RELATIVO", relativo_value, config) | |
| x, y = positions["relativo"] | |
| overlay = draw_metric_box(overlay, "SALTO RELATIVO", relativo_value, x, y, bg_width, bg_height, config, colors) | |
| # Draw high jump box | |
| if "alto" in positions: | |
| alto_value = f"{max(0, salto_alto):.2f} m" | |
| bg_width, bg_height = calculate_metric_box_size("SALTO ALTO", alto_value, config) | |
| x, y = positions["alto"] | |
| overlay = draw_metric_box(overlay, "SALTO ALTO", alto_value, x, y, bg_width, bg_height, config, colors) | |
| # Draw repetitions box | |
| if "reps" in positions: | |
| reps_value = f"{repetition_count}" | |
| bg_width, bg_height = calculate_metric_box_size("REPETICIONES", reps_value, config) | |
| x, y = positions["reps"] | |
| overlay = draw_metric_box(overlay, "REPETICIONES", reps_value, x, y, bg_width, bg_height, config, colors) | |
| # Draw velocity box (only if ankles detected) | |
| if "velocidad" in positions: | |
| velocidad_value = f"{abs(velocity_vertical):.2f} m/s" | |
| bg_width, bg_height = calculate_metric_box_size("VELOCIDAD VERTICAL", velocidad_value, config) | |
| x, y = positions["velocidad"] | |
| overlay = draw_metric_box(overlay, "VELOCIDAD VERTICAL", velocidad_value, x, y, bg_width, bg_height, config, colors) | |
| # Draw power box (only if ankles detected) | |
| if "potencia" in positions: | |
| potencia_value = f"{peak_power_sayer:.2f} W" | |
| bg_width, bg_height = calculate_metric_box_size("POTENCIA SAYER", potencia_value, config) | |
| x, y = positions["potencia"] | |
| overlay = draw_metric_box(overlay, "POTENCIA SAYER", potencia_value, x, y, bg_width, bg_height, config, colors) | |
| # Blend overlay with original frame | |
| result = cv2.addWeighted(overlay, config.alpha, frame, 1 - config.alpha, 0) | |
| return result |