Spaces:
Sleeping
Sleeping
import os | |
import requests | |
import json | |
import time | |
import subprocess | |
import gradio as gr | |
import uuid | |
from dotenv import load_dotenv | |
from urllib.parse import urlparse | |
# Load environment variables | |
load_dotenv() | |
# API Keys | |
B_KEY = os.getenv("B_KEY") | |
# URLs | |
API_URL = "https://api.sync.so/v2/generate" | |
def get_media_resolution(url): | |
print(f"\n[DEBUG] Getting resolution for: {url}") | |
# Download the file to a temporary location | |
response = requests.get(url) | |
if response.status_code != 200: | |
print(f"[ERROR] Failed to download media. Status code: {response.status_code}") | |
return None | |
temp_path = f"temp_media_{uuid.uuid4()}" | |
with open(temp_path, 'wb') as f: | |
f.write(response.content) | |
# Get resolution using FFprobe | |
cmd = [ | |
'ffprobe', | |
'-v', 'error', | |
'-select_streams', 'v:0', | |
'-show_entries', 'stream=width,height', | |
'-of', 'json', | |
temp_path | |
] | |
try: | |
result = subprocess.run(cmd, capture_output=True, text=True) | |
os.remove(temp_path) # Clean up temp file | |
if result.returncode == 0: | |
data = json.loads(result.stdout) | |
if 'streams' in data and data['streams']: | |
width = data['streams'][0].get('width') | |
height = data['streams'][0].get('height') | |
if width and height: | |
print(f"[DEBUG] Detected resolution: {width}x{height}") | |
return [width, height] | |
except Exception as e: | |
print(f"[ERROR] Failed to get resolution: {str(e)}") | |
if os.path.exists(temp_path): | |
os.remove(temp_path) | |
print("[DEBUG] Failed to detect resolution, using default") | |
return [1280, 720] # Default resolution | |
def lipsync_api_call(video_url, audio_url): | |
print(f"\n[DEBUG] Starting lipsync_api_call") | |
print(f"[DEBUG] Video URL: {video_url}") | |
print(f"[DEBUG] Audio URL: {audio_url}") | |
# Get the resolution of the input video/image | |
resolution = get_media_resolution(video_url) | |
headers = { | |
"Content-Type": "application/json", | |
"x-api-key": B_KEY | |
} | |
data = { | |
"model": "lipsync-1.8.0-beta", | |
"input": [ | |
{ | |
"type": "video", | |
"url": video_url | |
}, | |
{ | |
"type": "audio", | |
"url": audio_url | |
} | |
], | |
"options": { | |
"pads": [0, 5, 0, 0], | |
"speedup": 1, | |
"output_format": "mp4", | |
"sync_mode": "bounce", | |
"fps": 24, | |
"output_resolution": resolution | |
} | |
} | |
print(f"[DEBUG] Request payload: {json.dumps(data, indent=2)}") | |
try: | |
response = requests.post(API_URL, headers=headers, data=json.dumps(data)) | |
print(f"[DEBUG] API Response status code: {response.status_code}") | |
print(f"[DEBUG] API Response: {response.text}") | |
return response.json() | |
except Exception as e: | |
print(f"[ERROR] API call failed: {str(e)}") | |
raise | |
def check_job_status(job_id): | |
print(f"\n[DEBUG] Checking job status for ID: {job_id}") | |
headers = {"x-api-key": B_KEY} | |
max_attempts = 3000 | |
attempt = 0 | |
while attempt < max_attempts: | |
try: | |
response = requests.get(f"{API_URL}/{job_id}", headers=headers) | |
print(f"[DEBUG] Status check attempt {attempt + 1}") | |
print(f"[DEBUG] Status response: {response.text}") | |
data = response.json() | |
status = data.get("status") | |
print(f"[DEBUG] Current status: {status}") | |
if status == "COMPLETED": | |
print(f"[DEBUG] Job completed. Output URL: {data.get('outputUrl')}") | |
return data.get("outputUrl") | |
elif status == "FAILED" or status == "CANCELED": | |
print(f"[ERROR] Job failed or was canceled. Error: {data.get('error')}") | |
return None | |
attempt += 1 | |
time.sleep(10) | |
except Exception as e: | |
print(f"[ERROR] Status check failed: {str(e)}") | |
return None | |
print("[ERROR] Max attempts reached") | |
return None | |
def get_media_duration(file_path): | |
print(f"\n[DEBUG] Getting duration for: {file_path}") | |
cmd = ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', file_path] | |
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
duration = float(result.stdout.strip()) | |
print(f"[DEBUG] Media duration: {duration} seconds") | |
return duration | |
def combine_audio_video(video_path, audio_path, output_path): | |
print(f"\n[DEBUG] Combining audio and video") | |
print(f"[DEBUG] Video path: {video_path}") | |
print(f"[DEBUG] Audio path: {audio_path}") | |
print(f"[DEBUG] Output path: {output_path}") | |
video_duration = get_media_duration(video_path) | |
audio_duration = get_media_duration(audio_path) | |
if video_duration > audio_duration: | |
print("[DEBUG] Video longer than audio - trimming video") | |
cmd = [ | |
'ffmpeg', '-i', video_path, '-i', audio_path, | |
'-t', str(audio_duration), | |
'-map', '0:v', '-map', '1:a', | |
'-c:v', 'copy', '-c:a', 'aac', | |
'-y', output_path | |
] | |
else: | |
print("[DEBUG] Audio longer than video - looping video") | |
loop_count = int(audio_duration // video_duration) + 1 | |
cmd = [ | |
'ffmpeg', '-stream_loop', str(loop_count), '-i', video_path, '-i', audio_path, | |
'-t', str(audio_duration), | |
'-map', '0:v', '-map', '1:a', | |
'-c:v', 'copy', '-c:a', 'aac', | |
'-shortest', '-y', output_path | |
] | |
print(f"[DEBUG] FFmpeg command: {' '.join(cmd)}") | |
result = subprocess.run(cmd, capture_output=True, text=True) | |
print(f"[DEBUG] FFmpeg stdout: {result.stdout}") | |
print(f"[DEBUG] FFmpeg stderr: {result.stderr}") | |
def is_image_url(url): | |
parsed = urlparse(url) | |
path = parsed.path.lower() | |
result = path.endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp', '.tiff', '.webp', '.heic', '.svg', '.ico')) | |
print(f"\n[DEBUG] Checking if URL is image: {url}") | |
print(f"[DEBUG] Result: {result}") | |
return result | |
def create_video_from_image(image_url, output_path, duration=10): | |
print(f"\n[DEBUG] Creating video from image") | |
print(f"[DEBUG] Image URL: {image_url}") | |
print(f"[DEBUG] Output path: {output_path}") | |
# Get the resolution before creating the video | |
resolution = get_media_resolution(image_url) | |
response = requests.get(image_url) | |
if response.status_code != 200: | |
print(f"[ERROR] Failed to download image. Status code: {response.status_code}") | |
raise Exception("Failed to download the image") | |
temp_image_path = f"temp_image_{uuid.uuid4()}.jpg" | |
print(f"[DEBUG] Temporary image path: {temp_image_path}") | |
with open(temp_image_path, 'wb') as f: | |
f.write(response.content) | |
cmd = [ | |
'ffmpeg', '-loop', '1', '-i', temp_image_path, | |
'-c:v', 'libx264', '-t', str(duration), '-pix_fmt', 'yuv420p', | |
'-vf', f'scale={resolution[0]}:{resolution[1]}', | |
'-y', output_path | |
] | |
print(f"[DEBUG] FFmpeg command: {' '.join(cmd)}") | |
result = subprocess.run(cmd, capture_output=True, text=True) | |
print(f"[DEBUG] FFmpeg stdout: {result.stdout}") | |
print(f"[DEBUG] FFmpeg stderr: {result.stderr}") | |
os.remove(temp_image_path) | |
print(f"[DEBUG] Temporary image removed") | |
return output_path | |
def upload_file(file_path): | |
print(f"\n[DEBUG] Uploading file: {file_path}") | |
with open(file_path, 'rb') as file: | |
files = {'fileToUpload': (os.path.basename(file_path), file)} | |
data = {'reqtype': 'fileupload'} | |
try: | |
response = requests.post(UPLOAD_URL, files=files, data=data) | |
print(f"[DEBUG] Upload response status code: {response.status_code}") | |
print(f"[DEBUG] Upload response: {response.text}") | |
if response.status_code == 200: | |
return response.text.strip() | |
return None | |
except Exception as e: | |
print(f"[ERROR] File upload failed: {str(e)}") | |
return None | |
def process_video(video_url, audio_url, progress=gr.Progress()): | |
print(f"\n[DEBUG] Starting video processing") | |
print(f"[DEBUG] Video URL: {video_url}") | |
print(f"[DEBUG] Audio URL: {audio_url}") | |
if not audio_url: | |
print("[ERROR] No audio URL provided") | |
return None, "No audio URL provided" | |
if not video_url: | |
print("[ERROR] No video URL provided") | |
return None, "No video URL provided" | |
session_id = str(uuid.uuid4()) | |
print(f"[DEBUG] Session ID: {session_id}") | |
progress(0.2, desc="Processing media...") | |
try: | |
if is_image_url(video_url): | |
progress(0.3, desc="Converting image to video...") | |
video_path = f"temp_video_{session_id}.mp4" | |
create_video_from_image(video_url, video_path) | |
progress(0.4, desc="Uploading converted video...") | |
video_url = upload_file(video_path) | |
if not video_url: | |
raise Exception("Failed to upload converted video") | |
os.remove(video_path) | |
progress(0.5, desc="Initiating lipsync...") | |
job_data = lipsync_api_call(video_url, audio_url) | |
# Check if we have a valid job ID | |
if "id" not in job_data: | |
print("[ERROR] No job ID in response") | |
raise Exception("No job ID received from API") | |
# Only treat as error if error field has actual error message | |
if job_data.get("error") not in [None, ""]: | |
error_msg = job_data["error"] | |
print(f"[ERROR] API error: {error_msg}") | |
raise Exception(error_msg) | |
job_id = job_data["id"] | |
print(f"[DEBUG] Job ID: {job_id}") | |
progress(0.6, desc="Processing lipsync...") | |
result_url = check_job_status(job_id) | |
if result_url: | |
progress(0.9, desc="Downloading result...") | |
print(f"[DEBUG] Downloading from: {result_url}") | |
response = requests.get(result_url) | |
output_path = f"output_{session_id}.mp4" | |
with open(output_path, "wb") as f: | |
f.write(response.content) | |
print(f"[DEBUG] Result saved to: {output_path}") | |
progress(1.0, desc="Complete!") | |
return output_path, "Lipsync completed successfully!" | |
else: | |
raise Exception("Lipsync processing failed or timed out") | |
except Exception as e: | |
print(f"[ERROR] Main process failed: {str(e)}") | |
progress(0.8, desc="Falling back to simple combination...") | |
try: | |
print("[DEBUG] Attempting fallback method") | |
video_response = requests.get(video_url) | |
temp_video_path = f"temp_video_{session_id}.mp4" | |
with open(temp_video_path, "wb") as f: | |
f.write(video_response.content) | |
audio_response = requests.get(audio_url) | |
temp_audio_path = f"temp_audio_{session_id}.mp3" | |
with open(temp_audio_path, "wb") as f: | |
f.write(audio_response.content) | |
output_path = f"output_{session_id}.mp4" | |
combine_audio_video(temp_video_path, temp_audio_path, output_path) | |
os.remove(temp_video_path) | |
os.remove(temp_audio_path) | |
progress(1.0, desc="Complete!") | |
return output_path, f"Used fallback method. Original error: {str(e)}" | |
except Exception as fallback_error: | |
print(f"[ERROR] Fallback method failed: {str(fallback_error)}") | |
return None, f"All methods failed. Error: {str(fallback_error)}" | |
def create_interface(): | |
# ํ ๋ง ์ ์ | |
theme = gr.themes.Soft( | |
primary_hue="blue", | |
secondary_hue="gray", | |
) | |
# CSS ์์ - ์จ๊น ์ฝ๋ ์ ๊ฑฐ | |
css = """ | |
.container {max-width: 1000px; margin: auto; padding: 20px;} | |
.header {text-align: center; margin-bottom: 2rem;} | |
.input-section {background: #f7f7f7; padding: 2rem; border-radius: 10px; margin-bottom: 1rem;} | |
.output-section {background: #f0f0f0; padding: 2rem; border-radius: 10px;} | |
.button-primary {background: #2196F3 !important; color: white !important;} | |
""" | |
with gr.Blocks(theme=theme, css=css) as app: | |
# ํค๋ | |
gr.Markdown("# ๐ฌ AI Lipsync Video Generator") | |
gr.Markdown("Upload a video/image and audio to create a lip-synced video") | |
# ์ ๋ ฅ ์น์ | |
with gr.Row(): | |
with gr.Column(): | |
video_url_input = gr.Textbox( | |
label="Video or Image URL", | |
placeholder="Enter URL of video or image..." | |
) | |
audio_url_input = gr.Textbox( | |
label="Audio URL", | |
placeholder="Enter URL of audio file..." | |
) | |
generate_btn = gr.Button("Generate Video", variant="primary") | |
# ์ถ๋ ฅ ์น์ | |
with gr.Column(): | |
video_output = gr.Video(label="Generated Video") | |
status_output = gr.Textbox( | |
label="Status", | |
interactive=False | |
) | |
# ์ด๋ฒคํธ ์ฐ๊ฒฐ | |
generate_btn.click( | |
fn=process_video, | |
inputs=[video_url_input, audio_url_input], | |
outputs=[video_output, status_output] | |
) | |
return app | |
if __name__ == "__main__": | |
print("[DEBUG] Starting application") | |
app = create_interface() | |
app.launch(share=True) # share=True๋ฅผ ์ถ๊ฐํ์ฌ ๊ณต์ ๊ฐ๋ฅํ ๋งํฌ ์์ฑ |