ghostvidspace / app.py
ghostai1's picture
Update app.py
7e988c5 verified
raw
history blame
62.4 kB
#!/usr/bin/env python3
# FILE: app.py
# Description: Image-to-Video generation server with Gradio UI and FastAPI for Hugging Face Spaces
# Version: 1.2.8
# Timestamp: 2025-07-01 20:41 CDT
# Author: Grok 3, built by xAI (based on GhostAI's ghostpack_gradio_f1.py)
# NOTE: Optimized for Hugging Face Spaces with H200 GPU, 25 min/day render time
# Loads models from Hugging Face Hub to avoid HDD costs
# Uses /data for persistent storage, /tmp for temporary files
# API key authentication for /generate endpoint (off-site use)
# Base64-encoded video responses
# Gradio UI matches original ghostpack_gradio_f1.py
# Idle until triggered by API or Gradio
import os
import sys
import time
import json
import argparse
import importlib.util
import subprocess
import traceback
import torch
import einops
import numpy as np
from PIL import Image
import io
import gradio as gr
import asyncio
import queue
from threading import Thread
import re
import logging
import base64
import socket
import requests
import shutil
import uuid
from fastapi import FastAPI, HTTPException, UploadFile, File, Form, Depends, Security, status
from fastapi.security import APIKeyHeader
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from diffusers import AutoencoderKLHunyuanVideo
from transformers import (
LlamaModel, CLIPTextModel, LlamaTokenizerFast, CLIPTokenizer,
SiglipImageProcessor, SiglipVisionModel
)
from diffusers_helper.hunyuan import (
encode_prompt_conds, vae_decode, vae_encode, vae_decode_fake
)
from diffusers_helper.utils import (
save_bcthw_as_mp4, crop_or_pad_yield_mask, soft_append_bcthw
)
from diffusers_helper.models.hunyuan_video_packed import HunyuanVideoTransformer3DModelPacked
from diffusers_helper.memory import (
gpu, get_cuda_free_memory_gb, move_model_to_device_with_memory_preservation,
offload_model_from_device_for_memory_preservation, fake_diffusers_current_device,
DynamicSwapInstaller, unload_complete_models, load_model_as_complete
)
from diffusers_helper.clip_vision import hf_clip_vision_encode
from diffusers_helper.bucket_tools import find_nearest_bucket
from diffusers_helper.thread_utils import AsyncStream
from diffusers_helper.gradio.progress_bar import make_progress_bar_css, make_progress_bar_html
# Optional: Colorama for colored console output
try:
from colorama import init, Fore, Style
init(autoreset=True)
COLORAMA_AVAILABLE = True
def red(s): return Fore.RED + s + Style.RESET_ALL
def green(s): return Fore.GREEN + s + Style.RESET_ALL
def yellow(s): return Fore.YELLOW + s + Style.RESET_ALL
def reset_all(s): return Style.RESET_ALL + s
except ImportError:
COLORAMA_AVAILABLE = False
def red(s): return s
def green(s): return s
def yellow(s): return s
def reset_all(s): return s
# Set up logging
logging.basicConfig(
filename="/data/ghostpack.log",
level=logging.DEBUG,
format="%(asctime)s %(levelname)s:%(message)s",
)
logger = logging.getLogger(__name__)
logger.info("Starting GhostPack F1 Pro")
print(f"{green('Using /data/video_info.json for metadata')}")
VERSION = "1.2.8"
HF_TOKEN = os.getenv('HF_TOKEN', 'your-hf-token') # Set in Spaces secrets
API_KEY_NAME = "X-API-Key"
API_KEY = os.getenv('API_KEY', 'your-secret-key') # Set in Spaces secrets
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False)
# Global job registry
active_jobs = {} # {job_id: AsyncStream}
job_status = {} # {job_id: {"status": str, "progress": float, "render_time": float}}
# CLI
parser = argparse.ArgumentParser(description="GhostPack F1 Pro")
parser.add_argument("--share", action="store_true", help="Share Gradio UI publicly")
parser.add_argument("--server", type=str, default="0.0.0.0", help="Server host")
parser.add_argument("--port", type=int, default=7860, help="FastAPI port")
parser.add_argument("--gradio", action="store_true", help="Enable Gradio UI")
parser.add_argument("--inbrowser", action="store_true", help="Open in browser")
parser.add_argument("--cli", action="store_true", help="Show CLI help")
args = parser.parse_args()
# Global state
render_on_off = True
BASE = os.path.abspath(os.path.dirname(__file__))
os.environ["HF_HOME"] = "/tmp/hf_cache" # Cache models in /tmp
# Check if ports are available
def is_port_in_use(port):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex(('0.0.0.0', port)) == 0
if args.cli:
print(f"{green('πŸ‘» GhostPack F1 Pro CLI')}")
print("python app.py # Launch API")
print("python app.py --gradio # Launch API + Gradio UI")
print("python app.py --cli # Show help")
sys.exit(0)
# Paths
DATA_DIR = "/data"
TMP_DIR = "/tmp/ghostpack"
VIDEO_OUTPUT_DIR = "/tmp/ghostpack/vid"
VIDEO_IMG_DIR = "/tmp/ghostpack/img"
VIDEO_TMP_DIR = "/tmp/ghostpack/tmp_vid"
VIDEO_INFO_FILE = "/data/video_info.json"
PROMPT_LOG_FILE = "/data/prompts.txt"
SAVED_PROMPTS_FILE = "/data/saved_prompts.json"
INSTALL_LOG_FILE = "/data/install_logs.txt"
LAST_CLEANUP_FILE = "/data/last_cleanup.txt"
# Initialize directories
for d in (DATA_DIR, TMP_DIR, VIDEO_OUTPUT_DIR, VIDEO_IMG_DIR, VIDEO_TMP_DIR):
if not os.path.exists(d):
try:
os.makedirs(d, exist_ok=True)
os.chmod(d, 0o775)
logger.debug(f"Created {d}")
except Exception as e:
logger.error(f"Failed to create {d}: {e}")
print(f"{red(f'Error: Failed to create {d}: {e}')}")
sys.exit(1)
# Initialize files
for f in (VIDEO_INFO_FILE, SAVED_PROMPTS_FILE, PROMPT_LOG_FILE, INSTALL_LOG_FILE, LAST_CLEANUP_FILE):
if not os.path.exists(f):
try:
if f == LAST_CLEANUP_FILE:
with open(f, "w") as fd:
fd.write(str(time.time()))
elif f in (VIDEO_INFO_FILE, SAVED_PROMPTS_FILE):
with open(f, "w") as fd:
json.dump([], fd)
else:
open(f, "w").close()
os.chmod(f, 0o664)
logger.debug(f"Created {f}")
except Exception as e:
logger.error(f"Failed to create/chmod {f}: {e}")
print(f"{red(f'Error: Failed to create/chmod {f}: {e}')}")
sys.exit(1)
# Clear VIDEO_INFO_FILE on startup
try:
with open(VIDEO_INFO_FILE, "w") as f:
json.dump([], f)
os.chmod(VIDEO_INFO_FILE, 0o664)
logger.debug(f"Cleared {VIDEO_INFO_FILE}")
except Exception as e:
logger.error(f"Failed to clear {VIDEO_INFO_FILE}: {e}")
print(f"{red(f'Error: Failed to clear {VIDEO_INFO_FILE}: {e}')}")
sys.exit(1)
# Queue clearing utility
def clear_queue(q):
try:
while True:
if hasattr(q, "get_nowait"):
q.get_nowait()
else:
break
except queue.Empty:
pass
# Prompt utilities
def get_last_prompts():
try:
return json.load(open(SAVED_PROMPTS_FILE))[-5:][::-1]
except Exception as e:
logger.error(f"Failed to load prompts from {SAVED_PROMPTS_FILE}: {e}")
print(f"{red(f'Error: Failed to load prompts: {e}')}")
return []
def save_prompt_fn(prompt, n_p):
if not prompt:
return f"{red('❌ No prompt')}"
try:
data = json.load(open(SAVED_PROMPTS_FILE))
entry = {"prompt": prompt, "negative": n_p}
if entry not in data:
data.append(entry)
with open(SAVED_PROMPTS_FILE, "w") as f:
json.dump(data, f, indent=2)
os.chmod(SAVED_PROMPTS_FILE, 0o664)
return f"{green('βœ… Saved')}"
except Exception as e:
logger.error(f"Failed to save prompt to {SAVED_PROMPTS_FILE}: {e}")
print(f"{red(f'Error: Failed to save prompt: {e}')}")
return f"{red('❌ Save failed')}"
def load_prompt_fn(idx):
lst = get_last_prompts()
return lst[idx]["prompt"] if idx < len(lst) else ""
# Cleanup utilities
def clear_temp_videos():
try:
for f in os.listdir(VIDEO_TMP_DIR):
os.remove(os.path.join(VIDEO_TMP_DIR, f))
return f"{green('βœ… Temp cleared')}"
except Exception as e:
logger.error(f"Failed to clear temp videos in {VIDEO_TMP_DIR}: {e}")
print(f"{red(f'Error: Failed to clear temp videos: {e}')}")
return f"{red('❌ Clear failed')}"
def clear_old_files():
cutoff = time.time() - 7 * 24 * 3600
c = 0
try:
for d in (VIDEO_TMP_DIR, VIDEO_IMG_DIR, VIDEO_OUTPUT_DIR):
for f in os.listdir(d):
p = os.path.join(d, f)
if os.path.isfile(p) and os.path.getmtime(p) < cutoff:
os.remove(p)
c += 1
with open(LAST_CLEANUP_FILE, "w") as f:
f.write(str(time.time()))
os.chmod(LAST_CLEANUP_FILE, 0o664)
return f"{green(f'βœ… {c} old files removed')}"
except Exception as e:
logger.error(f"Failed to clear old files: {e}")
print(f"{red(f'Error: Failed to clear old files: {e}')}")
return f"{red('❌ Clear failed')}"
def clear_images():
try:
for f in os.listdir(VIDEO_IMG_DIR):
os.remove(os.path.join(VIDEO_IMG_DIR, f))
return f"{green('βœ… Images cleared')}"
except Exception as e:
logger.error(f"Failed to clear images in {VIDEO_IMG_DIR}: {e}")
print(f"{red(f'Error: Failed to clear images: {e}')}")
return f"{red('❌ Clear failed')}"
def clear_videos():
try:
for f in os.listdir(VIDEO_OUTPUT_DIR):
os.remove(os.path.join(VIDEO_OUTPUT_DIR, f))
return f"{green('βœ… Videos cleared')}"
except Exception as e:
logger.error(f"Failed to clear videos in {VIDEO_OUTPUT_DIR}: {e}")
print(f"{red(f'Error: Failed to clear videos: {e}')}")
return f"{red('❌ Clear failed')}"
def check_and_run_weekly_cleanup():
try:
with open(LAST_CLEANUP_FILE, "r") as f:
last_cleanup = float(f.read().strip())
except (FileNotFoundError, ValueError):
last_cleanup = 0
if time.time() - last_cleanup > 7 * 24 * 3600:
return clear_old_files()
return ""
# Video metadata utilities
def save_video_info(prompt, n_p, filename, seed, secs, additional_info, completed=False):
if not completed:
return
try:
video_info = json.load(open(VIDEO_INFO_FILE))
except (FileNotFoundError, json.JSONDecodeError):
video_info = []
entry = {
"prompt": prompt or "",
"negative_prompt": n_p or "",
"filename": filename,
"location": os.path.join(VIDEO_OUTPUT_DIR, filename),
"seed": seed,
"duration_secs": secs,
"timestamp": time.strftime("%Y%m%d_%H%M%S"),
"completed": completed,
"additional_info": additional_info or {},
}
video_info.append(entry)
try:
with open(VIDEO_INFO_FILE, "w") as f:
json.dump(video_info, f, indent=2)
os.chmod(VIDEO_INFO_FILE, 0o664)
logger.debug(f"Saved video info to {VIDEO_INFO_FILE}")
except Exception as e:
logger.error(f"Failed to save video info to {VIDEO_INFO_FILE}: {e}")
print(f"{red(f'Error: Failed to save video info to {VIDEO_INFO_FILE}: {e}')}")
raise
# Gallery helpers
def list_images():
return sorted(
[os.path.join(VIDEO_IMG_DIR, f) for f in os.listdir(VIDEO_IMG_DIR) if f.lower().endswith((".png", ".jpg"))],
key=os.path.getmtime,
)
def list_videos():
return sorted(
[os.path.join(VIDEO_OUTPUT_DIR, f) for f in os.listdir(VIDEO_OUTPUT_DIR) if f.lower().endswith(".mp4")],
key=os.path.getmtime,
)
def load_image(sel):
imgs = list_images()
if sel in [os.path.basename(p) for p in imgs]:
pth = imgs[[os.path.basename(p) for p in imgs].index(sel)]
return gr.update(value=pth), gr.update(value=os.path.basename(pth))
return gr.update(), gr.update()
def load_video(sel):
vids = list_videos()
if sel in [os.path.basename(p) for p in vids]:
pth = vids[[os.path.basename(p) for p in vids].index(sel)]
return gr.update(value=pth), gr.update(value=os.path.basename(pth))
return gr.update(), gr.update()
def next_image_and_load(sel):
imgs = list_images()
if not imgs:
return gr.update(), gr.update()
names = [os.path.basename(i) for i in imgs]
idx = (names.index(sel) + 1) % len(names) if sel in names else 0
pth = imgs[idx]
return gr.update(value=pth), gr.update(value=os.path.basename(pth))
def next_video_and_load(sel):
vids = list_videos()
if not vids:
return gr.update(), gr.update()
names = [os.path.basename(v) for v in vids]
idx = (names.index(sel) + 1) % len(names) if sel in names else 0
pth = vids[idx]
return gr.update(value=pth), gr.update(value=os.path.basename(pth))
def gallery_image_select(evt: gr.SelectData):
imgs = list_images()
if evt.index is not None and evt.index < len(imgs):
pth = imgs[evt.index]
return gr.update(value=pth), gr.update(value=os.path.basename(pth))
return gr.update(), gr.update()
def gallery_video_select(evt: gr.SelectData):
vids = list_videos()
if evt.index is not None and evt.index < len(vids):
pth = vids[evt.index]
return gr.update(value=pth), gr.update(value=os.path.basename(pth))
return gr.update(), gr.update()
# Install status
def check_mod(n):
return importlib.util.find_spec(n) is not None
def status_xformers():
print(f"{green('βœ… Xformers is installed!')}" if check_mod("xformers") else f"{red('❌ Xformers is not installed!')}")
return f"{green('βœ… xformers')}" if check_mod("xformers") else f"{red('❌ xformers')}"
def status_sage():
print(f"{green('βœ… Sage Attn is installed!')}" if check_mod("sageattention") else f"{red('❌ Sage Attn is not installed!')}")
return f"{green('βœ… sage-attn')}" if check_mod("sageattention") else f"{red('❌ sage-attn')}"
def status_flash():
print(f"{yellow('⚠️ Flash Attn is not installed, performance may be reduced!')}" if not check_mod("flash_attn") else f"{green('βœ… Flash Attn is installed!')}")
return f"{yellow('⚠️ flash-attn')}" if not check_mod("flash_attn") else f"{green('βœ… flash-attn')}"
def status_colorama():
return f"{green('βœ… colorama')}" if COLORAMA_AVAILABLE else f"{red('❌ colorama')}"
def install_pkg(pkg, warn=None):
if warn:
print(f"{yellow(warn)}")
time.sleep(1)
try:
out = subprocess.check_output(
[sys.executable, "-m", "pip", "install", pkg], stderr=subprocess.STDOUT, text=True
)
res = f"{green(f'βœ… {pkg}')}\n{out}\n"
except subprocess.CalledProcessError as e:
res = f"{red(f'❌ {pkg}')}\n{e.output}\n"
with open(INSTALL_LOG_FILE, "a") as f:
f.write(f"[{pkg}] {res}")
return res
install_xformers = lambda: install_pkg("xformers")
install_sage_attn = lambda: install_pkg("sage-attn")
install_flash_attn = lambda: install_pkg("flash-attn", "⚠️ long compile, optional for performance")
install_colorama = lambda: install_pkg("colorama")
refresh_logs = lambda: open(INSTALL_LOG_FILE).read()
clear_logs = lambda: open(INSTALL_LOG_FILE, "w").close() or f"{green('βœ… Logs cleared')}"
# Model load
free_mem = get_cuda_free_memory_gb(gpu)
hv = free_mem > 60
logger.info(f"VRAM available: {free_mem:.2f} GB, High VRAM mode: {hv}")
print(f"{yellow(f'VRAM available: {free_mem:.2f} GB, High VRAM mode: {hv}')}")
try:
print(f"{yellow('Loading models...')}")
text_encoder = LlamaModel.from_pretrained(
"hunyuanvideo-community/HunyuanVideo", subfolder="text_encoder", torch_dtype=torch.float16, token=HF_TOKEN, cache_dir="/tmp/hf_cache"
).cpu().eval()
text_encoder_2 = CLIPTextModel.from_pretrained(
"hunyuanvideo-community/HunyuanVideo", subfolder="text_encoder_2", torch_dtype=torch.float16, token=HF_TOKEN, cache_dir="/tmp/hf_cache"
).cpu().eval()
tokenizer = LlamaTokenizerFast.from_pretrained(
"hunyuanvideo-community/HunyuanVideo", subfolder="tokenizer", token=HF_TOKEN, cache_dir="/tmp/hf_cache"
)
tokenizer_2 = CLIPTokenizer.from_pretrained(
"hunyuanvideo-community/HunyuanVideo", subfolder="tokenizer_2", token=HF_TOKEN, cache_dir="/tmp/hf_cache"
)
vae = AutoencoderKLHunyuanVideo.from_pretrained(
"hunyuanvideo-community/HunyuanVideo", subfolder="vae", torch_dtype=torch.float16, token=HF_TOKEN, cache_dir="/tmp/hf_cache"
).cpu().eval()
feature_extractor = SiglipImageProcessor.from_pretrained(
"lllyasviel/flux_redux_bfl", subfolder="feature_extractor", token=HF_TOKEN, cache_dir="/tmp/hf_cache"
)
image_encoder = SiglipVisionModel.from_pretrained(
"lllyasviel/flux_redux_bfl", subfolder="image_encoder", torch_dtype=torch.float16, token=HF_TOKEN, cache_dir="/tmp/hf_cache"
).cpu().eval()
transformer = HunyuanVideoTransformer3DModelPacked.from_pretrained(
"lllyasviel/FramePack_F1_I2V_HY_20250503", torch_dtype=torch.bfloat16, token=HF_TOKEN, cache_dir="/tmp/hf_cache"
).cpu().eval()
logger.info("Models loaded successfully")
print(f"{green('Models loaded successfully')}")
except Exception as e:
logger.error(f"Failed to load models: {e}", exc_info=True)
print(f"{red(f'Error: Failed to load models: {e}')}")
raise
if not hv:
vae.enable_slicing()
vae.enable_tiling()
transformer.high_quality_fp32_output_for_inference = True
transformer.to(dtype=torch.bfloat16)
for m in (vae, image_encoder, text_encoder, text_encoder_2):
m.to(dtype=torch.float16)
for m in (vae, image_encoder, text_encoder, text_encoder_2, transformer):
m.requires_grad_(False)
if not hv:
DynamicSwapInstaller.install_model(transformer, device=gpu)
DynamicSwapInstaller.install_model(text_encoder, device=gpu)
else:
for m in (vae, image_encoder, text_encoder, text_encoder_2, transformer):
m.to(gpu)
logger.debug("Models configured and moved to device")
print(f"{green('Models configured and moved to device')}")
# FastAPI Setup
app = FastAPI(title="GhostPack F1 Pro API")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
async def verify_api_key(api_key: str = Security(api_key_header)):
if api_key != API_KEY:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key"
)
return api_key
class GenerateRequest(BaseModel):
prompt: str
negative_prompt: str
seed: int
video_length: float
latent_window: int
steps: int
cfg: float
distilled_cfg: float
cfg_rescale: float
gpu_keep: float
crf: int
use_teacache: bool
camera_action: str
disable_prompt_mods: bool
link_steps_window: bool
@app.get("/health")
async def health_check():
try:
return JSONResponse(content={"status": "healthy"})
except Exception as e:
logger.error(f"Health check failed: {e}", exc_info=True)
return JSONResponse(content={"error": str(e), "status": "error"}, status_code=500)
@app.get("/test")
async def test_server():
try:
report = {
"server_status": {
"version": VERSION,
"host": args.server,
"port": args.port,
"uptime": time.time() - time.time() if job_status else 0,
"active_jobs": len(active_jobs),
"api_status": "running",
},
"system": {
"vram_total": free_mem,
"vram_free": get_cuda_free_memory_gb(gpu),
"high_vram_mode": hv,
"cuda_available": torch.cuda.is_available(),
"cuda_device": torch.cuda.get_device_name(gpu) if torch.cuda.is_available() else "N/A",
},
"models": {
"text_encoder": text_encoder is not None,
"text_encoder_2": text_encoder_2 is not None,
"vae": vae is not None,
"image_encoder": image_encoder is not None,
"transformer": transformer is not None,
"tokenizer": tokenizer is not None,
"tokenizer_2": tokenizer_2 is not None,
"feature_extractor": feature_extractor is not None,
},
"paths": {
"base": BASE,
"images": VIDEO_IMG_DIR,
"videos": VIDEO_OUTPUT_DIR,
"temp": VIDEO_TMP_DIR,
"data": DATA_DIR,
"prompt_log": PROMPT_LOG_FILE,
"saved_prompts": SAVED_PROMPTS_FILE,
"install_log": INSTALL_LOG_FILE,
"video_info": VIDEO_INFO_FILE,
},
"file_system": {
"images_writable": os.access(VIDEO_IMG_DIR, os.W_OK),
"videos_writable": os.access(VIDEO_OUTPUT_DIR, os.W_OK),
"temp_writable": os.access(VIDEO_TMP_DIR, os.W_OK),
"data_writable": os.access(DATA_DIR, os.W_OK),
},
"dependencies": {
"xformers": status_xformers(),
"sage_attn": status_sage(),
"flash_attn": status_flash(),
"colorama": status_colorama(),
},
"health_check": {"status": "pass", "details": ""}
}
try:
dummy_img = np.zeros((64, 64, 3), dtype=np.uint8)
img_pt = (torch.from_numpy(dummy_img).float() / 127.5 - 1).permute(2, 0, 1)[None, :, None]
if not hv:
load_model_as_complete(vae, gpu)
_ = vae_encode(img_pt, vae)
report["health_check"]["status"] = "pass"
except Exception as e:
report["health_check"]["status"] = "fail"
report["health_check"]["details"] = str(e)
logger.error(f"Health check failed: {e}", exc_info=True)
logger.info("Test endpoint accessed successfully")
print(f"{green(f'Test endpoint accessed: API running on {args.server}:{args.port}')}")
return JSONResponse(content=report)
except Exception as e:
logger.error(f"Test endpoint error: {e}", exc_info=True)
print(f"{red(f'Test endpoint error: {e}')}")
return JSONResponse(
content={"error": str(e), "status": "fail"},
status_code=500
)
@app.get("/status/{job_id}")
async def get_status(job_id: str, api_key: str = Depends(verify_api_key)):
try:
status = job_status.get(job_id, {"status": "not_found", "progress": 0.0, "render_time": 0})
return JSONResponse(
content={
"job_id": job_id,
"render_status": status["status"],
"render_progress": status["progress"],
"render_time": status["render_time"],
"active_jobs": len(active_jobs),
"api_status": "running",
}
)
except Exception as e:
logger.error(f"Status check failed for job {job_id}: {e}", exc_info=True)
return JSONResponse(
content={"error": str(e), "job_id": job_id, "status": "error"},
status_code=500
)
@app.post("/stop/{job_id}")
async def stop_render(job_id: str, api_key: str = Depends(verify_api_key)):
if job_id not in active_jobs:
logger.info(f"No active job {job_id} to stop")
print(f"{yellow(f'No active job {job_id} to stop')}")
return JSONResponse(content={"message": f"No active job {job_id}"})
stream = active_jobs[job_id]
stream.stop()
active_jobs.pop(job_id, None)
job_status[job_id]["status"] = "stopped"
job_status[jid]["progress"] = 0.0
logger.info(f"Stopped job {job_id}")
print(f"{yellow(f'Stopped job {job_id}')}")
return JSONResponse(content={"message": f"Job {job_id} stopped"})
@app.get("/videos")
async def get_videos(api_key: str = Depends(verify_api_key)):
try:
videos = [f for f in os.listdir(VIDEO_OUTPUT_DIR) if f.lower().endswith(".mp4")]
return JSONResponse(content={"status": "success", "videos": videos})
except Exception as e:
logger.error(f"Failed to list videos: {e}", exc_info=True)
return JSONResponse(content={"error": str(e), "status": "error"}, status_code=500)
@app.post("/generate")
async def generate_video(
image_file: UploadFile = File(...),
prompt: str = Form(""),
negative_prompt: str = Form(""),
seed: int = Form(31337),
video_length: float = Form(8.0),
latent_window: int = Form(3),
steps: int = Form(12),
cfg: float = Form(1.0),
distilled_cfg: float = Form(7.0),
cfg_rescale: float = Form(0.5),
gpu_keep: float = Form(6.0),
crf: int = Form(20),
use_teacache: bool = Form(True),
camera_action: str = Form("Static Camera"),
disable_prompt_mods: bool = Form(False),
link_steps_window: bool = Form(True),
api_key: str = Depends(verify_api_key)
):
params = {
"prompt": prompt,
"negative_prompt": negative_prompt,
"seed": seed,
"video_length": video_length,
"latent_window": latent_window,
"steps": steps,
"cfg": cfg,
"distilled_cfg": distilled_cfg,
"cfg_rescale": cfg_rescale,
"gpu_keep": gpu_keep,
"crf": crf,
"use_teacache": use_teacache,
"camera_action": camera_action,
"disable_prompt_mods": disable_prompt_mods,
"link_steps_window": link_steps_window
}
logger.info(f"Received /generate request with parameters: {json.dumps(params, indent=2)}")
print(f"{green(f'API: Received /generate request with parameters: {json.dumps(params, indent=2)}')}")
if not render_on_off:
logger.info("Render disabled by client")
print(f"{red('API: Render disabled by client')}")
return JSONResponse(content={"status": "render_disabled", "error": "Rendering disabled"}, status_code=403)
jid = str(uuid.uuid4())
logger.info(f"Starting job {jid} with prompt: {prompt}")
print(f"{green(f'API: Starting job ID: {jid}')}")
stream = AsyncStream()
active_jobs[jid] = stream
job_status[jid] = {"status": "rendering", "progress": 0.0, "render_time": 0}
try:
logger.debug("Processing uploaded image file")
print(f"{yellow('API: Processing uploaded image file')}")
img_data = await image_file.read()
if not img_data:
logger.error("Empty image file")
print(f"{red('API: Empty image file')}")
raise HTTPException(status_code=400, detail="Empty image file")
try:
img = Image.open(io.BytesIO(img_data)).convert('RGB')
img_np = np.array(img)
if img_np.shape[0] < 64 or img_np.shape[1] < 64:
logger.error("Image dimensions too small")
print(f"{red('API: Image dimensions too small (minimum 64x64)')}")
raise HTTPException(status_code=400, detail="Image dimensions must be at least 64x64")
except Exception as e:
logger.error(f"Invalid image: {str(e)}")
print(f"{red(f'API: Invalid image: {str(e)}')}")
raise HTTPException(status_code=400, detail=f"Invalid image: {str(e)}")
if get_cuda_free_memory_gb(gpu) < 2:
logger.error("Insufficient VRAM for processing")
print(f"{red('API: Insufficient VRAM (<2GB). Lower gpu_keep or latent_window.')}")
raise HTTPException(status_code=500, detail="Low VRAM (<2GB). Lower 'gpu_keep' or 'latent_window'.")
logger.info(f"Passing to worker: seed={seed}, video_length={video_length}, latent_window={latent_window}, steps={steps}, cfg={cfg}, distilled_cfg={distilled_cfg}")
print(f"{yellow(f'API: Passing to worker: seed={seed}, video_length={video_length}, latent_window={latent_window}, steps={steps}, cfg={cfg}, distilled_cfg={distilled_cfg}')}")
final_video_path = worker(
img_np=img_np,
prompt=prompt,
negative_prompt=negative_prompt,
seed=seed,
secs=video_length,
win=latent_window,
stp=steps,
cfg=cfg,
gsc=distilled_cfg,
rsc=cfg_rescale,
keep=gpu_keep,
tea=use_teacache,
crf=crf,
camera_action=camera_action,
disable_prompt_mods=disable_prompt_mods,
link_steps_window=link_steps_window,
stream=stream,
jid=jid
)
if final_video_path is None:
logger.error("Render stopped or failed")
print(f"{red('API: Render stopped or failed')}")
raise HTTPException(status_code=500, detail="Render stopped or failed")
final_filename = os.path.basename(final_video_path)
with open(final_video_path, "rb") as f:
video_data = base64.b64encode(f.read()).decode("utf-8")
save_video_info(
prompt=prompt,
n_p=negative_prompt,
filename=final_filename,
seed=seed,
secs=video_length,
additional_info={"camera_action": camera_action, "job_id": jid},
completed=True
)
response_info = {
"status": "success",
"job_id": jid,
"video_data": video_data,
"metadata": {
"prompt": prompt,
"negative_prompt": negative_prompt,
"seed": seed,
"duration_secs": video_length,
"timestamp": time.strftime("%Y%m%d_%H%M%S"),
"render_time_secs": job_status[jid]["render_time"],
"camera_action": camera_action,
"latent_window": latent_window,
"steps": steps,
"cfg": cfg,
"distilled_cfg": distilled_cfg,
"cfg_rescale": cfg_rescale,
"gpu_keep": gpu_keep,
"crf": crf,
"use_teacache": use_teacache,
"disable_prompt_mods": disable_prompt_mods,
"link_steps_window": link_steps_window
}
}
logger.info(f"Video generated: {final_video_path}")
print(f"{green(f'API: Video generated: {final_video_path}')}")
return JSONResponse(content=response_info)
except Exception as e:
logger.error(f"Generate failed: {e}", exc_info=True)
print(f"{red(f'API: Error during /generate: {str(e)}')}")
job_status[jid]["status"] = "error"
job_status[jid]["progress"] = 0.0
stream.output_queue.push(("end", str(e)))
return JSONResponse(
content={"error": str(e), "job_id": jid, "status": "error"},
status_code=500
)
finally:
active_jobs.pop(jid, None)
clear_queue(stream.input_queue)
clear_queue(stream.output_queue)
if job_status.get(jid, {}).get("status") not in ["complete", "error", "stopped"]:
job_status[jid]["status"] = "complete"
torch.cuda.empty_cache()
@torch.no_grad()
def worker(img_np, prompt, negative_prompt, seed, secs, win, stp, cfg, gsc, rsc, keep, tea, crf, camera_action, disable_prompt_mods, link_steps_window, stream, jid):
start_time = time.time()
job_status[jid] = {"status": "rendering", "progress": 0.0, "render_time": 0}
max_sections = 100
logger.info(f"Worker started for job {jid} with secs={secs}, win={win}, cfg={cfg}, distilled_cfg={gsc}")
print(f"{green(f'API: Starting video generation, job ID: {jid}, secs={secs}, win={win}, cfg={cfg}, distilled_cfg={gsc}')}")
try:
if img_np.shape[0] < 64 or img_np.shape[1] < 64:
raise ValueError("Image dimensions too small (minimum 64x64)")
if secs > 10:
logger.warning("Video length > 10s capped at 10s")
print(f"{yellow('API: Video length > 10s capped at 10s')}")
secs = min(secs, 10)
if win > 10:
logger.warning("Latent window > 10 capped at 10")
print(f"{yellow('API: Latent window > 10 capped at 10')}")
win = min(win, 10)
if get_cuda_free_memory_gb(gpu) < 2:
raise ValueError("Low VRAM (<2GB). Lower 'gpu_keep' or 'latent_window'.")
try:
if hasattr(stream.input_queue, "qsize") and stream.input_queue.qsize() > 0:
if stream.input_queue.get_nowait() == "end":
stream.output_queue.push(("end", "Job stopped by client"))
job_status[jid]["status"] = "stopped"
return None
except queue.Empty:
pass
if not disable_prompt_mods:
if "stop" not in prompt.lower() and secs > 3:
prompt += " The subject stops moving after 3 seconds."
if "smooth" not in prompt.lower():
prompt = f"Smooth animation: {prompt}"
if "silent" not in prompt.lower():
prompt += ", silent"
prompt = update_prompt(prompt, camera_action)
if len(prompt.split()) > 50:
logger.warning("Complex prompt may slow rendering")
print(f"{yellow('API: Warning: Complex prompt may slow rendering')}")
try:
with open(PROMPT_LOG_FILE, "a") as f:
f.write(f"{jid}\t{prompt}\t{negative_prompt}\n")
os.chmod(PROMPT_LOG_FILE, 0o664)
except Exception as e:
logger.error(f"Failed to write to {PROMPT_LOG_FILE}: {e}")
print(f"{red(f'API: Failed to write prompt log: {e}')}")
raise
stream.output_queue.push(('progress', (None, "", make_progress_bar_html(0, "Start"))))
if not hv:
unload_complete_models(text_encoder, text_encoder_2, image_encoder, vae, transformer)
fake_diffusers_current_device(text_encoder, gpu)
load_model_as_complete(text_encoder_2, gpu)
lv, cp = encode_prompt_conds(prompt, text_encoder, text_encoder_2, tokenizer, tokenizer_2)
if cfg == 1:
lv_n = torch.zeros_like(lv)
cp_n = torch.zeros_like(cp)
else:
lv_n, cp_n = encode_prompt_conds(negative_prompt, text_encoder, text_encoder_2, tokenizer, tokenizer_2)
lv, m = crop_or_pad_yield_mask(lv, 512)
lv_n, m_n = crop_or_pad_yield_mask(lv_n, 512)
lv, cp, lv_n, cp_n = [x.to(torch.bfloat16) for x in (lv, cp, lv_n, cp_n)]
logger.debug(f"Prompt embeddings: lv={lv.shape}, cp={cp.shape}, lv_n={lv_n.shape}, cp_n={cp_n.shape}")
torch.cuda.empty_cache()
H, W, _ = img_np.shape
h, w = H, W
img_filename = f"{jid}.png"
try:
Image.fromarray(img_np).save(os.path.join(VIDEO_IMG_DIR, img_filename))
os.chmod(os.path.join(VIDEO_IMG_DIR, img_filename), 0o664)
except Exception as e:
logger.error(f"Failed to save image {img_filename}: {e}")
print(f"{red(f'API: Failed to save image: {e}')}")
raise
img_pt = (torch.from_numpy(img_np).float() / 127.5 - 1).permute(2, 0, 1)[None, :, None]
logger.debug(f"Image tensor shape: {img_pt.shape}")
if not hv:
load_model_as_complete(vae, gpu)
start_lat = vae_encode(img_pt, vae)
logger.debug(f"VAE encoded latent shape: {start_lat.shape}")
if not hv:
load_model_as_complete(image_encoder, gpu)
img_emb = hf_clip_vision_encode(img_np, feature_extractor, image_encoder).last_hidden_state.to(torch.bfloat16)
logger.debug(f"Image embedding shape: {img_emb.shape}")
torch.cuda.empty_cache()
gen = torch.Generator("cpu").manual_seed(seed)
sections = max(round((secs * 30) / (win * 4)), 1)
if sections > max_sections:
logger.error(f"Too many sections ({sections}) for job {jid}")
print(f"{red(f'API: Too many sections ({sections}) for job {jid}')}")
raise ValueError(f"Too many sections ({sections})")
logger.info(f"Job {jid} sections: {sections}, pad_seq: {[3] + [2] * (sections - 3) + [1, 0] if sections > 4 else list(reversed(range(sections)))}")
hist_lat = torch.zeros((1, 16, 1 + 2 + 16, h // 8, w // 8), dtype=torch.float16).cpu()
hist_px = None
total = 0
pad_seq = [3] + [2] * (sections - 3) + [1, 0] if sections > 4 else list(reversed(range(sections)))
section_count = 0
for pad in pad_seq:
section_count += 1
if section_count > max_sections:
logger.error(f"Max sections ({max_sections}) exceeded for job {jid}")
print(f"{red(f'API: Max sections ({max_sections}) exceeded for job {jid}')}")
raise ValueError(f"Max sections ({max_sections}) exceeded")
last = pad == 0
logger.info(f"Job {jid} processing pad: {pad}, last: {last}")
def cb(d):
if job_status[jid]["status"] == "complete":
return
pv = vae_decode_fake(d["denoised"])
pv = (pv * 255).cpu().numpy().clip(0, 255).astype(np.uint8)
pv = einops.rearrange(pv, "b c t h w -> (b h) (t w) c")
cur = d["i"] + 1
job_status[jid]["progress"] = (cur / stp) * 100
progress_message = f"API: Job {jid} Progress {cur}/{stp} ({job_status[jid]['progress']:.1f}%)"
logger.info(progress_message)
print(yellow(progress_message))
stream.output_queue.push(('progress', (pv, f"{cur}/{stp}", make_progress_bar_html(int(100 * cur / stp), f"{cur}/{stp}"))))
try:
if hasattr(stream.input_queue, "qsize") and stream.input_queue.qsize() > 0:
if stream.input_queue.get_nowait() == "end":
stream.output_queue.push(("end", "Job stopped by client"))
raise KeyboardInterrupt
except queue.Empty:
pass
idx = torch.arange(0, sum([1, pad * win, win, 1, 2, 16]))[None].to(device=gpu)
a, b, c, d, e, f = idx.split([1, pad * win, win, 1, 2, 16], 1)
clean_idx = torch.cat([a, d], 1)
pre = start_lat.to(hist_lat)
post, two, four = hist_lat[:, :, :1 + 2 + 16].split([1, 2, 16], 2)
clean = torch.cat([pre, post], 2)
if not hv:
unload_complete_models()
move_model_to_device_with_memory_preservation(transformer, gpu, keep)
transformer.initialize_teacache(tea, stp)
new_lat = sample_hunyuan(
transformer=transformer, sampler="unipc", width=w, height=h, frames=win * 4 - 3,
real_guidance_scale=cfg, distilled_guidance_scale=gsc, guidance_rescale=rsc,
num_inference_steps=stp, generator=gen,
prompt_embeds=lv, prompt_embeds_mask=m, prompt_poolers=cp,
negative_prompt_embeds=lv_n, negative_prompt_embeds_mask=m_n, negative_prompt_poolers=cp_n,
device=gpu, dtype=torch.bfloat16, image_embeddings=img_emb,
latent_indices=c, clean_latents=clean, clean_latent_indices=clean_idx,
clean_latents_2x=two, clean_latent_2x_indices=e,
clean_latents_4x=four, clean_latent_4x_indices=f, callback=cb
)
if last:
new_lat = torch.cat([start_lat.to(new_lat), new_lat], 2)
total += new_lat.shape[2]
hist_lat = torch.cat([new_lat.to(hist_lat), hist_lat], 2)
if not hv:
offload_model_from_device_for_memory_preservation(transformer, gpu, 8)
load_model_as_complete(vae, gpu)
real = hist_lat[:, :, :total]
if hist_px is None:
hist_px = vae_decode(real, vae).cpu()
else:
overlap = win * 4 - 3
curr = vae_decode(real[:, :, :win * 2], vae).cpu()
hist_px = soft_append_bcthw(curr, hist_px, overlap)
if not hv:
unload_complete_models()
tmp_path = os.path.join(VIDEO_TMP_DIR, f"{jid}_{total}.mp4")
save_bcthw_as_mp4(hist_px, tmp_path, fps=30, crf=crf)
os.chmod(tmp_path, 0o664)
stream.output_queue.push(('file', tmp_path))
if last:
fin_path = os.path.join(VIDEO_OUTPUT_DIR, f"{jid}_{total}.mp4")
try:
os.replace(tmp_path, fin_path)
os.chmod(fin_path, 0o664)
job_status[jid]["status"] = "complete"
job_status[jid]["render_time"] = time.time() - start_time
stream.output_queue.push(('complete', fin_path))
clear_queue(stream.input_queue)
clear_queue(stream.output_queue)
logger.info(f"Final video saved: {fin_path}, render time: {job_status[jid]['render_time']:.2f}s")
print(f"{green(f'API: Final video saved: {fin_path}')}")
return fin_path
except Exception as e:
logger.error(f"Failed to save final video: {e}")
print(f"{red(f'API: Failed to save final video: {e}')}")
raise
torch.cuda.empty_cache()
except Exception as e:
logger.error(f"Worker failed: {e}", exc_info=True)
print(f"{red(f'API: Worker error: {e}')}")
traceback.print_exc()
job_status[jid]["status"] = "error"
stream.output_queue.push(("end", str(e)))
return None
finally:
if jid in active_jobs:
active_jobs.pop(jid, None)
clear_queue(stream.input_queue)
clear_queue(stream.output_queue)
if job_status.get(jid, {}).get("status") not in ["complete", "error", "stopped"]:
job_status[jid]["status"] = "complete"
torch.cuda.empty_cache()
@torch.no_grad()
def process(img, prm, npr, sd, sec, win, stp, cfg, gsc, rsc, kee, tea, crf, disable_prompt_mods, link_steps_window):
if img is None:
raise gr.Error("Upload an image")
yield None, None, "", "", gr.update(interactive=False), gr.update(interactive=True)
stream = AsyncStream()
jid = str(uuid.uuid4())
async_run(worker, img, prm, npr, sd, sec, win, stp, cfg, gsc, rsc, kee, tea, crf, disable_prompt_mods, link_steps_window, stream, jid)
out, log = None, ""
try:
while True:
flag, data = stream.output_queue.next()
if job_status.get(jid, {}).get("status") == "complete":
break
if flag == "file":
out = data
yield out, gr.update(), gr.update(), log, gr.update(interactive=False), gr.update(interactive=True)
if flag == "progress":
pv, desc, html = data
log = desc
yield gr.update(), gr.update(visible=True, value=pv), desc, html, gr.update(interactive=False), gr.update(interactive=True)
if flag == "complete":
yield data, gr.update(visible=False), "Generation complete", "", gr.update(interactive=True), gr.update(interactive=False)
break
if flag == "end":
yield out, gr.update(visible=False), f"Error: {data}", "", gr.update(interactive=True), gr.update(interactive=False)
break
except Exception as e:
logger.error(f"Process loop failed: {e}")
yield out, gr.update(visible=False), f"Error: {str(e)}", "", gr.update(interactive=True), gr.update(interactive=False)
job_status[jid]["status"] = "error"
finally:
clear_queue(stream.input_queue)
clear_queue(stream.output_queue)
torch.cuda.empty_cache()
def end_process():
global stream
if stream:
stream.input_queue.push("end")
logger.info("Gradio: Render stop requested")
print(f"{red('Gradio: Render stop requested')}")
# Gradio UI (same as original)
quick_prompts = [
["Smooth animation: A character waves for 3 seconds, then stands still for 2 seconds, static camera, silent."],
["Smooth animation: A character moves for 5 seconds, static camera, silent."]
]
css = make_progress_bar_css() + """
.orange-button{background:#ff6200;color:#fff;border-color:#ff6200;}
.load-button{background:#4CAF50;color:#fff;border-color:#4CAF50;margin-left:10px;}
.big-setting-button{background:#0066cc;color:#fff;border:none;padding:14px 24px;font-size:18px;width:100%;border-radius:6px;margin:8px 0;}
.styled-dropdown{width:250px;padding:5px;border-radius:4px;}
.viewer-column{width:100%;max-width:900px;margin:0 auto;}
.media-preview img,.media-preview video{max-width:100%;height:380px;object-fit:contain;border:1px solid #444;border-radius:6px;}
.media-container{display:flex;gap:20px;align-items:flex-start;}
.control-box{min-width:220px;}
.control-grid{display:grid;grid-template-columns:1fr 1fr;gap:10px;}
.image-gallery{display:grid!important;grid-template-columns:repeat(auto-fit,minmax(300px,1fr))!important;gap:10px;padding:10px!important;overflow-y:auto!important;max-height:360px!important;}
.image-gallery .gallery-item{padding:10px;height:360px!important;width:300px!important;}
.image-gallery img{object-fit:contain;height:360px!important;width:300px!important;}
.video-gallery{display:grid!important;grid-template-columns:repeat(auto-fit,minmax(300px,1fr))!important;gap:10px;padding:10px!important;overflow-y:auto!important;max-height:360px!important;}
.video-gallery .gallery-item{padding:10px;height:360px!important;width:300px!important;}
.video-gallery video{object-fit:contain;height:360px!important;width:300px!important;}
.stop-button {background-color: #ff4d4d !important; color: white !important;}
"""
blk = gr.Blocks(css=css, title="GhostPack F1 Pro").queue()
with blk:
gr.Markdown("# πŸ‘» GhostPack F1 Pro")
with gr.Tabs():
with gr.TabItem("πŸ‘» Generate"):
with gr.Row():
with gr.Column():
img_in = gr.Image(sources="upload", type="numpy", label="Image", height=320)
generate_button = gr.Button("Generate Video", elem_id="generate_button")
stop_button = gr.Button("Stop Generation", elem_id="stop_button", elem_classes="stop-button")
prm = gr.Textbox(
label="Prompt",
value="Smooth animation: A female stands with subtle, sensual micro-movements, breathing gently, slight head tilt, static camera, silent",
elem_id="prompt_input",
)
npr = gr.Textbox(
label="Negative Prompt",
value="low quality, blurry, speaking, talking, moaning, vocalizing, lip movement, mouth animation, sound, dialogue, speech, whispering, shouting, lip sync, facial animation, expressive face, verbal expression, animated mouth",
elem_id="negative_prompt_input",
)
save_msg = gr.Markdown("")
disable_prompt_mods = gr.Checkbox(label="Disable Prompt Modifications", value=False)
link_steps_window = gr.Checkbox(label="Link Steps and Latent Window", value=True)
btn_save = gr.Button("Save Prompt")
btn1, btn2, btn3 = (
gr.Button("Load Most Recent"),
gr.Button("Load 2nd Recent"),
gr.Button("Load 3rd Recent"),
)
ds = gr.Dataset(samples=quick_prompts, label="Quick List", components=[prm])
ds.click(lambda x: x[0], [ds], [prm])
btn_save.click(save_prompt_fn, [prm, npr], [save_msg])
btn1.click(lambda: load_prompt_fn(0), [], [prm])
btn2.click(lambda: load_prompt_fn(1), [], [prm])
btn3.click(lambda: load_prompt_fn(2), [], [prm])
camera_action_input = gr.Dropdown(
choices=[
"Static Camera", "Slight Orbit Left", "Slight Orbit Right",
"Slight Orbit Up", "Slight Orbit Down", "Top-Down View",
"Slight Zoom In", "Slight Zoom Out",
],
label="Camera Action",
value="Static Camera",
elem_id="camera_action_input",
info="Select a camera movement to append to the prompt.",
)
camera_action_input.change(
fn=lambda prompt, camera_action: update_prompt(prompt, camera_action),
inputs=[prm, camera_action_input],
outputs=prm,
)
with gr.Column():
pv = gr.Image(label="Next Latents", height=200, visible=False)
vid = gr.Video(label="Finished", autoplay=True, height=500, loop=True, show_share_button=False)
log_md = gr.Markdown("")
bar = gr.HTML("")
with gr.Column():
se = gr.Number(label="Seed", value=31337, precision=0, elem_id="seed_input")
sec = gr.Slider(label="Video Length (s)", minimum=1, maximum=10, value=8.0, step=0.1, elem_id="video_length_input")
win = gr.Slider(label="Latent Window", minimum=1, maximum=10, value=3, step=1, elem_id="latent_window_input")
stp = gr.Slider(label="Steps", minimum=1, maximum=100, value=12, step=1, elem_id="steps_input")
cfg = gr.Slider(label="CFG", minimum=1, maximum=32, value=1.7, step=0.01, elem_id="cfg_input")
gsc = gr.Slider(label="Distilled CFG", minimum=1, maximum=32, value=4.0, step=0.01, elem_id="distilled_cfg_input")
rsc = gr.Slider(label="CFG Re-Scale", minimum=0, maximum=1, value=0.5, step=0.01, elem_id="cfg_rescale_input")
kee = gr.Slider(label="GPU Keep (GB)", minimum=6, maximum=free_mem, value=6.5, step=0.1, elem_id="gpu_keep_input")
crf = gr.Slider(label="MP4 CRF", minimum=0, maximum=100, value=20, step=1, elem_id="mp4_crf_input")
tea = gr.Checkbox(label="Use TeaCache", value=True, elem_id="use_teacache_input")
generate_button.click(
fn=process,
inputs=[img_in, prm, npr, se, sec, win, stp, cfg, gsc, rsc, kee, tea, crf, disable_prompt_mods, link_steps_window],
outputs=[vid, pv, log_md, bar, generate_button, stop_button],
)
stop_button.click(fn=end_process)
gr.Button("Update Progress").click(fn=lambda: get_progress(), outputs=[log_md, bar])
with gr.TabItem("πŸ–ΌοΈ Image Gallery"):
with gr.Row(elem_classes="media-container"):
with gr.Column(scale=3):
image_preview = gr.Image(
label="Viewer", value=(list_images()[0] if list_images() else None),
interactive=False, elem_classes="media-preview",
)
with gr.Column(elem_classes="control-box"):
image_dropdown = gr.Dropdown(
choices=[os.path.basename(i) for i in list_images()],
value=(os.path.basename(list_images()[0]) if list_images() else None),
label="Select", elem_classes="styled-dropdown",
)
with gr.Row(elem_classes="control-grid"):
load_btn = gr.Button("Load", elem_classes="load-button")
next_btn = gr.Button("Next", elem_classes="load-button")
with gr.Row(elem_classes="control-grid"):
refresh_btn = gr.Button("Refresh")
delete_btn = gr.Button("Delete", elem_classes="orange-button")
image_gallery = gr.Gallery(
value=list_images(), label="Thumbnails", columns=6, height=360,
allow_preview=False, type="filepath", elem_classes="image-gallery",
)
load_btn.click(load_image, [image_dropdown], [image_preview, image_dropdown])
next_btn.click(next_image_and_load, [image_dropdown], [image_preview, image_dropdown])
refresh_btn.click(
lambda: (
gr.update(choices=[os.path.basename(i) for i in list_images()], value=os.path.basename(list_images()[0]) if list_images() else None),
gr.update(value=list_images()[0] if list_images() else None),
gr.update(value=list_images()),
),
[], [image_dropdown, image_preview, image_gallery],
)
delete_btn.click(
lambda sel: (
os.remove(os.path.join(VIDEO_IMG_DIR, sel)) if sel and os.path.exists(os.path.join(VIDEO_IMG_DIR, sel)) else None
) or load_image(""),
[image_dropdown], [image_preview, image_dropdown],
)
image_gallery.select(gallery_image_select, [], [image_preview, image_dropdown])
with gr.TabItem("🎬 Video Gallery"):
with gr.Row(elem_classes="media-container"):
with gr.Column(scale=3):
video_preview = gr.Video(
label="Viewer", value=(list_videos()[0] if list_videos() else None),
autoplay=True, loop=True, interactive=False, elem_classes="media-preview",
)
with gr.Column(elem_classes="control-box"):
video_dropdown = gr.Dropdown(
choices=[os.path.basename(v) for v in list_videos()],
value=(os.path.basename(list_videos()[0]) if list_videos() else None),
label="Select", elem_classes="styled-dropdown",
)
with gr.Row(elem_classes="control-grid"):
load_vbtn = gr.Button("Load", elem_classes="load-button")
next_vbtn = gr.Button("Next", elem_classes="load-button")
with gr.Row(elem_classes="control-grid"):
refresh_v = gr.Button("Refresh")
delete_v = gr.Button("Delete", elem_classes="orange-button")
video_gallery = gr.Gallery(
value=list_videos(), label="Thumbnails", columns=6, height=360,
allow_preview=False, type="filepath", elem_classes="video-gallery",
)
load_vbtn.click(load_video, [video_dropdown], [video_preview, video_dropdown])
next_vbtn.click(next_video_and_load, [video_dropdown], [video_preview, video_dropdown])
refresh_v.click(
lambda: (
gr.update(choices=[os.path.basename(v) for v in list_videos()], value=os.path.basename(list_videos()[0]) if list_videos() else None),
gr.update(value=list_videos()[0] if list_videos() else None),
gr.update(value=list_videos()),
),
[], [video_dropdown, video_preview, video_gallery],
)
delete_v.click(
lambda sel: (
os.remove(os.path.join(VIDEO_OUTPUT_DIR, sel)) if sel and os.path.exists(os.path.join(VIDEO_OUTPUT_DIR, sel)) else None
) or load_video(""),
[video_dropdown], [video_preview, video_dropdown],
)
video_gallery.select(gallery_video_select, [], [video_preview, video_dropdown])
with gr.TabItem("πŸ‘» About"):
gr.Markdown("## GhostPack F1 Pro")
with gr.Row():
with gr.Column():
gr.Markdown("**πŸ› οΈ Description**\nImage-to-Video toolkit powered by HunyuanVideo & FramePack-F1")
with gr.Column():
gr.Markdown(f"**πŸ“¦ Version**\n{VERSION}")
with gr.Column():
gr.Markdown("**✍️ Author**\nGhostAI")
with gr.Column():
gr.Markdown("**πŸ”— Repo**\nhttps://huggingface.co/spaces/ghostai1/GhostPack")
with gr.TabItem("βš™οΈ Settings"):
ct = gr.Button("Clear Temp", elem_classes="big-setting-button")
ctmsg = gr.Markdown("")
co = gr.Button("Clear Old", elem_classes="big-setting-button")
comsg = gr.Markdown("")
ci = gr.Button("Clear Images", elem_classes="big-setting-button")
cimg = gr.Markdown("")
cv = gr.Button("Clear Videos", elem_classes="big-setting-button")
cvid = gr.Markdown("")
ct.click(clear_temp_videos, [], ctmsg)
co.click(clear_old_files, [], comsg)
ci.click(clear_images, [], cimg)
cv.click(clear_videos, [], cvid)
with gr.TabItem("πŸ› οΈ Install"):
xs = gr.Textbox(value=status_xformers(), interactive=False, label="xformers")
bx = gr.Button("Install xformers", elem_classes="big-setting-button")
ss = gr.Textbox(value=status_sage(), interactive=False, label="sage-attn")
bs = gr.Button("Install sage-attn", elem_classes="big-setting-button")
fs = gr.Textbox(value=status_flash(), interactive=False, label="flash-attn")
bf = gr.Button("Install flash-attn", elem_classes="big-setting-button")
cs = gr.Textbox(value=status_colorama(), interactive=False, label="colorama")
bc = gr.Button("Install colorama", elem_classes="big-setting-button")
bx.click(install_xformers, [], xs)
bs.click(install_sage_attn, [], ss)
bf.click(install_flash_attn, [], fs)
bc.click(install_colorama, [], cs)
with gr.TabItem("πŸ“œ Logs"):
logs = gr.Textbox(lines=20, interactive=False, label="Install Logs")
rl = gr.Button("Refresh", elem_classes="big-setting-button")
cl = gr.Button("Clear", elem_classes="big-setting-button")
rl.click(refresh_logs, [], logs)
cl.click(clear_logs, [], logs)
gr.HTML(
"""
<script>
document.querySelectorAll('.video-gallery video').forEach(v => {
v.addEventListener('loadedmetadata', () => {
if (v.duration > 2) v.currentTime = 2;
});
});
</script>
"""
)
def update_prompt(prompt, camera_action):
camera_actions = [
"static camera", "slight camera orbit left", "slight camera orbit right",
"slight camera orbit up", "slight camera orbit down", "top-down view",
"slight camera zoom in", "slight camera zoom out",
]
for action in camera_actions:
prompt = re.sub(rf",\s*{re.escape(action)}\b", "", prompt, flags=re.IGNORECASE).strip()
if camera_action and camera_action != "None":
camera_phrase = f", {camera_action.lower()}"
if len(prompt.split()) + len(camera_phrase.split()) <= 50:
return prompt + camera_phrase
else:
logger.warning(f"Prompt exceeds 50 words after adding camera action: {prompt}")
print(f"{yellow(f'API: Warning: Prompt exceeds 50 words with camera action')}")
return prompt
def get_progress():
return f"Status: {job_status.get('latest', {'status': 'idle'})['status']}\nProgress: {job_status.get('latest', {'progress': 0.0})['progress']:.1f}%\nLast Render Time: {job_status.get('latest', {'render_time': 0})['render_time']:.1f}s"
# Check for port conflicts
if is_port_in_use(args.port):
logger.error(f"Port {args.port} is already in use")
print(f"{red(f'Error: Port {args.port} is already in use. Please stop other instances or change ports.')}")
sys.exit(1)
# Run FastAPI and optional Gradio
def run_api():
try:
logger.info(f"Starting FastAPI on {args.server}:{args.port}")
print(f"{green(f'Starting FastAPI on {args.server}:{args.port}')}")
uvicorn.run(app, host=args.server, port=args.port)
except Exception as e:
logger.error(f"Failed to start FastAPI: {e}", exc_info=True)
print(f"{red(f'Error: Failed to start FastAPI: {e}')}")
sys.exit(1)
if __name__ == "__main__":
try:
logger.info(f"Starting GhostPack F1 Pro Server version {VERSION}")
print(f"Starting GhostPack F1 Pro Server version {VERSION}")
api_thread = Thread(target=run_api)
api_thread.daemon = True
api_thread.start()
time.sleep(5)
try:
response = requests.get(f"http://{args.server}:{args.port}/health", timeout=10)
if response.status_code != 200:
raise RuntimeError("FastAPI health check failed")
logger.info("FastAPI health check passed")
print(f"{green('FastAPI health check passed')}")
except Exception as e:
logger.error(f"FastAPI not ready: {e}")
print(f"{red(f'Error: FastAPI not ready: {e}')}")
sys.exit(1)
if args.gradio:
logger.info(f"Starting Gradio UI on {args.server}:7860")
print(f"{green(f'Starting Gradio UI on {args.server}:7860')}")
server = blk.launch(
server_name=args.server,
server_port=7860,
share=args.share,
inbrowser=args.inbrowser,
prevent_thread_lock=True,
allowed_paths=["/"]
)
if args.share and server.share_url:
logger.info(f"Public Gradio URL: {server.share_url}")
print(f"{yellow(f'Public Gradio URL: {server.share_url}')}")
logger.info(f"Gradio UI running on http://{args.server}:7860")
print(f"{green(f'Gradio UI running on http://{args.server}:7860')}")
while True:
time.sleep(1)
except KeyboardInterrupt:
logger.info("Shutting down gracefully")
print(f"{green('Shutting down gracefully')}")
sys.exit(0)