Spaces:
Running
Running
from fastapi import APIRouter, Query, HTTPException | |
from fastapi.responses import StreamingResponse | |
from moviepy.editor import VideoFileClip, CompositeVideoClip, ColorClip, ImageClip, TextClip | |
from moviepy.video.VideoClip import VideoClip | |
from moviepy.video.fx.all import resize | |
from io import BytesIO | |
import tempfile | |
import requests | |
import os | |
import numpy as np | |
from PIL import Image, ImageDraw, ImageFont | |
import gc | |
import re | |
from typing import List, Tuple, Optional | |
router = APIRouter() | |
def download_file(url: str, suffix: str = ".mp4") -> str: | |
"""Download genérico para vídeos e arquivos SRT""" | |
print(f"Tentando baixar arquivo de: {url}") | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', | |
'Accept': '*/*', | |
'Accept-Language': 'en-US,en;q=0.5', | |
'Accept-Encoding': 'gzip, deflate', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1', | |
} | |
try: | |
response = requests.get(url, headers=headers, stream=True, timeout=30) | |
print(f"Status da resposta: {response.status_code}") | |
response.raise_for_status() | |
except requests.exceptions.RequestException as e: | |
print(f"Erro na requisição: {e}") | |
raise HTTPException(status_code=400, detail=f"Não foi possível baixar o arquivo: {str(e)}") | |
if response.status_code != 200: | |
raise HTTPException(status_code=400, detail=f"Erro ao baixar arquivo. Status: {response.status_code}") | |
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix) | |
try: | |
total_size = 0 | |
for chunk in response.iter_content(chunk_size=8192): | |
if chunk: | |
tmp.write(chunk) | |
total_size += len(chunk) | |
tmp.close() | |
print(f"Arquivo baixado com sucesso. Tamanho: {total_size} bytes") | |
return tmp.name | |
except Exception as e: | |
tmp.close() | |
if os.path.exists(tmp.name): | |
os.unlink(tmp.name) | |
print(f"Erro ao salvar arquivo: {e}") | |
raise HTTPException(status_code=400, detail=f"Erro ao salvar arquivo: {str(e)}") | |
def download_video(video_url: str) -> str: | |
return download_file(video_url, ".mp4") | |
def download_srt(srt_url: str) -> str: | |
return download_file(srt_url, ".srt") | |
def parse_srt(srt_path: str) -> List[Tuple[float, float, str]]: | |
"""Parse arquivo SRT e retorna lista de tuplas (start_time, end_time, text)""" | |
subtitles = [] | |
with open(srt_path, 'r', encoding='utf-8') as f: | |
content = f.read() | |
# Regex para extrair informações do SRT | |
pattern = r'(\d+)\s*\n(\d{2}:\d{2}:\d{2},\d{3})\s*-->\s*(\d{2}:\d{2}:\d{2},\d{3})\s*\n(.*?)(?=\n\d+\s*\n|\n*$)' | |
matches = re.findall(pattern, content, re.DOTALL) | |
for match in matches: | |
start_time_str = match[1] | |
end_time_str = match[2] | |
text = match[3].strip() | |
# Converter timestamp para segundos | |
start_time = time_to_seconds(start_time_str) | |
end_time = time_to_seconds(end_time_str) | |
subtitles.append((start_time, end_time, text)) | |
print(f"Parsed {len(subtitles)} subtítulos do arquivo SRT") | |
return subtitles | |
def time_to_seconds(time_str: str) -> float: | |
"""Converte timestamp SRT (HH:MM:SS,mmm) para segundos""" | |
time_str = time_str.replace(',', '.') | |
parts = time_str.split(':') | |
hours = int(parts[0]) | |
minutes = int(parts[1]) | |
seconds = float(parts[2]) | |
return hours * 3600 + minutes * 60 + seconds | |
def create_rounded_mask(w: int, h: int, radius: int) -> np.ndarray: | |
"""Cria uma máscara numpy com cantos arredondados otimizada""" | |
img = Image.new("L", (w, h), 0) | |
draw = ImageDraw.Draw(img) | |
draw.rounded_rectangle((0, 0, w, h), radius=radius, fill=255) | |
mask = np.array(img, dtype=np.float32) / 255.0 | |
return mask | |
def create_text_image(text: str, font_path: str, font_size: int, color: str = "white", width: int = 900, background_color: str = None) -> np.ndarray: | |
"""Cria uma imagem com texto usando PIL e retorna array numpy diretamente com quebra de linha""" | |
try: | |
font = ImageFont.truetype(font_path, font_size) | |
except: | |
font = ImageFont.load_default() | |
# Função para quebrar texto em múltiplas linhas | |
def wrap_text(text, font, max_width): | |
# Primeiro, dividir por quebras de linha existentes (importantes para SRT) | |
existing_lines = text.split('\n') | |
final_lines = [] | |
for line in existing_lines: | |
if not line.strip(): # Pular linhas vazias | |
continue | |
words = line.split(' ') | |
current_line = [] | |
for word in words: | |
test_line = ' '.join(current_line + [word]) | |
bbox = font.getbbox(test_line) | |
test_width = bbox[2] - bbox[0] | |
if test_width <= max_width - 40: # 40px de margem total | |
current_line.append(word) | |
else: | |
if current_line: | |
final_lines.append(' '.join(current_line)) | |
current_line = [word] | |
else: | |
final_lines.append(word) | |
if current_line: | |
final_lines.append(' '.join(current_line)) | |
return final_lines | |
# Quebrar o texto em linhas | |
lines = wrap_text(text, font, width) | |
# Calcular dimensões totais baseadas na altura real da fonte | |
font_metrics = font.getmetrics() | |
ascent, descent = font_metrics | |
line_height = ascent + descent | |
line_spacing = int(line_height * 0.2) | |
total_height = len(lines) * line_height + (len(lines) - 1) * line_spacing | |
# Definir padding para o fundo | |
padding_vertical = 16 if background_color else 10 | |
padding_horizontal = 24 if background_color else 10 | |
# Criar imagem com altura ajustada para múltiplas linhas | |
img = Image.new("RGBA", (width, total_height + padding_vertical * 2), (0, 0, 0, 0)) | |
draw = ImageDraw.Draw(img) | |
# Desenhar fundo se especificado | |
if background_color: | |
# Calcular largura máxima do texto para um fundo mais ajustado | |
max_text_width = 0 | |
for line in lines: | |
bbox = font.getbbox(line) | |
line_width = bbox[2] - bbox[0] | |
max_text_width = max(max_text_width, line_width) | |
# Calcular dimensões do fundo | |
bg_width = max_text_width + padding_horizontal * 2 | |
bg_height = total_height + padding_vertical * 2 | |
bg_x = (width - bg_width) // 2 | |
bg_y = 0 | |
# Desenhar fundo com cantos arredondados | |
draw.rounded_rectangle( | |
(bg_x, bg_y, bg_x + bg_width, bg_y + bg_height), | |
radius=6, | |
fill=background_color | |
) | |
# Desenhar cada linha centralizada usando baseline correto | |
current_y = padding_vertical | |
for line in lines: | |
bbox = font.getbbox(line) | |
line_width = bbox[2] - bbox[0] | |
line_x = (width - line_width) // 2 # Centralizar cada linha | |
draw.text((line_x, current_y), line, font=font, fill=color) | |
current_y += line_height + line_spacing | |
return np.array(img, dtype=np.uint8) | |
def create_subtitle_clips(subtitles: List[Tuple[float, float, str]], video_duration: float) -> List[ImageClip]: | |
"""Cria clips de legenda otimizados usando ImageClip""" | |
subtitle_clips = [] | |
for start_time, end_time, text in subtitles: | |
# Ignorar legendas que ultrapassam a duração do vídeo | |
if start_time >= video_duration: | |
continue | |
# Ajustar end_time se necessário | |
if end_time > video_duration: | |
end_time = video_duration | |
# Criar imagem da legenda com fonte Medium e fundo escuro | |
subtitle_array = create_text_image( | |
text, | |
"fonts/Montserrat-Medium.ttf", # Fonte Medium para legendas | |
32, # Tamanho para legendas | |
"white", | |
900, | |
"#1A1A1A" # Fundo escuro para legendas | |
) | |
# Criar clip de imagem | |
subtitle_clip = ImageClip(subtitle_array, duration=end_time - start_time) | |
subtitle_clip = subtitle_clip.set_start(start_time) | |
subtitle_clips.append(subtitle_clip) | |
print(f"Criados {len(subtitle_clips)} clips de legenda") | |
return subtitle_clips | |
def create_centered_video_on_black_background( | |
video_path: str, | |
text: str = "Season 1, episode 1", | |
srt_path: Optional[str] = None, | |
output_resolution=(1080, 1920), | |
max_height=500, | |
max_width=900 | |
) -> BytesIO: | |
print(f"Iniciando processamento do vídeo: {video_path}") | |
clip = None | |
background = None | |
text_clip = None | |
centered_clip = None | |
final = None | |
subtitle_clips = [] | |
try: | |
# Carregar vídeo | |
clip = VideoFileClip(video_path, audio=True, verbose=False) | |
print(f"Vídeo carregado - Dimensões: {clip.w}x{clip.h}, Duração: {clip.duration}s, FPS: {clip.fps}") | |
# Redimensionar vídeo para 500px de altura máxima | |
if clip.w != max_width or clip.h > max_height: | |
scale_w = max_width / clip.w | |
scale_h = max_height / clip.h | |
scale = min(scale_w, scale_h) | |
new_width = int(clip.w * scale) | |
new_height = int(clip.h * scale) | |
print(f"Redimensionando para: {new_width}x{new_height} (max_height={max_height})") | |
clip = clip.resize(newsize=(new_width, new_height)) | |
# Criar fundo preto | |
background = ColorClip(size=output_resolution, color=(0, 0, 0), duration=clip.duration) | |
# Criar máscara arredondada baseada no tamanho atual do vídeo | |
print(f"Criando máscara para vídeo: {clip.w}x{clip.h}") | |
mask_array = create_rounded_mask(clip.w, clip.h, radius=80) | |
def make_mask_frame(t): | |
return mask_array | |
mask_clip = VideoClip(make_mask_frame, ismask=True, duration=clip.duration) | |
clip = clip.set_mask(mask_clip) | |
# Criar texto principal | |
text_array = create_text_image(text, "fonts/Montserrat-SemiBold.ttf", 38, "white", 900) | |
text_clip = ImageClip(text_array, duration=clip.duration) | |
# Centralizar o vídeo | |
centered_clip = clip.set_position(("center", "center")) | |
# Posicionar texto principal (45px de distância do vídeo) | |
video_top = (output_resolution[1] - clip.h) // 2 | |
text_y = video_top - 45 - text_clip.h | |
text_clip = text_clip.set_position(("center", text_y)) | |
# Processar legendas se fornecidas | |
if srt_path: | |
print("Processando legendas SRT...") | |
subtitles = parse_srt(srt_path) | |
subtitle_clips = create_subtitle_clips(subtitles, clip.duration) | |
# Posicionar legendas abaixo do vídeo (45px de distância) | |
video_bottom = (output_resolution[1] + clip.h) // 2 | |
subtitle_y = video_bottom + 45 # 45px de espaçamento | |
# Aplicar posicionamento a cada clip individual | |
for i, subtitle_clip in enumerate(subtitle_clips): | |
subtitle_clips[i] = subtitle_clip.set_position(("center", subtitle_y)) | |
# Compor todos os elementos | |
all_clips = [background, text_clip, centered_clip] + subtitle_clips | |
final = CompositeVideoClip(all_clips) | |
print("Composição finalizada, iniciando renderização...") | |
buffer = BytesIO() | |
tmp_output_path = None | |
try: | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_output: | |
tmp_output_path = tmp_output.name | |
print(f"Renderizando para arquivo temporário: {tmp_output_path}") | |
final.write_videofile( | |
tmp_output_path, | |
codec="libx264", | |
audio_codec="aac", | |
fps=clip.fps, | |
preset="ultrafast", | |
threads=os.cpu_count(), | |
temp_audiofile="temp-audio.m4a", | |
remove_temp=True, | |
audio=True, | |
logger=None, | |
verbose=False, | |
ffmpeg_params=[ | |
"-crf", "23", | |
"-movflags", "+faststart", | |
"-tune", "fastdecode", | |
"-x264opts", "no-scenecut" | |
] | |
) | |
print("Renderização concluída, lendo arquivo...") | |
with open(tmp_output_path, "rb") as f: | |
buffer.write(f.read()) | |
buffer.seek(0) | |
print(f"Vídeo processado com sucesso. Tamanho final: {buffer.getbuffer().nbytes} bytes") | |
finally: | |
if tmp_output_path and os.path.exists(tmp_output_path): | |
os.unlink(tmp_output_path) | |
except Exception as e: | |
print(f"Erro durante processamento: {e}") | |
raise | |
finally: | |
# Limpeza de memória | |
clips_to_close = [clip, background, text_clip, centered_clip, final] + subtitle_clips | |
for c in clips_to_close: | |
if c is not None: | |
try: | |
c.close() | |
except: | |
pass | |
gc.collect() | |
return buffer | |
def get_video_with_black_background( | |
video_url: str = Query(..., description="URL do vídeo em .mp4 para centralizar em fundo preto com cantos arredondados"), | |
text: str = Query("Season 1, episode 1", description="Texto a ser exibido acima do vídeo"), | |
srt_url: Optional[str] = Query(None, description="URL do arquivo SRT de legendas (opcional)") | |
): | |
local_video = None | |
local_srt = None | |
try: | |
# Baixar vídeo | |
local_video = download_video(video_url) | |
# Baixar SRT se fornecido | |
if srt_url: | |
local_srt = download_srt(srt_url) | |
# Processar vídeo com altura máxima de 500px | |
video_buffer = create_centered_video_on_black_background( | |
local_video, | |
text, | |
local_srt | |
) | |
return StreamingResponse(video_buffer, media_type="video/mp4") | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Erro ao processar vídeo: {e}") | |
finally: | |
# Limpeza de arquivos temporários | |
for temp_file in [local_video, local_srt]: | |
if temp_file and os.path.exists(temp_file): | |
os.unlink(temp_file) |