my-telegram-bot / app.py
akdNIKY's picture
Update app.py
1fc2290 verified
raw
history blame
7.98 kB
import os
import sys
import re
import subprocess
import json
import threading
from io import BytesIO
from flask import Flask, request, jsonify
from telegram import Update, InputFile, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (
Application,
CommandHandler,
MessageHandler,
CallbackQueryHandler,
filters,
ConversationHandler,
ContextTypes
)
from telegram.constants import ParseMode
from pydub import AudioSegment
TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
if not TOKEN:
print("Error: TELEGRAM_BOT_TOKEN environment variable not set.", file=sys.stderr)
sys.exit(1)
BOT_USERNAME = os.getenv("TELEGRAM_BOT_USERNAME", "Voice2mp3_RoBot")
ADMIN_ID = int(os.getenv("TELEGRAM_ADMIN_ID", "0"))
if ADMIN_ID == 0:
print("Warning: TELEGRAM_ADMIN_ID environment variable not set or set to 0. Admin features might not work.", file=sys.stderr)
DOWNLOAD_DIR = "/tmp/downloads"
OUTPUT_DIR = "/tmp/outputs"
CHANNELS_FILE = "channels.json"
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
def load_required_channels():
if os.path.exists(CHANNELS_FILE):
try:
with open(CHANNELS_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
except json.JSONDecodeError:
print(f"Error decoding JSON from {CHANNELS_FILE}. Starting with empty list.", file=sys.stderr)
return []
return []
def save_required_channels(channels):
with open(CHANNELS_FILE, 'w', encoding='utf-8') as f:
json.dump(channels, f, indent=4, ensure_ascii=False)
REQUIRED_CHANNELS = load_required_channels()
LANGUAGE_SELECTION, MAIN_MENU, CONVERT_AUDIO, CUT_AUDIO_FILE, CUT_AUDIO_RANGE, \
VIDEO_CONVERSION_MODE, WAITING_FOR_MEMBERSHIP, \
ADMIN_MENU, ADD_CHANNEL, LIST_REMOVE_CHANNELS = range(10)
MESSAGES = {
# ... [keep the MESSAGES dictionary unchanged] ...
}
def get_message(context, key, **kwargs):
lang = context.user_data.get('language', 'fa')
message_template = MESSAGES[lang].get(key, MESSAGES['fa'][key])
return message_template.format(**kwargs)
def parse_time_to_ms(time_str):
match = re.match(r'^(\d{2})\.(\d{2})$', time_str)
if not match:
raise ValueError("Invalid time format")
minutes = int(match.group(1))
seconds = int(match.group(2))
if seconds >= 60:
raise ValueError("Seconds must be between 00 and 59")
return (minutes * 60 + seconds) * 1000
async def check_user_membership(update: Update, context: ContextTypes.DEFAULT_TYPE):
# ... [keep this function unchanged] ...
async def show_membership_required_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
# ... [keep this function unchanged] ...
async def process_feature_or_check_membership(update: Update, context: ContextTypes.DEFAULT_TYPE, feature_func, *args, **kwargs):
# ... [keep this function unchanged] ...
async def check_membership_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
# ... [keep this function unchanged] ...
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
# ... [keep this function unchanged] ...
async def set_language(update: Update, context: ContextTypes.DEFAULT_TYPE):
# ... [keep this function unchanged] ...
async def show_main_menu(update: Update, context: ContextTypes.DEFAULT_TYPE):
# ... [keep this function unchanged] ...
async def change_format_selected(update: Update, context: ContextTypes.DEFAULT_TYPE):
# ... [keep this function unchanged] ...
async def cut_audio_selected(update: Update, context: ContextTypes.DEFAULT_TYPE):
# ... [keep this function unchanged] ...
async def video_conversion_selected(update: Update, context: ContextTypes.DEFAULT_TYPE):
# ... [keep this function unchanged] ...
async def handle_audio(update: Update, context: ContextTypes.DEFAULT_TYPE):
# ... [keep this function unchanged] ...
async def handle_voice(update: Update, context: ContextTypes.DEFAULT_TYPE):
# ... [keep this function unchanged] ...
async def handle_cut_audio_file(update: Update, context: ContextTypes.DEFAULT_TYPE):
# ... [keep this function unchanged] ...
async def handle_cut_audio_range(update: Update, context: ContextTypes.DEFAULT_TYPE):
# ... [keep this function unchanged] ...
async def handle_video_conversion(update: Update, context: ContextTypes.DEFAULT_TYPE):
# ... [keep this function unchanged] ...
async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE):
# ... [keep this function unchanged] ...
async def error_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
# ... [keep this function unchanged] ...
async def admin_link_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
# ... [keep this function unchanged] ...
async def admin_add_channel_prompt(update: Update, context: ContextTypes.DEFAULT_TYPE):
# ... [keep this function unchanged] ...
async def admin_handle_add_channel(update: Update, context: ContextTypes.DEFAULT_TYPE):
# ... [keep this function unchanged] ...
async def admin_list_channels(update: Update, context: ContextTypes.DEFAULT_TYPE):
# ... [keep this function unchanged] ...
async def admin_handle_remove_channel(update: Update, context: ContextTypes.DEFAULT_TYPE):
# ... [keep this function unchanged] ...
app = Flask(__name__)
# مدیریت ایمن Application در محیط چندنخی
_app_lock = threading.Lock()
_application_instance = None
def get_telegram_application():
global _application_instance
with _app_lock:
if _application_instance is None:
print("Initializing new Telegram Application instance...")
_app = Application.builder().token(TOKEN).build()
conv_handler = ConversationHandler(
entry_points=[
CommandHandler("start", start),
CommandHandler("link", admin_link_command)
],
states={
# ... [keep the states unchanged] ...
},
fallbacks=[CommandHandler("cancel", cancel), CommandHandler("start", start)],
allow_reentry=True
)
_app.add_handler(conv_handler)
_app.add_error_handler(error_handler)
_application_instance = _app
print("Telegram Application initialized.")
return _application_instance
@app.route("/")
def index():
return jsonify({"status": "ok", "message": "Telegram bot is running."})
@app.route("/webhook", methods=["POST"])
def webhook():
application = get_telegram_application()
json_data = request.get_json(force=True)
update = Update.de_json(json_data, application.bot)
application.update_queue.put(update)
return "ok"
@app.route("/set_webhook", methods=["GET"])
def set_webhook_route():
webhook_url = os.getenv("WEBHOOK_URL")
if not webhook_url:
return jsonify({"status": "error", "message": "WEBHOOK_URL environment variable not set."}), 500
if not webhook_url.endswith("/webhook"):
webhook_url = f"{webhook_url.rstrip('/')}/webhook"
try:
application = get_telegram_application()
application.bot.set_webhook(url=webhook_url)
return jsonify({"status": "success", "message": f"Webhook set to {webhook_url}"})
except Exception as e:
print(f"Failed to set webhook: {e}", file=sys.stderr)
return jsonify({"status": "error", "message": f"Failed to set webhook: {e}"}), 500
def run_bot():
"""Run the bot in polling mode for development"""
print("Starting bot in polling mode...")
application = get_telegram_application()
application.run_polling()
if __name__ == "__main__":
# Run Flask app and bot in polling mode for local development
import threading
threading.Thread(target=run_bot, daemon=True).start()
app.run(host='0.0.0.0', port=int(os.environ.get("PORT", 7860)))