DHEIVER's picture
Update app.py
915333e verified
import json
import numpy as np
import faiss
from sklearn.feature_extraction.text import TfidfVectorizer
from transformers import AutoTokenizer, AutoModel, pipeline
from sentence_transformers import SentenceTransformer
import traceback
import gradio as gr
import pandas as pd
# --- Classes principais ---
class SistemaRecuperacaoAvancado:
"""Classe responsável pela recuperação de chunks relevantes usando embeddings semânticos."""
def __init__(self, chunks):
self.chunks = chunks
self.textos = [chunk["texto"] for chunk in chunks]
# Inicialização do modelo de embeddings
self.embedder = SentenceTransformer('neuralmind/bert-base-portuguese-cased')
self.embeddings = self.embedder.encode(self.textos)
# Índice FAISS para busca eficiente
self.index = faiss.IndexFlatL2(self.embeddings.shape[1])
self.index.add(np.array(self.embeddings))
def recuperar_chunks_relevantes(self, pergunta, top_k=10):
"""Recupera os top_k chunks mais relevantes para uma pergunta."""
pergunta_embedding = self.embedder.encode([pergunta])
_, indices = self.index.search(pergunta_embedding, top_k)
return [self.chunks[i] for i in indices[0]]
def recuperar_chunks_por_itens(self, itens, top_k=10):
"""Filtra chunks que contenham qualquer um dos itens mencionados."""
return [chunk for chunk in self.chunks
if any(item in chunk["texto"] for item in itens)][:top_k]
class RAGAvançado:
"""Classe principal do sistema RAG com funcionalidades de análise e geração de relatórios."""
def __init__(self, chunks):
self.sistema_recuperacao = SistemaRecuperacaoAvancado(chunks)
# Configuração do modelo BERTimbau e sumarizador
self.modelo_nome = "neuralmind/bert-base-portuguese-cased"
self.tokenizer = AutoTokenizer.from_pretrained(self.modelo_nome)
self.model = AutoModel.from_pretrained(self.modelo_nome)
self.summarizer = pipeline("summarization", model="t5-small")
def analisar_divergencias(self, chunks):
"""Analisa divergências em uma lista de chunks."""
divergencias = []
divergencias_detalhes = {}
referencias = {}
for chunk in chunks:
texto, topico = chunk["texto"], chunk["topico"]
if "Divergência detectada" in texto:
partes = texto.split("|")
item_id = partes[0].strip()
descricao = partes[1].strip() if len(partes) > 1 else "Sem descrição"
divergencias.append((item_id, descricao, topico))
divergencias_detalhes[item_id] = texto
for div_id, _, _ in divergencias:
if div_id in texto and "Divergência detectada" not in texto:
referencias.setdefault(div_id, []).append((topico, texto))
return divergencias, divergencias_detalhes, referencias
def gerar_resposta_divergencias(self, divergencias, divergencias_detalhes, referencias):
"""Gera uma resposta formatada sobre divergências detectadas."""
if not divergencias:
return "Não foram encontradas divergências nos dados analisados."
resposta = "## Análise de Divergências Detectadas\n\n"
resposta += "Foram identificadas as seguintes divergências:\n\n"
for i, (item_id, descricao, topico) in enumerate(divergencias, 1):
resposta += f"### {i}. Divergência no item **{item_id}**\n"
resposta += f"- **Tópico:** {topico}\n"
resposta += f"- **Descrição:** {descricao}\n"
resposta += f"- **Detalhe completo:** {divergencias_detalhes[item_id]}\n\n"
if item_id in referencias:
resposta += "**Informações contextuais relacionadas:**\n"
for ref_topico, ref_texto in referencias[item_id]:
resposta += f" - **{ref_topico}:** {ref_texto}\n"
resposta += "\n"
resposta += "## Conclusão\n"
resposta += f"Foram encontradas **{len(divergencias)} divergências** que precisam ser resolvidas.\n"
resposta += "Recomenda-se revisar os itens mencionados e consultar as especificações técnicas."
return resposta
def sumarizar_texto(self, texto, max_length=100):
"""Sumariza um texto longo automaticamente."""
summary = self.summarizer(texto, max_length=max_length, min_length=30, do_sample=False)
return summary[0]['summary_text']
def gerar_relatorio(self, pergunta, top_k=10, exibir_chunks=False):
"""Gera um relatório completo com base em uma pergunta."""
chunks_relevantes = self.sistema_recuperacao.recuperar_chunks_relevantes(pergunta, top_k)
try:
divergencias, detalhes, referencias = self.analisar_divergencias(chunks_relevantes)
resposta_texto = self.gerar_resposta_divergencias(divergencias, detalhes, referencias)
except Exception as e:
resposta_texto = f"Erro ao gerar resposta: {str(e)}\n\n{traceback.format_exc()}"
relatorio = f"=== RELATÓRIO GERADO ===\n\n**Pergunta:** {pergunta}\n\n**Resposta:**\n{resposta_texto}\n\n"
if exibir_chunks:
relatorio += "=== CHUNKS USADOS COMO CONTEXTO ===\n"
for i, chunk in enumerate(chunks_relevantes, 1):
relatorio += f"\n**Chunk {i}:**\n**Tópico:** {chunk['topico']}\n**Texto:** {chunk['texto']}\n"
return relatorio, resposta_texto, divergencias
# --- Funções auxiliares ---
def carregar_chunks_xlsx(arquivo):
"""Carrega chunks de um arquivo .xlsx."""
try:
df = pd.read_excel(arquivo, engine='openpyxl')
# Converte o DataFrame em uma lista de dicionários com as colunas esperadas
chunks = df[['texto', 'topico']].to_dict(orient='records')
return chunks
except Exception as e:
raise Exception(f"Erro ao carregar o arquivo .xlsx: {str(e)}")
def gerar_resposta_itens_relacionados(rag, itens_divergentes, top_k=10):
"""Gera resposta para itens relacionados às divergências."""
chunks_relevantes = rag.sistema_recuperacao.recuperar_chunks_por_itens(itens_divergentes, top_k)
if not chunks_relevantes:
return "Não foram encontradas informações relacionadas aos itens mencionados."
resposta = "## Informações Relacionadas\n\n"
for i, chunk in enumerate(chunks_relevantes, 1):
texto_completo = chunk["texto"]
sumario = rag.sumarizar_texto(texto_completo)
resposta += f"### Chunk {i}\n"
resposta += f"**Tópico:** {chunk['topico']}\n"
resposta += "**Informações relevantes:**\n"
resposta += f"- **Sumário:** {sumario}\n"
resposta += "- **Detalhes completos:**\n"
for linha in texto_completo.split("\n"):
if linha.strip():
resposta += f" - {linha.strip()}\n"
resposta += "\n"
resposta += "## Conclusão\n"
resposta += "As informações acima estão relacionadas aos itens mencionados.\n"
return resposta
def calcular_probabilidade_divergencia(chunks):
"""Calcula a probabilidade de um chunk conter 'Divergência detectada'."""
total_chunks = len(chunks)
if total_chunks == 0:
return 0.0, 0, 0
chunks_com_divergencia = sum(1 for chunk in chunks if "Divergência detectada" in chunk["texto"])
probabilidade = chunks_com_divergencia / total_chunks
return probabilidade, chunks_com_divergencia, total_chunks
def encontrar_chunk_mais_representativo(chunks, aba_nome):
"""Encontra o chunk com maior 'relevância' entre os que têm divergência."""
chunks_divergencia = [chunk for chunk in chunks if "Divergência detectada" in chunk["texto"]]
if not chunks_divergencia:
return None, f"Não há chunks com divergência na {aba_nome}."
chunk_mais_representativo = max(chunks_divergencia, key=lambda x: len(x["texto"]))
return chunk_mais_representativo, f"Chunk mais representativo da {aba_nome} (baseado no comprimento do texto)."
# --- Função para interface Gradio ---
def processar_pergunta(pergunta, arquivo_aba1, arquivo_aba2, top_k=10):
"""Processa a pergunta do usuário e retorna relatórios formatados."""
try:
# Carregar dados dos arquivos .xlsx
chunks_aba1 = carregar_chunks_xlsx(arquivo_aba1.name if arquivo_aba1 else None)
chunks_aba2 = carregar_chunks_xlsx(arquivo_aba2.name if arquivo_aba2 else None)
# Inicializar sistemas RAG
rag_aba1 = RAGAvançado(chunks_aba1)
rag_aba2 = RAGAvançado(chunks_aba2)
# Pergunta 1: Relatório de divergências
relatorio1, resposta1, divergencias1 = rag_aba1.gerar_relatorio(pergunta, top_k=top_k, exibir_chunks=True)
# Análise de probabilidade ABA1
prob_aba1, num_div_aba1, total_aba1 = calcular_probabilidade_divergencia(chunks_aba1)
prob_aba1_text = (
f"### Análise de Probabilidade (ABA1)\n"
f"- Total de chunks: {total_aba1}\n"
f"- Chunks com divergência: {num_div_aba1}\n"
f"- Probabilidade: {prob_aba1:.2%}\n"
)
# Chunk mais representativo ABA1
chunk_aba1, msg_aba1 = encontrar_chunk_mais_representativo(chunks_aba1, "ABA1")
chunk_aba1_text = f"### Chunk Mais Representativo (ABA1)\n{msg_aba1}\n"
if chunk_aba1:
chunk_aba1_text += f"- Tópico: {chunk_aba1['topico']}\n- Texto: {chunk_aba1['texto']}\n"
# Análise de probabilidade ABA2
prob_aba2, num_div_aba2, total_aba2 = calcular_probabilidade_divergencia(chunks_aba2)
prob_aba2_text = (
f"### Análise de Probabilidade (ABA2)\n"
f"- Total de chunks: {total_aba2}\n"
f"- Chunks com divergência: {num_div_aba2}\n"
f"- Probabilidade: {prob_aba2:.2%}\n"
)
# Chunk mais representativo ABA2
chunk_aba2, msg_aba2 = encontrar_chunk_mais_representativo(chunks_aba2, "ABA2")
chunk_aba2_text = f"### Chunk Mais Representativo (ABA2)\n{msg_aba2}\n"
if chunk_aba2:
chunk_aba2_text += f"- Tópico: {chunk_aba2['topico']}\n- Texto: {chunk_aba2['texto']}\n"
# Pergunta 2: Informações relacionadas (se houver divergências)
resposta2 = ""
if divergencias1:
itens_divergentes = [item_id for item_id, _, _ in divergencias1]
resposta2 = gerar_resposta_itens_relacionados(rag_aba2, itens_divergentes, top_k=top_k)
else:
resposta2 = "Nenhuma divergência encontrada para gerar informações relacionadas."
# Combinar resultados
resultado_final = (
f"{prob_aba1_text}\n{chunk_aba1_text}\n\n"
f"{prob_aba2_text}\n{chunk_aba2_text}\n\n"
f"### Resposta à Pergunta\n{resposta1}\n\n"
f"### Informações Relacionadas\n{resposta2}"
)
return resultado_final
except Exception as e:
return f"Erro ao processar a pergunta: {str(e)}\n\n{traceback.format_exc()}"
# --- Interface Gradio ---
def criar_interface():
"""Cria a interface gráfica com Gradio."""
with gr.Blocks(title="Sistema RAG Avançado") as interface:
gr.Markdown("# Sistema RAG Avançado")
gr.Markdown("Faça uma pergunta e envie arquivos .xlsx para análise de divergências.")
with gr.Row():
with gr.Column():
pergunta_input = gr.Textbox(label="Sua Pergunta", placeholder="Ex: Quais os itens de Divergência detectada?")
aba1_input = gr.File(label="Arquivo ABA1 (.xlsx)", file_types=[".xlsx"])
aba2_input = gr.File(label="Arquivo ABA2 (.xlsx)", file_types=[".xlsx"])
top_k_input = gr.Slider(label="Top K Chunks", minimum=1, maximum=20, value=10, step=1)
submit_btn = gr.Button("Gerar Relatório")
with gr.Column():
output_text = gr.Markdown(label="Relatório Gerado")
submit_btn.click(
fn=processar_pergunta,
inputs=[pergunta_input, aba1_input, aba2_input, top_k_input],
outputs=output_text
)
return interface
# --- Executar a interface ---
if __name__ == "__main__":
interface = criar_interface()
interface.launch()