Spaces:
Sleeping
Sleeping
| import supervision as sv | |
| from ultralytics import YOLO | |
| import cv2 | |
| import numpy as np | |
| from fastapi import FastAPI, File, UploadFile | |
| from fastapi.responses import JSONResponse, Response | |
| import uvicorn | |
| import logging | |
| from datetime import datetime | |
| import os | |
| import time | |
| from collections import defaultdict | |
| # Ensure the logs directory exists | |
| if not os.path.exists("logs"): | |
| os.makedirs("logs") | |
| app = FastAPI() | |
| # Load the exported ONNX model | |
| onnx_model = YOLO("models/best-data-v5.onnx", task="detect") | |
| # Define the logging configuration | |
| LOGGING_CONFIG = { | |
| "version": 1, | |
| "disable_existing_loggers": False, | |
| "formatters": { | |
| "default": { | |
| "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
| }, | |
| "access": { | |
| "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
| }, | |
| }, | |
| "handlers": { | |
| "default": { | |
| "formatter": "default", | |
| "class": "logging.StreamHandler", | |
| "stream": "ext://sys.stdout", | |
| }, | |
| "file": { | |
| "formatter": "default", | |
| "class": "logging.FileHandler", | |
| "filename": f"logs/{datetime.now().strftime('%Y-%m-%d')}.log", | |
| "mode": "a", | |
| }, | |
| "access": { | |
| "formatter": "access", | |
| "class": "logging.StreamHandler", | |
| "stream": "ext://sys.stdout", | |
| }, | |
| }, | |
| "loggers": { | |
| "": { | |
| "handlers": ["default", "file"], | |
| "level": "INFO", | |
| }, | |
| "uvicorn.access": { | |
| "handlers": ["access", "file"], | |
| "level": "INFO", | |
| "propagate": False, | |
| }, | |
| "ultralytics": { | |
| "handlers": ["default", "file"], | |
| "level": "INFO", | |
| "propagate": False, | |
| }, | |
| } | |
| } | |
| # Apply the logging configuration | |
| logging.config.dictConfig(LOGGING_CONFIG) | |
| # def parse_detection(detections): | |
| # parsed_rows = [] | |
| # for i in range(len(detections.xyxy)): | |
| # x_min = float(detections.xyxy[i][0]) | |
| # y_min = float(detections.xyxy[i][1]) | |
| # x_max = float(detections.xyxy[i][2]) | |
| # y_max = float(detections.xyxy[i][3]) | |
| # width = int(x_max - x_min) | |
| # height = int(y_max - y_min) | |
| # row = { | |
| # "top": int(y_min), | |
| # "left": int(x_min), | |
| # "width": width, | |
| # "height": height, | |
| # "class_id": "" | |
| # if detections.class_id is None | |
| # else int(detections.class_id[i]), | |
| # "confidence": "" | |
| # if detections.confidence is None | |
| # else float(detections.confidence[i]), | |
| # "tracker_id": "" | |
| # if detections.tracker_id is None | |
| # else int(detections.tracker_id[i]), | |
| # } | |
| # if hasattr(detections, "data"): | |
| # for key, value in detections.data.items(): | |
| # row[key] = ( | |
| # str(value[i]) | |
| # if hasattr(value, "__getitem__") and value.ndim != 0 | |
| # else str(value) | |
| # ) | |
| # parsed_rows.append(row) | |
| # return parsed_rows | |
| # # Run inference | |
| # def callback(image_slice: np.ndarray) -> sv.Detections: | |
| # # logging.info("Running callback for image slice") | |
| # results = onnx_model(image_slice)[0] | |
| # return sv.Detections.from_ultralytics(results) | |
| # def infer(image): | |
| # start_time = time.time() | |
| # image_arr = np.frombuffer(image, np.uint8) | |
| # image = cv2.imdecode(image_arr, cv2.IMREAD_COLOR) | |
| # image = cv2.resize(image, (1920, 1920)) | |
| # results = onnx_model(image)[0] | |
| # # detections = sv.Detections.from_ultralytics(results) | |
| # slicer = sv.InferenceSlicer(callback=callback, slice_wh=(640, 640)) | |
| # detections = slicer(image=image) | |
| # logging.info("Completed slicing and detection") | |
| # parsed_rows = parse_detection(detections) | |
| # # Count the occurrences of each class | |
| # class_counts = defaultdict(int) | |
| # for detection in parsed_rows: | |
| # class_name = detection.get("class_name", "Unknown") | |
| # class_counts[class_name] += 1 | |
| # summary_info = ", ".join( | |
| # [f"{count} {class_name}" for class_name, count in class_counts.items()] | |
| # ) | |
| # logging.info(f"Summary info: {summary_info}") | |
| # logging.info(f"Run time: {time.time() - start_time:.2f} seconds") | |
| # # label_annotator = sv.LabelAnnotator(text_color=sv.Color.BLACK) | |
| # bounding_box_annotator = sv.BoundingBoxAnnotator(thickness=4) | |
| # annotated_image = image.copy() | |
| # annotated_image = bounding_box_annotator.annotate(scene=annotated_image, detections=detections) | |
| # # annotated_image = label_annotator.annotate(scene=annotated_image, detections=detections) | |
| # # logging.info("Annotated image") | |
| # return annotated_image, parsed_rows | |
| def parse_detection(detections, scale_x, scale_y): | |
| parsed_rows = [] | |
| for i in range(len(detections.xyxy)): | |
| # Rescale the coordinates to match the original image size | |
| x_min = float(detections.xyxy[i][0]) / scale_x | |
| y_min = float(detections.xyxy[i][1]) / scale_y | |
| x_max = float(detections.xyxy[i][2]) / scale_x | |
| y_max = float(detections.xyxy[i][3]) / scale_y | |
| width = int(x_max - x_min) | |
| height = int(y_max - y_min) | |
| row = { | |
| "top": int(y_min), | |
| "left": int(x_min), | |
| "width": width, | |
| "height": height, | |
| "class_id": "" | |
| if detections.class_id is None | |
| else int(detections.class_id[i]), | |
| "confidence": "" | |
| if detections.confidence is None | |
| else float(detections.confidence[i]), | |
| "tracker_id": "" | |
| if detections.tracker_id is None | |
| else int(detections.tracker_id[i]), | |
| } | |
| if hasattr(detections, "data"): | |
| for key, value in detections.data.items(): | |
| row[key] = ( | |
| str(value[i]) | |
| if hasattr(value, "__getitem__") and value.ndim != 0 | |
| else str(value) | |
| ) | |
| parsed_rows.append(row) | |
| return parsed_rows | |
| # Run inference | |
| def callback(image_slice: np.ndarray) -> sv.Detections: | |
| # logging.info("Running callback for image slice") | |
| results = onnx_model(image_slice)[0] | |
| return sv.Detections.from_ultralytics(results) | |
| def infer(image): | |
| start_time = time.time() | |
| image_arr = np.frombuffer(image, np.uint8) | |
| image = cv2.imdecode(image_arr, cv2.IMREAD_COLOR) | |
| # Get original dimensions | |
| original_height, original_width = image.shape[:2] | |
| # Resize image for detection | |
| target_size = 1920 | |
| image = cv2.resize(image, (target_size, target_size)) | |
| # Compute scale factors | |
| scale_x = target_size / original_width | |
| scale_y = target_size / original_height | |
| # Run model | |
| results = onnx_model(image)[0] | |
| # Using slicer for detection | |
| slicer = sv.InferenceSlicer(callback=callback, slice_wh=(640, 640)) | |
| detections = slicer(image=image) | |
| logging.info("Completed slicing and detection") | |
| # Parse detections and adjust coordinates to original size | |
| parsed_rows = parse_detection(detections, scale_x, scale_y) | |
| # Count the occurrences of each class | |
| class_counts = defaultdict(int) | |
| for detection in parsed_rows: | |
| class_name = detection.get("class_name", "Unknown") | |
| class_counts[class_name] += 1 | |
| summary_info = ", ".join( | |
| [f"{count} {class_name}" for class_name, count in class_counts.items()] | |
| ) | |
| logging.info(f"Summary info: {summary_info}") | |
| logging.info(f"Run time: {time.time() - start_time:.2f} seconds") | |
| # Annotate the resized image | |
| bounding_box_annotator = sv.BoundingBoxAnnotator(thickness=4) | |
| annotated_image = image.copy() | |
| annotated_image = bounding_box_annotator.annotate(scene=annotated_image, detections=detections) | |
| # Resize the annotated image back to original dimensions | |
| annotated_image = cv2.resize(annotated_image, (original_width, original_height)) | |
| # Return the resized annotated image and parsed detection results | |
| return annotated_image, parsed_rows | |
| async def process_image(image: UploadFile = File(...), draw_boxes: bool = False): | |
| filename = image.filename | |
| logging.info(f"Received process-image request for file: {filename}") | |
| image_data = await image.read() | |
| annotated_image, results = infer(image_data) | |
| if draw_boxes: | |
| _, img_encoded = cv2.imencode('.jpg', annotated_image) | |
| logging.info("Returning annotated image") | |
| return Response(content=img_encoded.tobytes(), media_type="image/jpeg") | |
| logging.info("Returning JSON results") | |
| return JSONResponse(content=results) | |
| def hello_world(): | |
| return 'Hello, World!' | |
| if __name__ == "__main__": | |
| uvicorn.run("main:app", port=8001, reload=True) | |