newapi-clone / routers /subtitle.py
habulaj's picture
Upload 13 files
4ffe0a9 verified
from fastapi import APIRouter, Query, HTTPException
from moviepy.editor import VideoFileClip
import tempfile
import requests
import os
import shutil
from groq import Groq
from audio_separator.separator import Separator
from google import genai
from google.genai import types
router = APIRouter()
def download_file(url: str, suffix: str) -> str:
"""Download genérico para arquivos de áudio e vídeo"""
print(f"Tentando baixar arquivo de: {url}")
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': '*/*',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
try:
response = requests.get(url, headers=headers, stream=True, timeout=30)
print(f"Status da resposta: {response.status_code}")
response.raise_for_status()
except requests.exceptions.RequestException as e:
print(f"Erro na requisição: {e}")
raise HTTPException(status_code=400, detail=f"Não foi possível baixar o arquivo: {str(e)}")
if response.status_code != 200:
raise HTTPException(status_code=400, detail=f"Erro ao baixar arquivo. Status: {response.status_code}")
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
try:
total_size = 0
for chunk in response.iter_content(chunk_size=8192):
if chunk:
tmp.write(chunk)
total_size += len(chunk)
tmp.close()
print(f"Arquivo baixado com sucesso. Tamanho: {total_size} bytes")
return tmp.name
except Exception as e:
tmp.close()
if os.path.exists(tmp.name):
os.unlink(tmp.name)
print(f"Erro ao salvar arquivo: {e}")
raise HTTPException(status_code=400, detail=f"Erro ao salvar arquivo: {str(e)}")
def extract_audio_from_video(video_path: str) -> str:
"""Extrai áudio de um arquivo de vídeo e salva como WAV"""
print(f"Extraindo áudio do vídeo: {video_path}")
audio_tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".wav")
audio_path = audio_tmp.name
audio_tmp.close()
try:
video = VideoFileClip(video_path)
audio = video.audio
audio.write_audiofile(audio_path, verbose=False, logger=None)
audio.close()
video.close()
print(f"Áudio extraído com sucesso: {audio_path}")
return audio_path
except Exception as e:
if os.path.exists(audio_path):
os.unlink(audio_path)
print(f"Erro ao extrair áudio: {e}")
raise HTTPException(status_code=500, detail=f"Erro ao extrair áudio do vídeo: {str(e)}")
def separate_vocals(audio_path: str) -> str:
"""Separa vocais do áudio usando audio-separator com modelo UVR_MDXNET_KARA_2.onnx"""
print(f"Iniciando separação de vocais do arquivo: {audio_path}")
# Criar diretório temporário para saída
temp_output_dir = tempfile.mkdtemp(prefix="vocal_separation_")
try:
# Inicializar o separador
separator = Separator(output_dir=temp_output_dir)
# Carregar modelo específico para vocais (UVR_MDXNET_KARA_2.onnx é melhor para vocais)
print("Carregando modelo UVR_MDXNET_KARA_2.onnx...")
separator.load_model('UVR_MDXNET_KARA_2.onnx')
# Processar arquivo
print("Processando separação de vocais...")
separator.separate(audio_path)
# Encontrar o arquivo de vocais gerado
# O audio-separator geralmente gera arquivos com sufixos específicos
base_name = os.path.splitext(os.path.basename(audio_path))[0]
# Procurar pelo arquivo de vocais (pode ter diferentes sufixos dependendo do modelo)
possible_vocal_files = [
f"{base_name}_(Vocals).wav",
f"{base_name}_vocals.wav",
f"{base_name}_Vocals.wav",
f"{base_name}_(Vocal).wav"
]
vocal_file_path = None
for possible_file in possible_vocal_files:
full_path = os.path.join(temp_output_dir, possible_file)
if os.path.exists(full_path):
vocal_file_path = full_path
break
# Se não encontrou pelos nomes padrão, procurar qualquer arquivo wav no diretório
if not vocal_file_path:
wav_files = [f for f in os.listdir(temp_output_dir) if f.endswith('.wav')]
if wav_files:
# Pegar o primeiro arquivo wav encontrado (assumindo que seja o vocal)
vocal_file_path = os.path.join(temp_output_dir, wav_files[0])
if not vocal_file_path or not os.path.exists(vocal_file_path):
raise HTTPException(status_code=500, detail="Arquivo de vocais não foi gerado corretamente")
# Mover arquivo de vocais para um local temporário permanente
vocal_temp = tempfile.NamedTemporaryFile(delete=False, suffix="_vocals.wav")
vocal_final_path = vocal_temp.name
vocal_temp.close()
shutil.copy2(vocal_file_path, vocal_final_path)
print(f"Vocais separados com sucesso: {vocal_final_path}")
return vocal_final_path
except Exception as e:
print(f"Erro na separação de vocais: {e}")
raise HTTPException(status_code=500, detail=f"Erro ao separar vocais: {str(e)}")
finally:
# Limpar diretório temporário de separação
if os.path.exists(temp_output_dir):
try:
shutil.rmtree(temp_output_dir)
print(f"Diretório temporário removido: {temp_output_dir}")
except Exception as cleanup_error:
print(f"Erro ao remover diretório temporário: {cleanup_error}")
def format_time(seconds_float: float) -> str:
"""Converte segundos para formato de tempo SRT (HH:MM:SS,mmm) - versão melhorada"""
# Calcula segundos totais e milissegundos
total_seconds = int(seconds_float)
milliseconds = int((seconds_float - total_seconds) * 1000)
# Calcula horas, minutos e segundos restantes
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
seconds = total_seconds % 60
return f"{hours:02}:{minutes:02}:{seconds:02},{milliseconds:03}"
def json_to_srt(segments_data) -> str:
"""
Converte dados de segmentos para formato SRT
"""
if not segments_data:
return ""
srt_lines = []
for segment in segments_data:
segment_id = segment.get('id', 0) + 1
start_time = format_time(segment.get('start', 0.0))
end_time = format_time(segment.get('end', 0.0))
text = segment.get('text', '').strip()
if text: # Só adiciona se há texto
srt_lines.append(f"{segment_id}")
srt_lines.append(f"{start_time} --> {end_time}")
srt_lines.append(text)
srt_lines.append("") # Linha em branco
return '\n'.join(srt_lines)
def convert_to_srt(transcription_data) -> str:
"""
Função para conversão usando apenas segments
"""
if hasattr(transcription_data, 'segments') and transcription_data.segments:
return json_to_srt(transcription_data.segments)
else:
return ""
def translate_subtitle_internal(content: str) -> str:
"""
Função interna para traduzir legendas usando Gemini
Baseada na lógica do inference_sub.py
"""
try:
print("Iniciando tradução da legenda...")
api_key = os.environ.get("GEMINI_API_KEY")
if not api_key:
raise HTTPException(status_code=500, detail="GEMINI_API_KEY não configurada")
client = genai.Client(api_key=api_key)
model = "gemini-2.5-pro"
# Instruções do sistema aprimoradas
SYSTEM_INSTRUCTIONS = """
Você é um tradutor profissional de legendas especializado em tradução do inglês para o português brasileiro.
Sua função é traduzir legendas mantendo a formatação SRT original intacta e seguindo os padrões da Netflix.
REGRAS FUNDAMENTAIS:
1. NUNCA altere os timestamps (00:00:00,000 --> 00:00:00,000)
2. NUNCA altere os números das legendas (1, 2, 3, etc.)
3. Mantenha a formatação SRT exata: número, timestamp, texto traduzido, linha em branco
4. Traduza APENAS o texto das falas
PADRÕES DE TRADUÇÃO:
- Tradução natural para português brasileiro
- Mantenha o tom e registro da fala original (formal/informal, gírias, etc.)
- Preserve nomes próprios, lugares e marcas
- Adapte expressões idiomáticas para equivalentes em português quando necessário
- Use contrações naturais do português brasileiro (você → cê, para → pra, quando apropriado)
FORMATAÇÃO NETFLIX:
- Máximo de 2 linhas por legenda
- Máximo de 42 caracteres por linha (incluindo espaços)
- Use quebra de linha quando o texto for muito longo
- Prefira quebras em pontos naturais da fala (após vírgulas, conjunções, etc.)
- Centralize o texto quando possível
PONTUAÇÃO E ESTILO:
- Use pontuação adequada em português
- Mantenha reticências (...) para hesitações ou falas interrompidas
- Use travessão (–) para diálogos quando necessário
- Evite pontos finais desnecessários em falas curtas
Sempre retorne APENAS o conteúdo das legendas traduzidas, mantendo a formatação SRT original.
"""
# Primeiro exemplo
EXAMPLE_INPUT_1 = """1
00:00:00,000 --> 00:00:03,500
You could argue he'd done it to curry favor with the guards.
2
00:00:04,379 --> 00:00:07,299
Or maybe make a few friends among us Khans.
3
00:00:08,720 --> 00:00:12,199
Me, I think he did it just to feel normal again.
4
00:00:13,179 --> 00:00:14,740
If only for a short while."""
EXAMPLE_OUTPUT_1 = """1
00:00:00,000 --> 00:00:03,500
Você pode dizer que ele fez isso
para agradar os guardas.
2
00:00:04,379 --> 00:00:07,299
Ou talvez para fazer alguns amigos
entre nós, os Khans.
3
00:00:08,720 --> 00:00:12,199
Eu acho que ele fez isso só para se sentir
normal de novo.
4
00:00:13,179 --> 00:00:14,740
Mesmo que só por um tempo."""
# Segundo exemplo
EXAMPLE_INPUT_2 = """1
00:00:15,420 --> 00:00:18,890
I'm not saying you're wrong, but have you considered the alternatives?
2
00:00:19,234 --> 00:00:21,567
What if we just... I don't know... talked to him?
3
00:00:22,890 --> 00:00:26,234
Listen, Jack, this isn't some Hollywood movie where everything works out.
4
00:00:27,123 --> 00:00:29,456
Sometimes you gotta make the hard choices."""
EXAMPLE_OUTPUT_2 = """1
00:00:15,420 --> 00:00:18,890
Não tô dizendo que você tá errado, mas
já pensou nas alternativas?
2
00:00:19,234 --> 00:00:21,567
E se a gente só... sei lá...
conversasse com ele?
3
00:00:22,890 --> 00:00:26,234
Escuta, Jack, isso não é um filme de
Hollywood onde tudo dá certo.
4
00:00:27,123 --> 00:00:29,456
Às vezes você tem que fazer
as escolhas difíceis."""
# Terceiro exemplo com diálogos
EXAMPLE_INPUT_3 = """1
00:00:30,789 --> 00:00:32,456
- Hey, what's up?
- Not much, just chilling.
2
00:00:33,567 --> 00:00:36,123
Did you see that new Netflix show everyone's talking about?
3
00:00:37,234 --> 00:00:40,789
Yeah, it's incredible! The cinematography is absolutely stunning.
4
00:00:41,890 --> 00:00:44,567
I can't believe they canceled it after just one season though."""
EXAMPLE_OUTPUT_3 = """1
00:00:30,789 --> 00:00:32,456
– E aí, tudo bem?
– De boa, só relaxando.
2
00:00:33,567 --> 00:00:36,123
Você viu aquela série nova da Netflix
que todo mundo tá falando?
3
00:00:37,234 --> 00:00:40,789
Vi, é incrível! A cinematografia
é absolutamente deslumbrante.
4
00:00:41,890 --> 00:00:44,567
Não acredito que cancelaram depois
de só uma temporada."""
# Estrutura de conversação correta com múltiplos exemplos
contents = [
# Primeiro exemplo: usuário envia legenda
types.Content(
role="user",
parts=[
types.Part.from_text(text=EXAMPLE_INPUT_1)
]
),
# Primeiro exemplo: modelo responde com tradução
types.Content(
role="model",
parts=[
types.Part.from_text(text=EXAMPLE_OUTPUT_1)
]
),
# Segundo exemplo: usuário envia outra legenda
types.Content(
role="user",
parts=[
types.Part.from_text(text=EXAMPLE_INPUT_2)
]
),
# Segundo exemplo: modelo responde com tradução
types.Content(
role="model",
parts=[
types.Part.from_text(text=EXAMPLE_OUTPUT_2)
]
),
# Terceiro exemplo: usuário envia legenda com diálogos
types.Content(
role="user",
parts=[
types.Part.from_text(text=EXAMPLE_INPUT_3)
]
),
# Terceiro exemplo: modelo responde com tradução
types.Content(
role="model",
parts=[
types.Part.from_text(text=EXAMPLE_OUTPUT_3)
]
),
# Agora o usuário envia a legenda real para ser traduzida
types.Content(
role="user",
parts=[
types.Part.from_text(text=content)
]
)
]
config = types.GenerateContentConfig(
system_instruction=SYSTEM_INSTRUCTIONS,
response_mime_type="text/plain",
max_output_tokens=4096,
temperature=0.3, # Menos criatividade, mais precisão na tradução
)
response_text = ""
for chunk in client.models.generate_content_stream(
model=model,
contents=contents,
config=config
):
if chunk.text:
response_text += chunk.text
translated_content = response_text.strip()
print("Tradução concluída com sucesso")
return translated_content
except Exception as e:
print(f"Erro na tradução interna: {e}")
# Retorna o conteúdo original se a tradução falhar
return content
@router.get("/subtitle/generate-srt")
def generate_srt_subtitle(
url: str = Query(..., description="URL do arquivo de áudio (.wav) ou vídeo")
):
"""
Gera legenda em formato SRT a partir de arquivo de áudio ou vídeo
- Se for .wav: separa vocais e transcreve
- Se for vídeo: extrai áudio, separa vocais e transcreve
- Usa modelo UVR_MDXNET_KARA_2.onnx para separação de vocais
- Usa segmentação natural do Whisper (segments)
- Detecção automática de idioma
- Tradução automática sempre ativada
"""
local_file = None
audio_file = None
vocal_file = None
try:
# Determinar tipo de arquivo pela URL
url_lower = url.lower()
is_audio = url_lower.endswith('.wav')
is_video = any(url_lower.endswith(ext) for ext in ['.mp4', '.avi', '.mov', '.mkv', '.webm'])
if not (is_audio or is_video):
raise HTTPException(
status_code=400,
detail="URL deve ser de um arquivo de áudio (.wav) ou vídeo"
)
if is_audio:
local_file = download_file(url, ".wav")
audio_file = local_file
else:
local_file = download_file(url, ".mp4")
audio_file = extract_audio_from_video(local_file)
# Separar vocais do áudio
vocal_file = separate_vocals(audio_file)
# Transcrição com configurações fixas otimizadas
api_key = os.getenv("GROQ_API")
if not api_key:
raise HTTPException(status_code=500, detail="GROQ_API key não configurada")
client = Groq(api_key=api_key)
print(f"Iniciando transcrição com modelo: whisper-large-v3")
with open(vocal_file, "rb") as file:
transcription_params = {
"file": (os.path.basename(vocal_file), file.read()),
"model": "whisper-large-v3",
"response_format": "verbose_json",
"timestamp_granularities": ["segment"],
"temperature": 0.0,
# language é automaticamente detectado (não enviado)
}
transcription = client.audio.transcriptions.create(**transcription_params)
# Converter para SRT usando segments
srt_content_original = convert_to_srt(transcription)
# Traduzir sempre
srt_content = translate_subtitle_internal(srt_content_original) if srt_content_original else None
return {
"srt": srt_content,
"duration": getattr(transcription, 'duration', 0),
"language": getattr(transcription, 'language', 'unknown'),
"model_used": "whisper-large-v3",
"processing_method": "segments",
"vocal_separation": "UVR_MDXNET_KARA_2.onnx",
"translation_applied": True,
"segment_count": len(transcription.segments) if hasattr(transcription, 'segments') and transcription.segments else 0,
"subtitle_count": len([line for line in srt_content.split('\n') if line.strip().isdigit()]) if srt_content else 0
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erro inesperado: {str(e)}")
finally:
# Limpeza de arquivos temporários
for temp_file in [local_file, audio_file, vocal_file]:
if temp_file and os.path.exists(temp_file):
try:
os.unlink(temp_file)
print(f"Arquivo temporário removido: {temp_file}")
except Exception as cleanup_error:
print(f"Erro ao remover arquivo temporário {temp_file}: {cleanup_error}")