Spaces:
Sleeping
Sleeping
import os | |
import tempfile | |
import requests | |
from mimetypes import guess_extension | |
from PIL import Image | |
from io import BytesIO | |
import subprocess | |
from fastapi import FastAPI, UploadFile, File, HTTPException, Response, Request | |
from typing import List, Optional | |
import asyncio, aiofiles | |
from fastapi.responses import StreamingResponse, FileResponse | |
app = FastAPI() | |
def download_image(image_url: str): | |
# Create a temporary directory | |
temp_dir = tempfile.mkdtemp() | |
# Get the image content | |
response = requests.get(image_url) | |
response.raise_for_status() | |
# Detect the image type | |
image = Image.open(BytesIO(response.content)) | |
image_format = image.format.lower() | |
image_extension = guess_extension(f"image/{image_format}") | |
if image_extension is None: | |
raise ValueError("Cannot detect image file type.") | |
# Define the image file path | |
image_path = os.path.join(temp_dir, f"image{image_extension}") | |
# Save the image file | |
with open(image_path, "wb") as image_file: | |
image_file.write(response.content) | |
# Return the image path and dimensions | |
return image_path, image.size | |
def make_effect( | |
image_link: str, | |
filename: str, | |
frame_rate: int, | |
duration: int, | |
quality: int, | |
ssaa: float, | |
raw: bool, | |
): | |
# Download the image and get its dimensions | |
image_path, (width, height) = download_image(image_url=image_link) | |
print(f"Image path: {image_path}, Width: {width}, Height: {height}", "#" * 100) | |
# Define the output video file path | |
destination = os.path.join("/tmp/Video", filename) | |
# Create the destination directory if it doesn't exist | |
os.makedirs(os.path.dirname(destination), exist_ok=True) | |
# Build the depthflow command | |
command = [ | |
"depthflow", | |
"input", | |
"-i", | |
image_path, | |
"main", | |
"-f", | |
str(frame_rate), | |
"-t", | |
str(duration), | |
"--width", | |
str(width), | |
"--height", | |
str(height), | |
"--quality", | |
str(quality), | |
"--ssaa", | |
str(ssaa), | |
"--benchmark", | |
] | |
if raw: | |
command.append("--raw") | |
command.extend(["--output", destination]) | |
# Execute the depthflow command | |
subprocess.run(command, check=True) | |
return destination | |
async def generate_video( | |
image_link: str = None, | |
filename: str = "output.mp4", | |
frame_rate: int = 30, | |
duration: int = 3, | |
quality: int = 10, | |
ssaa: float = 0.75, | |
raw: bool = True, | |
): | |
try: | |
output_path = await make_effect( | |
image_link, filename, frame_rate, duration, quality, ssaa, raw | |
) | |
return {"output_file": filename} | |
except Exception as e: | |
raise HTTPException(status_code=400, detail=str(e)) | |
async def download_video(filename: str, request: Request): | |
video_directory = "/tmp/Video" | |
video_path = os.path.join(video_directory, filename) | |
if not os.path.isfile(video_path): | |
raise HTTPException(status_code=404, detail="Video not found") | |
range_header = request.headers.get("Range", None) | |
video_size = os.path.getsize(video_path) | |
if range_header: | |
start, end = range_header.strip().split("=")[1].split("-") | |
start = int(start) | |
end = video_size if end == "" else int(end) | |
headers = { | |
"Content-Range": f"bytes {start}-{end}/{video_size}", | |
"Accept-Ranges": "bytes", | |
} | |
content = read_file_range(video_path, start, end) | |
return StreamingResponse(content, media_type="video/mp4", headers=headers) | |
return FileResponse(video_path, media_type="video/mp4") | |
async def read_file_range(path, start, end): | |
async with aiofiles.open(path, "rb") as file: | |
await file.seek(start) | |
while True: | |
data = await file.read(1024 * 1024) # read in chunks of 1MB | |
if not data or await file.tell() > end: | |
break | |
yield data | |