from fastapi import FastAPI, Request, HTTPException, UploadFile, File, Form, BackgroundTasks from pydantic import BaseModel import uuid from fastapi.responses import RedirectResponse, PlainTextResponse from fastapi.middleware.cors import CORSMiddleware import os import requests from urllib.parse import urlencode import aiofiles import asyncio import subprocess import tempfile import shutil app = FastAPI() deployments = {} # GitHub OAuth config (replace with your own client_id and client_secret) GITHUB_CLIENT_ID = os.getenv("GITHUB_CLIENT_ID", "your_github_client_id") GITHUB_CLIENT_SECRET = os.getenv("GITHUB_CLIENT_SECRET", "your_github_client_secret") GITHUB_OAUTH_REDIRECT = os.getenv("GITHUB_OAUTH_REDIRECT", "https://dragxd-host.hf.space/github/callback") # In-memory user token storage (for demo; use DB in production) user_tokens = {} # In-memory user selected repo storage (for demo) user_selected_repo = {} # In-memory .env storage: {(user_id, repo_full_name): env_content} env_files = {} # In-memory log storage: {deploy_id: [log lines]} deploy_logs = {} # In-memory deploy status: {deploy_id: "deploying"|"success"|"failed"} deploy_status = {} app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class DeployRequest(BaseModel): github_url: str user_id: str class RepoSelectRequest(BaseModel): user_id: str repo_full_name: str async def simulate_deployment(deploy_id, user_id, repo_full_name): deploy_logs[deploy_id] = [] deploy_status[deploy_id] = "deploying" temp_dir = tempfile.mkdtemp() try: # 1. Clone the repo cmd = ["git", "clone", f"https://github.com/{repo_full_name}.git", temp_dir] proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT) while True: line = await proc.stdout.readline() if not line: break deploy_logs[deploy_id].append(line.decode().rstrip()) await proc.wait() # 2. Install dependencies (if requirements.txt exists) req_path = f"{temp_dir}/requirements.txt" if os.path.exists(req_path): cmd = ["pip", "install", "-r", req_path] proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT) while True: line = await proc.stdout.readline() if not line: break deploy_logs[deploy_id].append(line.decode().rstrip()) await proc.wait() # 3. Set environment variables from .env (if uploaded) env_content = env_files.get((user_id, repo_full_name)) if env_content: env_path = f"{temp_dir}/.env" with open(env_path, "w") as f: f.write(env_content) deploy_logs[deploy_id].append(".env file written.") # 4. (Optional) Run build or start script if present # For demo, just list files cmd = ["ls", "-l", temp_dir] proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT) while True: line = await proc.stdout.readline() if not line: break deploy_logs[deploy_id].append(line.decode().rstrip()) await proc.wait() deploy_status[deploy_id] = "success" except Exception as e: deploy_logs[deploy_id].append(f"Error: {e}") deploy_status[deploy_id] = "failed" finally: shutil.rmtree(temp_dir, ignore_errors=True) @app.post("/deploy") async def deploy(req: DeployRequest, background_tasks: BackgroundTasks): user_id = req.user_id repo_full_name = user_selected_repo.get(user_id) if not repo_full_name: return {"error": "No repo selected for user."} deploy_id = str(uuid.uuid4()) background_tasks.add_task(simulate_deployment, deploy_id, user_id, repo_full_name) return {"deploy_id": deploy_id} @app.get("/status/{deploy_id}") async def status(deploy_id: str): if deploy_id in deployments: return deployments[deploy_id] return {"error": "Not found"} @app.get("/github/login") def github_login(user_id: str): params = { "client_id": GITHUB_CLIENT_ID, "redirect_uri": GITHUB_OAUTH_REDIRECT, "scope": "repo", "state": user_id } url = f"https://github.com/login/oauth/authorize?{urlencode(params)}" return {"auth_url": url} @app.get("/github/callback") def github_callback(code: str, state: str): # state is user_id token_url = "https://github.com/login/oauth/access_token" headers = {"Accept": "application/json"} data = { "client_id": GITHUB_CLIENT_ID, "client_secret": GITHUB_CLIENT_SECRET, "code": code, "redirect_uri": GITHUB_OAUTH_REDIRECT, "state": state } resp = requests.post(token_url, headers=headers, data=data) print("GitHub token response:", resp.text) # Debug print token = resp.json().get("access_token") if token: user_tokens[state] = token return RedirectResponse(url=f"https://t.me/your_bot_username?start=github_connected") return {"error": "Failed to get token"} @app.get("/github/repos") def github_repos(user_id: str): token = user_tokens.get(user_id) if not token: raise HTTPException(status_code=401, detail="User not authenticated with GitHub.") headers = {"Authorization": f"token {token}", "Accept": "application/vnd.github.v3+json"} resp = requests.get("https://api.github.com/user/repos", headers=headers) if resp.status_code != 200: raise HTTPException(status_code=resp.status_code, detail="Failed to fetch repos.") repos = resp.json() # Return only repo name and full_name for selection return [{"name": r["name"], "full_name": r["full_name"]} for r in repos] @app.post("/repo/select") def select_repo(req: RepoSelectRequest): user_selected_repo[req.user_id] = req.repo_full_name return {"message": "Repo selected", "repo_full_name": req.repo_full_name} @app.post("/env/upload") async def upload_env(user_id: str = Form(...), file: UploadFile = File(...)): content = await file.read() # Find selected repo for user repo_full_name = user_selected_repo.get(user_id) if not repo_full_name: return {"error": "No repo selected for user."} env_files[(user_id, repo_full_name)] = content.decode() return {"message": ".env uploaded", "repo_full_name": repo_full_name} @app.get("/logs/{deploy_id}") def get_logs(deploy_id: str, last_line: int = 0): logs = deploy_logs.get(deploy_id, []) status = deploy_status.get(deploy_id, "unknown") # Return new log lines since last_line return { "logs": logs[last_line:], "next_line": len(logs), "status": status } @app.get("/logs/{deploy_id}/download") def download_logs(deploy_id: str): logs = deploy_logs.get(deploy_id, []) return PlainTextResponse('\n'.join(logs), media_type='text/plain')