Spaces:
Sleeping
Sleeping
File size: 3,136 Bytes
21b6b8b e7c1dd3 c540b1a e7c1dd3 21b6b8b c540b1a 21b6b8b c540b1a 21b6b8b c540b1a 21b6b8b c540b1a d56de30 21b6b8b c540b1a e7c1dd3 c540b1a e7c1dd3 611a040 c540b1a d56de30 21b6b8b c540b1a 21b6b8b c540b1a d56de30 21b6b8b 611a040 c540b1a 611a040 21b6b8b e7c1dd3 d56de30 e7c1dd3 c540b1a 611a040 e7c1dd3 611a040 e7c1dd3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
import logging
import json
from flask import Flask, request
from telegram import Update, Bot
from telegram.ext import Application, CommandHandler, MessageHandler, ContextTypes, filters
import google.generativeai as genai
import asyncio
# ==== CONFIG ====
TELEGRAM_TOKEN = "7745816717:AAGKTpRtuPknjRAIct_2kdoANpJx3ZFztrg"
GEMINI_API_KEY = "AIzaSyCq23lcvpPfig6ifq1rmt-z11vKpMvDD4I"
# ==== LOGGING ====
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
)
logger = logging.getLogger(__name__)
# ==== GEMINI AI SETUP ====
genai.configure(api_key=GEMINI_API_KEY)
model = genai.GenerativeModel("gemini-1.5-flash")
# ==== FLASK APP ====
app = Flask(__name__)
# ==== TELEGRAM SETUP ====
bot = Bot(token=TELEGRAM_TOKEN)
application = Application.builder().token(TELEGRAM_TOKEN).build()
# ==== TELEGRAM BOT HANDLERS ====
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text("👋 Hi! I am Sumit, your AI buddy. How can I help you today?")
async def chat(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_message = update.message.text
try:
response = model.generate_content(user_message)
reply = response.text if response.text else "⚠️ I couldn't generate a reply."
except Exception as e:
logger.error(f"Gemini error: {e}")
reply = "❌ Something went wrong while generating response."
try:
await update.message.reply_text(reply)
except Exception as e:
logger.error(f"Telegram reply error: {e}")
# Add handlers
application.add_handler(CommandHandler("start", start))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, chat))
# ==== FLASK ROUTES ====
@app.route("/")
def home():
return "✅ Telegram AI Chatbot is running with webhooks!"
@app.route("/health")
def health():
return {"status": "healthy"}
@app.route(f"/{TELEGRAM_TOKEN}", methods=["POST"])
def webhook():
"""Handle incoming updates from Telegram"""
try:
update = Update.de_json(request.get_json(force=True), bot)
# Process update asynchronously
asyncio.create_task(application.process_update(update))
return "OK"
except Exception as e:
logger.error(f"Webhook error: {e}")
return "Error", 500
@app.route("/set_webhook", methods=["GET"])
def set_webhook():
"""Set up the webhook (call this once after deployment)"""
webhook_url = f"https://your-space-name-your-username.hf.space/{TELEGRAM_TOKEN}"
try:
# This will need to be called manually or you can set it via Telegram API
success = asyncio.run(bot.set_webhook(url=webhook_url))
if success:
return f"Webhook set to {webhook_url}"
else:
return "Failed to set webhook"
except Exception as e:
logger.error(f"Error setting webhook: {e}")
return f"Error: {e}"
# ==== MAIN ====
if __name__ == "__main__":
# Initialize the application
asyncio.run(application.initialize())
# Run Flask app
app.run(host="0.0.0.0", port=7860, debug=False) |