Final_Assignment_Template / video_analyzer_temp.py
gdms's picture
Tools
5459b92
raw
history blame
17 kB
# -*- coding: utf-8 -*-
"""
Script para baixar um vídeo do YouTube, extrair frames, analisar com GPT-4o e contar aves.
"""
import os
import subprocess
import cv2
import base64
import time
from openai import OpenAI # Importa a classe OpenAI
import json
# --- Configurações (Substitua os placeholders) ---
VIDEO_URL = "URL_DO_SEU_VIDEO_AQUI" # Substitua pela URL do vídeo do YouTube
OUTPUT_DIR = "./video_analysis_output" # Diretório para salvar o vídeo e os frames
FRAME_INTERVAL_SECONDS = 5 # Intervalo entre frames a serem extraídos
OPENAI_API_KEY = "SUA_CHAVE_API_OPENAI_AQUI" # Substitua pela sua chave da API OpenAI
GPT_MODEL = "gpt-4o" # Modelo GPT a ser usado (certifique-se que é o correto para análise de imagem)
PROMPT_TEXT = "Quantas aves existem nesta imagem? Responda apenas com o número." # Prompt para o GPT-4o
RESULTS_FILE = os.path.join(OUTPUT_DIR, "analysis_results.json")
VIDEO_FILENAME = "downloaded_video.mp4"
VIDEO_PATH = os.path.join(OUTPUT_DIR, VIDEO_FILENAME)
# Verifica se a chave da API foi definida
if OPENAI_API_KEY == "SUA_CHAVE_API_OPENAI_AQUI":
print("AVISO: A chave da API OpenAI não foi definida. Por favor, edite o script e insira sua chave.")
# Considerar sair do script ou lançar um erro se a chave for essencial para a execução completa
# exit(1)
# Verifica se a URL foi definida
if VIDEO_URL == "URL_DO_SEU_VIDEO_AQUI":
print("AVISO: A URL do vídeo não foi definida. Por favor, edite o script e insira a URL desejada.")
# exit(1)
# --- Funções ---
def create_output_directory():
"""Cria o diretório de saída se não existir."""
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)
print(f"Diretório criado: {OUTPUT_DIR}")
def download_video(url, output_path):
"""Baixa o vídeo do YouTube usando yt-dlp."""
print(f"Baixando vídeo de {url} para {output_path}...")
try:
# Comando yt-dlp para baixar o melhor formato mp4
command = [
'yt-dlp',
'-f', 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best',
'-o', output_path,
url
]
result = subprocess.run(command, check=True, capture_output=True, text=True)
print("Download concluído com sucesso.")
# print(result.stdout) # Descomente para ver a saída do yt-dlp
return True
except subprocess.CalledProcessError as e:
print(f"Erro ao baixar o vídeo: {e}")
print(f"Saída do erro: {e.stderr}")
return False
except FileNotFoundError:
print("Erro: O comando 'yt-dlp' não foi encontrado. Certifique-se de que ele está instalado e no PATH do sistema.")
print("Você pode instalá-lo com: pip install yt-dlp")
return False
# --- Bloco Principal (Inicial) ---
if __name__ == "__main__":
create_output_directory()
# Etapa 1: Baixar o vídeo
if VIDEO_URL != "URL_DO_SEU_VIDEO_AQUI":
if not download_video(VIDEO_URL, VIDEO_PATH):
print("Falha no download do vídeo. Abortando.")
# exit(1) # Descomente se quiser que o script pare aqui em caso de falha no download
else:
print(f"Vídeo salvo em: {VIDEO_PATH}")
else:
print("URL do vídeo não fornecida. Pulando etapa de download.")
print("Certifique-se de que o arquivo 'downloaded_video.mp4' existe em '{OUTPUT_DIR}' se quiser continuar.")
# Próximas etapas (extração de frames, análise, etc.) serão adicionadas aqui
print("\nPróximas etapas a serem implementadas: extração de frames, análise com GPT-4o, compilação de resultados.")
def extract_frames(video_path, output_dir, interval_sec):
"""Extrai frames de um vídeo em intervalos específicos."""
print(f"Extraindo frames de {video_path} a cada {interval_sec} segundos...")
if not os.path.exists(video_path):
print(f"Erro: Arquivo de vídeo não encontrado em {video_path}")
return []
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"Erro ao abrir o arquivo de vídeo: {video_path}")
return []
fps = cap.get(cv2.CAP_PROP_FPS)
if fps == 0:
print("Erro: Não foi possível obter o FPS do vídeo. Usando FPS padrão de 30.")
fps = 30 # Valor padrão caso a leitura falhe
frame_interval = int(fps * interval_sec)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"Vídeo FPS: {fps:.2f}, Intervalo de frames: {frame_interval}, Total de frames: {total_frames}")
extracted_frames_paths = []
frame_count = 0
saved_frame_index = 0
while True:
# Define a posição do próximo frame a ser lido
# Adiciona frame_interval para pegar o frame *após* o intervalo de tempo
target_frame_pos = saved_frame_index * frame_interval
if target_frame_pos >= total_frames:
break # Sai se o próximo frame alvo estiver além do final do vídeo
cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame_pos)
ret, frame = cap.read()
if not ret:
print(f"Não foi possível ler o frame na posição {target_frame_pos}. Pode ser o fim do vídeo ou um erro.")
break # Sai se não conseguir ler o frame
# Calcula o timestamp em segundos
timestamp_sec = target_frame_pos / fps
# Salva o frame
frame_filename = f"frame_{saved_frame_index:04d}_time_{timestamp_sec:.2f}s.png"
frame_path = os.path.join(output_dir, frame_filename)
try:
cv2.imwrite(frame_path, frame)
extracted_frames_paths.append(frame_path)
print(f"Frame salvo: {frame_path} (Timestamp: {timestamp_sec:.2f}s)")
saved_frame_index += 1
except Exception as e:
print(f"Erro ao salvar o frame {frame_path}: {e}")
# Continua para o próximo intervalo mesmo se um frame falhar
# Segurança para evitar loop infinito caso algo dê errado com a lógica de posição
if saved_frame_index > (total_frames / frame_interval) + 2:
print("Aviso: Número de frames salvos parece exceder o esperado. Interrompendo extração.")
break
cap.release()
print(f"Extração de frames concluída. Total de frames salvos: {len(extracted_frames_paths)}")
return extracted_frames_paths
# --- Atualização do Bloco Principal ---
# (Remover a linha print anterior sobre próximas etapas e adicionar a chamada da nova função)
def encode_frame_to_base64(frame_path):
"""Codifica um arquivo de imagem (frame) para base64."""
try:
with open(frame_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
except FileNotFoundError:
print(f"Erro: Arquivo de frame não encontrado em {frame_path}")
return None
except Exception as e:
print(f"Erro ao codificar o frame {frame_path} para base64: {e}")
return None
# --- Atualização do Bloco Principal ---
# (Remover a linha print anterior sobre próximas etapas e adicionar a chamada da nova função)
if __name__ == "__main__":
create_output_directory()
extracted_frames = [] # Inicializa a lista de frames extraídos
# Etapa 1: Baixar o vídeo
if VIDEO_URL != "URL_DO_SEU_VIDEO_AQUI":
if not download_video(VIDEO_URL, VIDEO_PATH):
print("Falha no download do vídeo. Pulando etapas dependentes.")
# Considerar sair se o vídeo for essencial
# exit(1)
else:
print(f"Vídeo salvo em: {VIDEO_PATH}")
# Etapa 2: Extrair frames (só executa se o download foi bem-sucedido ou se a URL não foi fornecida mas o vídeo existe)
if os.path.exists(VIDEO_PATH):
extracted_frames = extract_frames(VIDEO_PATH, OUTPUT_DIR, FRAME_INTERVAL_SECONDS)
else:
print(f"Arquivo de vídeo {VIDEO_PATH} não encontrado após tentativa de download. Pulando extração de frames.")
elif os.path.exists(VIDEO_PATH):
print(f"URL não fornecida, mas vídeo encontrado em {VIDEO_PATH}. Tentando extrair frames.")
extracted_frames = extract_frames(VIDEO_PATH, OUTPUT_DIR, FRAME_INTERVAL_SECONDS)
else:
print("URL do vídeo não fornecida e vídeo local não encontrado. Pulando download e extração de frames.")
print(f"Certifique-se de que o arquivo '{VIDEO_FILENAME}' existe em '{OUTPUT_DIR}' para processamento manual.")
# Etapa 3: Codificar frames para base64 (será usado na próxima etapa de análise)
if extracted_frames:
print(f"\nIniciando codificação de {len(extracted_frames)} frames para base64...")
# A codificação será feita dentro do loop de análise na próxima etapa
# para otimizar o uso de memória, processando um frame por vez.
print("Codificação será realizada frame a frame antes do envio para a API.")
else:
print("Nenhum frame foi extraído. Pulando etapas de codificação e análise.")
# Próxima etapa: Enviar frames para GPT-4o
print("\nPróxima etapa a ser implementada: Envio dos frames (codificados em base64) para análise com GPT-4o.")
def analyze_frame_with_gpt4o(client, base64_image, prompt):
"""Envia um frame codificado em base64 para a API GPT-4o e retorna a análise."""
print(f"Enviando frame para análise no {GPT_MODEL}...")
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {OPENAI_API_KEY}"
}
payload = {
"model": GPT_MODEL,
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": prompt
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{base64_image}"
}
}
]
}
],
"max_tokens": 100 # Ajuste conforme necessário para a resposta esperada
}
try:
# Verifica se a chave da API é o placeholder
if OPENAI_API_KEY == "SUA_CHAVE_API_OPENAI_AQUI":
print("AVISO: Chave da API OpenAI não configurada. Pulando análise.")
return {"error": "API key not configured."}
# Cria o cliente OpenAI dentro da função para garantir que use a chave mais recente
# (embora seja definida globalmente, isso pode ser útil se a chave for atualizada dinamicamente no futuro)
# client = OpenAI(api_key=OPENAI_API_KEY)
response = client.chat.completions.create(
model=payload["model"],
messages=payload["messages"],
max_tokens=payload["max_tokens"]
)
# Extrai o conteúdo da resposta
analysis_result = response.choices[0].message.content.strip()
print(f"Análise recebida: {analysis_result}")
# Tenta converter a resposta para um inteiro (contagem de aves)
try:
bird_count = int(analysis_result)
return {"bird_count": bird_count, "raw_response": analysis_result}
except ValueError:
print(f"Aviso: Não foi possível converter a resposta '{analysis_result}' para um número inteiro.")
return {"error": "Failed to parse bird count from response.", "raw_response": analysis_result}
except Exception as e:
print(f"Erro ao chamar a API OpenAI: {e}")
return {"error": str(e)}
# --- Atualização do Bloco Principal ---
# (Adicionar inicialização do cliente OpenAI e o loop de análise)
if __name__ == "__main__":
create_output_directory()
extracted_frames = []
analysis_results_list = []
# Inicializa o cliente OpenAI (se a chave estiver definida)
openai_client = None
if OPENAI_API_KEY != "SUA_CHAVE_API_OPENAI_AQUI":
try:
openai_client = OpenAI(api_key=OPENAI_API_KEY)
print("Cliente OpenAI inicializado.")
except Exception as e:
print(f"Erro ao inicializar o cliente OpenAI: {e}. As chamadas de API serão puladas.")
else:
print("Chave da API OpenAI não configurada. As chamadas de API serão puladas.")
# Etapa 1: Baixar o vídeo
video_downloaded_or_exists = False
if VIDEO_URL != "URL_DO_SEU_VIDEO_AQUI":
if download_video(VIDEO_URL, VIDEO_PATH):
print(f"Vídeo salvo em: {VIDEO_PATH}")
video_downloaded_or_exists = True
else:
print("Falha no download do vídeo. Pulando etapas dependentes.")
elif os.path.exists(VIDEO_PATH):
print(f"URL não fornecida, mas vídeo encontrado em {VIDEO_PATH}. Tentando processar.")
video_downloaded_or_exists = True
else:
print("URL do vídeo não fornecida e vídeo local não encontrado. Pulando download e extração.")
# Etapa 2: Extrair frames
if video_downloaded_or_exists:
extracted_frames = extract_frames(VIDEO_PATH, OUTPUT_DIR, FRAME_INTERVAL_SECONDS)
else:
print("Pulando extração de frames pois o vídeo não está disponível.")
# Etapa 3 e 4: Codificar e Analisar Frames
if extracted_frames and openai_client:
print(f"\nIniciando análise de {len(extracted_frames)} frames com {GPT_MODEL}...")
for frame_path in extracted_frames:
print(f"\nProcessando frame: {frame_path}")
# Extrai timestamp do nome do arquivo, se possível
timestamp_str = "unknown"
try:
# Exemplo: frame_0000_time_0.00s.png
parts = os.path.basename(frame_path).split('_')
if len(parts) >= 4 and parts[2] == 'time':
timestamp_str = parts[3].replace('s.png','')
except Exception:
pass # Mantém 'unknown' se o parsing falhar
# Codifica o frame
base64_image = encode_frame_to_base64(frame_path)
if base64_image:
# Analisa o frame com GPT-4o
analysis_result = analyze_frame_with_gpt4o(openai_client, base64_image, PROMPT_TEXT)
result_entry = {
"frame_path": frame_path,
"timestamp_approx_sec": timestamp_str,
"analysis": analysis_result
}
analysis_results_list.append(result_entry)
# Pausa opcional para evitar rate limiting
time.sleep(1) # Pausa de 1 segundo entre as chamadas
else:
print(f"Falha ao codificar o frame {frame_path}. Pulando análise.")
analysis_results_list.append({
"frame_path": frame_path,
"timestamp_approx_sec": timestamp_str,
"analysis": {"error": "Failed to encode frame to base64."}
})
print("\nAnálise de todos os frames concluída.")
elif not extracted_frames:
print("Nenhum frame foi extraído. Pulando etapa de análise.")
elif not openai_client:
print("Cliente OpenAI não inicializado (verifique a API Key). Pulando etapa de análise.")
# Próxima etapa: Compilar resultados
print(f"\nPróxima etapa a ser implementada: Compilação dos resultados ({len(analysis_results_list)} análises) em um relatório.")
def save_results_to_json(results_list, output_file):
"""Salva a lista de resultados da análise em um arquivo JSON."""
print(f"Salvando resultados da análise em {output_file}...")
try:
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(results_list, f, ensure_ascii=False, indent=4)
print(f"Resultados salvos com sucesso em: {output_file}")
return True
except Exception as e:
print(f"Erro ao salvar os resultados em JSON: {e}")
return False
# --- Finalização do Bloco Principal ---
if __name__ == "__main__":
# ... (código anterior para inicialização, download, extração, análise) ...
# Etapa 5: Compilar e Salvar Resultados
if analysis_results_list:
print(f"\nCompilando {len(analysis_results_list)} resultados da análise...")
if save_results_to_json(analysis_results_list, RESULTS_FILE):
print("Compilação e salvamento dos resultados concluídos.")
else:
print("Falha ao salvar os resultados da análise.")
else:
print("Nenhum resultado de análise para compilar.")
print("\n--- Processo de Análise de Vídeo Concluído ---")
print(f"Verifique o diretório '{OUTPUT_DIR}' para os frames extraídos (se aplicável).")
print(f"Verifique o arquivo '{RESULTS_FILE}' para os resultados da análise (se aplicável).")
print("Lembre-se de substituir os placeholders para URL_DO_SEU_VIDEO_AQUI e SUA_CHAVE_API_OPENAI_AQUI no script.")