Update app.py
Browse files
app.py
CHANGED
@@ -1,12 +1,11 @@
|
|
1 |
-
import os
|
2 |
import requests
|
3 |
import json
|
4 |
import time
|
5 |
import subprocess
|
6 |
import gradio as gr
|
7 |
import uuid
|
|
|
8 |
from dotenv import load_dotenv
|
9 |
-
from urllib.parse import urlparse
|
10 |
|
11 |
# Load environment variables
|
12 |
load_dotenv()
|
@@ -32,24 +31,24 @@ def get_voices():
|
|
32 |
|
33 |
def text_to_speech(voice, text, session_id):
|
34 |
url = "https://api.openai.com/v1/audio/speech"
|
35 |
-
|
36 |
headers = {
|
37 |
"Authorization": f"Bearer {OPENAI_API_KEY}",
|
38 |
"Content-Type": "application/json"
|
39 |
}
|
40 |
-
|
41 |
data = {
|
42 |
"model": "tts-1",
|
43 |
"input": text,
|
44 |
"voice": voice
|
45 |
}
|
46 |
-
|
47 |
response = requests.post(url, json=data, headers=headers)
|
48 |
if response.status_code != 200:
|
49 |
return None
|
50 |
-
|
51 |
# Save temporary audio file with session ID
|
52 |
-
audio_file_path = f'
|
53 |
with open(audio_file_path, 'wb') as audio_file:
|
54 |
audio_file.write(response.content)
|
55 |
return audio_file_path
|
@@ -59,7 +58,7 @@ def upload_file(file_path):
|
|
59 |
files = {'fileToUpload': (os.path.basename(file_path), file)}
|
60 |
data = {'reqtype': 'fileupload'}
|
61 |
response = requests.post(UPLOAD_URL, files=files, data=data)
|
62 |
-
|
63 |
if response.status_code == 200:
|
64 |
return response.text.strip()
|
65 |
return None
|
@@ -69,7 +68,7 @@ def lipsync_api_call(video_url, audio_url):
|
|
69 |
"Content-Type": "application/json",
|
70 |
"x-api-key": B_KEY
|
71 |
}
|
72 |
-
|
73 |
data = {
|
74 |
"audioUrl": audio_url,
|
75 |
"videoUrl": video_url,
|
@@ -79,23 +78,23 @@ def lipsync_api_call(video_url, audio_url):
|
|
79 |
"pads": [0, 5, 0, 0],
|
80 |
"synergizerStrength": 1
|
81 |
}
|
82 |
-
|
83 |
response = requests.post(API_URL, headers=headers, data=json.dumps(data))
|
84 |
return response.json()
|
85 |
|
86 |
def check_job_status(job_id):
|
87 |
headers = {"x-api-key": B_KEY}
|
88 |
max_attempts = 30 # Limit the number of attempts
|
89 |
-
|
90 |
for _ in range(max_attempts):
|
91 |
response = requests.get(f"{API_URL}/{job_id}", headers=headers)
|
92 |
data = response.json()
|
93 |
-
|
94 |
if data["status"] == "COMPLETED":
|
95 |
return data["videoUrl"]
|
96 |
elif data["status"] == "FAILED":
|
97 |
return None
|
98 |
-
|
99 |
time.sleep(10)
|
100 |
return None
|
101 |
|
@@ -132,95 +131,88 @@ def combine_audio_video(video_path, audio_path, output_path):
|
|
132 |
|
133 |
subprocess.run(cmd, check=True)
|
134 |
|
135 |
-
def
|
136 |
-
parsed = urlparse(url)
|
137 |
-
path = parsed.path.lower()
|
138 |
-
return path.endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp'))
|
139 |
-
|
140 |
-
def create_video_from_image(image_url, output_path, duration=10):
|
141 |
# Download the image
|
142 |
response = requests.get(image_url)
|
143 |
-
|
144 |
-
|
145 |
-
|
146 |
-
temp_image_path = f"temp_image_{uuid.uuid4()}.jpg"
|
147 |
-
with open(temp_image_path, 'wb') as f:
|
148 |
f.write(response.content)
|
149 |
|
150 |
# Create a 10-second video from the image
|
|
|
151 |
cmd = [
|
152 |
-
'ffmpeg', '-loop', '1', '-i',
|
153 |
-
'-c:v', 'libx264', '-t',
|
154 |
-
'-vf', 'scale=
|
155 |
-
|
156 |
]
|
157 |
subprocess.run(cmd, check=True)
|
158 |
|
159 |
# Clean up the temporary image file
|
160 |
-
os.remove(
|
161 |
|
162 |
-
return
|
163 |
|
164 |
-
def process_video(voice,
|
165 |
-
session_id = str(uuid.uuid4())
|
166 |
progress(0, desc="Generating speech...")
|
167 |
audio_path = text_to_speech(voice, text, session_id)
|
168 |
if not audio_path:
|
169 |
return None, "Failed to generate speech audio."
|
170 |
-
|
171 |
progress(0.2, desc="Processing media...")
|
172 |
-
|
173 |
try:
|
174 |
-
if
|
|
|
|
|
|
|
|
|
175 |
progress(0.3, desc="Converting image to video...")
|
176 |
-
video_path =
|
177 |
-
create_video_from_image(media_url, video_path)
|
178 |
-
progress(0.4, desc="Uploading converted video...")
|
179 |
video_url = upload_file(video_path)
|
180 |
-
if not video_url:
|
181 |
-
raise Exception("Failed to upload converted video")
|
182 |
else:
|
183 |
-
video_url =
|
184 |
-
|
185 |
-
progress(0.
|
186 |
audio_url = upload_file(audio_path)
|
187 |
-
|
188 |
-
if not audio_url:
|
189 |
-
raise Exception("Failed to upload audio file")
|
190 |
-
|
191 |
-
progress(0.
|
192 |
job_data = lipsync_api_call(video_url, audio_url)
|
193 |
-
|
194 |
if "error" in job_data or "message" in job_data:
|
195 |
raise Exception(job_data.get("error", job_data.get("message", "Unknown error")))
|
196 |
-
|
197 |
job_id = job_data["id"]
|
198 |
-
|
199 |
-
progress(0.
|
200 |
result_url = check_job_status(job_id)
|
201 |
-
|
202 |
if result_url:
|
203 |
progress(0.9, desc="Downloading result...")
|
204 |
response = requests.get(result_url)
|
205 |
-
output_path = f"
|
206 |
with open(output_path, "wb") as f:
|
207 |
f.write(response.content)
|
208 |
progress(1.0, desc="Complete!")
|
209 |
return output_path, "Lipsync completed successfully!"
|
210 |
else:
|
211 |
raise Exception("Lipsync processing failed or timed out")
|
212 |
-
|
213 |
except Exception as e:
|
214 |
progress(0.8, desc="Falling back to simple combination...")
|
215 |
try:
|
216 |
if 'video_path' not in locals():
|
217 |
# Download the video from the URL if it wasn't created from an image
|
218 |
video_response = requests.get(video_url)
|
219 |
-
video_path = f"
|
220 |
with open(video_path, "wb") as f:
|
221 |
f.write(video_response.content)
|
222 |
-
|
223 |
-
output_path = f"
|
224 |
combine_audio_video(video_path, audio_path, output_path)
|
225 |
progress(1.0, desc="Complete!")
|
226 |
return output_path, f"Used fallback method. Original error: {str(e)}"
|
@@ -230,36 +222,36 @@ def process_video(voice, media_url, text, progress=gr.Progress()):
|
|
230 |
# Cleanup
|
231 |
if os.path.exists(audio_path):
|
232 |
os.remove(audio_path)
|
233 |
-
if os.path.exists(f"
|
234 |
-
os.remove(f"
|
235 |
|
236 |
def create_interface():
|
237 |
voices = get_voices()
|
238 |
-
|
239 |
with gr.Blocks() as app:
|
240 |
gr.Markdown("# Lipsync Video Generator")
|
241 |
with gr.Row():
|
242 |
with gr.Column():
|
243 |
voice_dropdown = gr.Dropdown(choices=[v[0] for v in voices], label="Select Voice", value=voices[0][0] if voices else None)
|
244 |
-
|
245 |
text_input = gr.Textbox(label="Enter text", lines=3)
|
246 |
generate_btn = gr.Button("Generate Video")
|
247 |
with gr.Column():
|
248 |
video_output = gr.Video(label="Generated Video")
|
249 |
status_output = gr.Textbox(label="Status", interactive=False)
|
250 |
-
|
251 |
-
def on_generate(voice_name,
|
252 |
voice_id = next((v[1] for v in voices if v[0] == voice_name), None)
|
253 |
if not voice_id:
|
254 |
return None, "Invalid voice selected."
|
255 |
-
return process_video(voice_id,
|
256 |
-
|
257 |
generate_btn.click(
|
258 |
fn=on_generate,
|
259 |
-
inputs=[voice_dropdown,
|
260 |
outputs=[video_output, status_output]
|
261 |
)
|
262 |
-
|
263 |
return app
|
264 |
|
265 |
if __name__ == "__main__":
|
|
|
|
|
1 |
import requests
|
2 |
import json
|
3 |
import time
|
4 |
import subprocess
|
5 |
import gradio as gr
|
6 |
import uuid
|
7 |
+
import os
|
8 |
from dotenv import load_dotenv
|
|
|
9 |
|
10 |
# Load environment variables
|
11 |
load_dotenv()
|
|
|
31 |
|
32 |
def text_to_speech(voice, text, session_id):
|
33 |
url = "https://api.openai.com/v1/audio/speech"
|
34 |
+
|
35 |
headers = {
|
36 |
"Authorization": f"Bearer {OPENAI_API_KEY}",
|
37 |
"Content-Type": "application/json"
|
38 |
}
|
39 |
+
|
40 |
data = {
|
41 |
"model": "tts-1",
|
42 |
"input": text,
|
43 |
"voice": voice
|
44 |
}
|
45 |
+
|
46 |
response = requests.post(url, json=data, headers=headers)
|
47 |
if response.status_code != 200:
|
48 |
return None
|
49 |
+
|
50 |
# Save temporary audio file with session ID
|
51 |
+
audio_file_path = f'tempvoice{session_id}.mp3'
|
52 |
with open(audio_file_path, 'wb') as audio_file:
|
53 |
audio_file.write(response.content)
|
54 |
return audio_file_path
|
|
|
58 |
files = {'fileToUpload': (os.path.basename(file_path), file)}
|
59 |
data = {'reqtype': 'fileupload'}
|
60 |
response = requests.post(UPLOAD_URL, files=files, data=data)
|
61 |
+
|
62 |
if response.status_code == 200:
|
63 |
return response.text.strip()
|
64 |
return None
|
|
|
68 |
"Content-Type": "application/json",
|
69 |
"x-api-key": B_KEY
|
70 |
}
|
71 |
+
|
72 |
data = {
|
73 |
"audioUrl": audio_url,
|
74 |
"videoUrl": video_url,
|
|
|
78 |
"pads": [0, 5, 0, 0],
|
79 |
"synergizerStrength": 1
|
80 |
}
|
81 |
+
|
82 |
response = requests.post(API_URL, headers=headers, data=json.dumps(data))
|
83 |
return response.json()
|
84 |
|
85 |
def check_job_status(job_id):
|
86 |
headers = {"x-api-key": B_KEY}
|
87 |
max_attempts = 30 # Limit the number of attempts
|
88 |
+
|
89 |
for _ in range(max_attempts):
|
90 |
response = requests.get(f"{API_URL}/{job_id}", headers=headers)
|
91 |
data = response.json()
|
92 |
+
|
93 |
if data["status"] == "COMPLETED":
|
94 |
return data["videoUrl"]
|
95 |
elif data["status"] == "FAILED":
|
96 |
return None
|
97 |
+
|
98 |
time.sleep(10)
|
99 |
return None
|
100 |
|
|
|
131 |
|
132 |
subprocess.run(cmd, check=True)
|
133 |
|
134 |
+
def create_video_from_image(image_url, session_id):
|
|
|
|
|
|
|
|
|
|
|
135 |
# Download the image
|
136 |
response = requests.get(image_url)
|
137 |
+
image_path = f"tempimage{session_id}.jpg"
|
138 |
+
with open(image_path, "wb") as f:
|
|
|
|
|
|
|
139 |
f.write(response.content)
|
140 |
|
141 |
# Create a 10-second video from the image
|
142 |
+
video_path = f"tempvideo{session_id}.mp4"
|
143 |
cmd = [
|
144 |
+
'ffmpeg', '-loop', '1', '-i', image_path,
|
145 |
+
'-c:v', 'libx264', '-t', '10', '-pix_fmt', 'yuv420p',
|
146 |
+
'-vf', 'scale=1280:720', # Adjust resolution as needed
|
147 |
+
video_path
|
148 |
]
|
149 |
subprocess.run(cmd, check=True)
|
150 |
|
151 |
# Clean up the temporary image file
|
152 |
+
os.remove(image_path)
|
153 |
|
154 |
+
return video_path
|
155 |
|
156 |
+
def process_video(voice, url, text, progress=gr.Progress()):
|
157 |
+
session_id = str(uuid.uuid4()) # Generate a unique session ID
|
158 |
progress(0, desc="Generating speech...")
|
159 |
audio_path = text_to_speech(voice, text, session_id)
|
160 |
if not audio_path:
|
161 |
return None, "Failed to generate speech audio."
|
162 |
+
|
163 |
progress(0.2, desc="Processing media...")
|
164 |
+
|
165 |
try:
|
166 |
+
# Check if the URL is an image
|
167 |
+
response = requests.head(url)
|
168 |
+
content_type = response.headers.get('Content-Type', '')
|
169 |
+
|
170 |
+
if content_type.startswith('image'):
|
171 |
progress(0.3, desc="Converting image to video...")
|
172 |
+
video_path = create_video_from_image(url, session_id)
|
|
|
|
|
173 |
video_url = upload_file(video_path)
|
|
|
|
|
174 |
else:
|
175 |
+
video_url = url
|
176 |
+
|
177 |
+
progress(0.4, desc="Uploading audio...")
|
178 |
audio_url = upload_file(audio_path)
|
179 |
+
|
180 |
+
if not audio_url or not video_url:
|
181 |
+
raise Exception("Failed to upload audio or video file")
|
182 |
+
|
183 |
+
progress(0.5, desc="Initiating lipsync...")
|
184 |
job_data = lipsync_api_call(video_url, audio_url)
|
185 |
+
|
186 |
if "error" in job_data or "message" in job_data:
|
187 |
raise Exception(job_data.get("error", job_data.get("message", "Unknown error")))
|
188 |
+
|
189 |
job_id = job_data["id"]
|
190 |
+
|
191 |
+
progress(0.6, desc="Processing lipsync...")
|
192 |
result_url = check_job_status(job_id)
|
193 |
+
|
194 |
if result_url:
|
195 |
progress(0.9, desc="Downloading result...")
|
196 |
response = requests.get(result_url)
|
197 |
+
output_path = f"output{session_id}.mp4"
|
198 |
with open(output_path, "wb") as f:
|
199 |
f.write(response.content)
|
200 |
progress(1.0, desc="Complete!")
|
201 |
return output_path, "Lipsync completed successfully!"
|
202 |
else:
|
203 |
raise Exception("Lipsync processing failed or timed out")
|
204 |
+
|
205 |
except Exception as e:
|
206 |
progress(0.8, desc="Falling back to simple combination...")
|
207 |
try:
|
208 |
if 'video_path' not in locals():
|
209 |
# Download the video from the URL if it wasn't created from an image
|
210 |
video_response = requests.get(video_url)
|
211 |
+
video_path = f"tempvideo{session_id}.mp4"
|
212 |
with open(video_path, "wb") as f:
|
213 |
f.write(video_response.content)
|
214 |
+
|
215 |
+
output_path = f"output{session_id}.mp4"
|
216 |
combine_audio_video(video_path, audio_path, output_path)
|
217 |
progress(1.0, desc="Complete!")
|
218 |
return output_path, f"Used fallback method. Original error: {str(e)}"
|
|
|
222 |
# Cleanup
|
223 |
if os.path.exists(audio_path):
|
224 |
os.remove(audio_path)
|
225 |
+
if os.path.exists(f"tempvideo{session_id}.mp4"):
|
226 |
+
os.remove(f"tempvideo{session_id}.mp4")
|
227 |
|
228 |
def create_interface():
|
229 |
voices = get_voices()
|
230 |
+
|
231 |
with gr.Blocks() as app:
|
232 |
gr.Markdown("# Lipsync Video Generator")
|
233 |
with gr.Row():
|
234 |
with gr.Column():
|
235 |
voice_dropdown = gr.Dropdown(choices=[v[0] for v in voices], label="Select Voice", value=voices[0][0] if voices else None)
|
236 |
+
url_input = gr.Textbox(label="Enter Video or Image URL")
|
237 |
text_input = gr.Textbox(label="Enter text", lines=3)
|
238 |
generate_btn = gr.Button("Generate Video")
|
239 |
with gr.Column():
|
240 |
video_output = gr.Video(label="Generated Video")
|
241 |
status_output = gr.Textbox(label="Status", interactive=False)
|
242 |
+
|
243 |
+
def on_generate(voice_name, url, text):
|
244 |
voice_id = next((v[1] for v in voices if v[0] == voice_name), None)
|
245 |
if not voice_id:
|
246 |
return None, "Invalid voice selected."
|
247 |
+
return process_video(voice_id, url, text)
|
248 |
+
|
249 |
generate_btn.click(
|
250 |
fn=on_generate,
|
251 |
+
inputs=[voice_dropdown, url_input, text_input],
|
252 |
outputs=[video_output, status_output]
|
253 |
)
|
254 |
+
|
255 |
return app
|
256 |
|
257 |
if __name__ == "__main__":
|