import os import json import uuid import shutil from datetime import datetime from app.logger_config import logger as logging import gradio as gr # TMP_DIR = "/tmp/canary_aed_streaming" TMP_DIR = os.getenv("TMP_DIR", "/tmp/canary_aed_streaming") ACTIVE_SESSIONS_HASH_FILE = os.path.join(TMP_DIR, "active_session_hash_code.json") ACTIVE_STREAM_FLAG="stream_active_" ACTIVE_TASK_FLAG="task_active_" NAME_FOLDER_CHUNKS="chunks_" # --------------------------- # Helper to manage the JSON # --------------------------- def _read_session_hash_code(): if not os.path.exists(ACTIVE_SESSIONS_HASH_FILE): return {} try: with open(ACTIVE_SESSIONS_HASH_FILE, "r") as f: return json.load(f) except Exception: return {} def _write_session_hash_code(data): os.makedirs(os.path.dirname(ACTIVE_SESSIONS_HASH_FILE), exist_ok=True) with open(ACTIVE_SESSIONS_HASH_FILE, "w") as f: json.dump(data, f, indent=2) # --------------------------- # LOAD # --------------------------- def on_load(request: gr.Request): """Called when a new visitor opens the app.""" session_hash_code = request.session_hash sessions = _read_session_hash_code() sessions[session_hash_code] = { "session_hash_code": session_hash_code, "file": "", "start_time": datetime.utcnow().strftime("%H:%M:%S"), "status": "active", } _write_session_hash_code(sessions) logging.info(f"[{session_hash_code}] session_hash_code registered (on_load).") return session_hash_code, session_hash_code # can be used as gr.State + display # --------------------------- # UNLOAD # --------------------------- def on_unload(request: gr.Request): """Called when the visitor closes or refreshes the app.""" session_hash_code = request.session_hash sessions = _read_session_hash_code() if session_hash_code in sessions: sessions.pop(session_hash_code) _write_session_hash_code(sessions) remove_session_hash_code_data(session_hash_code) unregister_session_hash_code_hash(session_hash_code) logging.info(f"[{session_hash_code}] session_hash_code removed (on_unload).") else: logging.info(f"[{session_hash_code}] No active session_hash_code found to remove.") def ensure_tmp_dir(): """Ensures the base temporary directory exists.""" try: os.makedirs(TMP_DIR, exist_ok=True) except Exception as e: logging.error(f"Failed to create tmp directory {TMP_DIR}: {e}") def reset_all_active_sessions(): """Removes all temporary session_hash_code files and folders at startup.""" ensure_tmp_dir() try: # --- Remove active session_hash_codes file --- if os.path.exists(ACTIVE_SESSIONS_HASH_FILE): os.remove(ACTIVE_SESSIONS_HASH_FILE) logging.info("Active session_hash_codes file reset at startup.") else: logging.debug("No active session_hash_codes file found to reset.") # --- Clean all flag files (stream + transcribe) --- for f in os.listdir(TMP_DIR): if ( f.startswith(f"{ACTIVE_TASK_FLAG}") or f.startswith(f"{ACTIVE_STREAM_FLAG}") ) and f.endswith(".txt"): path = os.path.join(TMP_DIR, f) try: os.remove(path) logging.debug(f"Removed leftover flag file: {f}") except Exception as e: logging.warning(f"Failed to remove flag file {f}: {e}") # --- Clean chunk directories --- for name in os.listdir(TMP_DIR): path = os.path.join(TMP_DIR, name) if os.path.isdir(path) and name.startswith(f"{NAME_FOLDER_CHUNKS}"): try: shutil.rmtree(path) logging.debug(f"Removed leftover chunk folder: {name}") except Exception as e: logging.warning(f"Failed to remove chunk folder {name}: {e}") logging.info("Temporary session cleanup completed successfully.") except Exception as e: logging.error(f"Error resetting active session_hash_codes: {e}") def remove_session_hash_code_data(session_hash_code: str): """Removes all temporary files and data related to a specific session_hash_code.""" if not session_hash_code: logging.warning("reset_session() called without a valid session_hash_code.") return try: # --- Remove session_hash_code from active_sessions.json --- if os.path.exists(ACTIVE_SESSIONS_HASH_FILE): try: with open(ACTIVE_SESSIONS_HASH_FILE, "r") as f: data = json.load(f) if session_hash_code in data: data.pop(session_hash_code) with open(ACTIVE_SESSIONS_HASH_FILE, "w") as f: json.dump(data, f, indent=2) logging.debug(f"[{session_hash_code}] Removed from {ACTIVE_SESSIONS_HASH_FILE}.") except Exception as e: logging.warning(f"[{session_hash_code}] Failed to update {ACTIVE_SESSIONS_HASH_FILE}: {e}") # --- Define all possible session_hash_code file patterns --- # --- Remove all temporary files --- remove_active_task_flag_file(session_hash_code) remove_active_stream_flag_file(session_hash_code) remove_chunk_folder(session_hash_code) logging.info(f"[{session_hash_code}] session_hash_code fully reset.") except Exception as e: logging.error(f"[{session_hash_code}] Error during reset_session: {e}") def register_session_hash_code(session_hash_code: str, filepath: str): """Registers a new session_hash_code.""" ensure_tmp_dir() data = {} if os.path.exists(ACTIVE_SESSIONS_HASH_FILE): with open(ACTIVE_SESSIONS_HASH_FILE, "r") as f: try: data = json.load(f) except Exception: data = {} data[session_hash_code] = { "session_hash_code": session_hash_code, "file": filepath, "start_time": datetime.utcnow().strftime("%H:%M:%S"), "status": "active", } with open(ACTIVE_SESSIONS_HASH_FILE, "w") as f: json.dump(data, f) logging.debug(f"[{session_hash_code}] session_hash_code registered in active_sessions.json.") def unregister_session_hash_code_hash(session_hash_code: str): """Removes a session_hash_code from the registry.""" if not os.path.exists(ACTIVE_SESSIONS_HASH_FILE): return try: with open(ACTIVE_SESSIONS_HASH_FILE, "r") as f: data = json.load(f) if session_hash_code in data: data.pop(session_hash_code) with open(ACTIVE_SESSIONS_HASH_FILE, "w") as f: json.dump(data, f) logging.debug(f"[{session_hash_code}] session_hash_code unregistered.") except Exception as e: logging.error(f"[{session_hash_code}] Error unregistering session_hash_code: {e}") def get_active_session_hashes(): """Returns active session_hash_codes as a list of rows for the DataFrame.""" if not os.path.exists(ACTIVE_SESSIONS_HASH_FILE): return [] try: with open(ACTIVE_SESSIONS_HASH_FILE, "r") as f: data = json.load(f) rows = [ [ s.get("session_hash_code", ""), s.get("file", ""), s.get("start_time", ""), s.get("status", ""), ] for s in data.values() ] return rows except Exception as e: logging.error(f"Error reading active session_hash_codes: {e}") return [] def get_active_task_flag_file(session_hash_code: str): return os.path.join(TMP_DIR, f"{ACTIVE_TASK_FLAG}{session_hash_code}.txt") def get_active_stream_flag_file(session_hash_code: str): return os.path.join(TMP_DIR, f"{ACTIVE_STREAM_FLAG}{session_hash_code}.txt") def remove_active_stream_flag_file(session_hash_code: str): fname = os.path.join(TMP_DIR, f"{ACTIVE_STREAM_FLAG}{session_hash_code}.txt") if os.path.exists(fname): try: os.remove(fname) logging.debug(f"[{session_hash_code}] Removed file: {fname}") except Exception as e: logging.warning(f"[{session_hash_code}] Failed to remove file {fname}: {e}") def remove_active_task_flag_file(session_hash_code: str): fname = os.path.join(TMP_DIR, f"{ACTIVE_TASK_FLAG}{session_hash_code}.txt") if os.path.exists(fname): try: os.remove(fname) logging.debug(f"[{session_hash_code}] Removed file: {fname}") except Exception as e: logging.warning(f"[{session_hash_code}] Failed to remove file {fname}: {e}") def remove_chunk_folder(session_hash_code: str) : # --- Remove chunk folder if exists --- chunk_dir = os.path.join(TMP_DIR, f"chunks_{session_hash_code}") if os.path.isdir(chunk_dir): try: shutil.rmtree(chunk_dir) logging.debug(f"[{session_hash_code}] Removed chunk folder: chunks_{session_hash_code}") except Exception as e: logging.warning(f"[{session_hash_code}] Failed to remove chunk folder: {e}") def get_session_hashe_chunks_dir(session_hash_code: str): return os.path.join(TMP_DIR, f"{NAME_FOLDER_CHUNKS}{session_hash_code}")