| | TELEGRAM_BOT_TOKEN="7722898432:AAEfj9s6ubY107SWiF6Uy1IKJFFmsiqY_BA" |
| |
|
| |
|
| |
|
| | import os |
| | from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup |
| | from telegram.ext import Application, CommandHandler, CallbackQueryHandler, MessageHandler, filters |
| | from telegram.ext import ContextTypes |
| | from main import song_cover_pipeline |
| | from webui import download_online_model |
| |
|
| |
|
| | |
| | BASE_DIR = os.path.dirname(os.path.abspath(__file__)) |
| | output_dir = os.path.join(BASE_DIR, 'song_output') |
| |
|
| | |
| | os.makedirs(output_dir, exist_ok=True) |
| |
|
| | |
| | async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): |
| | keyboard = [ |
| | [InlineKeyboardButton("Generate Song", callback_data='generate')], |
| | [InlineKeyboardButton("Download Model", callback_data='download_model')], |
| | [InlineKeyboardButton("Help", callback_data='help')] |
| | ] |
| | reply_markup = InlineKeyboardMarkup(keyboard) |
| | await update.message.reply_text('Welcome to AICoverGen! Choose an option below:', reply_markup=reply_markup) |
| |
|
| | |
| | async def button(update: Update, context: ContextTypes.DEFAULT_TYPE): |
| | query = update.callback_query |
| | await query.answer() |
| |
|
| | |
| | if query.data == 'generate': |
| | |
| | context.user_data['mode'] = 'generate' |
| | await query.edit_message_text(text="Please send the model name, YouTube link, and pitch (e.g., '<model_name> <link> <pitch>')\nNote: pitch 1 for female and pitch -1 for male.") |
| | |
| | elif query.data == 'download_model': |
| | |
| | context.user_data['mode'] = 'download_model' |
| | await query.edit_message_text(text="Please send the model name and URL in the format '<model_name> <url>'.") |
| |
|
| | elif query.data == 'help': |
| | help_text = ( |
| | "To generate a song, follow these steps:\n" |
| | "1. Click 'Generate Song'.\n" |
| | "2. Send a message in the format '<model_name> <link> <pitch>' (e.g., 'model1 https://youtube.com/abc 2').\n" |
| | "3. Wait for the bot to process and return the generated song.\n" |
| | "Pitch: Use 1 for female voice, -1 for male voice.\n\n" |
| | "To download a model:\n" |
| | "1. Click 'Download Model'.\n" |
| | "2. Send a message in the format '<model_name> <url>' to specify the model name and the download URL." |
| | ) |
| | await query.edit_message_text(text=help_text) |
| |
|
| | |
| | async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): |
| | |
| | mode = context.user_data.get('mode') |
| |
|
| | if mode == 'generate': |
| | |
| | await generate_song(update, context) |
| | elif mode == 'download_model': |
| | |
| | await download_model(update, context) |
| | else: |
| | |
| | await update.message.reply_text("Please choose an option first by clicking 'Generate Song' or 'Download Model'.") |
| |
|
| | |
| | async def generate_song(update: Update, context: ContextTypes.DEFAULT_TYPE): |
| | song_input = update.message.text |
| | try: |
| | model_name, song_link, pitch_str = song_input.split() |
| | pitch = int(pitch_str) |
| | except ValueError: |
| | await update.message.reply_text(f"Please send a valid input in the format '<model_name> <link> <pitch>' (e.g., 'model1 https://youtube.com/abc 2').") |
| | return |
| |
|
| | keep_files = False |
| | is_webui = False |
| |
|
| | song_output = song_cover_pipeline(song_link, model_name, pitch, keep_files, is_webui) |
| | |
| | if os.path.exists(song_output): |
| | await update.message.reply_audio(audio=open(song_output, 'rb')) |
| | os.remove(song_output) |
| | else: |
| | await update.message.reply_text(f"An error occurred while generating the song.") |
| |
|
| | |
| | async def download_model(update: Update, context: ContextTypes.DEFAULT_TYPE): |
| | model_input = update.message.text |
| | try: |
| | |
| | model_name, model_url = model_input.split() |
| | except ValueError: |
| | await update.message.reply_text(f"Please send a valid input in the format '<model_name> <url>' (e.g., 'model1 https://model.com/abc').") |
| | return |
| |
|
| | if not model_url.startswith("http"): |
| | await update.message.reply_text("Please send a valid URL.") |
| | return |
| |
|
| | try: |
| | |
| | download_online_model(model_url, model_name) |
| | await update.message.reply_text(f"Model '{model_name}' downloaded successfully from {model_url}!") |
| | except Exception as e: |
| | await update.message.reply_text(f"Failed to download the model. Error: {str(e)}") |
| |
|
| | |
| | def main(): |
| | bot_token = TELEGRAM_BOT_TOKEN |
| |
|
| | if not bot_token: |
| | raise ValueError("Bot token not found. Set the TELEGRAM_BOT_TOKEN environment variable.") |
| |
|
| | application = Application.builder().token(bot_token).build() |
| |
|
| | |
| | application.add_handler(CommandHandler("start", start)) |
| | application.add_handler(CallbackQueryHandler(button)) |
| | application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) |
| |
|
| | |
| | application.run_polling() |
| |
|
| | if __name__ == '__main__': |
| | main() |