|
|
|
|
|
""" |
|
|
HuggingFace Spaces app.py for IndexTTS2 with Auto-Processing and Combined Audio |
|
|
""" |
|
|
import os |
|
|
import sys |
|
|
import subprocess |
|
|
import gradio as gr |
|
|
import torch |
|
|
import numpy as np |
|
|
import soundfile as sf |
|
|
from huggingface_hub import ( |
|
|
HfApi, |
|
|
hf_hub_download, |
|
|
CommitOperationAdd, |
|
|
list_repo_files, |
|
|
CommitOperationDelete, |
|
|
) |
|
|
import threading |
|
|
import time |
|
|
from pathlib import Path |
|
|
import tempfile |
|
|
|
|
|
|
|
|
os.environ["GRADIO_SERVER_NAME"] = "0.0.0.0" |
|
|
os.environ["GRADIO_SERVER_PORT"] = "7860" |
|
|
|
|
|
|
|
|
current_dir = os.path.dirname(os.path.abspath(__file__)) |
|
|
sys.path.append(current_dir) |
|
|
|
|
|
|
|
|
auto_process_running = False |
|
|
auto_process_thread = None |
|
|
current_status = "Ready" |
|
|
tts_model = None |
|
|
|
|
|
|
|
|
MAX_COMBINED_DURATION = 30 * 60 |
|
|
PAUSE_DURATION = 3.0 |
|
|
|
|
|
|
|
|
def download_models(): |
|
|
"""Download models if they don't exist""" |
|
|
checkpoints_dir = "./checkpoints" |
|
|
|
|
|
if not os.path.exists(checkpoints_dir): |
|
|
print("Downloading IndexTTS2 models...") |
|
|
try: |
|
|
from huggingface_hub import snapshot_download |
|
|
|
|
|
snapshot_download( |
|
|
repo_id="IndexTeam/IndexTTS-2", |
|
|
local_dir=checkpoints_dir, |
|
|
allow_patterns=[ |
|
|
"*.pth", |
|
|
"*.pt", |
|
|
"*.yaml", |
|
|
"*.model", |
|
|
"*.vocab", |
|
|
"qwen0.6bemo4-merge/**", |
|
|
], |
|
|
) |
|
|
print("Models downloaded successfully!") |
|
|
except Exception as e: |
|
|
print(f"Failed to download models: {e}") |
|
|
print( |
|
|
"Please download models manually from: " |
|
|
"https://huggingface.co/IndexTeam/IndexTTS-2" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
download_models() |
|
|
|
|
|
|
|
|
try: |
|
|
from indextts.infer_v2 import IndexTTS2 |
|
|
|
|
|
tts_model = IndexTTS2( |
|
|
cfg_path="checkpoints/config.yaml", |
|
|
model_dir="checkpoints", |
|
|
use_fp16=True, |
|
|
use_cuda_kernel=False, |
|
|
use_deepspeed=False, |
|
|
) |
|
|
print("IndexTTS2 model loaded successfully!") |
|
|
except Exception as e: |
|
|
print(f"Error loading IndexTTS2 model: {e}") |
|
|
tts_model = None |
|
|
|
|
|
|
|
|
def add_silence(duration_sec: float, sample_rate: int = 24000) -> np.ndarray: |
|
|
"""Generate silence of specified duration in seconds.""" |
|
|
return np.zeros(int(duration_sec * sample_rate), dtype=np.float32) |
|
|
|
|
|
|
|
|
def parse_audio_duration_from_log(log_line: str): |
|
|
"""Parse audio duration from log line like '>> Generated audio length: 4.89 seconds'""" |
|
|
if "Generated audio length:" in log_line: |
|
|
try: |
|
|
duration_str = ( |
|
|
log_line.split("Generated audio length:")[1] |
|
|
.split("seconds")[0] |
|
|
.strip() |
|
|
) |
|
|
return float(duration_str) |
|
|
except Exception: |
|
|
return None |
|
|
return None |
|
|
|
|
|
|
|
|
def create_combined_audios(audio_files_info): |
|
|
""" |
|
|
Create combined audio file(s) with 3-second pauses, |
|
|
without changing pitch, samplerate or bitdepth. |
|
|
|
|
|
audio_files_info: List[(file_path, duration_in_seconds)] |
|
|
""" |
|
|
|
|
|
|
|
|
first_file = audio_files_info[0][0] |
|
|
_, sr = sf.read(first_file, dtype="int16") |
|
|
|
|
|
|
|
|
silence_3s = np.zeros(int(sr * PAUSE_DURATION), dtype=np.int16) |
|
|
|
|
|
combined_files = [] |
|
|
current_files = [] |
|
|
current_duration = 0.0 |
|
|
combined_index = 1 |
|
|
|
|
|
for file_path, duration in audio_files_info: |
|
|
|
|
|
new_length = current_duration |
|
|
if current_files: |
|
|
new_length += PAUSE_DURATION |
|
|
new_length += duration |
|
|
|
|
|
|
|
|
if new_length > MAX_COMBINED_DURATION and current_files: |
|
|
combined_name = ( |
|
|
"temp_combined.wav" |
|
|
if combined_index == 1 and len(audio_files_info) <= 30 |
|
|
else f"temp_combined_{combined_index:03d}.wav" |
|
|
) |
|
|
|
|
|
audio_out = [] |
|
|
|
|
|
|
|
|
silence_intro = np.zeros(int(sr * 1.5), dtype=np.int16) |
|
|
audio_out.append(silence_intro) |
|
|
|
|
|
for i, fp in enumerate(current_files): |
|
|
data, _ = sf.read(fp, dtype='int16') |
|
|
audio_out.append(data) |
|
|
|
|
|
|
|
|
if i < len(current_files) - 1: |
|
|
audio_out.append(silence_3s) |
|
|
|
|
|
|
|
|
final_audio = np.concatenate(audio_out) |
|
|
sf.write(combined_name, final_audio, sr, subtype="PCM_16") |
|
|
|
|
|
combined_files.append((combined_name, current_duration)) |
|
|
print( |
|
|
f"Created combined file {combined_index}: " |
|
|
f"{int(current_duration // 60)}:{int(current_duration % 60):02d}" |
|
|
) |
|
|
combined_index += 1 |
|
|
|
|
|
|
|
|
current_files = [file_path] |
|
|
current_duration = duration |
|
|
|
|
|
else: |
|
|
current_files.append(file_path) |
|
|
if len(current_files) == 1: |
|
|
current_duration = duration |
|
|
else: |
|
|
current_duration += PAUSE_DURATION + duration |
|
|
|
|
|
|
|
|
if current_files: |
|
|
combined_name = ( |
|
|
"temp_combined.wav" |
|
|
if combined_index == 1 and len(audio_files_info) <= 30 |
|
|
else f"temp_combined_{combined_index:03d}.wav" |
|
|
) |
|
|
|
|
|
audio_out = [] |
|
|
|
|
|
|
|
|
silence_intro = np.zeros(int(sr * 1.5), dtype=np.int16) |
|
|
audio_out.append(silence_intro) |
|
|
|
|
|
for i, fp in enumerate(current_files): |
|
|
data, _ = sf.read(fp, dtype='int16') |
|
|
audio_out.append(data) |
|
|
|
|
|
|
|
|
if i < len(current_files) - 1: |
|
|
audio_out.append(silence_3s) |
|
|
|
|
|
|
|
|
final_audio = np.concatenate(audio_out) |
|
|
sf.write(combined_name, final_audio, sr, subtype="PCM_16") |
|
|
|
|
|
combined_files.append((combined_name, current_duration)) |
|
|
print( |
|
|
f"Created combined file {combined_index}: " |
|
|
f"{int(current_duration // 60)}:{int(current_duration % 60):02d}" |
|
|
) |
|
|
|
|
|
return combined_files |
|
|
|
|
|
|
|
|
def auto_process_dataset(): |
|
|
""" |
|
|
Auto-process TXT files from Monarchtaba22/rawAffirmation |
|
|
Generate audio for each sentence (split by .-) and upload to output dataset |
|
|
Create combined audio(s) with 3s pauses, max 30 min each |
|
|
Move processed TXT files to /done folder |
|
|
""" |
|
|
global auto_process_running, current_status, tts_model |
|
|
|
|
|
if tts_model is None: |
|
|
current_status = "Error: TTS model not loaded" |
|
|
return |
|
|
|
|
|
try: |
|
|
token = os.getenv("HF_TOKEN") |
|
|
if not token: |
|
|
current_status = "Error: HF_TOKEN not found in environment" |
|
|
return |
|
|
|
|
|
api = HfApi(token=token) |
|
|
input_dataset_id = "Mo2294/rawAffirmation" |
|
|
output_dataset_id = "Mo2294/outputAffirmation" |
|
|
|
|
|
|
|
|
current_status = "Downloading reference voice Mo.wav..." |
|
|
reference_voice_path = hf_hub_download( |
|
|
repo_id=output_dataset_id, |
|
|
filename="Mo.wav", |
|
|
repo_type="dataset", |
|
|
token=token, |
|
|
) |
|
|
|
|
|
|
|
|
current_status = "Scanning for TXT files..." |
|
|
try: |
|
|
repo_files = list_repo_files( |
|
|
repo_id=input_dataset_id, repo_type="dataset", token=token |
|
|
) |
|
|
|
|
|
txt_files = [ |
|
|
f |
|
|
for f in repo_files |
|
|
if f.endswith(".txt") and not f.startswith("done/") |
|
|
] |
|
|
except Exception as e: |
|
|
current_status = f"Error listing files: {e}" |
|
|
return |
|
|
|
|
|
if not txt_files: |
|
|
current_status = "No TXT files found to process" |
|
|
return |
|
|
|
|
|
current_status = f"Found {len(txt_files)} TXT files to process" |
|
|
|
|
|
|
|
|
for txt_file in txt_files: |
|
|
if not auto_process_running: |
|
|
current_status = "Processing stopped by user" |
|
|
break |
|
|
|
|
|
txt_name = Path(txt_file).stem |
|
|
current_status = f"Processing: {txt_name}" |
|
|
|
|
|
try: |
|
|
|
|
|
txt_path = hf_hub_download( |
|
|
repo_id=input_dataset_id, |
|
|
filename=txt_file, |
|
|
repo_type="dataset", |
|
|
token=token, |
|
|
) |
|
|
|
|
|
|
|
|
with open(txt_path, "r", encoding="utf-8") as f: |
|
|
content = f.read() |
|
|
|
|
|
|
|
|
raw_sentences = content.split(".-") |
|
|
sentences = [] |
|
|
|
|
|
for s in raw_sentences: |
|
|
cleaned = s.strip() |
|
|
if cleaned: |
|
|
|
|
|
if cleaned.endswith("-") or cleaned.endswith("."): |
|
|
cleaned = cleaned[:-1].rstrip() |
|
|
sentences.append(cleaned) |
|
|
|
|
|
if not sentences: |
|
|
current_status = f"No sentences found in {txt_name}" |
|
|
continue |
|
|
|
|
|
current_status = ( |
|
|
f"Found {len(sentences)} sentences in {txt_name}" |
|
|
) |
|
|
print(f"Processing sentences from {txt_name}:") |
|
|
|
|
|
temp_files = [] |
|
|
audio_files_info = [] |
|
|
commit_operations = [] |
|
|
|
|
|
|
|
|
for idx, sentence in enumerate(sentences): |
|
|
if not auto_process_running: |
|
|
break |
|
|
|
|
|
current_status = ( |
|
|
f"Processing {txt_name}: sentence " |
|
|
f"{idx + 1}/{len(sentences)}" |
|
|
) |
|
|
|
|
|
try: |
|
|
if not sentence: |
|
|
continue |
|
|
|
|
|
|
|
|
if sentence[-1] not in ".!?": |
|
|
sentence = sentence + "." |
|
|
|
|
|
print(f" Sentence {idx+1}: '{sentence}'") |
|
|
|
|
|
|
|
|
output_filename = f"temp_{txt_name}_{idx+1:03d}.wav" |
|
|
|
|
|
|
|
|
import io |
|
|
from contextlib import redirect_stdout |
|
|
|
|
|
buf = io.StringIO() |
|
|
with redirect_stdout(buf): |
|
|
tts_model.infer( |
|
|
spk_audio_prompt=reference_voice_path, |
|
|
text=sentence, |
|
|
output_path=output_filename, |
|
|
verbose=True, |
|
|
) |
|
|
|
|
|
|
|
|
output_log = buf.getvalue() |
|
|
duration = None |
|
|
for line in output_log.split("\n"): |
|
|
dur = parse_audio_duration_from_log(line) |
|
|
if dur: |
|
|
duration = dur |
|
|
break |
|
|
|
|
|
if duration is None: |
|
|
|
|
|
audio_data, sr = sf.read(output_filename) |
|
|
duration = len(audio_data) / sr |
|
|
|
|
|
print(f" Generated audio: {duration:.2f} seconds") |
|
|
|
|
|
|
|
|
audio_files_info.append((output_filename, duration)) |
|
|
temp_files.append(output_filename) |
|
|
|
|
|
|
|
|
output_path = ( |
|
|
f"Affirmations/{txt_name}/" |
|
|
f"{txt_name}_{idx+1:03d}.wav" |
|
|
) |
|
|
commit_operations.append( |
|
|
CommitOperationAdd( |
|
|
path_in_repo=output_path, |
|
|
path_or_fileobj=output_filename, |
|
|
) |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
current_status = ( |
|
|
f"Error generating audio for sentence {idx+1}: {e}" |
|
|
) |
|
|
print(f"Generation error: {e}") |
|
|
continue |
|
|
|
|
|
|
|
|
if audio_files_info and auto_process_running: |
|
|
current_status = ( |
|
|
f"Creating combined audio(s) for {txt_name}..." |
|
|
) |
|
|
combined_files = create_combined_audios(audio_files_info) |
|
|
|
|
|
|
|
|
for i, (combined_file, duration) in enumerate( |
|
|
combined_files |
|
|
): |
|
|
if len(combined_files) == 1: |
|
|
combined_path = ( |
|
|
f"Affirmations/{txt_name}/" |
|
|
f"{txt_name}_combined.wav" |
|
|
) |
|
|
else: |
|
|
combined_path = ( |
|
|
f"Affirmations/{txt_name}/" |
|
|
f"{txt_name}_combined_{i+1:03d}.wav" |
|
|
) |
|
|
|
|
|
commit_operations.append( |
|
|
CommitOperationAdd( |
|
|
path_in_repo=combined_path, |
|
|
path_or_fileobj=combined_file, |
|
|
) |
|
|
) |
|
|
temp_files.append(combined_file) |
|
|
|
|
|
duration_min = int(duration // 60) |
|
|
duration_sec = int(duration % 60) |
|
|
print( |
|
|
f" Combined file {i+1}: " |
|
|
f"{duration_min}:{duration_sec:02d}" |
|
|
) |
|
|
|
|
|
|
|
|
if commit_operations and auto_process_running: |
|
|
total_individual = len(audio_files_info) |
|
|
total_combined = ( |
|
|
len(combined_files) if audio_files_info else 0 |
|
|
) |
|
|
|
|
|
current_status = ( |
|
|
f"Uploading {total_individual} individual + " |
|
|
f"{total_combined} combined files for {txt_name}..." |
|
|
) |
|
|
|
|
|
try: |
|
|
api.create_commit( |
|
|
repo_id=output_dataset_id, |
|
|
repo_type="dataset", |
|
|
operations=commit_operations, |
|
|
commit_message=( |
|
|
f"Add audio files for {txt_name} - " |
|
|
f"{total_individual} individual + " |
|
|
f"{total_combined} combined" |
|
|
), |
|
|
token=token, |
|
|
) |
|
|
current_status = ( |
|
|
f"Successfully uploaded files for {txt_name}" |
|
|
) |
|
|
|
|
|
|
|
|
current_status = ( |
|
|
f"Moving {txt_name}.txt to /done folder..." |
|
|
) |
|
|
|
|
|
|
|
|
with open(txt_path, "rb") as f: |
|
|
file_content = f.read() |
|
|
|
|
|
|
|
|
move_operations = [ |
|
|
CommitOperationAdd( |
|
|
path_in_repo=f"done/{txt_file}", |
|
|
path_or_fileobj=file_content, |
|
|
), |
|
|
CommitOperationDelete(path_in_repo=txt_file), |
|
|
] |
|
|
|
|
|
api.create_commit( |
|
|
repo_id=input_dataset_id, |
|
|
repo_type="dataset", |
|
|
operations=move_operations, |
|
|
commit_message=( |
|
|
f"Move {txt_name}.txt to /done after processing" |
|
|
), |
|
|
token=token, |
|
|
) |
|
|
|
|
|
current_status = ( |
|
|
f"✅ Completed {txt_name}: " |
|
|
f"{total_individual} individual + " |
|
|
f"{total_combined} combined audio files" |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
current_status = ( |
|
|
f"Upload/Move error for {txt_name}: {e}" |
|
|
) |
|
|
print(f"Error: {e}") |
|
|
|
|
|
|
|
|
for temp_file in temp_files: |
|
|
try: |
|
|
if os.path.exists(temp_file): |
|
|
os.remove(temp_file) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
time.sleep(2) |
|
|
|
|
|
except Exception as e: |
|
|
current_status = f"Error processing {txt_name}: {e}" |
|
|
print(f"Error: {e}") |
|
|
continue |
|
|
|
|
|
if auto_process_running: |
|
|
current_status = "✅ Auto-processing completed successfully!" |
|
|
else: |
|
|
current_status = "⏹️ Auto-processing stopped" |
|
|
|
|
|
except Exception as e: |
|
|
current_status = f"❌ Fatal error: {str(e)}" |
|
|
print(f"Fatal error: {e}") |
|
|
finally: |
|
|
auto_process_running = False |
|
|
|
|
|
|
|
|
def start_auto_process(): |
|
|
"""Start the auto-processing thread""" |
|
|
global auto_process_running, auto_process_thread |
|
|
|
|
|
if auto_process_running: |
|
|
return "Auto-processing already running!", current_status |
|
|
|
|
|
auto_process_running = True |
|
|
auto_process_thread = threading.Thread(target=auto_process_dataset) |
|
|
auto_process_thread.start() |
|
|
return "✅ Auto-processing started!", "Starting..." |
|
|
|
|
|
|
|
|
def stop_auto_process(): |
|
|
"""Stop the auto-processing""" |
|
|
global auto_process_running |
|
|
auto_process_running = False |
|
|
return "⏹️ Stop signal sent!", current_status |
|
|
|
|
|
|
|
|
def get_status(): |
|
|
"""Get current processing status""" |
|
|
global auto_process_running |
|
|
if auto_process_running: |
|
|
return current_status + " 🔄" |
|
|
return current_status |
|
|
|
|
|
|
|
|
def manual_generate(text, reference_audio, emotion_audio, emo_alpha, use_emo_text): |
|
|
"""Manual TTS generation""" |
|
|
global tts_model |
|
|
|
|
|
if tts_model is None: |
|
|
return None |
|
|
|
|
|
if not reference_audio: |
|
|
return None |
|
|
|
|
|
try: |
|
|
output_path = "manual_output.wav" |
|
|
|
|
|
if emotion_audio: |
|
|
tts_model.infer( |
|
|
spk_audio_prompt=reference_audio, |
|
|
text=text, |
|
|
output_path=output_path, |
|
|
emo_audio_prompt=emotion_audio, |
|
|
emo_alpha=emo_alpha, |
|
|
verbose=False, |
|
|
) |
|
|
else: |
|
|
tts_model.infer( |
|
|
spk_audio_prompt=reference_audio, |
|
|
text=text, |
|
|
output_path=output_path, |
|
|
use_emo_text=use_emo_text, |
|
|
emo_alpha=emo_alpha if use_emo_text else 1.0, |
|
|
verbose=False, |
|
|
) |
|
|
|
|
|
|
|
|
audio_data, sample_rate = sf.read(output_path) |
|
|
return (sample_rate, audio_data) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Generation error: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(title="IndexTTS2 with Auto-Processing") as demo: |
|
|
gr.Markdown("# 🎤 IndexTTS2 Voice Synthesis") |
|
|
gr.Markdown( |
|
|
"State-of-the-art TTS with auto-processing and combined audio generation" |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Tab("Manual Processing"): |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
text_input = gr.Textbox( |
|
|
label="Text to synthesize", |
|
|
placeholder="Enter text here...", |
|
|
lines=3, |
|
|
value="大家好,我现在正在体验AI科技!", |
|
|
) |
|
|
reference_audio = gr.Audio( |
|
|
sources=["upload"], |
|
|
type="filepath", |
|
|
label="Voice reference (required)", |
|
|
) |
|
|
emotion_audio = gr.Audio( |
|
|
sources=["upload"], |
|
|
type="filepath", |
|
|
label="Emotion reference (optional)", |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
emo_alpha = gr.Slider( |
|
|
minimum=0.0, |
|
|
maximum=1.0, |
|
|
value=0.6, |
|
|
step=0.1, |
|
|
label="Emotion strength", |
|
|
) |
|
|
use_emo_text = gr.Checkbox( |
|
|
label="Use text-based emotion", value=False |
|
|
) |
|
|
|
|
|
with gr.Column(): |
|
|
generate_btn = gr.Button( |
|
|
"🎙️ Generate", variant="primary", size="lg" |
|
|
) |
|
|
output_audio = gr.Audio(label="Generated audio", type="numpy") |
|
|
|
|
|
generate_btn.click( |
|
|
manual_generate, |
|
|
inputs=[ |
|
|
text_input, |
|
|
reference_audio, |
|
|
emotion_audio, |
|
|
emo_alpha, |
|
|
use_emo_text, |
|
|
], |
|
|
outputs=output_audio, |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Tab("Auto Processing"): |
|
|
gr.Markdown("### 🚀 Automatic Dataset Processing with Combined Audio") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown( |
|
|
""" |
|
|
**Configuration:** |
|
|
- 📁 Input: `Mo2294/rawAffirmation` |
|
|
- 📂 Output: `Mo2294/outputAffirmation` |
|
|
- 🎙️ Voice: `Mo.wav` |
|
|
- ✂️ Delimiter: `.-` |
|
|
- 📝 Structure: `/Affirmations/[name]/` |
|
|
- ⏰ Combined: Max 30 min chunks |
|
|
- ⏸️ Pauses: 3 seconds between audios |
|
|
""" |
|
|
) |
|
|
|
|
|
with gr.Column(scale=2): |
|
|
status_display = gr.Textbox( |
|
|
label="📊 Processing Status", |
|
|
value=get_status(), |
|
|
interactive=False, |
|
|
lines=3, |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
start_btn = gr.Button( |
|
|
"▶️ Start Processing", variant="primary", scale=2 |
|
|
) |
|
|
stop_btn = gr.Button("⏹️ Stop", variant="stop", scale=1) |
|
|
refresh_btn = gr.Button("🔄 Refresh", scale=1) |
|
|
|
|
|
message_display = gr.Textbox( |
|
|
label="Message", interactive=False, visible=False |
|
|
) |
|
|
|
|
|
|
|
|
start_btn.click( |
|
|
start_auto_process, outputs=[message_display, status_display] |
|
|
) |
|
|
stop_btn.click( |
|
|
stop_auto_process, outputs=[message_display, status_display] |
|
|
) |
|
|
refresh_btn.click(get_status, outputs=status_display) |
|
|
|
|
|
|
|
|
gr.Markdown( |
|
|
""" |
|
|
--- |
|
|
<div align="center"> |
|
|
<a href="https://github.com/index-tts/index-tts">GitHub</a> | |
|
|
<a href="https://arxiv.org/abs/2506.21619">Paper</a> | |
|
|
<a href="https://index-tts.github.io/index-tts2.github.io/">Demo</a> |
|
|
</div> |
|
|
""" |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch() |
|
|
|