Spaces:
Sleeping
Sleeping
File size: 12,456 Bytes
2da2923 915333e 2da2923 915333e 2da2923 915333e 2da2923 915333e 2da2923 915333e 2da2923 915333e 2da2923 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 |
import json
import numpy as np
import faiss
from sklearn.feature_extraction.text import TfidfVectorizer
from transformers import AutoTokenizer, AutoModel, pipeline
from sentence_transformers import SentenceTransformer
import traceback
import gradio as gr
import pandas as pd
# --- Classes principais ---
class SistemaRecuperacaoAvancado:
"""Classe responsável pela recuperação de chunks relevantes usando embeddings semânticos."""
def __init__(self, chunks):
self.chunks = chunks
self.textos = [chunk["texto"] for chunk in chunks]
# Inicialização do modelo de embeddings
self.embedder = SentenceTransformer('neuralmind/bert-base-portuguese-cased')
self.embeddings = self.embedder.encode(self.textos)
# Índice FAISS para busca eficiente
self.index = faiss.IndexFlatL2(self.embeddings.shape[1])
self.index.add(np.array(self.embeddings))
def recuperar_chunks_relevantes(self, pergunta, top_k=10):
"""Recupera os top_k chunks mais relevantes para uma pergunta."""
pergunta_embedding = self.embedder.encode([pergunta])
_, indices = self.index.search(pergunta_embedding, top_k)
return [self.chunks[i] for i in indices[0]]
def recuperar_chunks_por_itens(self, itens, top_k=10):
"""Filtra chunks que contenham qualquer um dos itens mencionados."""
return [chunk for chunk in self.chunks
if any(item in chunk["texto"] for item in itens)][:top_k]
class RAGAvançado:
"""Classe principal do sistema RAG com funcionalidades de análise e geração de relatórios."""
def __init__(self, chunks):
self.sistema_recuperacao = SistemaRecuperacaoAvancado(chunks)
# Configuração do modelo BERTimbau e sumarizador
self.modelo_nome = "neuralmind/bert-base-portuguese-cased"
self.tokenizer = AutoTokenizer.from_pretrained(self.modelo_nome)
self.model = AutoModel.from_pretrained(self.modelo_nome)
self.summarizer = pipeline("summarization", model="t5-small")
def analisar_divergencias(self, chunks):
"""Analisa divergências em uma lista de chunks."""
divergencias = []
divergencias_detalhes = {}
referencias = {}
for chunk in chunks:
texto, topico = chunk["texto"], chunk["topico"]
if "Divergência detectada" in texto:
partes = texto.split("|")
item_id = partes[0].strip()
descricao = partes[1].strip() if len(partes) > 1 else "Sem descrição"
divergencias.append((item_id, descricao, topico))
divergencias_detalhes[item_id] = texto
for div_id, _, _ in divergencias:
if div_id in texto and "Divergência detectada" not in texto:
referencias.setdefault(div_id, []).append((topico, texto))
return divergencias, divergencias_detalhes, referencias
def gerar_resposta_divergencias(self, divergencias, divergencias_detalhes, referencias):
"""Gera uma resposta formatada sobre divergências detectadas."""
if not divergencias:
return "Não foram encontradas divergências nos dados analisados."
resposta = "## Análise de Divergências Detectadas\n\n"
resposta += "Foram identificadas as seguintes divergências:\n\n"
for i, (item_id, descricao, topico) in enumerate(divergencias, 1):
resposta += f"### {i}. Divergência no item **{item_id}**\n"
resposta += f"- **Tópico:** {topico}\n"
resposta += f"- **Descrição:** {descricao}\n"
resposta += f"- **Detalhe completo:** {divergencias_detalhes[item_id]}\n\n"
if item_id in referencias:
resposta += "**Informações contextuais relacionadas:**\n"
for ref_topico, ref_texto in referencias[item_id]:
resposta += f" - **{ref_topico}:** {ref_texto}\n"
resposta += "\n"
resposta += "## Conclusão\n"
resposta += f"Foram encontradas **{len(divergencias)} divergências** que precisam ser resolvidas.\n"
resposta += "Recomenda-se revisar os itens mencionados e consultar as especificações técnicas."
return resposta
def sumarizar_texto(self, texto, max_length=100):
"""Sumariza um texto longo automaticamente."""
summary = self.summarizer(texto, max_length=max_length, min_length=30, do_sample=False)
return summary[0]['summary_text']
def gerar_relatorio(self, pergunta, top_k=10, exibir_chunks=False):
"""Gera um relatório completo com base em uma pergunta."""
chunks_relevantes = self.sistema_recuperacao.recuperar_chunks_relevantes(pergunta, top_k)
try:
divergencias, detalhes, referencias = self.analisar_divergencias(chunks_relevantes)
resposta_texto = self.gerar_resposta_divergencias(divergencias, detalhes, referencias)
except Exception as e:
resposta_texto = f"Erro ao gerar resposta: {str(e)}\n\n{traceback.format_exc()}"
relatorio = f"=== RELATÓRIO GERADO ===\n\n**Pergunta:** {pergunta}\n\n**Resposta:**\n{resposta_texto}\n\n"
if exibir_chunks:
relatorio += "=== CHUNKS USADOS COMO CONTEXTO ===\n"
for i, chunk in enumerate(chunks_relevantes, 1):
relatorio += f"\n**Chunk {i}:**\n**Tópico:** {chunk['topico']}\n**Texto:** {chunk['texto']}\n"
return relatorio, resposta_texto, divergencias
# --- Funções auxiliares ---
def carregar_chunks_xlsx(arquivo):
"""Carrega chunks de um arquivo .xlsx."""
try:
df = pd.read_excel(arquivo, engine='openpyxl')
# Converte o DataFrame em uma lista de dicionários com as colunas esperadas
chunks = df[['texto', 'topico']].to_dict(orient='records')
return chunks
except Exception as e:
raise Exception(f"Erro ao carregar o arquivo .xlsx: {str(e)}")
def gerar_resposta_itens_relacionados(rag, itens_divergentes, top_k=10):
"""Gera resposta para itens relacionados às divergências."""
chunks_relevantes = rag.sistema_recuperacao.recuperar_chunks_por_itens(itens_divergentes, top_k)
if not chunks_relevantes:
return "Não foram encontradas informações relacionadas aos itens mencionados."
resposta = "## Informações Relacionadas\n\n"
for i, chunk in enumerate(chunks_relevantes, 1):
texto_completo = chunk["texto"]
sumario = rag.sumarizar_texto(texto_completo)
resposta += f"### Chunk {i}\n"
resposta += f"**Tópico:** {chunk['topico']}\n"
resposta += "**Informações relevantes:**\n"
resposta += f"- **Sumário:** {sumario}\n"
resposta += "- **Detalhes completos:**\n"
for linha in texto_completo.split("\n"):
if linha.strip():
resposta += f" - {linha.strip()}\n"
resposta += "\n"
resposta += "## Conclusão\n"
resposta += "As informações acima estão relacionadas aos itens mencionados.\n"
return resposta
def calcular_probabilidade_divergencia(chunks):
"""Calcula a probabilidade de um chunk conter 'Divergência detectada'."""
total_chunks = len(chunks)
if total_chunks == 0:
return 0.0, 0, 0
chunks_com_divergencia = sum(1 for chunk in chunks if "Divergência detectada" in chunk["texto"])
probabilidade = chunks_com_divergencia / total_chunks
return probabilidade, chunks_com_divergencia, total_chunks
def encontrar_chunk_mais_representativo(chunks, aba_nome):
"""Encontra o chunk com maior 'relevância' entre os que têm divergência."""
chunks_divergencia = [chunk for chunk in chunks if "Divergência detectada" in chunk["texto"]]
if not chunks_divergencia:
return None, f"Não há chunks com divergência na {aba_nome}."
chunk_mais_representativo = max(chunks_divergencia, key=lambda x: len(x["texto"]))
return chunk_mais_representativo, f"Chunk mais representativo da {aba_nome} (baseado no comprimento do texto)."
# --- Função para interface Gradio ---
def processar_pergunta(pergunta, arquivo_aba1, arquivo_aba2, top_k=10):
"""Processa a pergunta do usuário e retorna relatórios formatados."""
try:
# Carregar dados dos arquivos .xlsx
chunks_aba1 = carregar_chunks_xlsx(arquivo_aba1.name if arquivo_aba1 else None)
chunks_aba2 = carregar_chunks_xlsx(arquivo_aba2.name if arquivo_aba2 else None)
# Inicializar sistemas RAG
rag_aba1 = RAGAvançado(chunks_aba1)
rag_aba2 = RAGAvançado(chunks_aba2)
# Pergunta 1: Relatório de divergências
relatorio1, resposta1, divergencias1 = rag_aba1.gerar_relatorio(pergunta, top_k=top_k, exibir_chunks=True)
# Análise de probabilidade ABA1
prob_aba1, num_div_aba1, total_aba1 = calcular_probabilidade_divergencia(chunks_aba1)
prob_aba1_text = (
f"### Análise de Probabilidade (ABA1)\n"
f"- Total de chunks: {total_aba1}\n"
f"- Chunks com divergência: {num_div_aba1}\n"
f"- Probabilidade: {prob_aba1:.2%}\n"
)
# Chunk mais representativo ABA1
chunk_aba1, msg_aba1 = encontrar_chunk_mais_representativo(chunks_aba1, "ABA1")
chunk_aba1_text = f"### Chunk Mais Representativo (ABA1)\n{msg_aba1}\n"
if chunk_aba1:
chunk_aba1_text += f"- Tópico: {chunk_aba1['topico']}\n- Texto: {chunk_aba1['texto']}\n"
# Análise de probabilidade ABA2
prob_aba2, num_div_aba2, total_aba2 = calcular_probabilidade_divergencia(chunks_aba2)
prob_aba2_text = (
f"### Análise de Probabilidade (ABA2)\n"
f"- Total de chunks: {total_aba2}\n"
f"- Chunks com divergência: {num_div_aba2}\n"
f"- Probabilidade: {prob_aba2:.2%}\n"
)
# Chunk mais representativo ABA2
chunk_aba2, msg_aba2 = encontrar_chunk_mais_representativo(chunks_aba2, "ABA2")
chunk_aba2_text = f"### Chunk Mais Representativo (ABA2)\n{msg_aba2}\n"
if chunk_aba2:
chunk_aba2_text += f"- Tópico: {chunk_aba2['topico']}\n- Texto: {chunk_aba2['texto']}\n"
# Pergunta 2: Informações relacionadas (se houver divergências)
resposta2 = ""
if divergencias1:
itens_divergentes = [item_id for item_id, _, _ in divergencias1]
resposta2 = gerar_resposta_itens_relacionados(rag_aba2, itens_divergentes, top_k=top_k)
else:
resposta2 = "Nenhuma divergência encontrada para gerar informações relacionadas."
# Combinar resultados
resultado_final = (
f"{prob_aba1_text}\n{chunk_aba1_text}\n\n"
f"{prob_aba2_text}\n{chunk_aba2_text}\n\n"
f"### Resposta à Pergunta\n{resposta1}\n\n"
f"### Informações Relacionadas\n{resposta2}"
)
return resultado_final
except Exception as e:
return f"Erro ao processar a pergunta: {str(e)}\n\n{traceback.format_exc()}"
# --- Interface Gradio ---
def criar_interface():
"""Cria a interface gráfica com Gradio."""
with gr.Blocks(title="Sistema RAG Avançado") as interface:
gr.Markdown("# Sistema RAG Avançado")
gr.Markdown("Faça uma pergunta e envie arquivos .xlsx para análise de divergências.")
with gr.Row():
with gr.Column():
pergunta_input = gr.Textbox(label="Sua Pergunta", placeholder="Ex: Quais os itens de Divergência detectada?")
aba1_input = gr.File(label="Arquivo ABA1 (.xlsx)", file_types=[".xlsx"])
aba2_input = gr.File(label="Arquivo ABA2 (.xlsx)", file_types=[".xlsx"])
top_k_input = gr.Slider(label="Top K Chunks", minimum=1, maximum=20, value=10, step=1)
submit_btn = gr.Button("Gerar Relatório")
with gr.Column():
output_text = gr.Markdown(label="Relatório Gerado")
submit_btn.click(
fn=processar_pergunta,
inputs=[pergunta_input, aba1_input, aba2_input, top_k_input],
outputs=output_text
)
return interface
# --- Executar a interface ---
if __name__ == "__main__":
interface = criar_interface()
interface.launch() |