TgB / main.py
Rooni's picture
Update main.py
0cf65c0 verified
import os
from collections import deque
from flask import Flask, request
import telegram
from telegram.ext import ApplicationBuilder, CommandHandler, MessageHandler, filters
import schedule
import time
from threading import Thread
from asyncio import run
BOT_TOKEN = os.environ['YOUR_BOT_TOKEN']
YOUR_USER_ID = 855890735 # Замените на ваш ID
CHANNEL_ID = os.environ['YOUR_CHANNEL_ID'] # Замените на ID канала
bot = telegram.Bot(token=BOT_TOKEN)
application = ApplicationBuilder().token(BOT_TOKEN).build()
message_queue = deque()
test_mode = False
# --- Flask app ---
app = Flask(__name__)
@app.route('/', methods=['GET'])
def home():
return "Это API сервер для Telegram бота."
@app.route('/send_message', methods=['POST'])
def send_message():
data = request.get_json()
message = data.get('message')
if message:
message_queue.append(message)
return {'status': 'ok'}
else:
return {'status': 'error', 'message': 'Missing message data'}
# --- Telegram bot handlers ---
async def handle_message(update, context):
message = update.message
if message and message.from_user:
user_id = message.from_user.id
if user_id == YOUR_USER_ID and message.chat.type == 'private':
if not message.text.startswith("/"):
if test_mode:
await forward_message(message.text)
print(f"Сообщение '{message.text}' отправлено напрямую (тестовый режим).")
else:
message_queue.append(message.text)
print(f"Добавлено сообщение '{message.text}' в очередь.")
else:
print(f"Получена команда от владельца: {message.text}")
else:
print(f"Получено сообщение от постороннего пользователя (ID: {user_id}): {message.text}")
await update.message.reply_text("Извини, но я могу общаться только с моим владельцем.")
async def start(update, context):
user_id = update.effective_user.id
if user_id == YOUR_USER_ID:
print("Владелец начал диалог.")
await update.message.reply_text("Привет!")
else:
print(f"Посторонний пользователь (ID: {user_id}) попытался начать диалог.")
await update.message.reply_text("Извини, но я могу общаться только с моим владельцем.")
async def clear_queue(update, context):
global message_queue
if update.effective_user.id == YOUR_USER_ID:
message_queue.clear()
print("Очередь сообщений очищена.")
await update.message.reply_text("Очередь сообщений очищена.")
async def toggle_test_mode(update, context):
global test_mode
if update.effective_user.id == YOUR_USER_ID:
test_mode = not test_mode
status = "включен" if test_mode else "выключен"
print(f"Тестовый режим {status}.")
await update.message.reply_text(f"Тестовый режим {status}.")
async def send_now(update, context):
global message_queue
if update.effective_user.id == YOUR_USER_ID:
if message_queue:
message = message_queue.popleft()
await forward_message(message)
message_queue.append(message) # Переносим в конец очереди
print("Отправлено первое сообщение из очереди.")
await update.message.reply_text("Отправлено первое сообщение из очереди.")
else:
print("Очередь пуста.")
await update.message.reply_text("Очередь пуста.")
async def delete_last_message(update, context):
global message_queue
if update.effective_user.id == YOUR_USER_ID:
if message_queue:
deleted_message = message_queue.pop()
print(f"Удалено сообщение '{deleted_message}' из очереди.")
await update.message.reply_text("Последнее сообщение удалено из очереди.")
else:
print("Очередь пуста, нечего удалять.")
await update.message.reply_text("Очередь пуста, нечего удалять.")
async def forward_message(message):
try:
await bot.send_message(chat_id=CHANNEL_ID, text=message)
print(f"Сообщение '{message}' успешно отправлено в канал.")
except telegram.error.BadRequest as e:
print(f"Ошибка при отправке сообщения: {e}")
# --- Планировщик ---
def send_messages_from_queue():
global message_queue
if message_queue:
message = message_queue.popleft()
run(forward_message(message))
schedule.every().day.at("00:00").do(send_messages_from_queue)
schedule.every().day.at("04:00").do(send_messages_from_queue)
schedule.every().day.at("08:00").do(send_messages_from_queue)
schedule.every().day.at("12:00").do(send_messages_from_queue)
schedule.every().day.at("16:00").do(send_messages_from_queue)
schedule.every().day.at("20:00").do(send_messages_from_queue)
def run_scheduler():
while True:
schedule.run_pending()
time.sleep(1)
# --- Регистрация обработчиков Telegram ---
start_handler = CommandHandler("start", start)
clear_handler = CommandHandler("clear", clear_queue)
test_handler = CommandHandler("test", toggle_test_mode)
send_now_handler = CommandHandler("send", send_now)
delete_last_handler = CommandHandler("del", delete_last_message)
message_handler = MessageHandler(
filters=filters.TEXT & ~filters.COMMAND, callback=handle_message
)
application.add_handler(start_handler)
application.add_handler(clear_handler)
application.add_handler(test_handler)
application.add_handler(send_now_handler)
application.add_handler(delete_last_handler)
application.add_handler(message_handler)
# --- Обработчик ошибок Telegram ---
async def error_handler(update, context):
"""Обработчик ошибок."""
print(f'Произошла ошибка: {context.error}')
application.add_error_handler(error_handler)
# --- Запуск приложения ---
if __name__ == "__main__":
scheduler_thread = Thread(target=run_scheduler)
scheduler_thread.start()
flask_thread = Thread(target=app.run, kwargs={'host': '0.0.0.0', 'port': 5000})
flask_thread.start()
application.run_polling()