bot / app.py
shashwatIDR's picture
Update app.py
e7c1dd3 verified
raw
history blame
3.14 kB
import logging
import json
from flask import Flask, request
from telegram import Update, Bot
from telegram.ext import Application, CommandHandler, MessageHandler, ContextTypes, filters
import google.generativeai as genai
import asyncio
# ==== CONFIG ====
TELEGRAM_TOKEN = "7745816717:AAGKTpRtuPknjRAIct_2kdoANpJx3ZFztrg"
GEMINI_API_KEY = "AIzaSyCq23lcvpPfig6ifq1rmt-z11vKpMvDD4I"
# ==== LOGGING ====
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
)
logger = logging.getLogger(__name__)
# ==== GEMINI AI SETUP ====
genai.configure(api_key=GEMINI_API_KEY)
model = genai.GenerativeModel("gemini-1.5-flash")
# ==== FLASK APP ====
app = Flask(__name__)
# ==== TELEGRAM SETUP ====
bot = Bot(token=TELEGRAM_TOKEN)
application = Application.builder().token(TELEGRAM_TOKEN).build()
# ==== TELEGRAM BOT HANDLERS ====
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text("👋 Hi! I am Sumit, your AI buddy. How can I help you today?")
async def chat(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_message = update.message.text
try:
response = model.generate_content(user_message)
reply = response.text if response.text else "⚠️ I couldn't generate a reply."
except Exception as e:
logger.error(f"Gemini error: {e}")
reply = "❌ Something went wrong while generating response."
try:
await update.message.reply_text(reply)
except Exception as e:
logger.error(f"Telegram reply error: {e}")
# Add handlers
application.add_handler(CommandHandler("start", start))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, chat))
# ==== FLASK ROUTES ====
@app.route("/")
def home():
return "✅ Telegram AI Chatbot is running with webhooks!"
@app.route("/health")
def health():
return {"status": "healthy"}
@app.route(f"/{TELEGRAM_TOKEN}", methods=["POST"])
def webhook():
"""Handle incoming updates from Telegram"""
try:
update = Update.de_json(request.get_json(force=True), bot)
# Process update asynchronously
asyncio.create_task(application.process_update(update))
return "OK"
except Exception as e:
logger.error(f"Webhook error: {e}")
return "Error", 500
@app.route("/set_webhook", methods=["GET"])
def set_webhook():
"""Set up the webhook (call this once after deployment)"""
webhook_url = f"https://your-space-name-your-username.hf.space/{TELEGRAM_TOKEN}"
try:
# This will need to be called manually or you can set it via Telegram API
success = asyncio.run(bot.set_webhook(url=webhook_url))
if success:
return f"Webhook set to {webhook_url}"
else:
return "Failed to set webhook"
except Exception as e:
logger.error(f"Error setting webhook: {e}")
return f"Error: {e}"
# ==== MAIN ====
if __name__ == "__main__":
# Initialize the application
asyncio.run(application.initialize())
# Run Flask app
app.run(host="0.0.0.0", port=7860, debug=False)