audio / app.py
dinhhan's picture
Update app.py
6f3ece3 verified
import os
import requests
import json
import time
import subprocess
import gradio as gr
import uuid
from dotenv import load_dotenv
from urllib.parse import urlparse
# Load environment variables
load_dotenv()
# API Keys
B_KEY = os.getenv("B_KEY")
# URLs
API_URL = "https://api.sync.so/v2/generate"
def get_media_resolution(url):
print(f"\n[DEBUG] Getting resolution for: {url}")
# Download the file to a temporary location
response = requests.get(url)
if response.status_code != 200:
print(f"[ERROR] Failed to download media. Status code: {response.status_code}")
return None
temp_path = f"temp_media_{uuid.uuid4()}"
with open(temp_path, 'wb') as f:
f.write(response.content)
# Get resolution using FFprobe
cmd = [
'ffprobe',
'-v', 'error',
'-select_streams', 'v:0',
'-show_entries', 'stream=width,height',
'-of', 'json',
temp_path
]
try:
result = subprocess.run(cmd, capture_output=True, text=True)
os.remove(temp_path) # Clean up temp file
if result.returncode == 0:
data = json.loads(result.stdout)
if 'streams' in data and data['streams']:
width = data['streams'][0].get('width')
height = data['streams'][0].get('height')
if width and height:
print(f"[DEBUG] Detected resolution: {width}x{height}")
return [width, height]
except Exception as e:
print(f"[ERROR] Failed to get resolution: {str(e)}")
if os.path.exists(temp_path):
os.remove(temp_path)
print("[DEBUG] Failed to detect resolution, using default")
return [1280, 720] # Default resolution
def lipsync_api_call(video_url, audio_url):
print(f"\n[DEBUG] Starting lipsync_api_call")
print(f"[DEBUG] Video URL: {video_url}")
print(f"[DEBUG] Audio URL: {audio_url}")
# Get the resolution of the input video/image
resolution = get_media_resolution(video_url)
headers = {
"Content-Type": "application/json",
"x-api-key": B_KEY
}
data = {
"model": "lipsync-1.8.0-beta",
"input": [
{
"type": "video",
"url": video_url
},
{
"type": "audio",
"url": audio_url
}
],
"options": {
"pads": [0, 5, 0, 0],
"speedup": 1,
"output_format": "mp4",
"sync_mode": "bounce",
"fps": 24,
"output_resolution": resolution
}
}
print(f"[DEBUG] Request payload: {json.dumps(data, indent=2)}")
try:
response = requests.post(API_URL, headers=headers, data=json.dumps(data))
print(f"[DEBUG] API Response status code: {response.status_code}")
print(f"[DEBUG] API Response: {response.text}")
return response.json()
except Exception as e:
print(f"[ERROR] API call failed: {str(e)}")
raise
def check_job_status(job_id):
print(f"\n[DEBUG] Checking job status for ID: {job_id}")
headers = {"x-api-key": B_KEY}
max_attempts = 3000
attempt = 0
while attempt < max_attempts:
try:
response = requests.get(f"{API_URL}/{job_id}", headers=headers)
print(f"[DEBUG] Status check attempt {attempt + 1}")
print(f"[DEBUG] Status response: {response.text}")
data = response.json()
status = data.get("status")
print(f"[DEBUG] Current status: {status}")
if status == "COMPLETED":
print(f"[DEBUG] Job completed. Output URL: {data.get('outputUrl')}")
return data.get("outputUrl")
elif status == "FAILED" or status == "CANCELED":
print(f"[ERROR] Job failed or was canceled. Error: {data.get('error')}")
return None
attempt += 1
time.sleep(10)
except Exception as e:
print(f"[ERROR] Status check failed: {str(e)}")
return None
print("[ERROR] Max attempts reached")
return None
def get_media_duration(file_path):
print(f"\n[DEBUG] Getting duration for: {file_path}")
cmd = ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', file_path]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
duration = float(result.stdout.strip())
print(f"[DEBUG] Media duration: {duration} seconds")
return duration
def combine_audio_video(video_path, audio_path, output_path):
print(f"\n[DEBUG] Combining audio and video")
print(f"[DEBUG] Video path: {video_path}")
print(f"[DEBUG] Audio path: {audio_path}")
print(f"[DEBUG] Output path: {output_path}")
video_duration = get_media_duration(video_path)
audio_duration = get_media_duration(audio_path)
if video_duration > audio_duration:
print("[DEBUG] Video longer than audio - trimming video")
cmd = [
'ffmpeg', '-i', video_path, '-i', audio_path,
'-t', str(audio_duration),
'-map', '0:v', '-map', '1:a',
'-c:v', 'copy', '-c:a', 'aac',
'-y', output_path
]
else:
print("[DEBUG] Audio longer than video - looping video")
loop_count = int(audio_duration // video_duration) + 1
cmd = [
'ffmpeg', '-stream_loop', str(loop_count), '-i', video_path, '-i', audio_path,
'-t', str(audio_duration),
'-map', '0:v', '-map', '1:a',
'-c:v', 'copy', '-c:a', 'aac',
'-shortest', '-y', output_path
]
print(f"[DEBUG] FFmpeg command: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True)
print(f"[DEBUG] FFmpeg stdout: {result.stdout}")
print(f"[DEBUG] FFmpeg stderr: {result.stderr}")
def is_image_url(url):
parsed = urlparse(url)
path = parsed.path.lower()
result = path.endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp', '.tiff', '.webp', '.heic', '.svg', '.ico'))
print(f"\n[DEBUG] Checking if URL is image: {url}")
print(f"[DEBUG] Result: {result}")
return result
def create_video_from_image(image_url, output_path, duration=10):
print(f"\n[DEBUG] Creating video from image")
print(f"[DEBUG] Image URL: {image_url}")
print(f"[DEBUG] Output path: {output_path}")
# Get the resolution before creating the video
resolution = get_media_resolution(image_url)
response = requests.get(image_url)
if response.status_code != 200:
print(f"[ERROR] Failed to download image. Status code: {response.status_code}")
raise Exception("Failed to download the image")
temp_image_path = f"temp_image_{uuid.uuid4()}.jpg"
print(f"[DEBUG] Temporary image path: {temp_image_path}")
with open(temp_image_path, 'wb') as f:
f.write(response.content)
cmd = [
'ffmpeg', '-loop', '1', '-i', temp_image_path,
'-c:v', 'libx264', '-t', str(duration), '-pix_fmt', 'yuv420p',
'-vf', f'scale={resolution[0]}:{resolution[1]}',
'-y', output_path
]
print(f"[DEBUG] FFmpeg command: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True)
print(f"[DEBUG] FFmpeg stdout: {result.stdout}")
print(f"[DEBUG] FFmpeg stderr: {result.stderr}")
os.remove(temp_image_path)
print(f"[DEBUG] Temporary image removed")
return output_path
def upload_file(file_path):
print(f"\n[DEBUG] Uploading file: {file_path}")
with open(file_path, 'rb') as file:
files = {'fileToUpload': (os.path.basename(file_path), file)}
data = {'reqtype': 'fileupload'}
try:
response = requests.post(UPLOAD_URL, files=files, data=data)
print(f"[DEBUG] Upload response status code: {response.status_code}")
print(f"[DEBUG] Upload response: {response.text}")
if response.status_code == 200:
return response.text.strip()
return None
except Exception as e:
print(f"[ERROR] File upload failed: {str(e)}")
return None
def process_video(video_url, audio_url, progress=gr.Progress()):
print(f"\n[DEBUG] Starting video processing")
print(f"[DEBUG] Video URL: {video_url}")
print(f"[DEBUG] Audio URL: {audio_url}")
if not audio_url:
print("[ERROR] No audio URL provided")
return None, "No audio URL provided"
if not video_url:
print("[ERROR] No video URL provided")
return None, "No video URL provided"
session_id = str(uuid.uuid4())
print(f"[DEBUG] Session ID: {session_id}")
progress(0.2, desc="Processing media...")
try:
if is_image_url(video_url):
progress(0.3, desc="Converting image to video...")
video_path = f"temp_video_{session_id}.mp4"
create_video_from_image(video_url, video_path)
progress(0.4, desc="Uploading converted video...")
video_url = upload_file(video_path)
if not video_url:
raise Exception("Failed to upload converted video")
os.remove(video_path)
progress(0.5, desc="Initiating lipsync...")
job_data = lipsync_api_call(video_url, audio_url)
# Check if we have a valid job ID
if "id" not in job_data:
print("[ERROR] No job ID in response")
raise Exception("No job ID received from API")
# Only treat as error if error field has actual error message
if job_data.get("error") not in [None, ""]:
error_msg = job_data["error"]
print(f"[ERROR] API error: {error_msg}")
raise Exception(error_msg)
job_id = job_data["id"]
print(f"[DEBUG] Job ID: {job_id}")
progress(0.6, desc="Processing lipsync...")
result_url = check_job_status(job_id)
if result_url:
progress(0.9, desc="Downloading result...")
print(f"[DEBUG] Downloading from: {result_url}")
response = requests.get(result_url)
output_path = f"output_{session_id}.mp4"
with open(output_path, "wb") as f:
f.write(response.content)
print(f"[DEBUG] Result saved to: {output_path}")
progress(1.0, desc="Complete!")
return output_path, "Lipsync completed successfully!"
else:
raise Exception("Lipsync processing failed or timed out")
except Exception as e:
print(f"[ERROR] Main process failed: {str(e)}")
progress(0.8, desc="Falling back to simple combination...")
try:
print("[DEBUG] Attempting fallback method")
video_response = requests.get(video_url)
temp_video_path = f"temp_video_{session_id}.mp4"
with open(temp_video_path, "wb") as f:
f.write(video_response.content)
audio_response = requests.get(audio_url)
temp_audio_path = f"temp_audio_{session_id}.mp3"
with open(temp_audio_path, "wb") as f:
f.write(audio_response.content)
output_path = f"output_{session_id}.mp4"
combine_audio_video(temp_video_path, temp_audio_path, output_path)
os.remove(temp_video_path)
os.remove(temp_audio_path)
progress(1.0, desc="Complete!")
return output_path, f"Used fallback method. Original error: {str(e)}"
except Exception as fallback_error:
print(f"[ERROR] Fallback method failed: {str(fallback_error)}")
return None, f"All methods failed. Error: {str(fallback_error)}"
def create_interface():
css = """
#component-0 > :not(.prose) {display: none !important;}
footer {display: none !important;}
"""
with gr.Blocks(css=css) as app:
gr.Markdown("# Lipsync Video Generator")
with gr.Row():
with gr.Column():
video_url_input = gr.Textbox(label="Video or Image URL")
audio_url_input = gr.Textbox(label="Audio URL")
generate_btn = gr.Button("Generate Video")
with gr.Column():
video_output = gr.Video(label="Generated Video")
status_output = gr.Textbox(label="Status", interactive=False)
generate_btn.click(
fn=process_video,
inputs=[video_url_input, audio_url_input],
outputs=[video_output, status_output]
)
return app
if __name__ == "__main__":
print("[DEBUG] Starting application")
app = create_interface()
app.launch()