File size: 12,456 Bytes
2da2923
 
 
 
 
 
 
 
915333e
2da2923
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
915333e
 
 
 
 
 
 
 
 
2da2923
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
915333e
2da2923
 
915333e
 
 
2da2923
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
915333e
2da2923
 
 
 
915333e
 
2da2923
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
import json
import numpy as np
import faiss
from sklearn.feature_extraction.text import TfidfVectorizer
from transformers import AutoTokenizer, AutoModel, pipeline
from sentence_transformers import SentenceTransformer
import traceback
import gradio as gr
import pandas as pd

# --- Classes principais ---

class SistemaRecuperacaoAvancado:
    """Classe responsável pela recuperação de chunks relevantes usando embeddings semânticos."""
    def __init__(self, chunks):
        self.chunks = chunks
        self.textos = [chunk["texto"] for chunk in chunks]
        
        # Inicialização do modelo de embeddings
        self.embedder = SentenceTransformer('neuralmind/bert-base-portuguese-cased')
        self.embeddings = self.embedder.encode(self.textos)
        
        # Índice FAISS para busca eficiente
        self.index = faiss.IndexFlatL2(self.embeddings.shape[1])
        self.index.add(np.array(self.embeddings))

    def recuperar_chunks_relevantes(self, pergunta, top_k=10):
        """Recupera os top_k chunks mais relevantes para uma pergunta."""
        pergunta_embedding = self.embedder.encode([pergunta])
        _, indices = self.index.search(pergunta_embedding, top_k)
        return [self.chunks[i] for i in indices[0]]

    def recuperar_chunks_por_itens(self, itens, top_k=10):
        """Filtra chunks que contenham qualquer um dos itens mencionados."""
        return [chunk for chunk in self.chunks 
                if any(item in chunk["texto"] for item in itens)][:top_k]


class RAGAvançado:
    """Classe principal do sistema RAG com funcionalidades de análise e geração de relatórios."""
    def __init__(self, chunks):
        self.sistema_recuperacao = SistemaRecuperacaoAvancado(chunks)
        
        # Configuração do modelo BERTimbau e sumarizador
        self.modelo_nome = "neuralmind/bert-base-portuguese-cased"
        self.tokenizer = AutoTokenizer.from_pretrained(self.modelo_nome)
        self.model = AutoModel.from_pretrained(self.modelo_nome)
        self.summarizer = pipeline("summarization", model="t5-small")

    def analisar_divergencias(self, chunks):
        """Analisa divergências em uma lista de chunks."""
        divergencias = []
        divergencias_detalhes = {}
        referencias = {}
        
        for chunk in chunks:
            texto, topico = chunk["texto"], chunk["topico"]
            if "Divergência detectada" in texto:
                partes = texto.split("|")
                item_id = partes[0].strip()
                descricao = partes[1].strip() if len(partes) > 1 else "Sem descrição"
                divergencias.append((item_id, descricao, topico))
                divergencias_detalhes[item_id] = texto
            
            for div_id, _, _ in divergencias:
                if div_id in texto and "Divergência detectada" not in texto:
                    referencias.setdefault(div_id, []).append((topico, texto))
        
        return divergencias, divergencias_detalhes, referencias

    def gerar_resposta_divergencias(self, divergencias, divergencias_detalhes, referencias):
        """Gera uma resposta formatada sobre divergências detectadas."""
        if not divergencias:
            return "Não foram encontradas divergências nos dados analisados."
        
        resposta = "## Análise de Divergências Detectadas\n\n"
        resposta += "Foram identificadas as seguintes divergências:\n\n"
        
        for i, (item_id, descricao, topico) in enumerate(divergencias, 1):
            resposta += f"### {i}. Divergência no item **{item_id}**\n"
            resposta += f"- **Tópico:** {topico}\n"
            resposta += f"- **Descrição:** {descricao}\n"
            resposta += f"- **Detalhe completo:** {divergencias_detalhes[item_id]}\n\n"
            
            if item_id in referencias:
                resposta += "**Informações contextuais relacionadas:**\n"
                for ref_topico, ref_texto in referencias[item_id]:
                    resposta += f"  - **{ref_topico}:** {ref_texto}\n"
                resposta += "\n"
        
        resposta += "## Conclusão\n"
        resposta += f"Foram encontradas **{len(divergencias)} divergências** que precisam ser resolvidas.\n"
        resposta += "Recomenda-se revisar os itens mencionados e consultar as especificações técnicas."
        return resposta

    def sumarizar_texto(self, texto, max_length=100):
        """Sumariza um texto longo automaticamente."""
        summary = self.summarizer(texto, max_length=max_length, min_length=30, do_sample=False)
        return summary[0]['summary_text']

    def gerar_relatorio(self, pergunta, top_k=10, exibir_chunks=False):
        """Gera um relatório completo com base em uma pergunta."""
        chunks_relevantes = self.sistema_recuperacao.recuperar_chunks_relevantes(pergunta, top_k)
        try:
            divergencias, detalhes, referencias = self.analisar_divergencias(chunks_relevantes)
            resposta_texto = self.gerar_resposta_divergencias(divergencias, detalhes, referencias)
        except Exception as e:
            resposta_texto = f"Erro ao gerar resposta: {str(e)}\n\n{traceback.format_exc()}"

        relatorio = f"=== RELATÓRIO GERADO ===\n\n**Pergunta:** {pergunta}\n\n**Resposta:**\n{resposta_texto}\n\n"
        if exibir_chunks:
            relatorio += "=== CHUNKS USADOS COMO CONTEXTO ===\n"
            for i, chunk in enumerate(chunks_relevantes, 1):
                relatorio += f"\n**Chunk {i}:**\n**Tópico:** {chunk['topico']}\n**Texto:** {chunk['texto']}\n"
        
        return relatorio, resposta_texto, divergencias


