|
import os |
|
import requests |
|
import json |
|
import time |
|
import subprocess |
|
import gradio as gr |
|
import uuid |
|
from dotenv import load_dotenv |
|
from urllib.parse import urlparse |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
B_KEY = os.getenv("B_KEY") |
|
|
|
|
|
API_URL = "https://api.sync.so/v2/generate" |
|
|
|
def get_media_resolution(url): |
|
print(f"\n[DEBUG] Getting resolution for: {url}") |
|
|
|
|
|
response = requests.get(url) |
|
if response.status_code != 200: |
|
print(f"[ERROR] Failed to download media. Status code: {response.status_code}") |
|
return None |
|
|
|
temp_path = f"temp_media_{uuid.uuid4()}" |
|
with open(temp_path, 'wb') as f: |
|
f.write(response.content) |
|
|
|
|
|
cmd = [ |
|
'ffprobe', |
|
'-v', 'error', |
|
'-select_streams', 'v:0', |
|
'-show_entries', 'stream=width,height', |
|
'-of', 'json', |
|
temp_path |
|
] |
|
|
|
try: |
|
result = subprocess.run(cmd, capture_output=True, text=True) |
|
os.remove(temp_path) |
|
|
|
if result.returncode == 0: |
|
data = json.loads(result.stdout) |
|
if 'streams' in data and data['streams']: |
|
width = data['streams'][0].get('width') |
|
height = data['streams'][0].get('height') |
|
if width and height: |
|
print(f"[DEBUG] Detected resolution: {width}x{height}") |
|
return [width, height] |
|
except Exception as e: |
|
print(f"[ERROR] Failed to get resolution: {str(e)}") |
|
if os.path.exists(temp_path): |
|
os.remove(temp_path) |
|
|
|
print("[DEBUG] Failed to detect resolution, using default") |
|
return [1280, 720] |
|
|
|
def lipsync_api_call(video_url, audio_url): |
|
print(f"\n[DEBUG] Starting lipsync_api_call") |
|
print(f"[DEBUG] Video URL: {video_url}") |
|
print(f"[DEBUG] Audio URL: {audio_url}") |
|
|
|
|
|
resolution = get_media_resolution(video_url) |
|
|
|
headers = { |
|
"Content-Type": "application/json", |
|
"x-api-key": B_KEY |
|
} |
|
|
|
data = { |
|
"model": "lipsync-1.8.0-beta", |
|
"input": [ |
|
{ |
|
"type": "video", |
|
"url": video_url |
|
}, |
|
{ |
|
"type": "audio", |
|
"url": audio_url |
|
} |
|
], |
|
"options": { |
|
"pads": [0, 5, 0, 0], |
|
"speedup": 1, |
|
"output_format": "mp4", |
|
"sync_mode": "bounce", |
|
"fps": 24, |
|
"output_resolution": resolution |
|
} |
|
} |
|
|
|
print(f"[DEBUG] Request payload: {json.dumps(data, indent=2)}") |
|
|
|
try: |
|
response = requests.post(API_URL, headers=headers, data=json.dumps(data)) |
|
print(f"[DEBUG] API Response status code: {response.status_code}") |
|
print(f"[DEBUG] API Response: {response.text}") |
|
return response.json() |
|
except Exception as e: |
|
print(f"[ERROR] API call failed: {str(e)}") |
|
raise |
|
|
|
def check_job_status(job_id): |
|
print(f"\n[DEBUG] Checking job status for ID: {job_id}") |
|
headers = {"x-api-key": B_KEY} |
|
max_attempts = 3000 |
|
attempt = 0 |
|
|
|
while attempt < max_attempts: |
|
try: |
|
response = requests.get(f"{API_URL}/{job_id}", headers=headers) |
|
print(f"[DEBUG] Status check attempt {attempt + 1}") |
|
print(f"[DEBUG] Status response: {response.text}") |
|
|
|
data = response.json() |
|
status = data.get("status") |
|
print(f"[DEBUG] Current status: {status}") |
|
|
|
if status == "COMPLETED": |
|
print(f"[DEBUG] Job completed. Output URL: {data.get('outputUrl')}") |
|
return data.get("outputUrl") |
|
elif status == "FAILED" or status == "CANCELED": |
|
print(f"[ERROR] Job failed or was canceled. Error: {data.get('error')}") |
|
return None |
|
|
|
attempt += 1 |
|
time.sleep(10) |
|
|
|
except Exception as e: |
|
print(f"[ERROR] Status check failed: {str(e)}") |
|
return None |
|
|
|
print("[ERROR] Max attempts reached") |
|
return None |
|
|
|
def get_media_duration(file_path): |
|
print(f"\n[DEBUG] Getting duration for: {file_path}") |
|
cmd = ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', file_path] |
|
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
duration = float(result.stdout.strip()) |
|
print(f"[DEBUG] Media duration: {duration} seconds") |
|
return duration |
|
|
|
def combine_audio_video(video_path, audio_path, output_path): |
|
print(f"\n[DEBUG] Combining audio and video") |
|
print(f"[DEBUG] Video path: {video_path}") |
|
print(f"[DEBUG] Audio path: {audio_path}") |
|
print(f"[DEBUG] Output path: {output_path}") |
|
|
|
video_duration = get_media_duration(video_path) |
|
audio_duration = get_media_duration(audio_path) |
|
|
|
if video_duration > audio_duration: |
|
print("[DEBUG] Video longer than audio - trimming video") |
|
cmd = [ |
|
'ffmpeg', '-i', video_path, '-i', audio_path, |
|
'-t', str(audio_duration), |
|
'-map', '0:v', '-map', '1:a', |
|
'-c:v', 'copy', '-c:a', 'aac', |
|
'-y', output_path |
|
] |
|
else: |
|
print("[DEBUG] Audio longer than video - looping video") |
|
loop_count = int(audio_duration // video_duration) + 1 |
|
cmd = [ |
|
'ffmpeg', '-stream_loop', str(loop_count), '-i', video_path, '-i', audio_path, |
|
'-t', str(audio_duration), |
|
'-map', '0:v', '-map', '1:a', |
|
'-c:v', 'copy', '-c:a', 'aac', |
|
'-shortest', '-y', output_path |
|
] |
|
|
|
print(f"[DEBUG] FFmpeg command: {' '.join(cmd)}") |
|
result = subprocess.run(cmd, capture_output=True, text=True) |
|
print(f"[DEBUG] FFmpeg stdout: {result.stdout}") |
|
print(f"[DEBUG] FFmpeg stderr: {result.stderr}") |
|
|
|
def is_image_url(url): |
|
parsed = urlparse(url) |
|
path = parsed.path.lower() |
|
result = path.endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp', '.tiff', '.webp', '.heic', '.svg', '.ico')) |
|
print(f"\n[DEBUG] Checking if URL is image: {url}") |
|
print(f"[DEBUG] Result: {result}") |
|
return result |
|
|
|
def create_video_from_image(image_url, output_path, duration=10): |
|
print(f"\n[DEBUG] Creating video from image") |
|
print(f"[DEBUG] Image URL: {image_url}") |
|
print(f"[DEBUG] Output path: {output_path}") |
|
|
|
|
|
resolution = get_media_resolution(image_url) |
|
|
|
response = requests.get(image_url) |
|
if response.status_code != 200: |
|
print(f"[ERROR] Failed to download image. Status code: {response.status_code}") |
|
raise Exception("Failed to download the image") |
|
|
|
temp_image_path = f"temp_image_{uuid.uuid4()}.jpg" |
|
print(f"[DEBUG] Temporary image path: {temp_image_path}") |
|
|
|
with open(temp_image_path, 'wb') as f: |
|
f.write(response.content) |
|
|
|
cmd = [ |
|
'ffmpeg', '-loop', '1', '-i', temp_image_path, |
|
'-c:v', 'libx264', '-t', str(duration), '-pix_fmt', 'yuv420p', |
|
'-vf', f'scale={resolution[0]}:{resolution[1]}', |
|
'-y', output_path |
|
] |
|
|
|
print(f"[DEBUG] FFmpeg command: {' '.join(cmd)}") |
|
result = subprocess.run(cmd, capture_output=True, text=True) |
|
print(f"[DEBUG] FFmpeg stdout: {result.stdout}") |
|
print(f"[DEBUG] FFmpeg stderr: {result.stderr}") |
|
|
|
os.remove(temp_image_path) |
|
print(f"[DEBUG] Temporary image removed") |
|
|
|
return output_path |
|
|
|
def upload_file(file_path): |
|
print(f"\n[DEBUG] Uploading file: {file_path}") |
|
|
|
with open(file_path, 'rb') as file: |
|
files = {'fileToUpload': (os.path.basename(file_path), file)} |
|
data = {'reqtype': 'fileupload'} |
|
try: |
|
response = requests.post(UPLOAD_URL, files=files, data=data) |
|
print(f"[DEBUG] Upload response status code: {response.status_code}") |
|
print(f"[DEBUG] Upload response: {response.text}") |
|
|
|
if response.status_code == 200: |
|
return response.text.strip() |
|
return None |
|
except Exception as e: |
|
print(f"[ERROR] File upload failed: {str(e)}") |
|
return None |
|
|
|
def process_video(video_url, audio_url, progress=gr.Progress()): |
|
print(f"\n[DEBUG] Starting video processing") |
|
print(f"[DEBUG] Video URL: {video_url}") |
|
print(f"[DEBUG] Audio URL: {audio_url}") |
|
|
|
if not audio_url: |
|
print("[ERROR] No audio URL provided") |
|
return None, "No audio URL provided" |
|
if not video_url: |
|
print("[ERROR] No video URL provided") |
|
return None, "No video URL provided" |
|
|
|
session_id = str(uuid.uuid4()) |
|
print(f"[DEBUG] Session ID: {session_id}") |
|
|
|
progress(0.2, desc="Processing media...") |
|
|
|
try: |
|
if is_image_url(video_url): |
|
progress(0.3, desc="Converting image to video...") |
|
video_path = f"temp_video_{session_id}.mp4" |
|
create_video_from_image(video_url, video_path) |
|
progress(0.4, desc="Uploading converted video...") |
|
video_url = upload_file(video_path) |
|
if not video_url: |
|
raise Exception("Failed to upload converted video") |
|
os.remove(video_path) |
|
|
|
progress(0.5, desc="Initiating lipsync...") |
|
job_data = lipsync_api_call(video_url, audio_url) |
|
|
|
|
|
if "id" not in job_data: |
|
print("[ERROR] No job ID in response") |
|
raise Exception("No job ID received from API") |
|
|
|
|
|
if job_data.get("error") not in [None, ""]: |
|
error_msg = job_data["error"] |
|
print(f"[ERROR] API error: {error_msg}") |
|
raise Exception(error_msg) |
|
|
|
job_id = job_data["id"] |
|
print(f"[DEBUG] Job ID: {job_id}") |
|
|
|
progress(0.6, desc="Processing lipsync...") |
|
result_url = check_job_status(job_id) |
|
|
|
if result_url: |
|
progress(0.9, desc="Downloading result...") |
|
print(f"[DEBUG] Downloading from: {result_url}") |
|
response = requests.get(result_url) |
|
output_path = f"output_{session_id}.mp4" |
|
|
|
with open(output_path, "wb") as f: |
|
f.write(response.content) |
|
|
|
print(f"[DEBUG] Result saved to: {output_path}") |
|
progress(1.0, desc="Complete!") |
|
return output_path, "Lipsync completed successfully!" |
|
else: |
|
raise Exception("Lipsync processing failed or timed out") |
|
|
|
except Exception as e: |
|
print(f"[ERROR] Main process failed: {str(e)}") |
|
progress(0.8, desc="Falling back to simple combination...") |
|
try: |
|
print("[DEBUG] Attempting fallback method") |
|
video_response = requests.get(video_url) |
|
temp_video_path = f"temp_video_{session_id}.mp4" |
|
with open(temp_video_path, "wb") as f: |
|
f.write(video_response.content) |
|
|
|
audio_response = requests.get(audio_url) |
|
temp_audio_path = f"temp_audio_{session_id}.mp3" |
|
with open(temp_audio_path, "wb") as f: |
|
f.write(audio_response.content) |
|
|
|
output_path = f"output_{session_id}.mp4" |
|
combine_audio_video(temp_video_path, temp_audio_path, output_path) |
|
|
|
os.remove(temp_video_path) |
|
os.remove(temp_audio_path) |
|
|
|
progress(1.0, desc="Complete!") |
|
return output_path, f"Used fallback method. Original error: {str(e)}" |
|
except Exception as fallback_error: |
|
print(f"[ERROR] Fallback method failed: {str(fallback_error)}") |
|
return None, f"All methods failed. Error: {str(fallback_error)}" |
|
|
|
def create_interface(): |
|
css = """ |
|
#component-0 > :not(.prose) {display: none !important;} |
|
footer {display: none !important;} |
|
""" |
|
with gr.Blocks(css=css) as app: |
|
gr.Markdown("# Lipsync Video Generator") |
|
with gr.Row(): |
|
with gr.Column(): |
|
video_url_input = gr.Textbox(label="Video or Image URL") |
|
audio_url_input = gr.Textbox(label="Audio URL") |
|
generate_btn = gr.Button("Generate Video") |
|
with gr.Column(): |
|
video_output = gr.Video(label="Generated Video") |
|
status_output = gr.Textbox(label="Status", interactive=False) |
|
|
|
generate_btn.click( |
|
fn=process_video, |
|
inputs=[video_url_input, audio_url_input], |
|
outputs=[video_output, status_output] |
|
) |
|
|
|
return app |
|
|
|
if __name__ == "__main__": |
|
print("[DEBUG] Starting application") |
|
app = create_interface() |
|
app.launch() |