import gradio as gr import os import requests import re import time from datetime import datetime, timezone # Constants RUNPOD_API_KEY = os.environ["RUNPOD_API_KEY"] RUNPOD_ENDPOINT = "https://api.runpod.ai/v2/vz476hgvqvzojy" USER_LOGIN = "Ammariii08" HEADERS = { "Content-Type": "application/json", "Authorization": f"Bearer {RUNPOD_API_KEY}" } def extract_file_id(url): m = re.search(r'/d/([a-zA-Z0-9_-]+)', url) or re.search(r'id=([a-zA-Z0-9_-]+)', url) return m.group(1) if m else None def to_direct_download(link): fid = extract_file_id(link) return f"https://drive.google.com/uc?export=download&id={fid}" if fid else None def make_api_request(fid): payload = {"input": {"video_url": f"https://drive.google.com/uc?export=download&id={fid}"}} resp = requests.post(f"{RUNPOD_ENDPOINT}/run", headers=HEADERS, json=payload, timeout=1800) resp.raise_for_status() return resp.json() def process_video(drive_link, progress=gr.Progress()): fid = extract_file_id(drive_link) if not fid: progress(1.0, desc="Invalid URL") return "❌ Invalid Drive URL", "", "–", "" progress(0.05, desc="Initializing…") t0 = time.time() try: job = make_api_request(fid) except Exception as e: progress(1.0, desc="Submission Failed") return f"❌ Submission error: {e}", "", "–", "" progress(0.2, desc="Job Submitted") t1 = time.time() job_id = job["id"] prog = 0.2 while True: st = requests.get(f"{RUNPOD_ENDPOINT}/status/{job_id}", headers=HEADERS).json() status = st.get("status", "UNKNOWN") if status == "COMPLETED": t2 = time.time() exec_time = t2 - t1 out = st["output"] drive_links = out.get("drive_links", []) break if status == "FAILED": progress(1.0, desc="Job Failed") return "❌ Job failed", "", "–", "" prog = min(prog + 0.1, 0.9) progress(prog, desc=f"Waiting ({status})…") time.sleep(5) progress(1.0, desc="Completed") delay = t1 - t0 details_md = f"**Job ID:** `{job_id}` \n**Status:** `{status}`\n\n**Submission Delay:** `{delay:.1f}s` \n**Execution Time:** `{exec_time:.1f}s`\n\n" # Compose all download links, one per drive links_md = "" button_md = "" for item in drive_links: drive = item.get("drive", "Unknown") web_link = item.get("webViewLink", "") dl_link = to_direct_download(web_link) or web_link links_md += f"**{drive} Drive Link:** \n`{web_link}`\n\n" # Add download button per drive button_md += ( f'' f'Open {drive} Download ' ) details_md = links_md + details_md # Optionally, show first link as the main download link primary_dl_link = drive_links[0]["webViewLink"] if drive_links else "" return "✅ Completed!", primary_dl_link, details_md, button_md def create_ui(): with gr.Blocks(theme=gr.themes.Soft()) as app: gr.Markdown(f""" # AXION 🎾 ## RUNPOD Deployment Testing """) link_in = gr.Textbox(label="Google Drive Link", placeholder="https://drive.google.com/file/d/…") btn = gr.Button("Process Video") status = gr.Textbox(label="Status", interactive=False) dl_out = gr.Textbox(label="Download Link", interactive=False) details = gr.Markdown(label="Details") open_btn = gr.Markdown() # <-- always present btn.click( fn=process_video, inputs=[link_in], outputs=[status, dl_out, details, open_btn] ) return app if __name__ == "__main__": create_ui().launch(share=True, server_name="0.0.0.0", server_port=7860)