Spaces:
Sleeping
Sleeping
import logging | |
import json | |
from flask import Flask, request | |
from telegram import Update, Bot | |
from telegram.ext import Application, CommandHandler, MessageHandler, ContextTypes, filters | |
import google.generativeai as genai | |
import asyncio | |
# ==== CONFIG ==== | |
TELEGRAM_TOKEN = "7745816717:AAGKTpRtuPknjRAIct_2kdoANpJx3ZFztrg" | |
GEMINI_API_KEY = "AIzaSyCq23lcvpPfig6ifq1rmt-z11vKpMvDD4I" | |
# ==== LOGGING ==== | |
logging.basicConfig( | |
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO | |
) | |
logger = logging.getLogger(__name__) | |
# ==== GEMINI AI SETUP ==== | |
genai.configure(api_key=GEMINI_API_KEY) | |
model = genai.GenerativeModel("gemini-1.5-flash") | |
# ==== FLASK APP ==== | |
app = Flask(__name__) | |
# ==== TELEGRAM SETUP ==== | |
bot = Bot(token=TELEGRAM_TOKEN) | |
application = Application.builder().token(TELEGRAM_TOKEN).build() | |
# ==== TELEGRAM BOT HANDLERS ==== | |
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
await update.message.reply_text("👋 Hi! I am Sumit, your AI buddy. How can I help you today?") | |
async def chat(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
user_message = update.message.text | |
try: | |
response = model.generate_content(user_message) | |
reply = response.text if response.text else "⚠️ I couldn't generate a reply." | |
except Exception as e: | |
logger.error(f"Gemini error: {e}") | |
reply = "❌ Something went wrong while generating response." | |
try: | |
await update.message.reply_text(reply) | |
except Exception as e: | |
logger.error(f"Telegram reply error: {e}") | |
# Add handlers | |
application.add_handler(CommandHandler("start", start)) | |
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, chat)) | |
# ==== FLASK ROUTES ==== | |
def home(): | |
return "✅ Telegram AI Chatbot is running with webhooks!" | |
def health(): | |
return {"status": "healthy"} | |
def webhook(): | |
"""Handle incoming updates from Telegram""" | |
try: | |
update = Update.de_json(request.get_json(force=True), bot) | |
# Process update asynchronously | |
asyncio.create_task(application.process_update(update)) | |
return "OK" | |
except Exception as e: | |
logger.error(f"Webhook error: {e}") | |
return "Error", 500 | |
def set_webhook(): | |
"""Set up the webhook (call this once after deployment)""" | |
webhook_url = f"https://your-space-name-your-username.hf.space/{TELEGRAM_TOKEN}" | |
try: | |
# This will need to be called manually or you can set it via Telegram API | |
success = asyncio.run(bot.set_webhook(url=webhook_url)) | |
if success: | |
return f"Webhook set to {webhook_url}" | |
else: | |
return "Failed to set webhook" | |
except Exception as e: | |
logger.error(f"Error setting webhook: {e}") | |
return f"Error: {e}" | |
# ==== MAIN ==== | |
if __name__ == "__main__": | |
# Initialize the application | |
asyncio.run(application.initialize()) | |
# Run Flask app | |
app.run(host="0.0.0.0", port=7860, debug=False) |