Spaces:
Paused
Paused
import os | |
import sys | |
import re | |
import subprocess | |
import json | |
import threading | |
from io import BytesIO | |
from flask import Flask, request, jsonify | |
from telegram import Update, InputFile, InlineKeyboardButton, InlineKeyboardMarkup | |
from telegram.ext import ( | |
Application, | |
CommandHandler, | |
MessageHandler, | |
CallbackQueryHandler, | |
filters, | |
ConversationHandler, | |
ContextTypes | |
) | |
from telegram.constants import ParseMode | |
from pydub import AudioSegment | |
TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") | |
if not TOKEN: | |
print("Error: TELEGRAM_BOT_TOKEN environment variable not set.", file=sys.stderr) | |
sys.exit(1) | |
BOT_USERNAME = os.getenv("TELEGRAM_BOT_USERNAME", "Voice2mp3_RoBot") | |
ADMIN_ID = int(os.getenv("TELEGRAM_ADMIN_ID", "0")) | |
if ADMIN_ID == 0: | |
print("Warning: TELEGRAM_ADMIN_ID environment variable not set or set to 0. Admin features might not work.", file=sys.stderr) | |
DOWNLOAD_DIR = "/tmp/downloads" | |
OUTPUT_DIR = "/tmp/outputs" | |
CHANNELS_FILE = "channels.json" | |
os.makedirs(DOWNLOAD_DIR, exist_ok=True) | |
os.makedirs(OUTPUT_DIR, exist_ok=True) | |
def load_required_channels(): | |
if os.path.exists(CHANNELS_FILE): | |
try: | |
with open(CHANNELS_FILE, 'r', encoding='utf-8') as f: | |
return json.load(f) | |
except json.JSONDecodeError: | |
print(f"Error decoding JSON from {CHANNELS_FILE}. Starting with empty list.", file=sys.stderr) | |
return [] | |
return [] | |
def save_required_channels(channels): | |
with open(CHANNELS_FILE, 'w', encoding='utf-8') as f: | |
json.dump(channels, f, indent=4, ensure_ascii=False) | |
REQUIRED_CHANNELS = load_required_channels() | |
LANGUAGE_SELECTION, MAIN_MENU, CONVERT_AUDIO, CUT_AUDIO_FILE, CUT_AUDIO_RANGE, \ | |
VIDEO_CONVERSION_MODE, WAITING_FOR_MEMBERSHIP, \ | |
ADMIN_MENU, ADD_CHANNEL, LIST_REMOVE_CHANNELS = range(10) | |
MESSAGES = { | |
# ... [keep the MESSAGES dictionary unchanged] ... | |
} | |
def get_message(context, key, **kwargs): | |
lang = context.user_data.get('language', 'fa') | |
message_template = MESSAGES[lang].get(key, MESSAGES['fa'][key]) | |
return message_template.format(**kwargs) | |
def parse_time_to_ms(time_str): | |
match = re.match(r'^(\d{2})\.(\d{2})$', time_str) | |
if not match: | |
raise ValueError("Invalid time format") | |
minutes = int(match.group(1)) | |
seconds = int(match.group(2)) | |
if seconds >= 60: | |
raise ValueError("Seconds must be between 00 and 59") | |
return (minutes * 60 + seconds) * 1000 | |
async def check_user_membership(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
# ... [keep this function unchanged] ... | |
async def show_membership_required_message(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
# ... [keep this function unchanged] ... | |
async def process_feature_or_check_membership(update: Update, context: ContextTypes.DEFAULT_TYPE, feature_func, *args, **kwargs): | |
# ... [keep this function unchanged] ... | |
async def check_membership_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
# ... [keep this function unchanged] ... | |
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
# ... [keep this function unchanged] ... | |
async def set_language(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
# ... [keep this function unchanged] ... | |
async def show_main_menu(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
# ... [keep this function unchanged] ... | |
async def change_format_selected(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
# ... [keep this function unchanged] ... | |
async def cut_audio_selected(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
# ... [keep this function unchanged] ... | |
async def video_conversion_selected(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
# ... [keep this function unchanged] ... | |
async def handle_audio(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
# ... [keep this function unchanged] ... | |
async def handle_voice(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
# ... [keep this function unchanged] ... | |
async def handle_cut_audio_file(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
# ... [keep this function unchanged] ... | |
async def handle_cut_audio_range(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
# ... [keep this function unchanged] ... | |
async def handle_video_conversion(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
# ... [keep this function unchanged] ... | |
async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
# ... [keep this function unchanged] ... | |
async def error_handler(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
# ... [keep this function unchanged] ... | |
async def admin_link_command(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
# ... [keep this function unchanged] ... | |
async def admin_add_channel_prompt(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
# ... [keep this function unchanged] ... | |
async def admin_handle_add_channel(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
# ... [keep this function unchanged] ... | |
async def admin_list_channels(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
# ... [keep this function unchanged] ... | |
async def admin_handle_remove_channel(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
# ... [keep this function unchanged] ... | |
app = Flask(__name__) | |
# مدیریت ایمن Application در محیط چندنخی | |
_app_lock = threading.Lock() | |
_application_instance = None | |
def get_telegram_application(): | |
global _application_instance | |
with _app_lock: | |
if _application_instance is None: | |
print("Initializing new Telegram Application instance...") | |
_app = Application.builder().token(TOKEN).build() | |
conv_handler = ConversationHandler( | |
entry_points=[ | |
CommandHandler("start", start), | |
CommandHandler("link", admin_link_command) | |
], | |
states={ | |
# ... [keep the states unchanged] ... | |
}, | |
fallbacks=[CommandHandler("cancel", cancel), CommandHandler("start", start)], | |
allow_reentry=True | |
) | |
_app.add_handler(conv_handler) | |
_app.add_error_handler(error_handler) | |
_application_instance = _app | |
print("Telegram Application initialized.") | |
return _application_instance | |
def index(): | |
return jsonify({"status": "ok", "message": "Telegram bot is running."}) | |
def webhook(): | |
application = get_telegram_application() | |
json_data = request.get_json(force=True) | |
update = Update.de_json(json_data, application.bot) | |
application.update_queue.put(update) | |
return "ok" | |
def set_webhook_route(): | |
webhook_url = os.getenv("WEBHOOK_URL") | |
if not webhook_url: | |
return jsonify({"status": "error", "message": "WEBHOOK_URL environment variable not set."}), 500 | |
if not webhook_url.endswith("/webhook"): | |
webhook_url = f"{webhook_url.rstrip('/')}/webhook" | |
try: | |
application = get_telegram_application() | |
application.bot.set_webhook(url=webhook_url) | |
return jsonify({"status": "success", "message": f"Webhook set to {webhook_url}"}) | |
except Exception as e: | |
print(f"Failed to set webhook: {e}", file=sys.stderr) | |
return jsonify({"status": "error", "message": f"Failed to set webhook: {e}"}), 500 | |
def run_bot(): | |
"""Run the bot in polling mode for development""" | |
print("Starting bot in polling mode...") | |
application = get_telegram_application() | |
application.run_polling() | |
if __name__ == "__main__": | |
# Run Flask app and bot in polling mode for local development | |
import threading | |
threading.Thread(target=run_bot, daemon=True).start() | |
app.run(host='0.0.0.0', port=int(os.environ.get("PORT", 7860))) |