jsonop / app.py
sheikhed's picture
Update app.py
f69554c verified
raw
history blame
11.7 kB
import requests
import json
import time
import subprocess
import gradio as gr
import uuid
import os
import logging
from dotenv import load_dotenv
# Set up logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
# API Keys
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
REPLICATE_API_TOKEN = os.getenv("REPLICATE_API_TOKEN")
# URLs
REPLICATE_API_URL = "https://api.replicate.com/v1/predictions"
UPLOAD_URL = os.getenv("UPLOAD_URL")
def get_voices():
# OpenAI TTS voices
return [
("alloy", "alloy"),
("echo", "echo"),
("fable", "fable"),
("onyx", "onyx"),
("nova", "nova"),
("shimmer", "shimmer")
]
def text_to_speech(voice, text, session_id):
logger.info(f"Starting text-to-speech conversion for session {session_id}")
url = "https://api.openai.com/v1/audio/speech"
headers = {
"Authorization": f"Bearer {OPENAI_API_KEY}",
"Content-Type": "application/json"
}
data = {
"model": "tts-1",
"input": text,
"voice": voice
}
logger.debug(f"Sending request to OpenAI TTS API for session {session_id}")
response = requests.post(url, json=data, headers=headers)
if response.status_code != 200:
logger.error(f"Failed to generate speech audio for session {session_id}. Status code: {response.status_code}")
return None
# Save temporary audio file with session ID
audio_file_path = f'tempvoice{session_id}.mp3'
with open(audio_file_path, 'wb') as audio_file:
audio_file.write(response.content)
logger.info(f"Audio file saved: {audio_file_path}")
return audio_file_path
def upload_file(file_path):
logger.info(f"Uploading file: {file_path}")
with open(file_path, 'rb') as file:
files = {'fileToUpload': (os.path.basename(file_path), file)}
data = {'reqtype': 'fileupload'}
response = requests.post(UPLOAD_URL, files=files, data=data)
if response.status_code == 200:
logger.info(f"File uploaded successfully: {file_path}")
return response.text.strip()
logger.error(f"Failed to upload file: {file_path}. Status code: {response.status_code}")
return None
def lipsync_api_call(video_url, audio_url):
logger.info(f"Initiating lip-sync API call with video: {video_url} and audio: {audio_url}")
headers = {
"Authorization": f"Bearer {REPLICATE_API_TOKEN}",
"Content-Type": "application/json",
"Prefer": "wait"
}
data = {
"version": "db5a650c807b007dc5f9e5abe27c53e1b62880d1f94d218d27ce7fa802711d67",
"input": {
"face": video_url,
"input_audio": audio_url
}
}
logger.debug(f"Sending request to Replicate API with data: {json.dumps(data)}")
response = requests.post(REPLICATE_API_URL, headers=headers, json=data)
logger.debug(f"Received response from Replicate API: {response.text}")
return response.json()
def check_job_status(prediction_id):
logger.info(f"Checking job status for prediction ID: {prediction_id}")
headers = {"Authorization": f"Bearer {REPLICATE_API_TOKEN}"}
max_attempts = 30 # Limit the number of attempts
for attempt in range(max_attempts):
logger.debug(f"Attempt {attempt + 1} to check job status")
response = requests.get(f"{REPLICATE_API_URL}/{prediction_id}", headers=headers)
data = response.json()
logger.debug(f"Job status response: {json.dumps(data)}")
if data["status"] == "succeeded":
logger.info(f"Job completed successfully for prediction ID: {prediction_id}")
return data["output"]
elif data["status"] == "failed":
logger.error(f"Job failed for prediction ID: {prediction_id}")
return None
logger.info(f"Job still in progress. Waiting for 10 seconds before next check.")
time.sleep(10)
logger.warning(f"Max attempts reached for prediction ID: {prediction_id}")
return None
def get_media_duration(file_path):
logger.info(f"Getting media duration for: {file_path}")
cmd = ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', file_path]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
duration = float(result.stdout.strip())
logger.info(f"Media duration: {duration} seconds")
return duration
def combine_audio_video(video_path, audio_path, output_path):
logger.info(f"Combining audio and video: video={video_path}, audio={audio_path}, output={output_path}")
video_duration = get_media_duration(video_path)
audio_duration = get_media_duration(audio_path)
if video_duration > audio_duration:
logger.info("Video longer than audio. Trimming video.")
cmd = [
'ffmpeg', '-i', video_path, '-i', audio_path,
'-t', str(audio_duration), # Trim video to audio duration
'-map', '0:v', '-map', '1:a',
'-c:v', 'copy', '-c:a', 'aac',
'-y', output_path
]
else:
logger.info("Audio longer than video. Looping video.")
loop_count = int(audio_duration // video_duration) + 1
cmd = [
'ffmpeg', '-stream_loop', str(loop_count), '-i', video_path, '-i', audio_path,
'-t', str(audio_duration),
'-map', '0:v', '-map', '1:a',
'-c:v', 'copy', '-c:a', 'aac',
'-shortest', '-y', output_path
]
logger.debug(f"Running ffmpeg command: {' '.join(cmd)}")
subprocess.run(cmd, check=True)
logger.info(f"Audio and video combined successfully: {output_path}")
def create_video_from_image(image_url, session_id):
logger.info(f"Creating video from image: {image_url}")
response = requests.get(image_url)
image_path = f"tempimage{session_id}.jpg"
with open(image_path, "wb") as f:
f.write(response.content)
logger.info(f"Image downloaded: {image_path}")
video_path = f"tempvideo{session_id}.mp4"
cmd = [
'ffmpeg', '-loop', '1', '-i', image_path,
'-vf', 'scale=trunc(iw/2)*2:trunc(ih/2)*2',
'-c:v', 'libx264', '-t', '10', '-pix_fmt', 'yuv420p',
video_path
]
logger.debug(f"Running ffmpeg command: {' '.join(cmd)}")
subprocess.run(cmd, check=True)
logger.info(f"Video created from image: {video_path}")
os.remove(image_path)
logger.info(f"Temporary image file removed: {image_path}")
return video_path
def process_video(voice, url, text, progress=gr.Progress()):
session_id = str(uuid.uuid4())
logger.info(f"Starting video processing for session {session_id}")
progress(0, desc="Generating speech...")
audio_path = text_to_speech(voice, text, session_id)
if not audio_path:
logger.error(f"Failed to generate speech audio for session {session_id}")
return None, "Failed to generate speech audio."
progress(0.2, desc="Processing media...")
try:
logger.info(f"Checking content type of URL: {url}")
response = requests.head(url)
content_type = response.headers.get('Content-Type', '')
logger.info(f"Content type of URL: {content_type}")
if content_type.startswith('image'):
progress(0.3, desc="Converting image to video...")
video_path = create_video_from_image(url, session_id)
video_url = upload_file(video_path)
else:
video_url = url
logger.info(f"Video URL: {video_url}")
progress(0.4, desc="Uploading audio...")
audio_url = upload_file(audio_path)
logger.info(f"Audio URL: {audio_url}")
if not audio_url or not video_url:
raise Exception("Failed to upload audio or video file")
progress(0.5, desc="Initiating lipsync...")
job_data = lipsync_api_call(video_url, audio_url)
logger.info(f"Lipsync job data: {json.dumps(job_data)}")
if "error" in job_data:
raise Exception(job_data.get("error", "Unknown error"))
prediction_id = job_data["id"]
logger.info(f"Lipsync prediction ID: {prediction_id}")
progress(0.6, desc="Processing lipsync...")
result_url = check_job_status(prediction_id)
if result_url:
logger.info(f"Lipsync result URL: {result_url}")
progress(0.9, desc="Downloading result...")
response = requests.get(result_url)
output_path = f"output{session_id}.mp4"
with open(output_path, 'wb') as f:
f.write(response.content)
logger.info(f"Lipsync result saved to: {output_path}")
progress(1.0, desc="Complete!")
return output_path, "Lipsync completed successfully!"
else:
raise Exception("Lipsync processing failed or timed out")
except Exception as e:
logger.error(f"Error during lipsync process: {str(e)}")
progress(0.8, desc="Falling back to simple combination...")
try:
if 'video_path' not in locals():
logger.info("Downloading video from URL")
video_response = requests.get(video_url)
video_path = f"tempvideo{session_id}.mp4"
with open(video_path, 'wb') as f:
f.write(video_response.content)
output_path = f"output{session_id}.mp4"
combine_audio_video(video_path, audio_path, output_path)
progress(1.0, desc="Complete!")
return output_path, f"Used fallback method. Original error: {str(e)}"
except Exception as fallback_error:
logger.error(f"Fallback method failed: {str(fallback_error)}")
return None, f"All methods failed. Error: {str(fallback_error)}"
finally:
# Cleanup
if os.path.exists(audio_path):
os.remove(audio_path)
logger.info(f"Removed temporary audio file: {audio_path}")
if os.path.exists(f"tempvideo{session_id}.mp4"):
os.remove(f"tempvideo{session_id}.mp4")
logger.info(f"Removed temporary video file: tempvideo{session_id}.mp4")
def create_interface():
voices = get_voices()
with gr.Blocks() as app:
gr.Markdown("# Generator")
with gr.Row():
with gr.Column():
voice_dropdown = gr.Dropdown(choices=[v[0] for v in voices], label="Select Voice", value=voices[0][0] if voices else None)
url_input = gr.Textbox(label="Enter Video or Image URL")
text_input = gr.Textbox(label="Enter text", lines=3)
generate_btn = gr.Button("Generate Video")
with gr.Column():
video_output = gr.Video(label="Generated Video")
status_output = gr.Textbox(label="Status", interactive=False)
def on_generate(voice_name, url, text):
logger.info(f"Generation started with voice: {voice_name}, URL: {url}")
voice_id = next((v[1] for v in voices if v[0] == voice_name), None)
if not voice_id:
logger.error(f"Invalid voice selected: {voice_name}")
return None, "Invalid voice selected."
return process_video(voice_id, url, text)
generate_btn.click(
fn=on_generate,
inputs=[voice_dropdown, url_input, text_input],
outputs=[video_output, status_output]
)
return app
if __name__ == "__main__":
logger.info("Starting the application")
app = create_interface()
app.launch()