# -*- coding: utf-8 -*- """ Script para baixar um vídeo do YouTube, extrair frames, analisar com GPT-4o e contar aves. """ import os import subprocess import cv2 import base64 import time from openai import OpenAI # Importa a classe OpenAI import json import re # --- Configurações (Substitua os placeholders) --- VIDEO_URL = "https://www.youtube.com/watch?v=L1vXCYZAYYM" # Substitua pela URL do vídeo do YouTube OUTPUT_DIR = "./video_analysis_output" # Diretório para salvar o vídeo e os frames FRAME_INTERVAL_SECONDS = 3 # Intervalo entre frames a serem extraídos OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") GPT_MODEL = "gpt-4o" # Modelo GPT a ser usado (certifique-se que é o correto para análise de imagem) PROMPT_TEXT = "Quantas aves existem nesta imagem? Responda apenas com o número." # Prompt para o GPT-4o RESULTS_FILE = os.path.join(OUTPUT_DIR, "analysis_results.json") VIDEO_FILENAME = "downloaded_video.mp4" VIDEO_PATH = os.path.join(OUTPUT_DIR, VIDEO_FILENAME) # Verifica se a chave da API foi definida if OPENAI_API_KEY == "SUA_CHAVE_API_OPENAI_AQUI": print("AVISO: A chave da API OpenAI não foi definida. Por favor, edite o script e insira sua chave.") # Considerar sair do script ou lançar um erro se a chave for essencial para a execução completa # exit(1) # Verifica se a URL foi definida if VIDEO_URL == "URL_DO_SEU_VIDEO_AQUI": print("AVISO: A URL do vídeo não foi definida. Por favor, edite o script e insira a URL desejada.") # exit(1) # --- Funções --- def create_output_directory(): """Cria o diretório de saída se não existir.""" if not os.path.exists(OUTPUT_DIR): os.makedirs(OUTPUT_DIR) print(f"Diretório criado: {OUTPUT_DIR}") def retirar_sufixo_codec_arquivo(directory) -> None: for filename in os.listdir(directory): # Procura padrão como ".f123" antes da extensão new_filename = re.sub(r'\.f\d{3}(?=\.\w+$)', '', filename) if new_filename != filename: old_path = os.path.join(directory, filename) new_path = os.path.join(directory, new_filename) os.rename(old_path, new_path) print(f"Renomeado: {filename} → {new_filename}") def download_video(url, output_path): """Baixa o vídeo do YouTube usando yt-dlp.""" print(f"Baixando vídeo de {url} para {output_path}...") try: # Comando yt-dlp para baixar o melhor formato mp4 command = [ 'yt-dlp', '-f', 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best', '-o', output_path, url ] result = subprocess.run(command, check=True, capture_output=True, text=True) print("Download concluído com sucesso.") retirar_sufixo_codec_arquivo(OUTPUT_DIR) # print(result.stdout) # Descomente para ver a saída do yt-dlp return True except subprocess.CalledProcessError as e: print(f"Erro ao baixar o vídeo: {e}") print(f"Saída do erro: {e.stderr}") return False except FileNotFoundError: print("Erro: O comando 'yt-dlp' não foi encontrado. Certifique-se de que ele está instalado e no PATH do sistema.") print("Você pode instalá-lo com: pip install yt-dlp") return False def extract_frames(video_path, output_dir, interval_sec): """Extrai frames de um vídeo em intervalos específicos.""" print(f"Extraindo frames de {video_path} a cada {interval_sec} segundos...") if not os.path.exists(video_path): print(f"Erro: Arquivo de vídeo não encontrado em {video_path}") return [] cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print(f"Erro ao abrir o arquivo de vídeo: {video_path}") return [] fps = cap.get(cv2.CAP_PROP_FPS) if fps == 0: print("Erro: Não foi possível obter o FPS do vídeo. Usando FPS padrão de 30.") fps = 30 # Valor padrão caso a leitura falhe frame_interval = int(fps * interval_sec) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) print(f"Vídeo FPS: {fps:.2f}, Intervalo de frames: {frame_interval}, Total de frames: {total_frames}") extracted_frames_paths = [] frame_count = 0 saved_frame_index = 5 # o importante nunca começa no inicio, é um deslocamento inicial para iniciar depois da introdução while True: # Define a posição do próximo frame a ser lido # Adiciona frame_interval para pegar o frame *após* o intervalo de tempo target_frame_pos = saved_frame_index * frame_interval if target_frame_pos >= total_frames: break # Sai se o próximo frame alvo estiver além do final do vídeo cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame_pos) ret, frame = cap.read() if not ret: print(f"Não foi possível ler o frame na posição {target_frame_pos}. Pode ser o fim do vídeo ou um erro.") break # Sai se não conseguir ler o frame # redimensiona o frame (custo chamada) frame = cv2.resize(frame, (1280, 720)) # Calcula o timestamp em segundos timestamp_sec = target_frame_pos / fps # Salva o frame frame_filename = f"frame_{saved_frame_index:04d}_time_{timestamp_sec:.2f}s.png" frame_path = os.path.join(output_dir, frame_filename) try: cv2.imwrite(frame_path, frame) extracted_frames_paths.append(frame_path) print(f"Frame salvo: {frame_path} (Timestamp: {timestamp_sec:.2f}s)") saved_frame_index += 1 except Exception as e: print(f"Erro ao salvar o frame {frame_path}: {e}") # Continua para o próximo intervalo mesmo se um frame falhar # Segurança para evitar loop infinito caso algo dê errado com a lógica de posição if saved_frame_index > (total_frames / frame_interval) + 2: print("Aviso: Número de frames salvos parece exceder o esperado. Interrompendo extração.") break cap.release() print(f"Extração de frames concluída. Total de frames salvos: {len(extracted_frames_paths)}") return extracted_frames_paths # --- Atualização do Bloco Principal --- # (Remover a linha print anterior sobre próximas etapas e adicionar a chamada da nova função) def encode_frame_to_base64(frame_path): """Codifica um arquivo de imagem (frame) para base64.""" try: with open(frame_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode('utf-8') except FileNotFoundError: print(f"Erro: Arquivo de frame não encontrado em {frame_path}") return None except Exception as e: print(f"Erro ao codificar o frame {frame_path} para base64: {e}") return None def analyze_frame_with_gpt4o(client, base64_image, prompt): print("NAO CHAMAR AINDA") return """Envia um frame codificado em base64 para a API GPT-4o e retorna a análise.""" print(f"Enviando frame para análise no {GPT_MODEL}...") headers = { "Content-Type": "application/json", "Authorization": f"Bearer {OPENAI_API_KEY}" } payload = { "model": GPT_MODEL, "messages": [ { "role": "user", "content": [ { "type": "text", "text": prompt }, { "type": "image_url", "image_url": { "url": f"data:image/png;base64,{base64_image}" } } ] } ], "max_tokens": 100 # Ajuste conforme necessário para a resposta esperada } try: # Verifica se a chave da API é o placeholder if OPENAI_API_KEY == "SUA_CHAVE_API_OPENAI_AQUI": print("AVISO: Chave da API OpenAI não configurada. Pulando análise.") return {"error": "API key not configured."} # Cria o cliente OpenAI dentro da função para garantir que use a chave mais recente # (embora seja definida globalmente, isso pode ser útil se a chave for atualizada dinamicamente no futuro) # client = OpenAI(api_key=OPENAI_API_KEY) response = client.chat.completions.create( model=payload["model"], messages=payload["messages"], max_tokens=payload["max_tokens"] ) # Extrai o conteúdo da resposta analysis_result = response.choices[0].message.content.strip() print(f"Análise recebida: {analysis_result}") # Tenta converter a resposta para um inteiro (contagem de aves) try: bird_count = int(analysis_result) return {"bird_count": bird_count, "raw_response": analysis_result} except ValueError: print(f"Aviso: Não foi possível converter a resposta '{analysis_result}' para um número inteiro.") return {"error": "Failed to parse bird count from response.", "raw_response": analysis_result} except Exception as e: print(f"Erro ao chamar a API OpenAI: {e}") return {"error": str(e)} def save_results_to_json(results_list, output_file): """Salva a lista de resultados da análise em um arquivo JSON.""" print(f"Salvando resultados da análise em {output_file}...") try: with open(output_file, 'w', encoding='utf-8') as f: json.dump(results_list, f, ensure_ascii=False, indent=4) print(f"Resultados salvos com sucesso em: {output_file}") return True except Exception as e: print(f"Erro ao salvar os resultados em JSON: {e}") return False # --- Atualização do Bloco Principal --- # (Adicionar inicialização do cliente OpenAI e o loop de análise) if __name__ == "__main__": create_output_directory() extracted_frames = [] analysis_results_list = [] # Inicializa o cliente OpenAI (se a chave estiver definida) openai_client = None if OPENAI_API_KEY != "SUA_CHAVE_API_OPENAI_AQUI": try: openai_client = OpenAI(api_key=OPENAI_API_KEY) print("Cliente OpenAI inicializado.") except Exception as e: print(f"Erro ao inicializar o cliente OpenAI: {e}. As chamadas de API serão puladas.") else: print("Chave da API OpenAI não configurada. As chamadas de API serão puladas.") # Etapa 1: Baixar o vídeo video_downloaded_or_exists = False if VIDEO_URL != "URL_DO_SEU_VIDEO_AQUI": if download_video(VIDEO_URL, VIDEO_PATH): print(f"Vídeo salvo em: {VIDEO_PATH}") video_downloaded_or_exists = True else: print("Falha no download do vídeo. Pulando etapas dependentes.") elif os.path.exists(VIDEO_PATH): print(f"URL não fornecida, mas vídeo encontrado em {VIDEO_PATH}. Tentando processar.") video_downloaded_or_exists = True else: print("URL do vídeo não fornecida e vídeo local não encontrado. Pulando download e extração.") # Etapa 2: Extrair frames if video_downloaded_or_exists: extracted_frames = extract_frames(VIDEO_PATH, OUTPUT_DIR, FRAME_INTERVAL_SECONDS) else: print("Pulando extração de frames pois o vídeo não está disponível.") # Etapa 3 e 4: Codificar e Analisar Frames if extracted_frames and openai_client: print(f"\nIniciando análise de {len(extracted_frames)} frames com {GPT_MODEL}...") for frame_path in extracted_frames: print(f"\nProcessando frame: {frame_path}") # Extrai timestamp do nome do arquivo, se possível timestamp_str = "unknown" try: # Exemplo: frame_0000_time_0.00s.png parts = os.path.basename(frame_path).split('_') if len(parts) >= 4 and parts[2] == 'time': timestamp_str = parts[3].replace('s.png','') except Exception: pass # Mantém 'unknown' se o parsing falhar # Codifica o frame base64_image = encode_frame_to_base64(frame_path) if base64_image: # Analisa o frame com GPT-4o # analysis_result = analyze_frame_with_gpt4o(openai_client, base64_image, PROMPT_TEXT) result_entry = { "frame_path": frame_path, "timestamp_approx_sec": timestamp_str, "analysis": f' pulado frame {frame_path}' #analysis_result } analysis_results_list.append(result_entry) # Pausa opcional para evitar rate limiting time.sleep(1) # Pausa de 1 segundo entre as chamadas else: print(f"Falha ao codificar o frame {frame_path}. Pulando análise.") analysis_results_list.append({ "frame_path": frame_path, "timestamp_approx_sec": timestamp_str, "analysis": {"error": "Failed to encode frame to base64."} }) print("\nAnálise de todos os frames concluída.") elif not extracted_frames: print("Nenhum frame foi extraído. Pulando etapa de análise.") elif not openai_client: print("Cliente OpenAI não inicializado (verifique a API Key). Pulando etapa de análise.") # Próxima etapa: Compilar resultados print(f"\nPróxima etapa a ser implementada: Compilação dos resultados ({len(analysis_results_list)} análises) em um relatório.") # ... (código anterior para inicialização, download, extração, análise) ... # Etapa 5: Compilar e Salvar Resultados if analysis_results_list: print(f"\nCompilando {len(analysis_results_list)} resultados da análise...") if save_results_to_json(analysis_results_list, RESULTS_FILE): print("Compilação e salvamento dos resultados concluídos.") else: print("Falha ao salvar os resultados da análise.") else: print("Nenhum resultado de análise para compilar.") print("\n--- Processo de Análise de Vídeo Concluído ---") print(f"Verifique o diretório '{OUTPUT_DIR}' para os frames extraídos (se aplicável).") print(f"Verifique o arquivo '{RESULTS_FILE}' para os resultados da análise (se aplicável).") print("Lembre-se de substituir os placeholders para URL_DO_SEU_VIDEO_AQUI e SUA_CHAVE_API_OPENAI_AQUI no script.")