import os from typing import Optional, Tuple, Dict import gradio as gr from langchain_community.document_loaders import PyPDFLoader, DirectoryLoader from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.embeddings import HuggingFaceEmbeddings from langchain_community.vectorstores import FAISS from langchain_community.llms import HuggingFacePipeline from langchain.chains import RetrievalQA from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline import torch import tempfile import time # Configurações EMBEDDING_MODEL = "sentence-transformers/all-mpnet-base-v2" LLM_MODEL = "google/flan-t5-large" DOCS_DIR = "documents" class RAGSystem: def __init__(self): self.tokenizer = AutoTokenizer.from_pretrained(LLM_MODEL) self.model = AutoModelForSeq2SeqLM.from_pretrained( LLM_MODEL, device_map="auto", torch_dtype=torch.float32 ) pipe = pipeline( "text2text-generation", model=self.model, tokenizer=self.tokenizer, max_length=512, temperature=0.3, # Reduzido para respostas mais precisas top_p=0.9, # Ajustado para permitir diversidade controlada repetition_penalty=1.2 # Evita repetições ) self.llm = HuggingFacePipeline(pipeline=pipe) self.embeddings = HuggingFaceEmbeddings( model_name=EMBEDDING_MODEL, model_kwargs={'device': 'cpu'} ) self.base_db = self.load_base_knowledge() def load_base_knowledge(self) -> Optional[FAISS]: try: if not os.path.exists(DOCS_DIR): os.makedirs(DOCS_DIR) return None loader = DirectoryLoader( DOCS_DIR, glob="**/*.pdf", loader_cls=PyPDFLoader ) documents = loader.load() if not documents: return None text_splitter = RecursiveCharacterTextSplitter( chunk_size=500, chunk_overlap=100, length_function=len, separators=["\n\n", "\n", ".", " ", ""] ) texts = text_splitter.split_documents(documents) return FAISS.from_documents(texts, self.embeddings) except Exception as e: print(f"Erro ao carregar base de conhecimento: {str(e)}") return None def process_pdf(self, file_content: bytes) -> Optional[FAISS]: try: with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: tmp_file.write(file_content) tmp_path = tmp_file.name loader = PyPDFLoader(tmp_path) documents = loader.load() os.unlink(tmp_path) if not documents: return None text_splitter = RecursiveCharacterTextSplitter( chunk_size=500, chunk_overlap=100, length_function=len, separators=["\n\n", "\n", ".", " ", ""] ) texts = text_splitter.split_documents(documents) db = FAISS.from_documents(texts, self.embeddings) if self.base_db is not None: db.merge_from(self.base_db) return db except Exception as e: print(f"Erro ao processar PDF: {str(e)}") return None def format_response(self, raw_response: str, source_type: str, context_found: bool) -> str: """Formata a resposta para um formato padronizado e claro""" if not context_found: return "🔍 Não foram encontradas informações suficientes nos documentos para responder esta pergunta." prefix = "" if source_type == "pdf": prefix = "📄 [Resposta baseada no PDF enviado]\n\n" elif source_type == "base": prefix = "📚 [Resposta baseada na base de documentos]\n\n" elif source_type == "both": prefix = "📚📄 [Resposta baseada em ambas as fontes]\n\n" # Limpa e formata a resposta response = raw_response.strip() if not response: return "🔍 Não foi possível gerar uma resposta adequada com as informações disponíveis." return f"{prefix}{response}" def generate_response(self, file_obj, query: str, progress=gr.Progress()) -> Tuple[str, str, str]: """Retorna (resposta, status, tempo_decorrido)""" if not query.strip(): return "Por favor, insira uma pergunta.", "⚠️ Aguardando pergunta", "0s" start_time = time.time() try: progress(0.2, desc="Processando documentos...") # Determina a fonte dos documentos has_pdf = file_obj is not None has_base = self.base_db is not None source_type = "both" if has_pdf and has_base else "pdf" if has_pdf else "base" if has_base else None if not source_type: return "Nenhuma fonte de documentos disponível.", "❌ Sem documentos", "0s" # Processa documento if has_pdf: db = self.process_pdf(file_obj) if db is None: return "Não foi possível processar o PDF.", "❌ Erro no processamento", "0s" else: db = self.base_db progress(0.4, desc="Buscando informações relevantes...") # Configuração do RAG qa_chain = RetrievalQA.from_chain_type( llm=self.llm, chain_type="stuff", retriever=db.as_retriever( search_kwargs={ "k": 4, "fetch_k": 6, "score_threshold": 0.6 # Filtra resultados pouco relevantes } ), return_source_documents=True ) progress(0.6, desc="Gerando resposta...") # Prompt mais estruturado prompt = f"""Instruções: 1. Analise cuidadosamente os documentos fornecidos. 2. Responda à seguinte pergunta em português de forma clara e direta: {query} 3. Use apenas informações encontradas nos documentos. 4. Se não houver informações suficientes, indique explicitamente. 5. Mantenha a resposta objetiva e baseada em fatos. 6. Cite exemplos específicos dos documentos quando relevante. 7. Evite inventar informações que não estão presentes nos documentos. Pergunta: {query}""" # Gera resposta result = qa_chain({"query": prompt}) # Verifica se encontrou contexto relevante context_found = bool(result.get("source_documents", [])) # Formata a resposta formatted_response = self.format_response( result["result"], source_type, context_found ) elapsed_time = f"{time.time() - start_time:.1f}s" progress(1.0, desc="Concluído!") return formatted_response, "✅ Sucesso", elapsed_time except Exception as e: elapsed_time = f"{time.time() - start_time:.1f}s" return f"Erro ao gerar resposta: {str(e)}", "❌ Erro", elapsed_time def create_demo(): rag = RAGSystem() with gr.Blocks(theme=gr.themes.Soft()) as demo: with gr.Column(elem_id="container"): # Cabeçalho gr.Markdown( """ # 🤖 Assistente de Documentos Inteligente Sistema de consulta avançada que responde perguntas sobre seus documentos usando RAG. """ ) # Área principal with gr.Row(): # Coluna de entrada with gr.Column(): with gr.Group(): gr.Markdown("### 📄 Documentos") file_input = gr.File( label="Upload de PDF (opcional)", type="binary", file_types=[".pdf"], height=100, ) info = gr.Markdown( f""" ℹ️ O sistema consulta: - PDFs enviados por você - Documentos na pasta `{DOCS_DIR}` """ ) with gr.Group(): gr.Markdown("### ❓ Sua Pergunta") query_input = gr.Textbox( placeholder="Digite sua pergunta sobre os documentos...", lines=3, max_lines=6, show_label=False, ) with gr.Row(): clear_btn = gr.Button("🗑️ Limpar", variant="secondary") submit_btn = gr.Button("🔍 Enviar Pergunta", variant="primary") # Coluna de saída with gr.Column(): with gr.Group(): gr.Markdown("### 📝 Resposta") with gr.Row(): status_output = gr.Textbox( label="Status", value="⏳ Aguardando...", interactive=False, show_label=False, ) time_output = gr.Textbox( label="Tempo", value="0s", interactive=False, show_label=False, ) response_output = gr.Textbox( label="Resposta", placeholder="A resposta aparecerá aqui...", interactive=False, lines=12, show_label=False, ) # Exemplos with gr.Accordion("📚 Exemplos de Perguntas", open=False): gr.Examples( examples=[ [None, "Quais são os principais tópicos abordados neste documento?"], [None, "Resuma as conclusões mais importantes."], [None, "O que o documento diz sobre [tema específico]?"], [None, "Quais são as recomendações apresentadas?"], ], inputs=[file_input, query_input], ) # Rodapé gr.Markdown( """ --- ### 🔧 Informações do Sistema * Respostas geradas usando tecnologia RAG (Retrieval-Augmented Generation) * Processamento inteligente de documentos PDF * Respostas baseadas exclusivamente no conteúdo dos documentos * Suporte a múltiplos documentos e contextos """ ) # Eventos submit_btn.click( fn=rag.generate_response, inputs=[file_input, query_input], outputs=[response_output, status_output, time_output], ) clear_btn.click( lambda: (None, "", "⏳ Aguardando...", "0s"), outputs=[file_input, query_input, status_output, time_output], ) # Limpa a resposta quando a pergunta muda query_input.change( lambda: ("", "⏳ Aguardando...", "0s"), outputs=[response_output, status_output, time_output], ) return demo if __name__ == "__main__": demo = create_demo() demo.launch()