Spaces:
Sleeping
Sleeping
# -*- coding: utf-8 -*- | |
""" | |
Script para baixar um vídeo do YouTube, extrair frames, analisar com GPT-4o e contar aves. | |
""" | |
import os | |
import subprocess | |
import cv2 | |
import base64 | |
import time | |
from openai import OpenAI # Importa a classe OpenAI | |
import json | |
import re | |
# --- Configurações (Substitua os placeholders) --- | |
VIDEO_URL = "https://www.youtube.com/watch?v=L1vXCYZAYYM" # Substitua pela URL do vídeo do YouTube | |
OUTPUT_DIR = "./video_analysis_output" # Diretório para salvar o vídeo e os frames | |
FRAME_INTERVAL_SECONDS = 3 # Intervalo entre frames a serem extraídos | |
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
GPT_MODEL = "gpt-4o" # Modelo GPT a ser usado (certifique-se que é o correto para análise de imagem) | |
PROMPT_TEXT = "Quantas aves existem nesta imagem? Responda apenas com o número." # Prompt para o GPT-4o | |
RESULTS_FILE = os.path.join(OUTPUT_DIR, "analysis_results.json") | |
VIDEO_FILENAME = "downloaded_video.mp4" | |
VIDEO_PATH = os.path.join(OUTPUT_DIR, VIDEO_FILENAME) | |
# Verifica se a chave da API foi definida | |
if OPENAI_API_KEY == "SUA_CHAVE_API_OPENAI_AQUI": | |
print("AVISO: A chave da API OpenAI não foi definida. Por favor, edite o script e insira sua chave.") | |
# Considerar sair do script ou lançar um erro se a chave for essencial para a execução completa | |
# exit(1) | |
# Verifica se a URL foi definida | |
if VIDEO_URL == "URL_DO_SEU_VIDEO_AQUI": | |
print("AVISO: A URL do vídeo não foi definida. Por favor, edite o script e insira a URL desejada.") | |
# exit(1) | |
# --- Funções --- | |
def create_output_directory(): | |
"""Cria o diretório de saída se não existir.""" | |
if not os.path.exists(OUTPUT_DIR): | |
os.makedirs(OUTPUT_DIR) | |
print(f"Diretório criado: {OUTPUT_DIR}") | |
def retirar_sufixo_codec_arquivo(directory) -> None: | |
for filename in os.listdir(directory): | |
# Procura padrão como ".f123" antes da extensão | |
new_filename = re.sub(r'\.f\d{3}(?=\.\w+$)', '', filename) | |
if new_filename != filename: | |
old_path = os.path.join(directory, filename) | |
new_path = os.path.join(directory, new_filename) | |
os.rename(old_path, new_path) | |
print(f"Renomeado: {filename} → {new_filename}") | |
def download_video(url, output_path): | |
"""Baixa o vídeo do YouTube usando yt-dlp.""" | |
print(f"Baixando vídeo de {url} para {output_path}...") | |
try: | |
# Comando yt-dlp para baixar o melhor formato mp4 | |
command = [ | |
'yt-dlp', | |
'-f', 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best', | |
'-o', output_path, | |
url | |
] | |
result = subprocess.run(command, check=True, capture_output=True, text=True) | |
print("Download concluído com sucesso.") | |
retirar_sufixo_codec_arquivo(OUTPUT_DIR) | |
# print(result.stdout) # Descomente para ver a saída do yt-dlp | |
return True | |
except subprocess.CalledProcessError as e: | |
print(f"Erro ao baixar o vídeo: {e}") | |
print(f"Saída do erro: {e.stderr}") | |
return False | |
except FileNotFoundError: | |
print("Erro: O comando 'yt-dlp' não foi encontrado. Certifique-se de que ele está instalado e no PATH do sistema.") | |
print("Você pode instalá-lo com: pip install yt-dlp") | |
return False | |
def extract_frames(video_path, output_dir, interval_sec): | |
"""Extrai frames de um vídeo em intervalos específicos.""" | |
print(f"Extraindo frames de {video_path} a cada {interval_sec} segundos...") | |
if not os.path.exists(video_path): | |
print(f"Erro: Arquivo de vídeo não encontrado em {video_path}") | |
return [] | |
cap = cv2.VideoCapture(video_path) | |
if not cap.isOpened(): | |
print(f"Erro ao abrir o arquivo de vídeo: {video_path}") | |
return [] | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
if fps == 0: | |
print("Erro: Não foi possível obter o FPS do vídeo. Usando FPS padrão de 30.") | |
fps = 30 # Valor padrão caso a leitura falhe | |
frame_interval = int(fps * interval_sec) | |
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
print(f"Vídeo FPS: {fps:.2f}, Intervalo de frames: {frame_interval}, Total de frames: {total_frames}") | |
extracted_frames_paths = [] | |
frame_count = 0 | |
saved_frame_index = 5 # o importante nunca começa no inicio, é um deslocamento inicial para iniciar depois da introdução | |
while True: | |
# Define a posição do próximo frame a ser lido | |
# Adiciona frame_interval para pegar o frame *após* o intervalo de tempo | |
target_frame_pos = saved_frame_index * frame_interval | |
if target_frame_pos >= total_frames: | |
break # Sai se o próximo frame alvo estiver além do final do vídeo | |
cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame_pos) | |
ret, frame = cap.read() | |
if not ret: | |
print(f"Não foi possível ler o frame na posição {target_frame_pos}. Pode ser o fim do vídeo ou um erro.") | |
break # Sai se não conseguir ler o frame | |
# redimensiona o frame (custo chamada) | |
frame = cv2.resize(frame, (1280, 720)) | |
# Calcula o timestamp em segundos | |
timestamp_sec = target_frame_pos / fps | |
# Salva o frame | |
frame_filename = f"frame_{saved_frame_index:04d}_time_{timestamp_sec:.2f}s.png" | |
frame_path = os.path.join(output_dir, frame_filename) | |
try: | |
cv2.imwrite(frame_path, frame) | |
extracted_frames_paths.append(frame_path) | |
print(f"Frame salvo: {frame_path} (Timestamp: {timestamp_sec:.2f}s)") | |
saved_frame_index += 1 | |
except Exception as e: | |
print(f"Erro ao salvar o frame {frame_path}: {e}") | |
# Continua para o próximo intervalo mesmo se um frame falhar | |
# Segurança para evitar loop infinito caso algo dê errado com a lógica de posição | |
if saved_frame_index > (total_frames / frame_interval) + 2: | |
print("Aviso: Número de frames salvos parece exceder o esperado. Interrompendo extração.") | |
break | |
cap.release() | |
print(f"Extração de frames concluída. Total de frames salvos: {len(extracted_frames_paths)}") | |
return extracted_frames_paths | |
# --- Atualização do Bloco Principal --- | |
# (Remover a linha print anterior sobre próximas etapas e adicionar a chamada da nova função) | |
def encode_frame_to_base64(frame_path): | |
"""Codifica um arquivo de imagem (frame) para base64.""" | |
try: | |
with open(frame_path, "rb") as image_file: | |
return base64.b64encode(image_file.read()).decode('utf-8') | |
except FileNotFoundError: | |
print(f"Erro: Arquivo de frame não encontrado em {frame_path}") | |
return None | |
except Exception as e: | |
print(f"Erro ao codificar o frame {frame_path} para base64: {e}") | |
return None | |
def analyze_frame_with_gpt4o(client, base64_image, prompt): | |
print("NAO CHAMAR AINDA") | |
return | |
"""Envia um frame codificado em base64 para a API GPT-4o e retorna a análise.""" | |
print(f"Enviando frame para análise no {GPT_MODEL}...") | |
headers = { | |
"Content-Type": "application/json", | |
"Authorization": f"Bearer {OPENAI_API_KEY}" | |
} | |
payload = { | |
"model": GPT_MODEL, | |
"messages": [ | |
{ | |
"role": "user", | |
"content": [ | |
{ | |
"type": "text", | |
"text": prompt | |
}, | |
{ | |
"type": "image_url", | |
"image_url": { | |
"url": f"data:image/png;base64,{base64_image}" | |
} | |
} | |
] | |
} | |
], | |
"max_tokens": 100 # Ajuste conforme necessário para a resposta esperada | |
} | |
try: | |
# Verifica se a chave da API é o placeholder | |
if OPENAI_API_KEY == "SUA_CHAVE_API_OPENAI_AQUI": | |
print("AVISO: Chave da API OpenAI não configurada. Pulando análise.") | |
return {"error": "API key not configured."} | |
# Cria o cliente OpenAI dentro da função para garantir que use a chave mais recente | |
# (embora seja definida globalmente, isso pode ser útil se a chave for atualizada dinamicamente no futuro) | |
# client = OpenAI(api_key=OPENAI_API_KEY) | |
response = client.chat.completions.create( | |
model=payload["model"], | |
messages=payload["messages"], | |
max_tokens=payload["max_tokens"] | |
) | |
# Extrai o conteúdo da resposta | |
analysis_result = response.choices[0].message.content.strip() | |
print(f"Análise recebida: {analysis_result}") | |
# Tenta converter a resposta para um inteiro (contagem de aves) | |
try: | |
bird_count = int(analysis_result) | |
return {"bird_count": bird_count, "raw_response": analysis_result} | |
except ValueError: | |
print(f"Aviso: Não foi possível converter a resposta '{analysis_result}' para um número inteiro.") | |
return {"error": "Failed to parse bird count from response.", "raw_response": analysis_result} | |
except Exception as e: | |
print(f"Erro ao chamar a API OpenAI: {e}") | |
return {"error": str(e)} | |
def save_results_to_json(results_list, output_file): | |
"""Salva a lista de resultados da análise em um arquivo JSON.""" | |
print(f"Salvando resultados da análise em {output_file}...") | |
try: | |
with open(output_file, 'w', encoding='utf-8') as f: | |
json.dump(results_list, f, ensure_ascii=False, indent=4) | |
print(f"Resultados salvos com sucesso em: {output_file}") | |
return True | |
except Exception as e: | |
print(f"Erro ao salvar os resultados em JSON: {e}") | |
return False | |
# --- Atualização do Bloco Principal --- | |
# (Adicionar inicialização do cliente OpenAI e o loop de análise) | |
if __name__ == "__main__": | |
create_output_directory() | |
extracted_frames = [] | |
analysis_results_list = [] | |
# Inicializa o cliente OpenAI (se a chave estiver definida) | |
openai_client = None | |
if OPENAI_API_KEY != "SUA_CHAVE_API_OPENAI_AQUI": | |
try: | |
openai_client = OpenAI(api_key=OPENAI_API_KEY) | |
print("Cliente OpenAI inicializado.") | |
except Exception as e: | |
print(f"Erro ao inicializar o cliente OpenAI: {e}. As chamadas de API serão puladas.") | |
else: | |
print("Chave da API OpenAI não configurada. As chamadas de API serão puladas.") | |
# Etapa 1: Baixar o vídeo | |
video_downloaded_or_exists = False | |
if VIDEO_URL != "URL_DO_SEU_VIDEO_AQUI": | |
if download_video(VIDEO_URL, VIDEO_PATH): | |
print(f"Vídeo salvo em: {VIDEO_PATH}") | |
video_downloaded_or_exists = True | |
else: | |
print("Falha no download do vídeo. Pulando etapas dependentes.") | |
elif os.path.exists(VIDEO_PATH): | |
print(f"URL não fornecida, mas vídeo encontrado em {VIDEO_PATH}. Tentando processar.") | |
video_downloaded_or_exists = True | |
else: | |
print("URL do vídeo não fornecida e vídeo local não encontrado. Pulando download e extração.") | |
# Etapa 2: Extrair frames | |
if video_downloaded_or_exists: | |
extracted_frames = extract_frames(VIDEO_PATH, OUTPUT_DIR, FRAME_INTERVAL_SECONDS) | |
else: | |
print("Pulando extração de frames pois o vídeo não está disponível.") | |
# Etapa 3 e 4: Codificar e Analisar Frames | |
if extracted_frames and openai_client: | |
print(f"\nIniciando análise de {len(extracted_frames)} frames com {GPT_MODEL}...") | |
for frame_path in extracted_frames: | |
print(f"\nProcessando frame: {frame_path}") | |
# Extrai timestamp do nome do arquivo, se possível | |
timestamp_str = "unknown" | |
try: | |
# Exemplo: frame_0000_time_0.00s.png | |
parts = os.path.basename(frame_path).split('_') | |
if len(parts) >= 4 and parts[2] == 'time': | |
timestamp_str = parts[3].replace('s.png','') | |
except Exception: | |
pass # Mantém 'unknown' se o parsing falhar | |
# Codifica o frame | |
base64_image = encode_frame_to_base64(frame_path) | |
if base64_image: | |
# Analisa o frame com GPT-4o | |
# analysis_result = analyze_frame_with_gpt4o(openai_client, base64_image, PROMPT_TEXT) | |
result_entry = { | |
"frame_path": frame_path, | |
"timestamp_approx_sec": timestamp_str, | |
"analysis": f' pulado frame {frame_path}' #analysis_result | |
} | |
analysis_results_list.append(result_entry) | |
# Pausa opcional para evitar rate limiting | |
time.sleep(1) # Pausa de 1 segundo entre as chamadas | |
else: | |
print(f"Falha ao codificar o frame {frame_path}. Pulando análise.") | |
analysis_results_list.append({ | |
"frame_path": frame_path, | |
"timestamp_approx_sec": timestamp_str, | |
"analysis": {"error": "Failed to encode frame to base64."} | |
}) | |
print("\nAnálise de todos os frames concluída.") | |
elif not extracted_frames: | |
print("Nenhum frame foi extraído. Pulando etapa de análise.") | |
elif not openai_client: | |
print("Cliente OpenAI não inicializado (verifique a API Key). Pulando etapa de análise.") | |
# Próxima etapa: Compilar resultados | |
print(f"\nPróxima etapa a ser implementada: Compilação dos resultados ({len(analysis_results_list)} análises) em um relatório.") | |
# ... (código anterior para inicialização, download, extração, análise) ... | |
# Etapa 5: Compilar e Salvar Resultados | |
if analysis_results_list: | |
print(f"\nCompilando {len(analysis_results_list)} resultados da análise...") | |
if save_results_to_json(analysis_results_list, RESULTS_FILE): | |
print("Compilação e salvamento dos resultados concluídos.") | |
else: | |
print("Falha ao salvar os resultados da análise.") | |
else: | |
print("Nenhum resultado de análise para compilar.") | |
print("\n--- Processo de Análise de Vídeo Concluído ---") | |
print(f"Verifique o diretório '{OUTPUT_DIR}' para os frames extraídos (se aplicável).") | |
print(f"Verifique o arquivo '{RESULTS_FILE}' para os resultados da análise (se aplicável).") | |
print("Lembre-se de substituir os placeholders para URL_DO_SEU_VIDEO_AQUI e SUA_CHAVE_API_OPENAI_AQUI no script.") | |