""" Vidsimplify Streamlit Application Supports two modes: 1. DIRECT MODE (for Hugging Face) - directly executes generation code 2. API MODE (for local development) - uses the FastAPI server Set DIRECT_MODE=true in environment to use direct execution. """ import streamlit as st import os import time import base64 import uuid import subprocess import asyncio import threading import shutil import logging from pathlib import Path from datetime import datetime # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Load environment variables from .env file from dotenv import load_dotenv load_dotenv() # Check if we're in direct mode (for Hugging Face deployment) DIRECT_MODE = os.getenv("DIRECT_MODE", "true").lower() == "true" # ============================================================================ # Background Cleanup Task (runs every 30 minutes) # ============================================================================ def cleanup_old_files(): """ Background task that runs every 10 minutes to delete files older than 10 minutes. This prevents running out of the 1GB storage limit on Hugging Face Spaces. """ import time while True: try: time.sleep(600) # Wait 10 minutes (600 seconds) cutoff_time = time.time() - 600 # 10 minutes ago deleted_count = 0 freed_mb = 0 logger.info("๐งน Starting periodic cleanup (files older than 10 minutes)...") # Clean up old video files media_videos = Path("media/videos") if media_videos.exists(): for scene_dir in media_videos.iterdir(): if scene_dir.is_dir() and scene_dir.name.startswith("scene_"): # Check if directory is older than 10 minutes if scene_dir.stat().st_mtime < cutoff_time: dir_size = sum(f.stat().st_size for f in scene_dir.rglob('*') if f.is_file()) shutil.rmtree(scene_dir) deleted_count += 1 freed_mb += dir_size / (1024 * 1024) logger.info(f" ๐๏ธ Deleted old scene: {scene_dir.name}") # Clean up old voiceover files (keep newest 50, delete older ones) for vo_dir in [Path("media/voiceover/edge_tts"), Path("media/voiceover/elevenlabs"), Path("media/voiceover/gtts")]: if vo_dir.exists(): voice_files = sorted(vo_dir.glob("*.mp3"), key=lambda x: x.stat().st_mtime, reverse=True) # Keep newest 50, delete the rest for old_file in voice_files[50:]: try: file_size = old_file.stat().st_size old_file.unlink() freed_mb += file_size / (1024 * 1024) logger.info(f" ๐๏ธ Deleted old voiceover: {old_file.name}") except: pass # Log disk usage total, used, free = shutil.disk_usage("/") free_mb = free // (1024 * 1024) logger.info(f"โ Cleanup complete: Deleted {deleted_count} scenes, freed {freed_mb:.1f}MB") logger.info(f"๐พ Disk space: {free_mb}MB free of {total // (1024 * 1024)}MB") except Exception as e: logger.error(f"โ Cleanup error: {e}") # Start cleanup thread in DIRECT_MODE (Hugging Face) if DIRECT_MODE: cleanup_thread = threading.Thread(target=cleanup_old_files, daemon=True) cleanup_thread.start() logger.info("๐งน Background cleanup task started (runs every 10 minutes)") # Page config st.set_page_config( page_title="Vidsimplify - AI Video Generator", page_icon="๐ฌ", layout="wide", initial_sidebar_state="expanded" ) # Custom CSS - Premium Techy Design st.markdown(""" """, unsafe_allow_html=True) # ============================================================================ # Direct Mode Functions (No API needed) # ============================================================================ def generate_video_direct(input_type: str, input_data: str, quality: str, category: str, progress_callback=None): """ Generate video directly without API. Used for Hugging Face deployment. """ from manimator.api.animation_generation import generate_animation_response from manimator.utils.code_fixer import CodeFixer job_id = str(uuid.uuid4()) base_dir = Path(__file__).parent # Quality flags for Manim quality_flags = { "low": "-pql", "medium": "-pqm", "high": "-pqh", "ultra": "-pqk" } quality_dirs = { "low": "480p15", "medium": "720p30", "high": "1080p60", "ultra": "2160p60" } try: # Stage 1: Generate code if progress_callback: progress_callback("generating_code", 20, "Generating Manim code with AI...") code = generate_animation_response( input_data=input_data, input_type=input_type, category=category, job_id=job_id ) # Save code to file scene_name = f"Scene_{uuid.uuid4().hex[:8]}" code_file = base_dir / f"scene_{job_id}.py" # Replace class name in code import re code = re.sub(r'class\s+\w+\s*\(\s*VoiceoverScene\s*\)', f'class {scene_name}(VoiceoverScene)', code) with open(code_file, 'w') as f: f.write(code) if progress_callback: progress_callback("code_generated", 40, "Code generated successfully!") # Stage 2: Render video with retry loop fixer = CodeFixer() max_retries = 3 video_path = None # Ensure media directories exist media_dir = base_dir / "media" (media_dir / "voiceover" / "elevenlabs").mkdir(parents=True, exist_ok=True) (media_dir / "voiceover" / "gtts").mkdir(parents=True, exist_ok=True) (media_dir / "voiceover" / "edge_tts").mkdir(parents=True, exist_ok=True) (media_dir / "videos").mkdir(parents=True, exist_ok=True) for attempt in range(max_retries): try: if progress_callback: progress_callback("rendering", 50 + (attempt * 10), f"Rendering video (attempt {attempt + 1}/{max_retries})...") # Run Manim cmd = [ "manim", quality_flags[quality], "--media_dir", str(media_dir), str(code_file), scene_name ] env = os.environ.copy() env["MEDIA_DIR"] = str(media_dir.resolve()) result = subprocess.run( cmd, capture_output=True, text=True, cwd=str(base_dir), env=env, timeout=300 # 5 minute timeout ) if result.returncode != 0: error_msg = result.stderr[-500:] if result.stderr else "Unknown render error" raise Exception(f"Manim rendering failed: {error_msg}") # Find video file video_dir = media_dir / "videos" / code_file.stem / quality_dirs[quality] video_files = list(video_dir.glob("*.mp4")) if not video_files: raise Exception(f"No video file found in {video_dir}") video_path = video_files[0] break # Success! except Exception as e: if attempt < max_retries - 1: # Try to fix code if progress_callback: progress_callback("fixing", 50 + (attempt * 10), "Fixing rendering error...") with open(code_file, 'r') as f: current_code = f.read() fixed_code = fixer.fix_runtime_error(current_code, str(e)) with open(code_file, 'w') as f: f.write(fixed_code) else: raise e # Cleanup code file try: code_file.unlink() except: pass if progress_callback: progress_callback("completed", 100, "Video generation completed!") return video_path except Exception as e: if progress_callback: progress_callback("failed", 0, f"Error: {str(e)}") raise e def generate_via_api(api_url: str, input_type: str, input_data: str, quality: str, category: str, progress_callback=None): """ Generate video via API (for local development). """ import requests payload = { "input_type": input_type, "input_data": input_data, "quality": quality, "category": category } response = requests.post(f"{api_url}/api/videos", json=payload, timeout=30) if response.status_code != 200: raise Exception(f"Failed to start job: {response.text}") job_data = response.json() job_id = job_data["job_id"] # Poll for status while True: status_response = requests.get(f"{api_url}/api/jobs/{job_id}", timeout=10) if status_response.status_code == 200: status_data = status_response.json() status = status_data["status"] progress = status_data.get("progress", {}) percentage = progress.get("percentage", 0) message = progress.get("message", "Processing...") if progress_callback: progress_callback(status, percentage, message) if status == "completed": return f"{api_url}/api/videos/{job_id}" elif status == "failed": raise Exception(status_data.get("error", "Unknown error")) time.sleep(2) # ============================================================================ # Sidebar # ============================================================================ with st.sidebar: # Logo and branding st.markdown("""
Precision Animations
Transform your idea into precision animation.
Transform text, blogs, and documents into beautiful animated videos