video / app.py
Luongsosad's picture
Update app.py
494fb2c verified
import ffmpeg
import os
import numpy as np
import cv2
import json
import gradio as gr
from concurrent.futures import ThreadPoolExecutor, TimeoutError
import tempfile
from PIL import Image
# Thêm đường dẫn FFmpeg
os.environ["PATH"] += os.pathsep + r"D:\Downloads\ffmpeg-7.1.1-essentials_build\ffmpeg-7.1.1-essentials_build\bin"
TIMEOUT = 300 # 5 phút
def wrap_text(text, max_width=1060, font_size=20):
words = text.split()
lines = []
current_line = ""
for word in words:
if len(current_line + " " + word) * font_size <= max_width:
current_line += " " + word
else:
lines.append(current_line.strip())
current_line = word
if current_line:
lines.append(current_line.strip())
return "\n".join(lines)
def calculate_text_height(text, font_size=20, line_spacing=10):
lines = text.split("\n")
return len(lines) * font_size + (len(lines) - 1) * line_spacing
def create_single_video(args):
img, script, dur, output_path, width, height, fps = args
temp_img_path = tempfile.NamedTemporaryFile(suffix='.png', delete=False).name
Image.fromarray(img).save(temp_img_path)
d_frames = int(dur * fps)
wrapped_text = wrap_text(script, width + 460, font_size=20)
wrapped_text = wrapped_text.replace(":", "\\:").replace("'", "\\'")
text_height = calculate_text_height(wrapped_text, font_size=20, line_spacing=10)
y_position = height - text_height - 36
font_path = "fonts/Roboto-VariableFont_wdth\,wght.ttf"
# vf = (
# f"scale=2400:-1,"
# f"zoompan=z='min(zoom+0.0001,1.5)':x='floor(iw/2-(iw/zoom/2))':y='floor(ih/2-(ih/zoom/2))':d={d_frames}:s={width}x{height}:fps={fps},"
# f"drawtext=text='{wrapped_text}':fontsize=20:fontcolor=white:x=(w-text_w)/2:y={y_position}:"
# f"fontfile={font_path}:box=1:[email protected]:boxborderw=10:line_spacing=10"
# )
vf = (
f"drawtext=text='{wrapped_text}':fontsize=20:fontcolor=white:x=(w-text_w)/2:y={y_position}:"
f"fontfile={font_path}:box=1:[email protected]:boxborderw=10:line_spacing=10"
)
try:
os.makedirs(os.path.dirname(output_path), exist_ok=True)
stream = ffmpeg.input(temp_img_path, loop=1, t=dur)
stream = stream.output(output_path, **{
'vf': vf,
't': dur,
'pix_fmt': 'yuv420p',
'crf': '17',
'c:v': 'libx264',
'an': None
})
print("FFmpeg command:", stream.compile())
out, err = stream.run(capture_stdout=True, capture_stderr=True)
except ffmpeg.Error as e:
print('FFmpeg Error:', e.stderr.decode('utf-8'))
raise
finally:
if os.path.exists(temp_img_path):
os.remove(temp_img_path)
return output_path
def create_video_from_images(images, scripts, durations, audio_path, output_path, fps=60):
height, width, _ = images[0].shape
temp_dir = tempfile.mkdtemp()
video_paths = []
with ThreadPoolExecutor(max_workers=2) as executor:
tasks = [
(img, script, dur, os.path.join(temp_dir, f"temp_{i}.mp4"), width, height, fps)
for i, (img, script, dur) in enumerate(zip(images, scripts, durations))
]
try:
video_paths = list(executor.map(create_single_video, tasks, timeout=TIMEOUT))
except TimeoutError:
print("Timeout Error: Operation took too long to complete.")
return None
concat_file = os.path.join(temp_dir, "concat.txt")
with open(concat_file, 'w') as f:
for path in video_paths:
f.write(f"file '{path}'\n")
ffmpeg.input(concat_file, format='concat', safe=0).output(output_path, c='copy', an=None).overwrite_output().run()
if audio_path:
final_output_path = output_path.replace(".mp4", "_with_audio.mp4")
video_input = ffmpeg.input(output_path)
audio_input = ffmpeg.input(audio_path)
ffmpeg.output(video_input, audio_input, final_output_path, vcodec='libx264', acodec='aac', shortest=None).overwrite_output().run()
else:
final_output_path = output_path
for path in video_paths:
if os.path.exists(path):
os.remove(path)
if os.path.exists(concat_file):
os.remove(concat_file)
if os.path.exists(temp_dir):
os.rmdir(temp_dir)
return final_output_path
def generate_video(image_files, script_input, duration_input, audio_file, fps=60):
try:
scripts = json.loads(script_input)
durations = json.loads(duration_input)
except Exception as e:
return None, f"❌ Lỗi khi phân tích đầu vào:\n{e}"
if len(image_files) != len(scripts) or len(scripts) != len(durations):
return None, "❌ Số lượng ảnh, scripts và durations phải bằng nhau!"
try:
os.makedirs("outputs", exist_ok=True)
# Xử lý audio_file
audio_path = None
if audio_file:
audio_path = audio_file.name
if not os.path.exists(audio_path):
raise ValueError(f"Tệp âm thanh không tồn tại: {audio_path}")
output_video_path = os.path.join("outputs", "temp_output.mp4")
images = []
for idx, img_file in enumerate(image_files):
try:
# Lấy đường dẫn tệp từ NamedString
img_path = img_file.name
if not os.path.exists(img_path):
raise ValueError(f"Tệp ảnh không tồn tại tại index {idx+1}: {img_path}")
# Đọc ảnh từ đường dẫn
img = cv2.imread(img_path)
if img is None:
raise ValueError(f"Không đọc được ảnh tại index {idx+1}: {img_path}")
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
images.append(img)
print(f"✅ Ảnh {idx+1}: {img_path} - kích thước {img.shape}")
except Exception as e:
print(f"❌ Không đọc được ảnh {idx+1} - lỗi: {e}")
return None, f"❌ Lỗi đọc ảnh tại index {idx+1}: {e}"
if not images:
return None, "❌ Không có ảnh nào hợp lệ để tạo video!"
final_video_path = create_video_from_images(images, scripts, durations, audio_path, output_video_path, fps)
if final_video_path is None:
return None, "❌ Quá trình tạo video đã bị timeout!"
return final_video_path, "✅ Video tạo thành công!"
except Exception as e:
import traceback
return None, f"❌ Lỗi khi tạo video:\n{traceback.format_exc()}"
demo = gr.Interface(
fn=generate_video,
inputs=[
gr.File(file_types=None, label="Ảnh (nhiều)", file_count="multiple"),
gr.Textbox(label="Scripts (danh sách)", placeholder="['Chào bạn', 'Video demo']"),
gr.Textbox(label="Durations (giây)", placeholder="[3, 4]"),
gr.File(file_types=None, label="Nhạc nền (bất kỳ định dạng)"), # Bỏ kiểm tra định dạng
gr.Slider(minimum=1, maximum=120, step=1, label="FPS (frame/giây)", value=60),
],
outputs=[
gr.Video(label="Video kết quả"),
gr.Textbox(label="Trạng thái", interactive=False),
],
title="Tạo video từ ảnh, chữ và nhạc",
description="Upload nhiều ảnh + đoạn chữ + nhạc nền để tạo video tự động."
)
if __name__ == "__main__":
demo.launch(show_error=True, share=True)