host / app.py
dragxd's picture
Update app.py
1274ac5 verified
from fastapi import FastAPI, Request, HTTPException, UploadFile, File, Form, BackgroundTasks
from pydantic import BaseModel
import uuid
from fastapi.responses import RedirectResponse, PlainTextResponse
from fastapi.middleware.cors import CORSMiddleware
import os
import requests
from urllib.parse import urlencode
import aiofiles
import asyncio
import subprocess
import tempfile
import shutil
app = FastAPI()
deployments = {}
# GitHub OAuth config (replace with your own client_id and client_secret)
GITHUB_CLIENT_ID = os.getenv("GITHUB_CLIENT_ID", "your_github_client_id")
GITHUB_CLIENT_SECRET = os.getenv("GITHUB_CLIENT_SECRET", "your_github_client_secret")
GITHUB_OAUTH_REDIRECT = os.getenv("GITHUB_OAUTH_REDIRECT", "https://dragxd-host.hf.space/github/callback")
# In-memory user token storage (for demo; use DB in production)
user_tokens = {}
# In-memory user selected repo storage (for demo)
user_selected_repo = {}
# In-memory .env storage: {(user_id, repo_full_name): env_content}
env_files = {}
# In-memory log storage: {deploy_id: [log lines]}
deploy_logs = {}
# In-memory deploy status: {deploy_id: "deploying"|"success"|"failed"}
deploy_status = {}
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class DeployRequest(BaseModel):
github_url: str
user_id: str
class RepoSelectRequest(BaseModel):
user_id: str
repo_full_name: str
async def simulate_deployment(deploy_id, user_id, repo_full_name):
deploy_logs[deploy_id] = []
deploy_status[deploy_id] = "deploying"
temp_dir = tempfile.mkdtemp()
try:
# 1. Clone the repo
cmd = ["git", "clone", f"https://github.com/{repo_full_name}.git", temp_dir]
proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT)
while True:
line = await proc.stdout.readline()
if not line:
break
deploy_logs[deploy_id].append(line.decode().rstrip())
await proc.wait()
# 2. Install dependencies (if requirements.txt exists)
req_path = f"{temp_dir}/requirements.txt"
if os.path.exists(req_path):
cmd = ["pip", "install", "-r", req_path]
proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT)
while True:
line = await proc.stdout.readline()
if not line:
break
deploy_logs[deploy_id].append(line.decode().rstrip())
await proc.wait()
# 3. Set environment variables from .env (if uploaded)
env_content = env_files.get((user_id, repo_full_name))
if env_content:
env_path = f"{temp_dir}/.env"
with open(env_path, "w") as f:
f.write(env_content)
deploy_logs[deploy_id].append(".env file written.")
# 4. (Optional) Run build or start script if present
# For demo, just list files
cmd = ["ls", "-l", temp_dir]
proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT)
while True:
line = await proc.stdout.readline()
if not line:
break
deploy_logs[deploy_id].append(line.decode().rstrip())
await proc.wait()
deploy_status[deploy_id] = "success"
except Exception as e:
deploy_logs[deploy_id].append(f"Error: {e}")
deploy_status[deploy_id] = "failed"
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
@app.post("/deploy")
async def deploy(req: DeployRequest, background_tasks: BackgroundTasks):
user_id = req.user_id
repo_full_name = user_selected_repo.get(user_id)
if not repo_full_name:
return {"error": "No repo selected for user."}
deploy_id = str(uuid.uuid4())
background_tasks.add_task(simulate_deployment, deploy_id, user_id, repo_full_name)
return {"deploy_id": deploy_id}
@app.get("/status/{deploy_id}")
async def status(deploy_id: str):
if deploy_id in deployments:
return deployments[deploy_id]
return {"error": "Not found"}
@app.get("/github/login")
def github_login(user_id: str):
params = {
"client_id": GITHUB_CLIENT_ID,
"redirect_uri": GITHUB_OAUTH_REDIRECT,
"scope": "repo",
"state": user_id
}
url = f"https://github.com/login/oauth/authorize?{urlencode(params)}"
return {"auth_url": url}
@app.get("/github/callback")
def github_callback(code: str, state: str):
# state is user_id
token_url = "https://github.com/login/oauth/access_token"
headers = {"Accept": "application/json"}
data = {
"client_id": GITHUB_CLIENT_ID,
"client_secret": GITHUB_CLIENT_SECRET,
"code": code,
"redirect_uri": GITHUB_OAUTH_REDIRECT,
"state": state
}
resp = requests.post(token_url, headers=headers, data=data)
print("GitHub token response:", resp.text) # Debug print
token = resp.json().get("access_token")
if token:
user_tokens[state] = token
return RedirectResponse(url=f"https://t.me/your_bot_username?start=github_connected")
return {"error": "Failed to get token"}
@app.get("/github/repos")
def github_repos(user_id: str):
token = user_tokens.get(user_id)
if not token:
raise HTTPException(status_code=401, detail="User not authenticated with GitHub.")
headers = {"Authorization": f"token {token}", "Accept": "application/vnd.github.v3+json"}
resp = requests.get("https://api.github.com/user/repos", headers=headers)
if resp.status_code != 200:
raise HTTPException(status_code=resp.status_code, detail="Failed to fetch repos.")
repos = resp.json()
# Return only repo name and full_name for selection
return [{"name": r["name"], "full_name": r["full_name"]} for r in repos]
@app.post("/repo/select")
def select_repo(req: RepoSelectRequest):
user_selected_repo[req.user_id] = req.repo_full_name
return {"message": "Repo selected", "repo_full_name": req.repo_full_name}
@app.post("/env/upload")
async def upload_env(user_id: str = Form(...), file: UploadFile = File(...)):
content = await file.read()
# Find selected repo for user
repo_full_name = user_selected_repo.get(user_id)
if not repo_full_name:
return {"error": "No repo selected for user."}
env_files[(user_id, repo_full_name)] = content.decode()
return {"message": ".env uploaded", "repo_full_name": repo_full_name}
@app.get("/logs/{deploy_id}")
def get_logs(deploy_id: str, last_line: int = 0):
logs = deploy_logs.get(deploy_id, [])
status = deploy_status.get(deploy_id, "unknown")
# Return new log lines since last_line
return {
"logs": logs[last_line:],
"next_line": len(logs),
"status": status
}
@app.get("/logs/{deploy_id}/download")
def download_logs(deploy_id: str):
logs = deploy_logs.get(deploy_id, [])
return PlainTextResponse('\n'.join(logs), media_type='text/plain')