|
from fastapi import FastAPI, Request, HTTPException, UploadFile, File, Form, BackgroundTasks |
|
from pydantic import BaseModel |
|
import uuid |
|
from fastapi.responses import RedirectResponse, PlainTextResponse |
|
from fastapi.middleware.cors import CORSMiddleware |
|
import os |
|
import requests |
|
from urllib.parse import urlencode |
|
import aiofiles |
|
import asyncio |
|
import subprocess |
|
import tempfile |
|
import shutil |
|
|
|
app = FastAPI() |
|
deployments = {} |
|
|
|
|
|
GITHUB_CLIENT_ID = os.getenv("GITHUB_CLIENT_ID", "your_github_client_id") |
|
GITHUB_CLIENT_SECRET = os.getenv("GITHUB_CLIENT_SECRET", "your_github_client_secret") |
|
GITHUB_OAUTH_REDIRECT = os.getenv("GITHUB_OAUTH_REDIRECT", "https://dragxd-host.hf.space/github/callback") |
|
|
|
|
|
user_tokens = {} |
|
|
|
|
|
user_selected_repo = {} |
|
|
|
|
|
env_files = {} |
|
|
|
|
|
deploy_logs = {} |
|
|
|
deploy_status = {} |
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
class DeployRequest(BaseModel): |
|
github_url: str |
|
user_id: str |
|
|
|
class RepoSelectRequest(BaseModel): |
|
user_id: str |
|
repo_full_name: str |
|
|
|
async def simulate_deployment(deploy_id, user_id, repo_full_name): |
|
deploy_logs[deploy_id] = [] |
|
deploy_status[deploy_id] = "deploying" |
|
temp_dir = tempfile.mkdtemp() |
|
try: |
|
|
|
cmd = ["git", "clone", f"https://github.com/{repo_full_name}.git", temp_dir] |
|
proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT) |
|
while True: |
|
line = await proc.stdout.readline() |
|
if not line: |
|
break |
|
deploy_logs[deploy_id].append(line.decode().rstrip()) |
|
await proc.wait() |
|
|
|
req_path = f"{temp_dir}/requirements.txt" |
|
if os.path.exists(req_path): |
|
cmd = ["pip", "install", "-r", req_path] |
|
proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT) |
|
while True: |
|
line = await proc.stdout.readline() |
|
if not line: |
|
break |
|
deploy_logs[deploy_id].append(line.decode().rstrip()) |
|
await proc.wait() |
|
|
|
env_content = env_files.get((user_id, repo_full_name)) |
|
if env_content: |
|
env_path = f"{temp_dir}/.env" |
|
with open(env_path, "w") as f: |
|
f.write(env_content) |
|
deploy_logs[deploy_id].append(".env file written.") |
|
|
|
|
|
cmd = ["ls", "-l", temp_dir] |
|
proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT) |
|
while True: |
|
line = await proc.stdout.readline() |
|
if not line: |
|
break |
|
deploy_logs[deploy_id].append(line.decode().rstrip()) |
|
await proc.wait() |
|
deploy_status[deploy_id] = "success" |
|
except Exception as e: |
|
deploy_logs[deploy_id].append(f"Error: {e}") |
|
deploy_status[deploy_id] = "failed" |
|
finally: |
|
shutil.rmtree(temp_dir, ignore_errors=True) |
|
|
|
@app.post("/deploy") |
|
async def deploy(req: DeployRequest, background_tasks: BackgroundTasks): |
|
user_id = req.user_id |
|
repo_full_name = user_selected_repo.get(user_id) |
|
if not repo_full_name: |
|
return {"error": "No repo selected for user."} |
|
deploy_id = str(uuid.uuid4()) |
|
background_tasks.add_task(simulate_deployment, deploy_id, user_id, repo_full_name) |
|
return {"deploy_id": deploy_id} |
|
|
|
@app.get("/status/{deploy_id}") |
|
async def status(deploy_id: str): |
|
if deploy_id in deployments: |
|
return deployments[deploy_id] |
|
return {"error": "Not found"} |
|
|
|
@app.get("/github/login") |
|
def github_login(user_id: str): |
|
params = { |
|
"client_id": GITHUB_CLIENT_ID, |
|
"redirect_uri": GITHUB_OAUTH_REDIRECT, |
|
"scope": "repo", |
|
"state": user_id |
|
} |
|
url = f"https://github.com/login/oauth/authorize?{urlencode(params)}" |
|
return {"auth_url": url} |
|
|
|
@app.get("/github/callback") |
|
def github_callback(code: str, state: str): |
|
|
|
token_url = "https://github.com/login/oauth/access_token" |
|
headers = {"Accept": "application/json"} |
|
data = { |
|
"client_id": GITHUB_CLIENT_ID, |
|
"client_secret": GITHUB_CLIENT_SECRET, |
|
"code": code, |
|
"redirect_uri": GITHUB_OAUTH_REDIRECT, |
|
"state": state |
|
} |
|
resp = requests.post(token_url, headers=headers, data=data) |
|
print("GitHub token response:", resp.text) |
|
token = resp.json().get("access_token") |
|
if token: |
|
user_tokens[state] = token |
|
return RedirectResponse(url=f"https://t.me/your_bot_username?start=github_connected") |
|
return {"error": "Failed to get token"} |
|
|
|
@app.get("/github/repos") |
|
def github_repos(user_id: str): |
|
token = user_tokens.get(user_id) |
|
if not token: |
|
raise HTTPException(status_code=401, detail="User not authenticated with GitHub.") |
|
headers = {"Authorization": f"token {token}", "Accept": "application/vnd.github.v3+json"} |
|
resp = requests.get("https://api.github.com/user/repos", headers=headers) |
|
if resp.status_code != 200: |
|
raise HTTPException(status_code=resp.status_code, detail="Failed to fetch repos.") |
|
repos = resp.json() |
|
|
|
return [{"name": r["name"], "full_name": r["full_name"]} for r in repos] |
|
|
|
@app.post("/repo/select") |
|
def select_repo(req: RepoSelectRequest): |
|
user_selected_repo[req.user_id] = req.repo_full_name |
|
return {"message": "Repo selected", "repo_full_name": req.repo_full_name} |
|
|
|
@app.post("/env/upload") |
|
async def upload_env(user_id: str = Form(...), file: UploadFile = File(...)): |
|
content = await file.read() |
|
|
|
repo_full_name = user_selected_repo.get(user_id) |
|
if not repo_full_name: |
|
return {"error": "No repo selected for user."} |
|
env_files[(user_id, repo_full_name)] = content.decode() |
|
return {"message": ".env uploaded", "repo_full_name": repo_full_name} |
|
|
|
@app.get("/logs/{deploy_id}") |
|
def get_logs(deploy_id: str, last_line: int = 0): |
|
logs = deploy_logs.get(deploy_id, []) |
|
status = deploy_status.get(deploy_id, "unknown") |
|
|
|
return { |
|
"logs": logs[last_line:], |
|
"next_line": len(logs), |
|
"status": status |
|
} |
|
|
|
@app.get("/logs/{deploy_id}/download") |
|
def download_logs(deploy_id: str): |
|
logs = deploy_logs.get(deploy_id, []) |
|
return PlainTextResponse('\n'.join(logs), media_type='text/plain') |