host-tg / bot.py
dragxd's picture
Update bot.py
85d1bc3 verified
import requests
from telegram import Update, ReplyKeyboardMarkup, ReplyKeyboardRemove, InputFile
from telegram.ext import ApplicationBuilder, CommandHandler, MessageHandler, filters, ContextTypes, ConversationHandler
import os
import time
import threading
import asyncio
import io
# Replace with your actual Hugging Face Space backend URL
BACKEND_URL = "https://dragxd-host.hf.space"
SELECT_REPO, UPLOAD_ENV = range(2)
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text("Send /connect to link your GitHub account.")
async def connect(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = str(update.effective_user.id)
resp = requests.get(f"{BACKEND_URL}/github/login", params={"user_id": user_id})
if resp.status_code == 200:
auth_url = resp.json()["auth_url"]
await update.message.reply_text(f"Connect your GitHub: {auth_url}\nAfter connecting, send /repos to choose a repo.")
else:
await update.message.reply_text("Failed to get GitHub auth URL.")
async def repos(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = str(update.effective_user.id)
resp = requests.get(f"{BACKEND_URL}/github/repos", params={"user_id": user_id})
if resp.status_code == 200:
repos = resp.json()
if not repos:
await update.message.reply_text("No repos found.")
return ConversationHandler.END
repo_names = [r["full_name"] for r in repos]
markup = ReplyKeyboardMarkup([[name] for name in repo_names], one_time_keyboard=True)
await update.message.reply_text("Select a repo:", reply_markup=markup)
return SELECT_REPO
else:
await update.message.reply_text("Failed to fetch repos. Make sure you are connected with GitHub.")
return ConversationHandler.END
async def select_repo(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = str(update.effective_user.id)
repo_full_name = update.message.text
resp = requests.post(f"{BACKEND_URL}/repo/select", json={"user_id": user_id, "repo_full_name": repo_full_name})
if resp.status_code == 200:
await update.message.reply_text(f"Repo selected: {repo_full_name}\nNow upload your .env file.", reply_markup=ReplyKeyboardRemove())
return UPLOAD_ENV
else:
await update.message.reply_text("Failed to select repo.", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
async def upload_env(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = str(update.effective_user.id)
if update.message.document:
file = await update.message.document.get_file()
file_bytes = await file.download_as_bytearray()
files = {"file": (update.message.document.file_name, file_bytes)}
data = {"user_id": user_id}
resp = requests.post(f"{BACKEND_URL}/env/upload", data=data, files=files)
if resp.status_code == 200:
await update.message.reply_text(".env uploaded! Now you can /deploy your project.")
else:
await update.message.reply_text("Failed to upload .env.")
else:
await update.message.reply_text("Please upload your .env file as a document.")
return ConversationHandler.END
async def deploy(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = str(update.effective_user.id)
# Trigger deployment
resp = requests.post(f"{BACKEND_URL}/deploy", json={"user_id": user_id, "github_url": ""})
if resp.status_code != 200 or "deploy_id" not in resp.json():
await update.message.reply_text("Failed to start deployment. Make sure you have selected a repo and uploaded your .env file.")
return
deploy_id = resp.json()["deploy_id"]
await update.message.reply_text(f"Deployment started! ID: {deploy_id}\nStreaming logs:")
async def poll_logs():
last_line = 0
finished = False
while not finished:
try:
logs_resp = requests.get(f"{BACKEND_URL}/logs/{deploy_id}", params={"last_line": last_line})
if logs_resp.status_code == 200:
data = logs_resp.json()
logs = data.get("logs", [])
status = data.get("status", "unknown")
if logs:
for log in logs:
await update.message.reply_text(log)
last_line = data.get("next_line", last_line)
if status != "deploying":
finished = True
await update.message.reply_text(f"Deployment finished with status: {status}")
# Download logs.txt and send as file
logs_file_resp = requests.get(f"{BACKEND_URL}/logs/{deploy_id}/download")
if logs_file_resp.status_code == 200:
log_bytes = io.BytesIO(logs_file_resp.content)
log_bytes.name = "logs.txt"
await update.message.reply_document(document=InputFile(log_bytes))
break
await asyncio.sleep(2)
except Exception as e:
await update.message.reply_text(f"Error streaming logs: {e}")
break
context.application.create_task(poll_logs())
def main():
TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN") # Set your bot token as env variable
app = ApplicationBuilder().token(TOKEN).build()
conv_handler = ConversationHandler(
entry_points=[CommandHandler("repos", repos)],
states={
SELECT_REPO: [MessageHandler(filters.TEXT & ~filters.COMMAND, select_repo)],
UPLOAD_ENV: [MessageHandler(filters.Document.ALL, upload_env)]
},
fallbacks=[]
)
app.add_handler(CommandHandler("start", start))
app.add_handler(CommandHandler("connect", connect))
app.add_handler(CommandHandler("deploy", deploy))
app.add_handler(conv_handler)
app.run_polling()
if __name__ == "__main__":
main()