Spaces:
Sleeping
Sleeping
import os | |
import requests | |
import json | |
import time | |
import subprocess | |
import gradio as gr | |
import uuid | |
from dotenv import load_dotenv | |
# Load environment variables | |
load_dotenv() | |
# API Keys | |
A_KEY = os.getenv("A_KEY") | |
B_KEY = os.getenv("B_KEY") | |
# URLs | |
API_URL = os.getenv("API_URL") | |
UPLOAD_URL = os.getenv("UPLOAD_URL") | |
def get_voices(): | |
url = "https://api.elevenlabs.io/v1/voices" | |
headers = { | |
"Accept": "application/json", | |
"xi-api-key": A_KEY | |
} | |
response = requests.get(url, headers=headers) | |
if response.status_code != 200: | |
return [] | |
return [(voice['name'], voice['voice_id']) for voice in response.json().get('voices', [])] | |
def get_video_models(): | |
return [f for f in os.listdir("models") if f.endswith((".mp4", ".avi", ".mov"))] | |
def text_to_speech(voice_id, text, session_id): | |
url = f"https://api.elevenlabs.io/v1/text-to-speech/{voice_id}" | |
headers = { | |
"Accept": "audio/mpeg", | |
"Content-Type": "application/json", | |
"xi-api-key": A_KEY | |
} | |
data = { | |
"text": text, | |
"model_id": "eleven_turbo_v2_5", | |
"voice_settings": { | |
"stability": 0.5, | |
"similarity_boost": 0.5 | |
} | |
} | |
response = requests.post(url, json=data, headers=headers) | |
if response.status_code != 200: | |
return None | |
# Save temporary audio file with session ID | |
audio_file_path = f'temp_voice_{session_id}.mp3' | |
with open(audio_file_path, 'wb') as audio_file: | |
audio_file.write(response.content) | |
return audio_file_path | |
def upload_file(file_path): | |
with open(file_path, 'rb') as file: | |
files = {'fileToUpload': (os.path.basename(file_path), file)} | |
data = {'reqtype': 'fileupload'} | |
response = requests.post(UPLOAD_URL, files=files, data=data) | |
if response.status_code == 200: | |
return response.text.strip() | |
return None | |
def lipsync_api_call(video_url, audio_url): | |
headers = { | |
"Content-Type": "application/json", | |
"x-api-key": B_KEY | |
} | |
data = { | |
"audioUrl": audio_url, | |
"videoUrl": video_url, | |
"maxCredits": 1000, | |
"model": "sync-1.6.0", | |
"synergize": True, | |
"pads": [0, 5, 0, 0], | |
"synergizerStrength": 1 | |
} | |
response = requests.post(API_URL, headers=headers, data=json.dumps(data)) | |
return response.json() | |
def check_job_status(job_id): | |
headers = {"x-api-key": B_KEY} | |
max_attempts = 30 # Limit the number of attempts | |
for _ in range(max_attempts): | |
response = requests.get(f"{API_URL}/{job_id}", headers=headers) | |
data = response.json() | |
if data["status"] == "COMPLETED": | |
return data["videoUrl"] | |
elif data["status"] == "FAILED": | |
return None | |
time.sleep(10) | |
return None | |
def get_media_duration(file_path): | |
# Fetch media duration using ffprobe | |
cmd = ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', file_path] | |
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
return float(result.stdout.strip()) | |
def combine_audio_video(video_path, audio_path, output_path): | |
# Get durations of both video and audio | |
video_duration = get_media_duration(video_path) | |
audio_duration = get_media_duration(audio_path) | |
if video_duration > audio_duration: | |
# Trim video to match the audio length | |
cmd = [ | |
'ffmpeg', '-i', video_path, '-i', audio_path, | |
'-t', str(audio_duration), # Trim video to audio duration | |
'-map', '0:v', '-map', '1:a', | |
'-c:v', 'copy', '-c:a', 'aac', | |
'-y', output_path | |
] | |
else: | |
# Loop video if it's shorter than audio | |
loop_count = int(audio_duration // video_duration) + 1 # Calculate how many times to loop | |
cmd = [ | |
'ffmpeg', '-stream_loop', str(loop_count), '-i', video_path, '-i', audio_path, | |
'-t', str(audio_duration), # Match the duration of the final video with the audio | |
'-map', '0:v', '-map', '1:a', | |
'-c:v', 'copy', '-c:a', 'aac', | |
'-shortest', '-y', output_path | |
] | |
subprocess.run(cmd, check=True) | |
def process_video(voice, model, text, progress=gr.Progress()): | |
session_id = str(uuid.uuid4()) # Generate a unique session ID | |
progress(0, desc="Generating speech...") | |
audio_path = text_to_speech(voice, text, session_id) | |
if not audio_path: | |
return None, "Failed to generate speech audio." | |
progress(0.2, desc="Processing video...") | |
video_path = os.path.join("models", model) | |
try: | |
progress(0.3, desc="Uploading files...") | |
video_url = upload_file(video_path) | |
audio_url = upload_file(audio_path) | |
if not video_url or not audio_url: | |
raise Exception("Failed to upload files") | |
progress(0.4, desc="Initiating lipsync...") | |
job_data = lipsync_api_call(video_url, audio_url) | |
if "error" in job_data or "message" in job_data: | |
raise Exception(job_data.get("error", job_data.get("message", "Unknown error"))) | |
job_id = job_data["id"] | |
progress(0.5, desc="Processing lipsync...") | |
result_url = check_job_status(job_id) | |
if result_url: | |
progress(0.9, desc="Downloading result...") | |
response = requests.get(result_url) | |
output_path = f"output_{session_id}.mp4" | |
with open(output_path, "wb") as f: | |
f.write(response.content) | |
progress(1.0, desc="Complete!") | |
return output_path, "Lipsync completed successfully!" | |
else: | |
raise Exception("Lipsync processing failed or timed out") | |
except Exception as e: | |
progress(0.8, desc="Falling back to simple combination...") | |
try: | |
output_path = f"output_{session_id}.mp4" | |
combine_audio_video(video_path, audio_path, output_path) | |
progress(1.0, desc="Complete!") | |
return output_path, f"Used fallback method. Original error: {str(e)}" | |
except Exception as fallback_error: | |
return None, f"All methods failed. Error: {str(fallback_error)}" | |
finally: | |
# Cleanup | |
if os.path.exists(audio_path): | |
os.remove(audio_path) | |
def create_interface(): | |
voices = get_voices() | |
models = get_video_models() | |
with gr.Blocks() as app: | |
gr.Markdown("# JSON Train") | |
with gr.Row(): | |
with gr.Column(): | |
voice_dropdown = gr.Dropdown(choices=[v[0] for v in voices], label="Select", value=voices[0][0] if voices else None) | |
model_dropdown = gr.Dropdown(choices=models, label="Select", value=models[0] if models else None) | |
text_input = gr.Textbox(label="Enter text", lines=3) | |
generate_btn = gr.Button("Generate Video") | |
with gr.Column(): | |
video_output = gr.Video(label="Generated Video") | |
status_output = gr.Textbox(label="Status", interactive=False) | |
def on_generate(voice_name, model_name, text): | |
voice_id = next((v[1] for v in voices if v[0] == voice_name), None) | |
if not voice_id: | |
return None, "Invalid voice selected." | |
return process_video(voice_id, model_name, text) | |
generate_btn.click( | |
fn=on_generate, | |
inputs=[voice_dropdown, model_dropdown, text_input], | |
outputs=[video_output, status_output] | |
) | |
return app | |
if __name__ == "__main__": | |
app = create_interface() | |
app.launch() | |