host-tg / bot.py
dragxd's picture
Update bot.py
e6d0867 verified
raw
history blame
5.71 kB
import requests
from telegram import Update, ReplyKeyboardMarkup, ReplyKeyboardRemove, InputFile
from telegram.ext import ApplicationBuilder, CommandHandler, MessageHandler, filters, ContextTypes, ConversationHandler
import os
import time
import threading
# Replace with your actual Hugging Face Space backend URL
BACKEND_URL = "https://dragxd-host.hf.space"
SELECT_REPO, UPLOAD_ENV = range(2)
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text("Send /connect to link your GitHub account.")
async def connect(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = str(update.effective_user.id)
resp = requests.get(f"{BACKEND_URL}/github/login", params={"user_id": user_id})
if resp.status_code == 200:
auth_url = resp.json()["auth_url"]
await update.message.reply_text(f"Connect your GitHub: {auth_url}\nAfter connecting, send /repos to choose a repo.")
else:
await update.message.reply_text("Failed to get GitHub auth URL.")
async def repos(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = str(update.effective_user.id)
resp = requests.get(f"{BACKEND_URL}/github/repos", params={"user_id": user_id})
if resp.status_code == 200:
repos = resp.json()
if not repos:
await update.message.reply_text("No repos found.")
return ConversationHandler.END
repo_names = [r["full_name"] for r in repos]
markup = ReplyKeyboardMarkup([[name] for name in repo_names], one_time_keyboard=True)
await update.message.reply_text("Select a repo:", reply_markup=markup)
return SELECT_REPO
else:
await update.message.reply_text("Failed to fetch repos. Make sure you are connected with GitHub.")
return ConversationHandler.END
async def select_repo(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = str(update.effective_user.id)
repo_full_name = update.message.text
resp = requests.post(f"{BACKEND_URL}/repo/select", json={"user_id": user_id, "repo_full_name": repo_full_name})
if resp.status_code == 200:
await update.message.reply_text(f"Repo selected: {repo_full_name}\nNow upload your .env file.", reply_markup=ReplyKeyboardRemove())
return UPLOAD_ENV
else:
await update.message.reply_text("Failed to select repo.", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
async def upload_env(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = str(update.effective_user.id)
if update.message.document:
file = await update.message.document.get_file()
file_bytes = await file.download_as_bytearray()
files = {"file": (update.message.document.file_name, file_bytes)}
data = {"user_id": user_id}
resp = requests.post(f"{BACKEND_URL}/env/upload", data=data, files=files)
if resp.status_code == 200:
await update.message.reply_text(".env uploaded! Now you can /deploy your project.")
else:
await update.message.reply_text("Failed to upload .env.")
else:
await update.message.reply_text("Please upload your .env file as a document.")
return ConversationHandler.END
async def deploy(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = str(update.effective_user.id)
# Trigger deployment
resp = requests.post(f"{BACKEND_URL}/deploy", json={"user_id": user_id, "github_url": ""})
if resp.status_code != 200 or "deploy_id" not in resp.json():
await update.message.reply_text("Failed to start deployment. Make sure you have selected a repo and uploaded your .env file.")
return
deploy_id = resp.json()["deploy_id"]
await update.message.reply_text(f"Deployment started! ID: {deploy_id}\nStreaming logs:")
def poll_logs():
last_line = 0
finished = False
while not finished:
try:
logs_resp = requests.get(f"{BACKEND_URL}/logs/{deploy_id}", params={"last_line": last_line})
if logs_resp.status_code == 200:
data = logs_resp.json()
logs = data.get("logs", [])
status = data.get("status", "unknown")
if logs:
for log in logs:
context.application.create_task(update.message.reply_text(log))
last_line = data.get("next_line", last_line)
if status != "deploying":
finished = True
context.application.create_task(update.message.reply_text(f"Deployment finished with status: {status}"))
break
time.sleep(2)
except Exception as e:
context.application.create_task(update.message.reply_text(f"Error streaming logs: {e}"))
break
threading.Thread(target=poll_logs, daemon=True).start()
def main():
TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN") # Set your bot token as env variable
app = ApplicationBuilder().token(TOKEN).build()
conv_handler = ConversationHandler(
entry_points=[CommandHandler("repos", repos)],
states={
SELECT_REPO: [MessageHandler(filters.TEXT & ~filters.COMMAND, select_repo)],
UPLOAD_ENV: [MessageHandler(filters.Document.ALL, upload_env)]
},
fallbacks=[]
)
app.add_handler(CommandHandler("start", start))
app.add_handler(CommandHandler("connect", connect))
app.add_handler(CommandHandler("deploy", deploy))
app.add_handler(conv_handler)
app.run_polling()
if __name__ == "__main__":
main()