import spaces # <--- CRITICAL: MUST BE THE FIRST IMPORT import os import time import threading import json import traceback import logging import string import random import shutil import cv2 import math import numpy as np import srt from flask import Flask, jsonify, request, send_from_directory, send_file # ====================================================== # đ ZEROGPU CONFIGURATION # ====================================================== @spaces.GPU def gpu_warmup(): import torch print(f"â ZeroGPU Warmup: CUDA Available: {torch.cuda.is_available()}") return True # ====================================================== # đž PERSISTENT STORAGE CONFIGURATION # ====================================================== # Checks for Hugging Face Persistent Storage if os.path.exists('/data'): BASE_STORAGE_PATH = '/data' print("â Using Persistent Storage at /data (Files saved for days/weeks)") else: BASE_STORAGE_PATH = '.' print("â ī¸ Using Ephemeral/Local Storage (Files lost on restart)") BASE_USER_DIR = os.path.join(BASE_STORAGE_PATH, "userdata") SAVED_COMICS_DIR = os.path.join(BASE_STORAGE_PATH, "saved_comics") os.makedirs(BASE_USER_DIR, exist_ok=True) os.makedirs(SAVED_COMICS_DIR, exist_ok=True) # ====================================================== # đ§ą DATA CLASSES # ====================================================== def bubble(dialog="", bubble_offset_x=50, bubble_offset_y=20, lip_x=-1, lip_y=-1, emotion='normal', type='speech'): return { 'dialog': dialog, 'bubble_offset_x': int(bubble_offset_x), 'bubble_offset_y': int(bubble_offset_y), 'lip_x': int(lip_x), 'lip_y': int(lip_y), 'emotion': emotion, 'type': type, 'tail_pos': '50%', 'classes': f'speech-bubble {type} tail-bottom' } def panel(image=""): return {'image': image} class Page: def __init__(self, panels, bubbles): self.panels = panels self.bubbles = bubbles # ====================================================== # đ§ APP CONFIG # ====================================================== logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = Flask(__name__) def generate_save_code(length=8): chars = string.ascii_uppercase + string.digits while True: code = ''.join(random.choices(chars, k=length)) if not os.path.exists(os.path.join(SAVED_COMICS_DIR, code)): return code # ====================================================== # đ§ GLOBAL GPU FUNCTIONS # ====================================================== @spaces.GPU(duration=300) def generate_comic_gpu(video_path, user_dir, frames_dir, metadata_path, target_pages): print(f"đ GPU Task Started: {video_path} | Pages: {target_pages}") import cv2 import srt import numpy as np from backend.keyframes.keyframes import black_bar_crop from backend.simple_color_enhancer import SimpleColorEnhancer from backend.quality_color_enhancer import QualityColorEnhancer from backend.subtitles.subs_real import get_real_subtitles from backend.ai_bubble_placement import ai_bubble_placer from backend.ai_enhanced_core import face_detector cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise Exception("Cannot open video") fps = cap.get(cv2.CAP_PROP_FPS) or 25 total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) duration = total_frames / fps cap.release() user_srt = os.path.join(user_dir, 'subs.srt') try: get_real_subtitles(video_path) if os.path.exists('test1.srt'): shutil.move('test1.srt', user_srt) except: with open(user_srt, 'w') as f: f.write("1\n00:00:01,000 --> 00:00:04,000\n...\n") with open(user_srt, 'r', encoding='utf-8') as f: try: all_subs = list(srt.parse(f.read())) except: all_subs = [] valid_subs = [s for s in all_subs if s.content.strip()] raw_moments = [{'text': s.content, 'start': s.start.total_seconds(), 'end': s.end.total_seconds()} for s in valid_subs] if target_pages <= 0: target_pages = 1 panels_per_page = 4 total_panels_needed = target_pages * panels_per_page selected_moments = [] if not raw_moments: times = np.linspace(1, duration-1, total_panels_needed) for t in times: selected_moments.append({'text': '', 'start': t, 'end': t+1}) elif len(raw_moments) <= total_panels_needed: selected_moments = raw_moments else: indices = np.linspace(0, len(raw_moments) - 1, total_panels_needed, dtype=int) selected_moments = [raw_moments[i] for i in indices] frame_metadata = {} cap = cv2.VideoCapture(video_path) count = 0 frame_files_ordered = [] for i, moment in enumerate(selected_moments): mid = (moment['start'] + moment['end']) / 2 if mid > duration: mid = duration - 1 cap.set(cv2.CAP_PROP_POS_FRAMES, int(mid * fps)) ret, frame = cap.read() if ret: fname = f"frame_{count:04d}.png" p = os.path.join(frames_dir, fname) cv2.imwrite(p, frame) os.sync() frame_metadata[fname] = {'dialogue': moment['text'], 'time': mid} frame_files_ordered.append(fname) count += 1 cap.release() with open(metadata_path, 'w') as f: json.dump(frame_metadata, f, indent=2) try: black_bar_crop() except: pass se = SimpleColorEnhancer() qe = QualityColorEnhancer() for f in frame_files_ordered: p = os.path.join(frames_dir, f) try: se.enhance_single(p, p) except: pass try: qe.enhance_single(p, p) except: pass bubbles_list = [] for f in frame_files_ordered: p = os.path.join(frames_dir, f) dialogue = frame_metadata.get(f, {}).get('dialogue', '') b_type = 'speech' if '(' in dialogue and ')' in dialogue: b_type = 'narration' elif '!' in dialogue and dialogue.isupper(): b_type = 'reaction' elif '?' in dialogue: b_type = 'speech' try: faces = face_detector.detect_faces(p) lip = face_detector.get_lip_position(p, faces[0]) if faces else (-1, -1) bx, by = ai_bubble_placer.place_bubble_ai(p, lip) b = bubble(dialog=dialogue, bubble_offset_x=bx, bubble_offset_y=by, lip_x=lip[0], lip_y=lip[1], type=b_type) bubbles_list.append(b) except: bubbles_list.append(bubble(dialog=dialogue, type=b_type)) pages = [] for i in range(target_pages): start_idx = i * 4 end_idx = start_idx + 4 p_frames = frame_files_ordered[start_idx:end_idx] p_bubbles = bubbles_list[start_idx:end_idx] if p_frames: pg_panels = [panel(image=f) for f in p_frames] pages.append(Page(panels=pg_panels, bubbles=p_bubbles)) result = [] for pg in pages: p_data = [p if isinstance(p, dict) else p.__dict__ for p in pg.panels] b_data = [b if isinstance(b, dict) else b.__dict__ for b in pg.bubbles] result.append({'panels': p_data, 'bubbles': b_data}) return result @spaces.GPU def regen_frame_gpu(video_path, frames_dir, metadata_path, fname, direction): import cv2 import json from backend.simple_color_enhancer import SimpleColorEnhancer if not os.path.exists(metadata_path): return {"success": False, "message": "No metadata"} with open(metadata_path, 'r') as f: meta = json.load(f) if fname not in meta: return {"success": False, "message": "Frame not found"} t = meta[fname]['time'] if isinstance(meta[fname], dict) else meta[fname] cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) or 25 offset = (1.0/fps) * (1 if direction == 'forward' else -1) new_t = max(0, t + offset) cap.set(cv2.CAP_PROP_POS_MSEC, new_t * 1000) ret, frame = cap.read() cap.release() if ret: p = os.path.join(frames_dir, fname) cv2.imwrite(p, frame) os.sync() try: SimpleColorEnhancer().enhance_single(p, p) except: pass if isinstance(meta[fname], dict): meta[fname]['time'] = new_t else: meta[fname] = new_t with open(metadata_path, 'w') as f: json.dump(meta, f, indent=2) return {"success": True, "message": f"Adjusted to {new_t:.2f}s"} return {"success": False, "message": "End of video"} @spaces.GPU def get_frame_at_ts_gpu(video_path, frames_dir, metadata_path, fname, ts): import cv2 import json from backend.simple_color_enhancer import SimpleColorEnhancer cap = cv2.VideoCapture(video_path) cap.set(cv2.CAP_PROP_POS_MSEC, float(ts) * 1000) ret, frame = cap.read() cap.release() if ret: p = os.path.join(frames_dir, fname) cv2.imwrite(p, frame) os.sync() try: SimpleColorEnhancer().enhance_single(p, p) except: pass if os.path.exists(metadata_path): with open(metadata_path, 'r') as f: meta = json.load(f) if fname in meta: if isinstance(meta[fname], dict): meta[fname]['time'] = float(ts) else: meta[fname] = float(ts) with open(metadata_path, 'w') as f: json.dump(meta, f, indent=2) return {"success": True, "message": f"Jumped to {ts}s"} return {"success": False, "message": "Invalid timestamp"} # ====================================================== # đģ BACKEND CLASS # ====================================================== class EnhancedComicGenerator: def __init__(self, sid): self.sid = sid self.user_dir = os.path.join(BASE_USER_DIR, sid) self.video_path = os.path.join(self.user_dir, 'uploaded.mp4') self.frames_dir = os.path.join(self.user_dir, 'frames') self.output_dir = os.path.join(self.user_dir, 'output') os.makedirs(self.frames_dir, exist_ok=True) os.makedirs(self.output_dir, exist_ok=True) self.metadata_path = os.path.join(self.frames_dir, 'frame_metadata.json') def cleanup(self): if os.path.exists(self.frames_dir): shutil.rmtree(self.frames_dir) if os.path.exists(self.output_dir): shutil.rmtree(self.output_dir) os.makedirs(self.frames_dir, exist_ok=True) os.makedirs(self.output_dir, exist_ok=True) def run(self, target_pages): try: self.write_status("Waiting for GPU...", 5) data = generate_comic_gpu(self.video_path, self.user_dir, self.frames_dir, self.metadata_path, int(target_pages)) with open(os.path.join(self.output_dir, 'pages.json'), 'w') as f: json.dump(data, f, indent=2) self.write_status("Complete!", 100) except Exception as e: traceback.print_exc() self.write_status(f"Error: {str(e)}", -1) def write_status(self, msg, prog): with open(os.path.join(self.output_dir, 'status.json'), 'w') as f: json.dump({'message': msg, 'progress': prog}, f) # ====================================================== # đ ROUTES & FULL UI # ====================================================== INDEX_HTML = '''
')
def load_comic(code):
code = code.upper()
save_dir = os.path.join(SAVED_COMICS_DIR, code)
state_file = os.path.join(save_dir, 'comic_state.json')
if not os.path.exists(state_file): return jsonify({'success': False, 'message': 'Save code not found'})
try:
with open(state_file, 'r') as f: save_data = json.load(f)
original_sid = save_data.get('originalSid')
saved_frames_dir = os.path.join(save_dir, 'frames')
if original_sid and os.path.exists(saved_frames_dir):
user_frames_dir = os.path.join(BASE_USER_DIR, original_sid, 'frames')
os.makedirs(user_frames_dir, exist_ok=True)
for fname in os.listdir(saved_frames_dir):
src = os.path.join(saved_frames_dir, fname)
dst = os.path.join(user_frames_dir, fname)
if not os.path.exists(dst): shutil.copy2(src, dst)
return jsonify({ 'success': True, 'pages': save_data.get('pages', []), 'originalSid': original_sid, 'savedAt': save_data.get('savedAt') })
except Exception as e:
traceback.print_exc()
return jsonify({'success': False, 'message': str(e)})
if __name__ == '__main__':
try: gpu_warmup()
except: pass
app.run(host='0.0.0.0', port=7860)