Spaces:
Sleeping
Sleeping
# -*- coding: utf-8 -*- | |
""" | |
Script para baixar um vídeo do YouTube, extrair frames, analisar com GPT-4o e contar aves. | |
""" | |
import os | |
import subprocess | |
import cv2 | |
import base64 | |
import time | |
from openai import OpenAI # Importa a classe OpenAI | |
import json | |
import re | |
import shutil | |
import google.generativeai as genai | |
import requests | |
# --- Configurações (Substitua os placeholders) --- | |
OUTPUT_DIR = "./image_analysis_output" # Diretório para salvar o vídeo e os frames | |
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") | |
GEMINI_MODEL = "gemini-2.0-flash" | |
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
GPT_MODEL = "gpt-4o" # Modelo GPT a ser usado (certifique-se que é o correto para análise de imagem) | |
PROMPT_TEXT = "Analyze the provided image of a chessboard, return the corresponding FEN (Forsyth–Edwards Notation), assuming black at the bottom and black turn. Include turn, castling rights, en passant (if possible), and full notation. Return only the FEN." | |
#PROMPT_TEXT = "You are a chessboard position analyzer. Given an image of a chessboard: - Assume standard orientation: White at the bottom, Black at the top. - Identify all visible pieces and their positions. - Return the FEN string corresponding to the exact position. - Be precise. Do not omit or infer captured pieces. - Return only the FEN, no explanations." | |
IMAGE_FILE = "arquivos-perguntas/cca530fc-4052-43b2-b130-b30968d8aa44.png" | |
RESULTS_FILE = os.path.join(OUTPUT_DIR, "analysis_results.json") | |
FEN_CORRETA = "3r2k1/pp3pp1/4b2p/7Q/3n4/PqBBR2P/5PP1/6K1 b - - 0 1" | |
CHESSVISION_TO_FEN_URL = "http://app.chessvision.ai/predict" | |
CHESS_MOVE_API = "https://chess-api.com/v1" | |
if GEMINI_API_KEY == "SUA_CHAVE_API_OPENAI_AQUI" or not GEMINI_API_KEY or len(GEMINI_API_KEY) ==0 : | |
print("AVISO: A chave da API GEMINI não foi definida. Por favor, edite o script e insira sua chave.") | |
# Considerar sair do script ou lançar um erro se a chave for essencial para a execução completa | |
# exit(1) | |
# --- Funções --- | |
def create_or_clear_output_directory(): | |
"""Cria o diretório de saída se não existir.""" | |
if not os.path.exists(OUTPUT_DIR): | |
os.makedirs(OUTPUT_DIR) | |
print(f"Diretório criado: {OUTPUT_DIR}") | |
else: | |
# Limpa todos os arquivos e subdiretórios | |
for filename in os.listdir(OUTPUT_DIR): | |
file_path = os.path.join(OUTPUT_DIR, filename) | |
try: | |
if os.path.isfile(file_path) or os.path.islink(file_path): | |
os.unlink(file_path) | |
elif os.path.isdir(file_path): | |
shutil.rmtree(file_path) | |
except Exception as e: | |
print(f"Erro ao excluir {file_path}: {e}") | |
print(f"Diretório limpo: {OUTPUT_DIR}") | |
def encode_image_to_base64(image_path): | |
"""Codifica um arquivo de imagem (frame) para base64.""" | |
try: | |
with open(image_path, "rb") as image_file: | |
return base64.b64encode(image_file.read()).decode('utf-8') | |
except FileNotFoundError: | |
print(f"Erro: Arquivo de frame não encontrado em {image_path}") | |
return None | |
except Exception as e: | |
print(f"Erro ao codificar o frame {image_path} para base64: {e}") | |
return None | |
def analyze_image_with_gpt(base64_image, prompt): | |
if OPENAI_API_KEY: | |
try: | |
openai_client = OpenAI(api_key=OPENAI_API_KEY) | |
print("Cliente OpenAI inicializado.") | |
except Exception as e: | |
print(f"Erro ao inicializar o cliente OpenAI: {e}. As chamadas de API serão puladas.") | |
else: | |
print("Chave da API OpenAI não configurada. As chamadas de API serão puladas.") | |
"""Envia um frame codificado em base64 para a API GPT-4o e retorna a análise.""" | |
print(f"Enviando imagem para análise no {GPT_MODEL}...") | |
payload = { | |
"model": GPT_MODEL, | |
"messages": [ | |
{ | |
"role": "user", | |
"content": [ | |
{ | |
"type": "text", | |
"text": prompt | |
}, | |
{ | |
"type": "image_url", | |
"image_url": { | |
"url": f"data:image/png;base64,{base64_image}" | |
} | |
} | |
] | |
} | |
], | |
"max_tokens": 100 # Ajuste conforme necessário para a resposta esperada | |
} | |
try: | |
# Cria o cliente OpenAI dentro da função para garantir que use a chave mais recente | |
# (embora seja definida globalmente, isso pode ser útil se a chave for atualizada dinamicamente no futuro) | |
# client = OpenAI(api_key=OPENAI_API_KEY) | |
response = openai_client.chat.completions.create( | |
model=payload["model"], | |
messages=payload["messages"], | |
max_tokens=payload["max_tokens"], | |
temperature=0 | |
) | |
# Extrai o conteúdo da resposta | |
analysis_result = response.choices[0].message.content.strip() | |
fen = analysis_result.strip("`") | |
fen = fen.replace("_", " ") #retorna _ no lugar de espaço em branco | |
print(f"Análise recebida (raw): {analysis_result}") | |
print(f"Análise tratada : {fen}") | |
if fen != FEN_CORRETA: | |
print(f"FEN INCORRETA ") | |
else: | |
print(f"FEN CORRETA ") | |
return {"image_response": fen} | |
except Exception as e: | |
print(f"Erro ao chamar a API OpenAI: {e}") | |
return {"error": str(e)} | |
def analyze_image_with_gemini(base64_image, prompt): | |
genai.configure(api_key=GEMINI_API_KEY) | |
model = genai.GenerativeModel(GEMINI_MODEL) | |
"""Envia um frame codificado em base64 para a API GPT-4o e retorna a análise.""" | |
print(f"Enviando frame para análise no {GEMINI_MODEL}...") | |
try: | |
response = model.generate_content( | |
contents=[ | |
{ | |
"role": "user", | |
"parts": [ | |
{f"text": f"{prompt}"}, | |
{"inline_data": { | |
"mime_type": "image/jpeg", | |
"data": base64_image | |
}} | |
] | |
} | |
], | |
generation_config={ | |
"temperature": 0.7, | |
"max_output_tokens": 500 | |
}) | |
# Extrai o conteúdo da resposta | |
analysis_result = response.text.strip() | |
print(f"Análise recebida: {analysis_result}") | |
return {"image_response": analysis_result} | |
except Exception as e: | |
print(f"Erro ao chamar a API Gemini: {e}") | |
return {"error": str(e)} | |
def analyze_image_with_chessvision(base64_image): | |
base64_image_encoded = f"data:image/jpeg;base64,{base64_image}" | |
url = CHESSVISION_TO_FEN_URL | |
payload = { | |
"board_orientation": "predict", | |
"cropped": False, | |
"current_player": "black", | |
"image": base64_image_encoded, | |
"predict_turn": False | |
} | |
response = requests.post(url, json=payload) | |
if response.status_code == 200: | |
dados = response.json() | |
if dados.get("success"): | |
print(f"Retorno Chessvision {dados}") | |
fen = dados.get("result") | |
fen = fen.replace("_", " ") #retorna _ no lugar de espaço em branco | |
return fen | |
else: | |
raise Exception("Requisição feita, mas falhou na predição.") | |
else: | |
raise Exception(f"Erro na requisição: {response.status_code}") | |
def get_best_next_move(fen: str): | |
url = CHESS_MOVE_API | |
payload = { | |
"fen": fen, | |
"depth": 1 | |
} | |
print(f"Buscando melhor jogada em {CHESS_MOVE_API} - {payload}") | |
response = requests.post(url, json=payload) | |
if response.status_code == 200: | |
#print(f"Retorno melhor jogada --> {response.text}") | |
dados = response.json() | |
move_algebric_notation = dados.get("san") | |
move = dados.get("text") | |
print(f"Melhor jogada segundo chess-api.com -> {move}") | |
return move_algebric_notation | |
else: | |
raise Exception(f"Erro na requisição: {response.status_code}") | |
def save_results_to_json(results_list, output_file): | |
"""Salva a lista de resultados da análise em um arquivo JSON.""" | |
print(f"Salvando resultados da análise em {output_file}...") | |
try: | |
with open(output_file, 'w', encoding='utf-8') as f: | |
json.dump(results_list, f, ensure_ascii=False, indent=4) | |
print(f"Resultados salvos com sucesso em: {output_file}") | |
return True | |
except Exception as e: | |
print(f"Erro ao salvar os resultados em JSON: {e}") | |
return False | |
# --- Atualização do Bloco Principal --- | |
# (Adicionar inicialização do cliente OpenAI e o loop de análise) | |
if __name__ == "__main__": | |
create_or_clear_output_directory() | |
analysis_results_list = [] | |
print(f"\nIniciando análise da imagem {IMAGE_FILE} frames com {GEMINI_MODEL}...") | |
# Extrai timestamp do nome do arquivo, se possível | |
base64_image = encode_image_to_base64(IMAGE_FILE) | |
if base64_image: | |
# Analisa a imagem com Gemini | |
fen = analyze_image_with_chessvision(base64_image) #analyze_image_with_gpt(base64_image, PROMPT_TEXT) | |
move = get_best_next_move(fen) | |
result_entry = { | |
"image": IMAGE_FILE, | |
"fen": fen, | |
"move": move | |
} | |
analysis_results_list.append(result_entry) | |
else: | |
print(f"Falha ao codificar o frame {IMAGE_FILE}. Pulando análise.") | |
analysis_results_list.append({ | |
"frame_path": IMAGE_FILE, | |
"analysis": {"error": "Failed to encode frame to base64."} | |
}) | |
# break # teste somente uma chamada | |
print("\nAnálise de imagem concluída.") | |
# Próxima etapa: Compilar resultados | |
print(f"\nPróxima etapa a ser implementada: Compilação dos resultados ({len(analysis_results_list)} análises) em um relatório.") | |
# ... (código anterior para inicialização, download, extração, análise) ... | |
# Etapa 5: Compilar e Salvar Resultados | |
if analysis_results_list: | |
print(f"\nCompilando {len(analysis_results_list)} resultados da análise...") | |
if save_results_to_json(analysis_results_list, RESULTS_FILE): | |
print("Compilação e salvamento dos resultados concluídos.") | |
else: | |
print("Falha ao salvar os resultados da análise.") | |
else: | |
print("Nenhum resultado de análise para compilar.") | |
print("\n--- Processo de Análise de Vídeo Concluído ---") | |
print(f"Verifique o diretório '{OUTPUT_DIR}' para os frames extraídos (se aplicável).") | |
print(f"Verifique o arquivo '{RESULTS_FILE}' para os resultados da análise (se aplicável).") | |
print("Lembre-se de substituir os placeholders para URL_DO_SEU_VIDEO_AQUI e SUA_CHAVE_API_OPENAI_AQUI no script.") | |