# --- Funções auxiliares ---

def carregar_chunks_xlsx(arquivo):
    """Carrega chunks de um arquivo .xlsx."""
    try:
        df = pd.read_excel(arquivo, engine='openpyxl')
        # Converte o DataFrame em uma lista de dicionários com as colunas esperadas
        chunks = df[['texto', 'topico']].to_dict(orient='records')
        return chunks
    except Exception as e:
        raise Exception(f"Erro ao carregar o arquivo .xlsx: {str(e)}")


def gerar_resposta_itens_relacionados(rag, itens_divergentes, top_k=10):
    """Gera resposta para itens relacionados às divergências."""
    chunks_relevantes = rag.sistema_recuperacao.recuperar_chunks_por_itens(itens_divergentes, top_k)
    
    if not chunks_relevantes:
        return "Não foram encontradas informações relacionadas aos itens mencionados."
    
    resposta = "## Informações Relacionadas\n\n"
    for i, chunk in enumerate(chunks_relevantes, 1):
        texto_completo = chunk["texto"]
        sumario = rag.sumarizar_texto(texto_completo)
        resposta += f"### Chunk {i}\n"
        resposta += f"**Tópico:** {chunk['topico']}\n"
        resposta += "**Informações relevantes:**\n"
        resposta += f"- **Sumário:** {sumario}\n"
        resposta += "- **Detalhes completos:**\n"
        for linha in texto_completo.split("\n"):
            if linha.strip():
                resposta += f"  - {linha.strip()}\n"
        resposta += "\n"
    
    resposta += "## Conclusão\n"
    resposta += "As informações acima estão relacionadas aos itens mencionados.\n"
    return resposta


def calcular_probabilidade_divergencia(chunks):
    """Calcula a probabilidade de um chunk conter 'Divergência detectada'."""
    total_chunks = len(chunks)
    if total_chunks == 0:
        return 0.0, 0, 0
    
    chunks_com_divergencia = sum(1 for chunk in chunks if "Divergência detectada" in chunk["texto"])
    probabilidade = chunks_com_divergencia / total_chunks
    return probabilidade, chunks_com_divergencia, total_chunks


def encontrar_chunk_mais_representativo(chunks, aba_nome):
    """Encontra o chunk com maior 'relevância' entre os que têm divergência."""
    chunks_divergencia = [chunk for chunk in chunks if "Divergência detectada" in chunk["texto"]]
    if not chunks_divergencia:
        return None, f"Não há chunks com divergência na {aba_nome}."
    
    chunk_mais_representativo = max(chunks_divergencia, key=lambda x: len(x["texto"]))
    return chunk_mais_representativo, f"Chunk mais representativo da {aba_nome} (baseado no comprimento do texto)."


# --- Função para interface Gradio ---

