Spaces:
Build error
Build error
import gradio as gr | |
import torch | |
from transformers import AutoProcessor, AutoModelForVision2Seq | |
from PIL import Image | |
import numpy as np | |
import os | |
import logging | |
import cv2 | |
import shutil | |
import subprocess | |
from pathlib import Path | |
import tempfile | |
# Configuração de logging | |
logging.basicConfig(level=logging.DEBUG) | |
logger = logging.getLogger(__name__) | |
class ForceImageProcessor: | |
"""Processador agressivo de imagens com múltiplos fallbacks""" | |
def force_convert_image(input_path): | |
"""Converte imagem usando múltiplos métodos até funcionar""" | |
try: | |
# Cria diretório temporário | |
with tempfile.TemporaryDirectory() as temp_dir: | |
temp_path = Path(temp_dir) / "converted_image.jpg" | |
# Tenta diferentes métodos de conversão | |
methods = [ | |
ForceImageProcessor._try_pillow, | |
ForceImageProcessor._try_opencv, | |
ForceImageProcessor._try_imagemagick, | |
ForceImageProcessor._try_ffmpeg | |
] | |
for method in methods: | |
try: | |
result = method(input_path, temp_path) | |
if result: | |
return Image.open(temp_path) | |
except Exception as e: | |
logger.debug(f"Método falhou: {str(e)}") | |
continue | |
raise ValueError("Todos os métodos de conversão falharam") | |
except Exception as e: | |
logger.error(f"Erro na conversão: {str(e)}") | |
raise | |
def _try_pillow(input_path, output_path): | |
"""Tenta converter usando Pillow""" | |
try: | |
img = Image.open(input_path) | |
img = img.convert('RGB') | |
img.save(output_path, 'JPEG') | |
return True | |
except: | |
return False | |
def _try_opencv(input_path, output_path): | |
"""Tenta converter usando OpenCV""" | |
try: | |
img = cv2.imread(str(input_path)) | |
if img is None: | |
return False | |
cv2.imwrite(str(output_path), img) | |
return True | |
except: | |
return False | |
def _try_imagemagick(input_path, output_path): | |
"""Tenta converter usando ImageMagick""" | |
try: | |
result = subprocess.run( | |
['convert', str(input_path), str(output_path)], | |
capture_output=True, | |
text=True | |
) | |
return result.returncode == 0 | |
except: | |
return False | |
def _try_ffmpeg(input_path, output_path): | |
"""Tenta converter usando FFmpeg""" | |
try: | |
result = subprocess.run( | |
['ffmpeg', '-i', str(input_path), '-y', str(output_path)], | |
capture_output=True, | |
text=True | |
) | |
return result.returncode == 0 | |
except: | |
return False | |
class NutritionalAnalyzer: | |
def __init__(self): | |
self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
self.models = {} | |
self.processors = {} | |
self.image_processor = ForceImageProcessor() | |
async def initialize_model(self, model_name): | |
"""Inicializa modelo com tratamento de erros melhorado""" | |
try: | |
if model_name not in self.models: | |
logger.info(f"Inicializando {model_name}...") | |
model_configs = { | |
"llava": { | |
"repo": "llava-hf/llava-1.5-7b-hf", | |
"local_cache": "models/llava" | |
}, | |
"git": { | |
"repo": "microsoft/git-base-coco", | |
"local_cache": "models/git" | |
} | |
} | |
config = model_configs.get(model_name) | |
if not config: | |
raise ValueError(f"Modelo não suportado: {model_name}") | |
# Garante que o diretório de cache existe | |
os.makedirs(config["local_cache"], exist_ok=True) | |
# Carrega processador e modelo | |
try: | |
self.processors[model_name] = await gr.asyncio.asyncio.to_thread( | |
AutoProcessor.from_pretrained, | |
config["repo"], | |
trust_remote_code=True | |
) | |
self.models[model_name] = await gr.asyncio.asyncio.to_thread( | |
AutoModelForVision2Seq.from_pretrained, | |
config["repo"], | |
torch_dtype=torch.float16 if self.device == "cuda" else torch.float32, | |
device_map="auto", | |
trust_remote_code=True | |
) | |
except Exception as e: | |
logger.error(f"Erro ao carregar modelo: {str(e)}") | |
raise | |
logger.info(f"{model_name} inicializado com sucesso") | |
return True | |
return True | |
except Exception as e: | |
logger.error(f"Erro na inicialização do {model_name}: {str(e)}") | |
return False | |
async def analyze_image(self, image, question, model_choice): | |
"""Analisa imagem com foco nutricional""" | |
try: | |
if image is None: | |
return "Por favor, envie uma imagem para análise." | |
# Converte escolha do modelo | |
model_name = model_choice.lower().replace("-", "") | |
# Inicializa modelo | |
if not await self.initialize_model(model_name): | |
return "Erro: Falha ao inicializar o modelo. Tente novamente." | |
# Processa imagem com conversão forçada | |
try: | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as temp_file: | |
if isinstance(image, np.ndarray): | |
cv2.imwrite(temp_file.name, cv2.cvtColor(image, cv2.COLOR_RGB2BGR)) | |
else: | |
shutil.copy2(image, temp_file.name) | |
processed_image = await gr.asyncio.asyncio.to_thread( | |
self.image_processor.force_convert_image, | |
temp_file.name | |
) | |
except Exception as e: | |
logger.error(f"Erro no processamento da imagem: {str(e)}") | |
return f"Erro ao processar imagem: {str(e)}" | |
# Gera prompt | |
nutritional_prompt = self.generate_nutritional_prompt(question) | |
# Processa input | |
try: | |
inputs = await gr.asyncio.asyncio.to_thread( | |
self.processors[model_name], | |
images=processed_image, | |
text=nutritional_prompt, | |
return_tensors="pt" | |
) | |
inputs = {k: v.to(self.device) for k, v in inputs.items()} | |
except Exception as e: | |
return f"Erro no processamento: {str(e)}" | |
# Gera resposta | |
try: | |
with torch.no_grad(): | |
outputs = await gr.asyncio.asyncio.to_thread( | |
self.models[model_name].generate, | |
**inputs, | |
max_new_tokens=300, | |
num_beams=5, | |
do_sample=True, # Habilitado para usar temperature e top_p | |
temperature=0.7, | |
top_p=0.9, | |
repetition_penalty=1.2, | |
no_repeat_ngram_size=3, # Evita repetições de frases | |
early_stopping=True # Para quando a geração estiver completa | |
) | |
response = self.processors[model_name].decode(outputs[0], skip_special_tokens=True) | |
return self.format_response(response) | |
except Exception as e: | |
return f"Erro na geração da análise: {str(e)}" | |
except Exception as e: | |
logger.error(f"Erro na análise: {str(e)}") | |
return f"Erro: {str(e)}\nPor favor, tente novamente." | |
def generate_nutritional_prompt(self, question): | |
"""Gera prompt para análise nutricional""" | |
return f"""Como nutricionista especializado, analise esta refeição detalhadamente: | |
1. Composição do Prato: | |
- Ingredientes principais | |
- Proporções aproximadas | |
- Método de preparo aparente | |
2. Análise Nutricional: | |
- Estimativa calórica | |
- Macronutrientes (proteínas, carboidratos, gorduras) | |
- Principais micronutrientes | |
3. Recomendações: | |
- Sugestões para versão mais saudável | |
- Porção recomendada | |
- Adequação para dietas específicas | |
Pergunta específica do usuário: {question} | |
Por favor, forneça uma análise detalhada em português.""" | |
def format_response(self, response): | |
"""Formata a resposta para melhor legibilidade""" | |
sections = [ | |
"Composição do Prato", | |
"Análise Nutricional", | |
"Recomendações" | |
] | |
formatted = "# 📊 Análise Nutricional\n\n" | |
current_section = "" | |
for paragraph in response.split("\n"): | |
for section in sections: | |
if section.lower() in paragraph.lower(): | |
current_section = f"\n## {section}\n" | |
formatted += current_section | |
break | |
if paragraph.strip() and current_section: | |
formatted += f"- {paragraph.strip()}\n" | |
elif paragraph.strip(): | |
formatted += f"{paragraph.strip()}\n" | |
return formatted | |
def create_interface(): | |
"""Cria interface Gradio""" | |
analyzer = NutritionalAnalyzer() | |
with gr.Blocks(theme=gr.themes.Soft()) as iface: | |
gr.Markdown(""" | |
# 🥗 Análise Nutricional Inteligente | |
Upload da foto do seu prato para análise nutricional detalhada. | |
""") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
image_input = gr.Image( | |
type="filepath", # Mudado para filepath para melhor compatibilidade | |
label="📸 Foto do Prato", | |
height=400 | |
) | |
question_input = gr.Textbox( | |
label="💭 Sua Pergunta", | |
placeholder="Ex: Quais são os nutrientes principais deste prato?", | |
lines=2 | |
) | |
model_choice = gr.Radio( | |
choices=["LLaVA", "GIT"], | |
value="LLaVA", | |
label="🤖 Escolha o Modelo" | |
) | |
analyze_btn = gr.Button( | |
"🔍 Analisar Prato", | |
variant="primary" | |
) | |
with gr.Column(scale=3): | |
output = gr.Markdown(label="Resultado da Análise") | |
with gr.Accordion("💡 Dicas", open=False): | |
gr.Markdown(""" | |
### Formatos Suportados: | |
- JPG/JPEG | |
- PNG | |
- WEBP | |
- AVIF | |
- Outros formatos de imagem comuns | |
### Para Melhores Resultados: | |
1. Boa iluminação na foto | |
2. Capture todo o prato | |
3. Evite ângulos muito inclinados | |
4. Perguntas específicas ajudam | |
""") | |
analyze_btn.click( | |
fn=analyzer.analyze_image, | |
inputs=[image_input, question_input, model_choice], | |
outputs=output | |
) | |
return iface | |
if __name__ == "__main__": | |
iface = create_interface() | |
iface.launch( | |
share=False, | |
debug=True, | |
server_name="0.0.0.0", | |
server_port=7860, | |
show_error=True | |
) |