import os import requests import json import time import subprocess import gradio as gr import uuid from dotenv import load_dotenv from urllib.parse import urlparse # Load environment variables load_dotenv() # API Keys B_KEY = os.getenv("B_KEY") # URLs API_URL = "https://api.sync.so/v2/generate" def get_media_resolution(url): print(f"\n[DEBUG] Getting resolution for: {url}") # Download the file to a temporary location response = requests.get(url) if response.status_code != 200: print(f"[ERROR] Failed to download media. Status code: {response.status_code}") return None temp_path = f"temp_media_{uuid.uuid4()}" with open(temp_path, 'wb') as f: f.write(response.content) # Get resolution using FFprobe cmd = [ 'ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height', '-of', 'json', temp_path ] try: result = subprocess.run(cmd, capture_output=True, text=True) os.remove(temp_path) # Clean up temp file if result.returncode == 0: data = json.loads(result.stdout) if 'streams' in data and data['streams']: width = data['streams'][0].get('width') height = data['streams'][0].get('height') if width and height: print(f"[DEBUG] Detected resolution: {width}x{height}") return [width, height] except Exception as e: print(f"[ERROR] Failed to get resolution: {str(e)}") if os.path.exists(temp_path): os.remove(temp_path) print("[DEBUG] Failed to detect resolution, using default") return [1280, 720] # Default resolution def lipsync_api_call(video_url, audio_url): print(f"\n[DEBUG] Starting lipsync_api_call") print(f"[DEBUG] Video URL: {video_url}") print(f"[DEBUG] Audio URL: {audio_url}") # Get the resolution of the input video/image resolution = get_media_resolution(video_url) headers = { "Content-Type": "application/json", "x-api-key": B_KEY } data = { "model": "lipsync-1.8.0-beta", "input": [ { "type": "video", "url": video_url }, { "type": "audio", "url": audio_url } ], "options": { "pads": [0, 5, 0, 0], "speedup": 1, "output_format": "mp4", "sync_mode": "bounce", "fps": 24, "output_resolution": resolution } } print(f"[DEBUG] Request payload: {json.dumps(data, indent=2)}") try: response = requests.post(API_URL, headers=headers, data=json.dumps(data)) print(f"[DEBUG] API Response status code: {response.status_code}") print(f"[DEBUG] API Response: {response.text}") return response.json() except Exception as e: print(f"[ERROR] API call failed: {str(e)}") raise def check_job_status(job_id): print(f"\n[DEBUG] Checking job status for ID: {job_id}") headers = {"x-api-key": B_KEY} max_attempts = 3000 attempt = 0 while attempt < max_attempts: try: response = requests.get(f"{API_URL}/{job_id}", headers=headers) print(f"[DEBUG] Status check attempt {attempt + 1}") print(f"[DEBUG] Status response: {response.text}") data = response.json() status = data.get("status") print(f"[DEBUG] Current status: {status}") if status == "COMPLETED": print(f"[DEBUG] Job completed. Output URL: {data.get('outputUrl')}") return data.get("outputUrl") elif status == "FAILED" or status == "CANCELED": print(f"[ERROR] Job failed or was canceled. Error: {data.get('error')}") return None attempt += 1 time.sleep(10) except Exception as e: print(f"[ERROR] Status check failed: {str(e)}") return None print("[ERROR] Max attempts reached") return None def get_media_duration(file_path): print(f"\n[DEBUG] Getting duration for: {file_path}") cmd = ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', file_path] result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) duration = float(result.stdout.strip()) print(f"[DEBUG] Media duration: {duration} seconds") return duration def combine_audio_video(video_path, audio_path, output_path): print(f"\n[DEBUG] Combining audio and video") print(f"[DEBUG] Video path: {video_path}") print(f"[DEBUG] Audio path: {audio_path}") print(f"[DEBUG] Output path: {output_path}") video_duration = get_media_duration(video_path) audio_duration = get_media_duration(audio_path) if video_duration > audio_duration: print("[DEBUG] Video longer than audio - trimming video") cmd = [ 'ffmpeg', '-i', video_path, '-i', audio_path, '-t', str(audio_duration), '-map', '0:v', '-map', '1:a', '-c:v', 'copy', '-c:a', 'aac', '-y', output_path ] else: print("[DEBUG] Audio longer than video - looping video") loop_count = int(audio_duration // video_duration) + 1 cmd = [ 'ffmpeg', '-stream_loop', str(loop_count), '-i', video_path, '-i', audio_path, '-t', str(audio_duration), '-map', '0:v', '-map', '1:a', '-c:v', 'copy', '-c:a', 'aac', '-shortest', '-y', output_path ] print(f"[DEBUG] FFmpeg command: {' '.join(cmd)}") result = subprocess.run(cmd, capture_output=True, text=True) print(f"[DEBUG] FFmpeg stdout: {result.stdout}") print(f"[DEBUG] FFmpeg stderr: {result.stderr}") def is_image_url(url): parsed = urlparse(url) path = parsed.path.lower() result = path.endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp', '.tiff', '.webp', '.heic', '.svg', '.ico')) print(f"\n[DEBUG] Checking if URL is image: {url}") print(f"[DEBUG] Result: {result}") return result def create_video_from_image(image_url, output_path, duration=10): print(f"\n[DEBUG] Creating video from image") print(f"[DEBUG] Image URL: {image_url}") print(f"[DEBUG] Output path: {output_path}") # Get the resolution before creating the video resolution = get_media_resolution(image_url) response = requests.get(image_url) if response.status_code != 200: print(f"[ERROR] Failed to download image. Status code: {response.status_code}") raise Exception("Failed to download the image") temp_image_path = f"temp_image_{uuid.uuid4()}.jpg" print(f"[DEBUG] Temporary image path: {temp_image_path}") with open(temp_image_path, 'wb') as f: f.write(response.content) cmd = [ 'ffmpeg', '-loop', '1', '-i', temp_image_path, '-c:v', 'libx264', '-t', str(duration), '-pix_fmt', 'yuv420p', '-vf', f'scale={resolution[0]}:{resolution[1]}', '-y', output_path ] print(f"[DEBUG] FFmpeg command: {' '.join(cmd)}") result = subprocess.run(cmd, capture_output=True, text=True) print(f"[DEBUG] FFmpeg stdout: {result.stdout}") print(f"[DEBUG] FFmpeg stderr: {result.stderr}") os.remove(temp_image_path) print(f"[DEBUG] Temporary image removed") return output_path def upload_file(file_path): print(f"\n[DEBUG] Uploading file: {file_path}") with open(file_path, 'rb') as file: files = {'fileToUpload': (os.path.basename(file_path), file)} data = {'reqtype': 'fileupload'} try: response = requests.post(UPLOAD_URL, files=files, data=data) print(f"[DEBUG] Upload response status code: {response.status_code}") print(f"[DEBUG] Upload response: {response.text}") if response.status_code == 200: return response.text.strip() return None except Exception as e: print(f"[ERROR] File upload failed: {str(e)}") return None def process_video(video_url, audio_url, progress=gr.Progress()): print(f"\n[DEBUG] Starting video processing") print(f"[DEBUG] Video URL: {video_url}") print(f"[DEBUG] Audio URL: {audio_url}") if not audio_url: print("[ERROR] No audio URL provided") return None, "No audio URL provided" if not video_url: print("[ERROR] No video URL provided") return None, "No video URL provided" session_id = str(uuid.uuid4()) print(f"[DEBUG] Session ID: {session_id}") progress(0.2, desc="Processing media...") try: if is_image_url(video_url): progress(0.3, desc="Converting image to video...") video_path = f"temp_video_{session_id}.mp4" create_video_from_image(video_url, video_path) progress(0.4, desc="Uploading converted video...") video_url = upload_file(video_path) if not video_url: raise Exception("Failed to upload converted video") os.remove(video_path) progress(0.5, desc="Initiating lipsync...") job_data = lipsync_api_call(video_url, audio_url) # Check if we have a valid job ID if "id" not in job_data: print("[ERROR] No job ID in response") raise Exception("No job ID received from API") # Only treat as error if error field has actual error message if job_data.get("error") not in [None, ""]: error_msg = job_data["error"] print(f"[ERROR] API error: {error_msg}") raise Exception(error_msg) job_id = job_data["id"] print(f"[DEBUG] Job ID: {job_id}") progress(0.6, desc="Processing lipsync...") result_url = check_job_status(job_id) if result_url: progress(0.9, desc="Downloading result...") print(f"[DEBUG] Downloading from: {result_url}") response = requests.get(result_url) output_path = f"output_{session_id}.mp4" with open(output_path, "wb") as f: f.write(response.content) print(f"[DEBUG] Result saved to: {output_path}") progress(1.0, desc="Complete!") return output_path, "Lipsync completed successfully!" else: raise Exception("Lipsync processing failed or timed out") except Exception as e: print(f"[ERROR] Main process failed: {str(e)}") progress(0.8, desc="Falling back to simple combination...") try: print("[DEBUG] Attempting fallback method") video_response = requests.get(video_url) temp_video_path = f"temp_video_{session_id}.mp4" with open(temp_video_path, "wb") as f: f.write(video_response.content) audio_response = requests.get(audio_url) temp_audio_path = f"temp_audio_{session_id}.mp3" with open(temp_audio_path, "wb") as f: f.write(audio_response.content) output_path = f"output_{session_id}.mp4" combine_audio_video(temp_video_path, temp_audio_path, output_path) os.remove(temp_video_path) os.remove(temp_audio_path) progress(1.0, desc="Complete!") return output_path, f"Used fallback method. Original error: {str(e)}" except Exception as fallback_error: print(f"[ERROR] Fallback method failed: {str(fallback_error)}") return None, f"All methods failed. Error: {str(fallback_error)}" def create_interface(): css = """ #component-0 > :not(.prose) {display: none !important;} footer {display: none !important;} """ with gr.Blocks(css=css) as app: gr.Markdown("# Lipsync Video Generator") with gr.Row(): with gr.Column(): video_url_input = gr.Textbox(label="Video or Image URL") audio_url_input = gr.Textbox(label="Audio URL") generate_btn = gr.Button("Generate Video") with gr.Column(): video_output = gr.Video(label="Generated Video") status_output = gr.Textbox(label="Status", interactive=False) generate_btn.click( fn=process_video, inputs=[video_url_input, audio_url_input], outputs=[video_output, status_output] ) return app if __name__ == "__main__": print("[DEBUG] Starting application") app = create_interface() app.launch()