def processar_pergunta(pergunta, arquivo_aba1, arquivo_aba2, top_k=10):
    """Processa a pergunta do usuário e retorna relatórios formatados."""
    try:
        # Carregar dados dos arquivos .xlsx
        chunks_aba1 = carregar_chunks_xlsx(arquivo_aba1.name if arquivo_aba1 else None)
        chunks_aba2 = carregar_chunks_xlsx(arquivo_aba2.name if arquivo_aba2 else None)

        # Inicializar sistemas RAG
        rag_aba1 = RAGAvançado(chunks_aba1)
        rag_aba2 = RAGAvançado(chunks_aba2)

        # Pergunta 1: Relatório de divergências
        relatorio1, resposta1, divergencias1 = rag_aba1.gerar_relatorio(pergunta, top_k=top_k, exibir_chunks=True)

        # Análise de probabilidade ABA1
        prob_aba1, num_div_aba1, total_aba1 = calcular_probabilidade_divergencia(chunks_aba1)
        prob_aba1_text = (
            f"### Análise de Probabilidade (ABA1)\n"
            f"- Total de chunks: {total_aba1}\n"
            f"- Chunks com divergência: {num_div_aba1}\n"
            f"- Probabilidade: {prob_aba1:.2%}\n"
        )

        # Chunk mais representativo ABA1
        chunk_aba1, msg_aba1 = encontrar_chunk_mais_representativo(chunks_aba1, "ABA1")
        chunk_aba1_text = f"### Chunk Mais Representativo (ABA1)\n{msg_aba1}\n"
        if chunk_aba1:
            chunk_aba1_text += f"- Tópico: {chunk_aba1['topico']}\n- Texto: {chunk_aba1['texto']}\n"

        # Análise de probabilidade ABA2
        prob_aba2, num_div_aba2, total_aba2 = calcular_probabilidade_divergencia(chunks_aba2)
        prob_aba2_text = (
            f"### Análise de Probabilidade (ABA2)\n"
            f"- Total de chunks: {total_aba2}\n"
            f"- Chunks com divergência: {num_div_aba2}\n"
            f"- Probabilidade: {prob_aba2:.2%}\n"
        )

        # Chunk mais representativo ABA2
        chunk_aba2, msg_aba2 = encontrar_chunk_mais_representativo(chunks_aba2, "ABA2")
        chunk_aba2_text = f"### Chunk Mais Representativo (ABA2)\n{msg_aba2}\n"
        if chunk_aba2:
            chunk_aba2_text += f"- Tópico: {chunk_aba2['topico']}\n- Texto: {chunk_aba2['texto']}\n"

        # Pergunta 2: Informações relacionadas (se houver divergências)
        resposta2 = ""
        if divergencias1:
            itens_divergentes = [item_id for item_id, _, _ in divergencias1]
            resposta2 = gerar_resposta_itens_relacionados(rag_aba2, itens_divergentes, top_k=top_k)
        else:
            resposta2 = "Nenhuma divergência encontrada para gerar informações relacionadas."

        # Combinar resultados
        resultado_final = (
            f"{prob_aba1_text}\n{chunk_aba1_text}\n\n"
            f"{prob_aba2_text}\n{chunk_aba2_text}\n\n"
            f"### Resposta à Pergunta\n{resposta1}\n\n"
            f"### Informações Relacionadas\n{resposta2}"
        )

        return resultado_final

    except Exception as e:
        return f"Erro ao processar a pergunta: {str(e)}\n\n{traceback.format_exc()}"

# --- Interface Gradio ---

def criar_interface():
    """Cria a interface gráfica com Gradio."""
    with gr.Blocks(title="Sistema RAG Avançado") as interface:
        gr.Markdown("# Sistema RAG Avançado")
        gr.Markdown("Faça uma pergunta e envie arquivos .xlsx para análise de divergências.")

        with gr.Row():
            with gr.Column():
                pergunta_input = gr.Textbox(label="Sua Pergunta", placeholder="Ex: Quais os itens de Divergência detectada?")
                aba1_input = gr.File(label="Arquivo ABA1 (.xlsx)", file_types=[".xlsx"])
                aba2_input = gr.File(label="Arquivo ABA2 (.xlsx)", file_types=[".xlsx"])
                top_k_input = gr.Slider(label="Top K Chunks", minimum=1, maximum=20, value=10, step=1)
                submit_btn = gr.Button("Gerar Relatório")

            with gr.Column():
                output_text = gr.Markdown(label="Relatório Gerado")

        submit_btn.click(
            fn=processar_pergunta,
            inputs=[pergunta_input, aba1_input, aba2_input, top_k_input],
            outputs=output_text
        )

    return interface

# --- Executar a interface ---

if __name__ == "__main__":
    interface = criar_interface()
    interface.launch()