Spaces:
Sleeping
Sleeping
import json | |
import numpy as np | |
import faiss | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from transformers import AutoTokenizer, AutoModel, pipeline | |
from sentence_transformers import SentenceTransformer | |
import traceback | |
import gradio as gr | |
import pandas as pd | |
# --- Classes principais --- | |
class SistemaRecuperacaoAvancado: | |
"""Classe responsável pela recuperação de chunks relevantes usando embeddings semânticos.""" | |
def __init__(self, chunks): | |
self.chunks = chunks | |
self.textos = [chunk["texto"] for chunk in chunks] | |
# Inicialização do modelo de embeddings | |
self.embedder = SentenceTransformer('neuralmind/bert-base-portuguese-cased') | |
self.embeddings = self.embedder.encode(self.textos) | |
# Índice FAISS para busca eficiente | |
self.index = faiss.IndexFlatL2(self.embeddings.shape[1]) | |
self.index.add(np.array(self.embeddings)) | |
def recuperar_chunks_relevantes(self, pergunta, top_k=10): | |
"""Recupera os top_k chunks mais relevantes para uma pergunta.""" | |
pergunta_embedding = self.embedder.encode([pergunta]) | |
_, indices = self.index.search(pergunta_embedding, top_k) | |
return [self.chunks[i] for i in indices[0]] | |
def recuperar_chunks_por_itens(self, itens, top_k=10): | |
"""Filtra chunks que contenham qualquer um dos itens mencionados.""" | |
return [chunk for chunk in self.chunks | |
if any(item in chunk["texto"] for item in itens)][:top_k] | |
class RAGAvançado: | |
"""Classe principal do sistema RAG com funcionalidades de análise e geração de relatórios.""" | |
def __init__(self, chunks): | |
self.sistema_recuperacao = SistemaRecuperacaoAvancado(chunks) | |
# Configuração do modelo BERTimbau e sumarizador | |
self.modelo_nome = "neuralmind/bert-base-portuguese-cased" | |
self.tokenizer = AutoTokenizer.from_pretrained(self.modelo_nome) | |
self.model = AutoModel.from_pretrained(self.modelo_nome) | |
self.summarizer = pipeline("summarization", model="t5-small") | |
def analisar_divergencias(self, chunks): | |
"""Analisa divergências em uma lista de chunks.""" | |
divergencias = [] | |
divergencias_detalhes = {} | |
referencias = {} | |
for chunk in chunks: | |
texto, topico = chunk["texto"], chunk["topico"] | |
if "Divergência detectada" in texto: | |
partes = texto.split("|") | |
item_id = partes[0].strip() | |
descricao = partes[1].strip() if len(partes) > 1 else "Sem descrição" | |
divergencias.append((item_id, descricao, topico)) | |
divergencias_detalhes[item_id] = texto | |
for div_id, _, _ in divergencias: | |
if div_id in texto and "Divergência detectada" not in texto: | |
referencias.setdefault(div_id, []).append((topico, texto)) | |
return divergencias, divergencias_detalhes, referencias | |
def gerar_resposta_divergencias(self, divergencias, divergencias_detalhes, referencias): | |
"""Gera uma resposta formatada sobre divergências detectadas.""" | |
if not divergencias: | |
return "Não foram encontradas divergências nos dados analisados." | |
resposta = "## Análise de Divergências Detectadas\n\n" | |
resposta += "Foram identificadas as seguintes divergências:\n\n" | |
for i, (item_id, descricao, topico) in enumerate(divergencias, 1): | |
resposta += f"### {i}. Divergência no item **{item_id}**\n" | |
resposta += f"- **Tópico:** {topico}\n" | |
resposta += f"- **Descrição:** {descricao}\n" | |
resposta += f"- **Detalhe completo:** {divergencias_detalhes[item_id]}\n\n" | |
if item_id in referencias: | |
resposta += "**Informações contextuais relacionadas:**\n" | |
for ref_topico, ref_texto in referencias[item_id]: | |
resposta += f" - **{ref_topico}:** {ref_texto}\n" | |
resposta += "\n" | |
resposta += "## Conclusão\n" | |
resposta += f"Foram encontradas **{len(divergencias)} divergências** que precisam ser resolvidas.\n" | |
resposta += "Recomenda-se revisar os itens mencionados e consultar as especificações técnicas." | |
return resposta | |
def sumarizar_texto(self, texto, max_length=100): | |
"""Sumariza um texto longo automaticamente.""" | |
summary = self.summarizer(texto, max_length=max_length, min_length=30, do_sample=False) | |
return summary[0]['summary_text'] | |
def gerar_relatorio(self, pergunta, top_k=10, exibir_chunks=False): | |
"""Gera um relatório completo com base em uma pergunta.""" | |
chunks_relevantes = self.sistema_recuperacao.recuperar_chunks_relevantes(pergunta, top_k) | |
try: | |
divergencias, detalhes, referencias = self.analisar_divergencias(chunks_relevantes) | |
resposta_texto = self.gerar_resposta_divergencias(divergencias, detalhes, referencias) | |
except Exception as e: | |
resposta_texto = f"Erro ao gerar resposta: {str(e)}\n\n{traceback.format_exc()}" | |
relatorio = f"=== RELATÓRIO GERADO ===\n\n**Pergunta:** {pergunta}\n\n**Resposta:**\n{resposta_texto}\n\n" | |
if exibir_chunks: | |
relatorio += "=== CHUNKS USADOS COMO CONTEXTO ===\n" | |
for i, chunk in enumerate(chunks_relevantes, 1): | |
relatorio += f"\n**Chunk {i}:**\n**Tópico:** {chunk['topico']}\n**Texto:** {chunk['texto']}\n" | |
return relatorio, resposta_texto, divergencias | |
# --- Funções auxiliares --- | |
def carregar_chunks_xlsx(arquivo): | |
"""Carrega chunks de um arquivo .xlsx.""" | |
try: | |
df = pd.read_excel(arquivo, engine='openpyxl') | |
# Converte o DataFrame em uma lista de dicionários com as colunas esperadas | |
chunks = df[['texto', 'topico']].to_dict(orient='records') | |
return chunks | |
except Exception as e: | |
raise Exception(f"Erro ao carregar o arquivo .xlsx: {str(e)}") | |
def gerar_resposta_itens_relacionados(rag, itens_divergentes, top_k=10): | |
"""Gera resposta para itens relacionados às divergências.""" | |
chunks_relevantes = rag.sistema_recuperacao.recuperar_chunks_por_itens(itens_divergentes, top_k) | |
if not chunks_relevantes: | |
return "Não foram encontradas informações relacionadas aos itens mencionados." | |
resposta = "## Informações Relacionadas\n\n" | |
for i, chunk in enumerate(chunks_relevantes, 1): | |
texto_completo = chunk["texto"] | |
sumario = rag.sumarizar_texto(texto_completo) | |
resposta += f"### Chunk {i}\n" | |
resposta += f"**Tópico:** {chunk['topico']}\n" | |
resposta += "**Informações relevantes:**\n" | |
resposta += f"- **Sumário:** {sumario}\n" | |
resposta += "- **Detalhes completos:**\n" | |
for linha in texto_completo.split("\n"): | |
if linha.strip(): | |
resposta += f" - {linha.strip()}\n" | |
resposta += "\n" | |
resposta += "## Conclusão\n" | |
resposta += "As informações acima estão relacionadas aos itens mencionados.\n" | |
return resposta | |
def calcular_probabilidade_divergencia(chunks): | |
"""Calcula a probabilidade de um chunk conter 'Divergência detectada'.""" | |
total_chunks = len(chunks) | |
if total_chunks == 0: | |
return 0.0, 0, 0 | |
chunks_com_divergencia = sum(1 for chunk in chunks if "Divergência detectada" in chunk["texto"]) | |
probabilidade = chunks_com_divergencia / total_chunks | |
return probabilidade, chunks_com_divergencia, total_chunks | |
def encontrar_chunk_mais_representativo(chunks, aba_nome): | |
"""Encontra o chunk com maior 'relevância' entre os que têm divergência.""" | |
chunks_divergencia = [chunk for chunk in chunks if "Divergência detectada" in chunk["texto"]] | |
if not chunks_divergencia: | |
return None, f"Não há chunks com divergência na {aba_nome}." | |
chunk_mais_representativo = max(chunks_divergencia, key=lambda x: len(x["texto"])) | |
return chunk_mais_representativo, f"Chunk mais representativo da {aba_nome} (baseado no comprimento do texto)." | |
# --- Função para interface Gradio --- | |
def processar_pergunta(pergunta, arquivo_aba1, arquivo_aba2, top_k=10): | |
"""Processa a pergunta do usuário e retorna relatórios formatados.""" | |
try: | |
# Carregar dados dos arquivos .xlsx | |
chunks_aba1 = carregar_chunks_xlsx(arquivo_aba1.name if arquivo_aba1 else None) | |
chunks_aba2 = carregar_chunks_xlsx(arquivo_aba2.name if arquivo_aba2 else None) | |
# Inicializar sistemas RAG | |
rag_aba1 = RAGAvançado(chunks_aba1) | |
rag_aba2 = RAGAvançado(chunks_aba2) | |
# Pergunta 1: Relatório de divergências | |
relatorio1, resposta1, divergencias1 = rag_aba1.gerar_relatorio(pergunta, top_k=top_k, exibir_chunks=True) | |
# Análise de probabilidade ABA1 | |
prob_aba1, num_div_aba1, total_aba1 = calcular_probabilidade_divergencia(chunks_aba1) | |
prob_aba1_text = ( | |
f"### Análise de Probabilidade (ABA1)\n" | |
f"- Total de chunks: {total_aba1}\n" | |
f"- Chunks com divergência: {num_div_aba1}\n" | |
f"- Probabilidade: {prob_aba1:.2%}\n" | |
) | |
# Chunk mais representativo ABA1 | |
chunk_aba1, msg_aba1 = encontrar_chunk_mais_representativo(chunks_aba1, "ABA1") | |
chunk_aba1_text = f"### Chunk Mais Representativo (ABA1)\n{msg_aba1}\n" | |
if chunk_aba1: | |
chunk_aba1_text += f"- Tópico: {chunk_aba1['topico']}\n- Texto: {chunk_aba1['texto']}\n" | |
# Análise de probabilidade ABA2 | |
prob_aba2, num_div_aba2, total_aba2 = calcular_probabilidade_divergencia(chunks_aba2) | |
prob_aba2_text = ( | |
f"### Análise de Probabilidade (ABA2)\n" | |
f"- Total de chunks: {total_aba2}\n" | |
f"- Chunks com divergência: {num_div_aba2}\n" | |
f"- Probabilidade: {prob_aba2:.2%}\n" | |
) | |
# Chunk mais representativo ABA2 | |
chunk_aba2, msg_aba2 = encontrar_chunk_mais_representativo(chunks_aba2, "ABA2") | |
chunk_aba2_text = f"### Chunk Mais Representativo (ABA2)\n{msg_aba2}\n" | |
if chunk_aba2: | |
chunk_aba2_text += f"- Tópico: {chunk_aba2['topico']}\n- Texto: {chunk_aba2['texto']}\n" | |
# Pergunta 2: Informações relacionadas (se houver divergências) | |
resposta2 = "" | |
if divergencias1: | |
itens_divergentes = [item_id for item_id, _, _ in divergencias1] | |
resposta2 = gerar_resposta_itens_relacionados(rag_aba2, itens_divergentes, top_k=top_k) | |
else: | |
resposta2 = "Nenhuma divergência encontrada para gerar informações relacionadas." | |
# Combinar resultados | |
resultado_final = ( | |
f"{prob_aba1_text}\n{chunk_aba1_text}\n\n" | |
f"{prob_aba2_text}\n{chunk_aba2_text}\n\n" | |
f"### Resposta à Pergunta\n{resposta1}\n\n" | |
f"### Informações Relacionadas\n{resposta2}" | |
) | |
return resultado_final | |
except Exception as e: | |
return f"Erro ao processar a pergunta: {str(e)}\n\n{traceback.format_exc()}" | |
# --- Interface Gradio --- | |
def criar_interface(): | |
"""Cria a interface gráfica com Gradio.""" | |
with gr.Blocks(title="Sistema RAG Avançado") as interface: | |
gr.Markdown("# Sistema RAG Avançado") | |
gr.Markdown("Faça uma pergunta e envie arquivos .xlsx para análise de divergências.") | |
with gr.Row(): | |
with gr.Column(): | |
pergunta_input = gr.Textbox(label="Sua Pergunta", placeholder="Ex: Quais os itens de Divergência detectada?") | |
aba1_input = gr.File(label="Arquivo ABA1 (.xlsx)", file_types=[".xlsx"]) | |
aba2_input = gr.File(label="Arquivo ABA2 (.xlsx)", file_types=[".xlsx"]) | |
top_k_input = gr.Slider(label="Top K Chunks", minimum=1, maximum=20, value=10, step=1) | |
submit_btn = gr.Button("Gerar Relatório") | |
with gr.Column(): | |
output_text = gr.Markdown(label="Relatório Gerado") | |
submit_btn.click( | |
fn=processar_pergunta, | |
inputs=[pergunta_input, aba1_input, aba2_input, top_k_input], | |
outputs=output_text | |
) | |
return interface | |
# --- Executar a interface --- | |
if __name__ == "__main__": | |
interface = criar_interface() | |
interface.launch() |