import json import numpy as np import faiss from sklearn.feature_extraction.text import TfidfVectorizer from transformers import AutoTokenizer, AutoModel, pipeline from sentence_transformers import SentenceTransformer import traceback import gradio as gr import pandas as pd # --- Classes principais --- class SistemaRecuperacaoAvancado: """Classe responsável pela recuperação de chunks relevantes usando embeddings semânticos.""" def __init__(self, chunks): self.chunks = chunks self.textos = [chunk["texto"] for chunk in chunks] # Inicialização do modelo de embeddings self.embedder = SentenceTransformer('neuralmind/bert-base-portuguese-cased') self.embeddings = self.embedder.encode(self.textos) # Índice FAISS para busca eficiente self.index = faiss.IndexFlatL2(self.embeddings.shape[1]) self.index.add(np.array(self.embeddings)) def recuperar_chunks_relevantes(self, pergunta, top_k=10): """Recupera os top_k chunks mais relevantes para uma pergunta.""" pergunta_embedding = self.embedder.encode([pergunta]) _, indices = self.index.search(pergunta_embedding, top_k) return [self.chunks[i] for i in indices[0]] def recuperar_chunks_por_itens(self, itens, top_k=10): """Filtra chunks que contenham qualquer um dos itens mencionados.""" return [chunk for chunk in self.chunks if any(item in chunk["texto"] for item in itens)][:top_k] class RAGAvançado: """Classe principal do sistema RAG com funcionalidades de análise e geração de relatórios.""" def __init__(self, chunks): self.sistema_recuperacao = SistemaRecuperacaoAvancado(chunks) # Configuração do modelo BERTimbau e sumarizador self.modelo_nome = "neuralmind/bert-base-portuguese-cased" self.tokenizer = AutoTokenizer.from_pretrained(self.modelo_nome) self.model = AutoModel.from_pretrained(self.modelo_nome) self.summarizer = pipeline("summarization", model="t5-small") def analisar_divergencias(self, chunks): """Analisa divergências em uma lista de chunks.""" divergencias = [] divergencias_detalhes = {} referencias = {} for chunk in chunks: texto, topico = chunk["texto"], chunk["topico"] if "Divergência detectada" in texto: partes = texto.split("|") item_id = partes[0].strip() descricao = partes[1].strip() if len(partes) > 1 else "Sem descrição" divergencias.append((item_id, descricao, topico)) divergencias_detalhes[item_id] = texto for div_id, _, _ in divergencias: if div_id in texto and "Divergência detectada" not in texto: referencias.setdefault(div_id, []).append((topico, texto)) return divergencias, divergencias_detalhes, referencias def gerar_resposta_divergencias(self, divergencias, divergencias_detalhes, referencias): """Gera uma resposta formatada sobre divergências detectadas.""" if not divergencias: return "Não foram encontradas divergências nos dados analisados." resposta = "## Análise de Divergências Detectadas\n\n" resposta += "Foram identificadas as seguintes divergências:\n\n" for i, (item_id, descricao, topico) in enumerate(divergencias, 1): resposta += f"### {i}. Divergência no item **{item_id}**\n" resposta += f"- **Tópico:** {topico}\n" resposta += f"- **Descrição:** {descricao}\n" resposta += f"- **Detalhe completo:** {divergencias_detalhes[item_id]}\n\n" if item_id in referencias: resposta += "**Informações contextuais relacionadas:**\n" for ref_topico, ref_texto in referencias[item_id]: resposta += f" - **{ref_topico}:** {ref_texto}\n" resposta += "\n" resposta += "## Conclusão\n" resposta += f"Foram encontradas **{len(divergencias)} divergências** que precisam ser resolvidas.\n" resposta += "Recomenda-se revisar os itens mencionados e consultar as especificações técnicas." return resposta def sumarizar_texto(self, texto, max_length=100): """Sumariza um texto longo automaticamente.""" summary = self.summarizer(texto, max_length=max_length, min_length=30, do_sample=False) return summary[0]['summary_text'] def gerar_relatorio(self, pergunta, top_k=10, exibir_chunks=False): """Gera um relatório completo com base em uma pergunta.""" chunks_relevantes = self.sistema_recuperacao.recuperar_chunks_relevantes(pergunta, top_k) try: divergencias, detalhes, referencias = self.analisar_divergencias(chunks_relevantes) resposta_texto = self.gerar_resposta_divergencias(divergencias, detalhes, referencias) except Exception as e: resposta_texto = f"Erro ao gerar resposta: {str(e)}\n\n{traceback.format_exc()}" relatorio = f"=== RELATÓRIO GERADO ===\n\n**Pergunta:** {pergunta}\n\n**Resposta:**\n{resposta_texto}\n\n" if exibir_chunks: relatorio += "=== CHUNKS USADOS COMO CONTEXTO ===\n" for i, chunk in enumerate(chunks_relevantes, 1): relatorio += f"\n**Chunk {i}:**\n**Tópico:** {chunk['topico']}\n**Texto:** {chunk['texto']}\n" return relatorio, resposta_texto, divergencias # --- Funções auxiliares --- def carregar_chunks_xlsx(arquivo): """Carrega chunks de um arquivo .xlsx.""" try: df = pd.read_excel(arquivo, engine='openpyxl') # Converte o DataFrame em uma lista de dicionários com as colunas esperadas chunks = df[['texto', 'topico']].to_dict(orient='records') return chunks except Exception as e: raise Exception(f"Erro ao carregar o arquivo .xlsx: {str(e)}") def gerar_resposta_itens_relacionados(rag, itens_divergentes, top_k=10): """Gera resposta para itens relacionados às divergências.""" chunks_relevantes = rag.sistema_recuperacao.recuperar_chunks_por_itens(itens_divergentes, top_k) if not chunks_relevantes: return "Não foram encontradas informações relacionadas aos itens mencionados." resposta = "## Informações Relacionadas\n\n" for i, chunk in enumerate(chunks_relevantes, 1): texto_completo = chunk["texto"] sumario = rag.sumarizar_texto(texto_completo) resposta += f"### Chunk {i}\n" resposta += f"**Tópico:** {chunk['topico']}\n" resposta += "**Informações relevantes:**\n" resposta += f"- **Sumário:** {sumario}\n" resposta += "- **Detalhes completos:**\n" for linha in texto_completo.split("\n"): if linha.strip(): resposta += f" - {linha.strip()}\n" resposta += "\n" resposta += "## Conclusão\n" resposta += "As informações acima estão relacionadas aos itens mencionados.\n" return resposta def calcular_probabilidade_divergencia(chunks): """Calcula a probabilidade de um chunk conter 'Divergência detectada'.""" total_chunks = len(chunks) if total_chunks == 0: return 0.0, 0, 0 chunks_com_divergencia = sum(1 for chunk in chunks if "Divergência detectada" in chunk["texto"]) probabilidade = chunks_com_divergencia / total_chunks return probabilidade, chunks_com_divergencia, total_chunks def encontrar_chunk_mais_representativo(chunks, aba_nome): """Encontra o chunk com maior 'relevância' entre os que têm divergência.""" chunks_divergencia = [chunk for chunk in chunks if "Divergência detectada" in chunk["texto"]] if not chunks_divergencia: return None, f"Não há chunks com divergência na {aba_nome}." chunk_mais_representativo = max(chunks_divergencia, key=lambda x: len(x["texto"])) return chunk_mais_representativo, f"Chunk mais representativo da {aba_nome} (baseado no comprimento do texto)." # --- Função para interface Gradio --- def processar_pergunta(pergunta, arquivo_aba1, arquivo_aba2, top_k=10): """Processa a pergunta do usuário e retorna relatórios formatados.""" try: # Carregar dados dos arquivos .xlsx chunks_aba1 = carregar_chunks_xlsx(arquivo_aba1.name if arquivo_aba1 else None) chunks_aba2 = carregar_chunks_xlsx(arquivo_aba2.name if arquivo_aba2 else None) # Inicializar sistemas RAG rag_aba1 = RAGAvançado(chunks_aba1) rag_aba2 = RAGAvançado(chunks_aba2) # Pergunta 1: Relatório de divergências relatorio1, resposta1, divergencias1 = rag_aba1.gerar_relatorio(pergunta, top_k=top_k, exibir_chunks=True) # Análise de probabilidade ABA1 prob_aba1, num_div_aba1, total_aba1 = calcular_probabilidade_divergencia(chunks_aba1) prob_aba1_text = ( f"### Análise de Probabilidade (ABA1)\n" f"- Total de chunks: {total_aba1}\n" f"- Chunks com divergência: {num_div_aba1}\n" f"- Probabilidade: {prob_aba1:.2%}\n" ) # Chunk mais representativo ABA1 chunk_aba1, msg_aba1 = encontrar_chunk_mais_representativo(chunks_aba1, "ABA1") chunk_aba1_text = f"### Chunk Mais Representativo (ABA1)\n{msg_aba1}\n" if chunk_aba1: chunk_aba1_text += f"- Tópico: {chunk_aba1['topico']}\n- Texto: {chunk_aba1['texto']}\n" # Análise de probabilidade ABA2 prob_aba2, num_div_aba2, total_aba2 = calcular_probabilidade_divergencia(chunks_aba2) prob_aba2_text = ( f"### Análise de Probabilidade (ABA2)\n" f"- Total de chunks: {total_aba2}\n" f"- Chunks com divergência: {num_div_aba2}\n" f"- Probabilidade: {prob_aba2:.2%}\n" ) # Chunk mais representativo ABA2 chunk_aba2, msg_aba2 = encontrar_chunk_mais_representativo(chunks_aba2, "ABA2") chunk_aba2_text = f"### Chunk Mais Representativo (ABA2)\n{msg_aba2}\n" if chunk_aba2: chunk_aba2_text += f"- Tópico: {chunk_aba2['topico']}\n- Texto: {chunk_aba2['texto']}\n" # Pergunta 2: Informações relacionadas (se houver divergências) resposta2 = "" if divergencias1: itens_divergentes = [item_id for item_id, _, _ in divergencias1] resposta2 = gerar_resposta_itens_relacionados(rag_aba2, itens_divergentes, top_k=top_k) else: resposta2 = "Nenhuma divergência encontrada para gerar informações relacionadas." # Combinar resultados resultado_final = ( f"{prob_aba1_text}\n{chunk_aba1_text}\n\n" f"{prob_aba2_text}\n{chunk_aba2_text}\n\n" f"### Resposta à Pergunta\n{resposta1}\n\n" f"### Informações Relacionadas\n{resposta2}" ) return resultado_final except Exception as e: return f"Erro ao processar a pergunta: {str(e)}\n\n{traceback.format_exc()}" # --- Interface Gradio --- def criar_interface(): """Cria a interface gráfica com Gradio.""" with gr.Blocks(title="Sistema RAG Avançado") as interface: gr.Markdown("# Sistema RAG Avançado") gr.Markdown("Faça uma pergunta e envie arquivos .xlsx para análise de divergências.") with gr.Row(): with gr.Column(): pergunta_input = gr.Textbox(label="Sua Pergunta", placeholder="Ex: Quais os itens de Divergência detectada?") aba1_input = gr.File(label="Arquivo ABA1 (.xlsx)", file_types=[".xlsx"]) aba2_input = gr.File(label="Arquivo ABA2 (.xlsx)", file_types=[".xlsx"]) top_k_input = gr.Slider(label="Top K Chunks", minimum=1, maximum=20, value=10, step=1) submit_btn = gr.Button("Gerar Relatório") with gr.Column(): output_text = gr.Markdown(label="Relatório Gerado") submit_btn.click( fn=processar_pergunta, inputs=[pergunta_input, aba1_input, aba2_input, top_k_input], outputs=output_text ) return interface # --- Executar a interface --- if __name__ == "__main__": interface = criar_interface() interface.launch()