Spaces:
Running
Running
from fastapi import APIRouter, Query, HTTPException | |
from moviepy.editor import VideoFileClip | |
import tempfile | |
import requests | |
import os | |
import shutil | |
from groq import Groq | |
from audio_separator.separator import Separator | |
from google import genai | |
from google.genai import types | |
router = APIRouter() | |
def download_file(url: str, suffix: str) -> str: | |
"""Download genérico para arquivos de áudio e vídeo""" | |
print(f"Tentando baixar arquivo de: {url}") | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', | |
'Accept': '*/*', | |
'Accept-Language': 'en-US,en;q=0.5', | |
'Accept-Encoding': 'gzip, deflate', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1', | |
} | |
try: | |
response = requests.get(url, headers=headers, stream=True, timeout=30) | |
print(f"Status da resposta: {response.status_code}") | |
response.raise_for_status() | |
except requests.exceptions.RequestException as e: | |
print(f"Erro na requisição: {e}") | |
raise HTTPException(status_code=400, detail=f"Não foi possível baixar o arquivo: {str(e)}") | |
if response.status_code != 200: | |
raise HTTPException(status_code=400, detail=f"Erro ao baixar arquivo. Status: {response.status_code}") | |
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix) | |
try: | |
total_size = 0 | |
for chunk in response.iter_content(chunk_size=8192): | |
if chunk: | |
tmp.write(chunk) | |
total_size += len(chunk) | |
tmp.close() | |
print(f"Arquivo baixado com sucesso. Tamanho: {total_size} bytes") | |
return tmp.name | |
except Exception as e: | |
tmp.close() | |
if os.path.exists(tmp.name): | |
os.unlink(tmp.name) | |
print(f"Erro ao salvar arquivo: {e}") | |
raise HTTPException(status_code=400, detail=f"Erro ao salvar arquivo: {str(e)}") | |
def extract_audio_from_video(video_path: str) -> str: | |
"""Extrai áudio de um arquivo de vídeo e salva como WAV""" | |
print(f"Extraindo áudio do vídeo: {video_path}") | |
audio_tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") | |
audio_path = audio_tmp.name | |
audio_tmp.close() | |
try: | |
video = VideoFileClip(video_path) | |
audio = video.audio | |
audio.write_audiofile(audio_path, verbose=False, logger=None) | |
audio.close() | |
video.close() | |
print(f"Áudio extraído com sucesso: {audio_path}") | |
return audio_path | |
except Exception as e: | |
if os.path.exists(audio_path): | |
os.unlink(audio_path) | |
print(f"Erro ao extrair áudio: {e}") | |
raise HTTPException(status_code=500, detail=f"Erro ao extrair áudio do vídeo: {str(e)}") | |
def separate_vocals(audio_path: str) -> str: | |
"""Separa vocais do áudio usando audio-separator com modelo UVR_MDXNET_KARA_2.onnx""" | |
print(f"Iniciando separação de vocais do arquivo: {audio_path}") | |
# Criar diretório temporário para saída | |
temp_output_dir = tempfile.mkdtemp(prefix="vocal_separation_") | |
try: | |
# Inicializar o separador | |
separator = Separator(output_dir=temp_output_dir) | |
# Carregar modelo específico para vocais (UVR_MDXNET_KARA_2.onnx é melhor para vocais) | |
print("Carregando modelo UVR_MDXNET_KARA_2.onnx...") | |
separator.load_model('UVR_MDXNET_KARA_2.onnx') | |
# Processar arquivo | |
print("Processando separação de vocais...") | |
separator.separate(audio_path) | |
# Encontrar o arquivo de vocais gerado | |
# O audio-separator geralmente gera arquivos com sufixos específicos | |
base_name = os.path.splitext(os.path.basename(audio_path))[0] | |
# Procurar pelo arquivo de vocais (pode ter diferentes sufixos dependendo do modelo) | |
possible_vocal_files = [ | |
f"{base_name}_(Vocals).wav", | |
f"{base_name}_vocals.wav", | |
f"{base_name}_Vocals.wav", | |
f"{base_name}_(Vocal).wav" | |
] | |
vocal_file_path = None | |
for possible_file in possible_vocal_files: | |
full_path = os.path.join(temp_output_dir, possible_file) | |
if os.path.exists(full_path): | |
vocal_file_path = full_path | |
break | |
# Se não encontrou pelos nomes padrão, procurar qualquer arquivo wav no diretório | |
if not vocal_file_path: | |
wav_files = [f for f in os.listdir(temp_output_dir) if f.endswith('.wav')] | |
if wav_files: | |
# Pegar o primeiro arquivo wav encontrado (assumindo que seja o vocal) | |
vocal_file_path = os.path.join(temp_output_dir, wav_files[0]) | |
if not vocal_file_path or not os.path.exists(vocal_file_path): | |
raise HTTPException(status_code=500, detail="Arquivo de vocais não foi gerado corretamente") | |
# Mover arquivo de vocais para um local temporário permanente | |
vocal_temp = tempfile.NamedTemporaryFile(delete=False, suffix="_vocals.wav") | |
vocal_final_path = vocal_temp.name | |
vocal_temp.close() | |
shutil.copy2(vocal_file_path, vocal_final_path) | |
print(f"Vocais separados com sucesso: {vocal_final_path}") | |
return vocal_final_path | |
except Exception as e: | |
print(f"Erro na separação de vocais: {e}") | |
raise HTTPException(status_code=500, detail=f"Erro ao separar vocais: {str(e)}") | |
finally: | |
# Limpar diretório temporário de separação | |
if os.path.exists(temp_output_dir): | |
try: | |
shutil.rmtree(temp_output_dir) | |
print(f"Diretório temporário removido: {temp_output_dir}") | |
except Exception as cleanup_error: | |
print(f"Erro ao remover diretório temporário: {cleanup_error}") | |
def format_time(seconds_float: float) -> str: | |
"""Converte segundos para formato de tempo SRT (HH:MM:SS,mmm) - versão melhorada""" | |
# Calcula segundos totais e milissegundos | |
total_seconds = int(seconds_float) | |
milliseconds = int((seconds_float - total_seconds) * 1000) | |
# Calcula horas, minutos e segundos restantes | |
hours = total_seconds // 3600 | |
minutes = (total_seconds % 3600) // 60 | |
seconds = total_seconds % 60 | |
return f"{hours:02}:{minutes:02}:{seconds:02},{milliseconds:03}" | |
def json_to_srt(segments_data) -> str: | |
""" | |
Converte dados de segmentos para formato SRT | |
""" | |
if not segments_data: | |
return "" | |
srt_lines = [] | |
for segment in segments_data: | |
segment_id = segment.get('id', 0) + 1 | |
start_time = format_time(segment.get('start', 0.0)) | |
end_time = format_time(segment.get('end', 0.0)) | |
text = segment.get('text', '').strip() | |
if text: # Só adiciona se há texto | |
srt_lines.append(f"{segment_id}") | |
srt_lines.append(f"{start_time} --> {end_time}") | |
srt_lines.append(text) | |
srt_lines.append("") # Linha em branco | |
return '\n'.join(srt_lines) | |
def convert_to_srt(transcription_data) -> str: | |
""" | |
Função para conversão usando apenas segments | |
""" | |
if hasattr(transcription_data, 'segments') and transcription_data.segments: | |
return json_to_srt(transcription_data.segments) | |
else: | |
return "" | |
def translate_subtitle_internal(content: str) -> str: | |
""" | |
Função interna para traduzir legendas usando Gemini | |
Baseada na lógica do inference_sub.py | |
""" | |
try: | |
print("Iniciando tradução da legenda...") | |
api_key = os.environ.get("GEMINI_API_KEY") | |
if not api_key: | |
raise HTTPException(status_code=500, detail="GEMINI_API_KEY não configurada") | |
client = genai.Client(api_key=api_key) | |
model = "gemini-2.5-pro" | |
# Instruções do sistema aprimoradas | |
SYSTEM_INSTRUCTIONS = """ | |
Você é um tradutor profissional de legendas especializado em tradução do inglês para o português brasileiro. | |
Sua função é traduzir legendas mantendo a formatação SRT original intacta e seguindo os padrões da Netflix. | |
REGRAS FUNDAMENTAIS: | |
1. NUNCA altere os timestamps (00:00:00,000 --> 00:00:00,000) | |
2. NUNCA altere os números das legendas (1, 2, 3, etc.) | |
3. Mantenha a formatação SRT exata: número, timestamp, texto traduzido, linha em branco | |
4. Traduza APENAS o texto das falas | |
PADRÕES DE TRADUÇÃO: | |
- Tradução natural para português brasileiro | |
- Mantenha o tom e registro da fala original (formal/informal, gírias, etc.) | |
- Preserve nomes próprios, lugares e marcas | |
- Adapte expressões idiomáticas para equivalentes em português quando necessário | |
- Use contrações naturais do português brasileiro (você → cê, para → pra, quando apropriado) | |
FORMATAÇÃO NETFLIX: | |
- Máximo de 2 linhas por legenda | |
- Máximo de 42 caracteres por linha (incluindo espaços) | |
- Use quebra de linha quando o texto for muito longo | |
- Prefira quebras em pontos naturais da fala (após vírgulas, conjunções, etc.) | |
- Centralize o texto quando possível | |
PONTUAÇÃO E ESTILO: | |
- Use pontuação adequada em português | |
- Mantenha reticências (...) para hesitações ou falas interrompidas | |
- Use travessão (–) para diálogos quando necessário | |
- Evite pontos finais desnecessários em falas curtas | |
Sempre retorne APENAS o conteúdo das legendas traduzidas, mantendo a formatação SRT original. | |
""" | |
# Primeiro exemplo | |
EXAMPLE_INPUT_1 = """1 | |
00:00:00,000 --> 00:00:03,500 | |
You could argue he'd done it to curry favor with the guards. | |
2 | |
00:00:04,379 --> 00:00:07,299 | |
Or maybe make a few friends among us Khans. | |
3 | |
00:00:08,720 --> 00:00:12,199 | |
Me, I think he did it just to feel normal again. | |
4 | |
00:00:13,179 --> 00:00:14,740 | |
If only for a short while.""" | |
EXAMPLE_OUTPUT_1 = """1 | |
00:00:00,000 --> 00:00:03,500 | |
Você pode dizer que ele fez isso | |
para agradar os guardas. | |
2 | |
00:00:04,379 --> 00:00:07,299 | |
Ou talvez para fazer alguns amigos | |
entre nós, os Khans. | |
3 | |
00:00:08,720 --> 00:00:12,199 | |
Eu acho que ele fez isso só para se sentir | |
normal de novo. | |
4 | |
00:00:13,179 --> 00:00:14,740 | |
Mesmo que só por um tempo.""" | |
# Segundo exemplo | |
EXAMPLE_INPUT_2 = """1 | |
00:00:15,420 --> 00:00:18,890 | |
I'm not saying you're wrong, but have you considered the alternatives? | |
2 | |
00:00:19,234 --> 00:00:21,567 | |
What if we just... I don't know... talked to him? | |
3 | |
00:00:22,890 --> 00:00:26,234 | |
Listen, Jack, this isn't some Hollywood movie where everything works out. | |
4 | |
00:00:27,123 --> 00:00:29,456 | |
Sometimes you gotta make the hard choices.""" | |
EXAMPLE_OUTPUT_2 = """1 | |
00:00:15,420 --> 00:00:18,890 | |
Não tô dizendo que você tá errado, mas | |
já pensou nas alternativas? | |
2 | |
00:00:19,234 --> 00:00:21,567 | |
E se a gente só... sei lá... | |
conversasse com ele? | |
3 | |
00:00:22,890 --> 00:00:26,234 | |
Escuta, Jack, isso não é um filme de | |
Hollywood onde tudo dá certo. | |
4 | |
00:00:27,123 --> 00:00:29,456 | |
Às vezes você tem que fazer | |
as escolhas difíceis.""" | |
# Terceiro exemplo com diálogos | |
EXAMPLE_INPUT_3 = """1 | |
00:00:30,789 --> 00:00:32,456 | |
- Hey, what's up? | |
- Not much, just chilling. | |
2 | |
00:00:33,567 --> 00:00:36,123 | |
Did you see that new Netflix show everyone's talking about? | |
3 | |
00:00:37,234 --> 00:00:40,789 | |
Yeah, it's incredible! The cinematography is absolutely stunning. | |
4 | |
00:00:41,890 --> 00:00:44,567 | |
I can't believe they canceled it after just one season though.""" | |
EXAMPLE_OUTPUT_3 = """1 | |
00:00:30,789 --> 00:00:32,456 | |
– E aí, tudo bem? | |
– De boa, só relaxando. | |
2 | |
00:00:33,567 --> 00:00:36,123 | |
Você viu aquela série nova da Netflix | |
que todo mundo tá falando? | |
3 | |
00:00:37,234 --> 00:00:40,789 | |
Vi, é incrível! A cinematografia | |
é absolutamente deslumbrante. | |
4 | |
00:00:41,890 --> 00:00:44,567 | |
Não acredito que cancelaram depois | |
de só uma temporada.""" | |
# Estrutura de conversação correta com múltiplos exemplos | |
contents = [ | |
# Primeiro exemplo: usuário envia legenda | |
types.Content( | |
role="user", | |
parts=[ | |
types.Part.from_text(text=EXAMPLE_INPUT_1) | |
] | |
), | |
# Primeiro exemplo: modelo responde com tradução | |
types.Content( | |
role="model", | |
parts=[ | |
types.Part.from_text(text=EXAMPLE_OUTPUT_1) | |
] | |
), | |
# Segundo exemplo: usuário envia outra legenda | |
types.Content( | |
role="user", | |
parts=[ | |
types.Part.from_text(text=EXAMPLE_INPUT_2) | |
] | |
), | |
# Segundo exemplo: modelo responde com tradução | |
types.Content( | |
role="model", | |
parts=[ | |
types.Part.from_text(text=EXAMPLE_OUTPUT_2) | |
] | |
), | |
# Terceiro exemplo: usuário envia legenda com diálogos | |
types.Content( | |
role="user", | |
parts=[ | |
types.Part.from_text(text=EXAMPLE_INPUT_3) | |
] | |
), | |
# Terceiro exemplo: modelo responde com tradução | |
types.Content( | |
role="model", | |
parts=[ | |
types.Part.from_text(text=EXAMPLE_OUTPUT_3) | |
] | |
), | |
# Agora o usuário envia a legenda real para ser traduzida | |
types.Content( | |
role="user", | |
parts=[ | |
types.Part.from_text(text=content) | |
] | |
) | |
] | |
config = types.GenerateContentConfig( | |
system_instruction=SYSTEM_INSTRUCTIONS, | |
response_mime_type="text/plain", | |
max_output_tokens=4096, | |
temperature=0.3, # Menos criatividade, mais precisão na tradução | |
) | |
response_text = "" | |
for chunk in client.models.generate_content_stream( | |
model=model, | |
contents=contents, | |
config=config | |
): | |
if chunk.text: | |
response_text += chunk.text | |
translated_content = response_text.strip() | |
print("Tradução concluída com sucesso") | |
return translated_content | |
except Exception as e: | |
print(f"Erro na tradução interna: {e}") | |
# Retorna o conteúdo original se a tradução falhar | |
return content | |
def generate_srt_subtitle( | |
url: str = Query(..., description="URL do arquivo de áudio (.wav) ou vídeo") | |
): | |
""" | |
Gera legenda em formato SRT a partir de arquivo de áudio ou vídeo | |
- Se for .wav: separa vocais e transcreve | |
- Se for vídeo: extrai áudio, separa vocais e transcreve | |
- Usa modelo UVR_MDXNET_KARA_2.onnx para separação de vocais | |
- Usa segmentação natural do Whisper (segments) | |
- Detecção automática de idioma | |
- Tradução automática sempre ativada | |
""" | |
local_file = None | |
audio_file = None | |
vocal_file = None | |
try: | |
# Determinar tipo de arquivo pela URL | |
url_lower = url.lower() | |
is_audio = url_lower.endswith('.wav') | |
is_video = any(url_lower.endswith(ext) for ext in ['.mp4', '.avi', '.mov', '.mkv', '.webm']) | |
if not (is_audio or is_video): | |
raise HTTPException( | |
status_code=400, | |
detail="URL deve ser de um arquivo de áudio (.wav) ou vídeo" | |
) | |
if is_audio: | |
local_file = download_file(url, ".wav") | |
audio_file = local_file | |
else: | |
local_file = download_file(url, ".mp4") | |
audio_file = extract_audio_from_video(local_file) | |
# Separar vocais do áudio | |
vocal_file = separate_vocals(audio_file) | |
# Transcrição com configurações fixas otimizadas | |
api_key = os.getenv("GROQ_API") | |
if not api_key: | |
raise HTTPException(status_code=500, detail="GROQ_API key não configurada") | |
client = Groq(api_key=api_key) | |
print(f"Iniciando transcrição com modelo: whisper-large-v3") | |
with open(vocal_file, "rb") as file: | |
transcription_params = { | |
"file": (os.path.basename(vocal_file), file.read()), | |
"model": "whisper-large-v3", | |
"response_format": "verbose_json", | |
"timestamp_granularities": ["segment"], | |
"temperature": 0.0, | |
# language é automaticamente detectado (não enviado) | |
} | |
transcription = client.audio.transcriptions.create(**transcription_params) | |
# Converter para SRT usando segments | |
srt_content_original = convert_to_srt(transcription) | |
# Traduzir sempre | |
srt_content = translate_subtitle_internal(srt_content_original) if srt_content_original else None | |
return { | |
"srt": srt_content, | |
"duration": getattr(transcription, 'duration', 0), | |
"language": getattr(transcription, 'language', 'unknown'), | |
"model_used": "whisper-large-v3", | |
"processing_method": "segments", | |
"vocal_separation": "UVR_MDXNET_KARA_2.onnx", | |
"translation_applied": True, | |
"segment_count": len(transcription.segments) if hasattr(transcription, 'segments') and transcription.segments else 0, | |
"subtitle_count": len([line for line in srt_content.split('\n') if line.strip().isdigit()]) if srt_content else 0 | |
} | |
except HTTPException: | |
raise | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Erro inesperado: {str(e)}") | |
finally: | |
# Limpeza de arquivos temporários | |
for temp_file in [local_file, audio_file, vocal_file]: | |
if temp_file and os.path.exists(temp_file): | |
try: | |
os.unlink(temp_file) | |
print(f"Arquivo temporário removido: {temp_file}") | |
except Exception as cleanup_error: | |
print(f"Erro ao remover arquivo temporário {temp_file}: {cleanup_error}") |