import gradio as gr from transformers import pipeline import PyPDF2 import os import re import numpy as np from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity import torch # Configuração de diretórios BASE_DIR = os.path.dirname(os.path.abspath(__file__)) PDF_DIR = os.path.join(BASE_DIR, "pdf_data") os.makedirs(PDF_DIR, exist_ok=True) # Inicialização de modelos gratuitos # Carregando o modelo de geração de texto device = 0 if torch.cuda.is_available() else -1 summarizer = pipeline( "summarization", model="facebook/bart-large-cnn", device=device ) # Classe RAG totalmente gratuita class FreeRAG: def __init__(self): self.documents = [] self.vectorizer = TfidfVectorizer(stop_words='english') self.vectors = None def clear(self): self.documents = [] self.vectors = None def process_text(self, text): """Processa e divide o texto em chunks de tamanho razoável""" # Limpa espaços extras text = re.sub(r'\s+', ' ', text).strip() # Divide por parágrafos paragraphs = text.split('\n') chunks = [] current_chunk = "" for p in paragraphs: p = p.strip() if not p: continue # Se o parágrafo for muito grande, divide-o if len(p) > 1000: sentences = re.split(r'(?<=[.!?])\s+', p) for sentence in sentences: if len(current_chunk) + len(sentence) < 1000: current_chunk += " " + sentence else: if current_chunk: chunks.append(current_chunk.strip()) current_chunk = sentence else: if len(current_chunk) + len(p) < 1000: current_chunk += " " + p else: if current_chunk: chunks.append(current_chunk.strip()) current_chunk = p if current_chunk: chunks.append(current_chunk.strip()) return [c for c in chunks if len(c) > 100] # Filtra chunks muito pequenos def load_pdf(self, file_obj): """Carrega um arquivo PDF""" try: # Determina o caminho do arquivo filename = os.path.basename(file_obj.name) file_path = os.path.join(PDF_DIR, filename) # Salva o arquivo with open(file_path, 'wb') as f: f.write(file_obj.read()) # Extrai o texto text = "" with open(file_path, 'rb') as f: reader = PyPDF2.PdfReader(f) for page in reader.pages: page_text = page.extract_text() if page_text: text += page_text + "\n" if not text.strip(): return "Erro: Não foi possível extrair texto do PDF." # Processa o texto chunks = self.process_text(text) if not chunks: return "Erro: Conteúdo do PDF não pôde ser processado adequadamente." # Adiciona à base de conhecimento for chunk in chunks: self.documents.append({ "source": filename, "content": chunk }) # Atualiza vetores TF-IDF self._update_vectors() return f"PDF carregado com sucesso: {filename} ({len(chunks)} segmentos)" except Exception as e: return f"Erro ao processar PDF: {str(e)}" def _update_vectors(self): """Atualiza os vetores TF-IDF para todos os documentos""" if not self.documents: self.vectors = None return texts = [doc["content"] for doc in self.documents] self.vectors = self.vectorizer.fit_transform(texts) def search(self, query, top_k=3): """Busca documentos relevantes para a query""" if not self.documents or self.vectors is None: return [] # Vetoriza a query query_vec = self.vectorizer.transform([query]) # Calcula similaridade similarity_scores = cosine_similarity(query_vec, self.vectors)[0] # Encontra os top-k resultados top_indices = similarity_scores.argsort()[-top_k:][::-1] results = [] for idx in top_indices: results.append({ "score": similarity_scores[idx], "document": self.documents[idx] }) return results # Instância do RAG rag_engine = FreeRAG() def generate_response(prompt, max_length=300): """Gera uma resposta baseada no prompt usando o modelo carregado""" try: # Limita o tamanho do prompt para evitar erros if len(prompt) > 1024: prompt = prompt[:1024] # Gera a resposta response = summarizer( prompt, max_length=max_length, min_length=50, do_sample=False )[0]['summary_text'] return response except Exception as e: return f"Erro ao gerar resposta: {str(e)}" def process_query(query, history): """Processa uma consulta do usuário""" # Verifica se há documentos carregados if not rag_engine.documents: return "Por favor, carregue alguns PDFs primeiro.", "Nenhum documento carregado." # Busca documentos relevantes results = rag_engine.search(query, top_k=3) # Formata o contexto context = "" for i, result in enumerate(results): context += f"[{i+1}] Documento: {result['document']['source']}\n" context += f"Trecho: {result['document']['content'][:300]}...\n" context += f"Relevância: {result['score']:.2f}\n\n" # Constrói o prompt prompt = f""" Com base nos seguintes documentos, responda à pergunta de forma concisa e informativa. Se a resposta não estiver nos documentos, diga que não há informações suficientes. DOCUMENTOS: {context} PERGUNTA: {query} RESPOSTA: """ # Gera a resposta response = generate_response(prompt) return response, context # Interface Gradio with gr.Blocks(title="RAG PDF Gratuito") as demo: gr.Markdown("# Sistema de RAG PDF (Retrieval Augmented Generation)") gr.Markdown("Carregue PDFs e faça perguntas sobre eles.") with gr.Tab("Carregar PDFs"): with gr.Row(): with gr.Column(): pdf_upload = gr.File( label="Selecionar PDF", file_types=[".pdf"], file_count="single" ) upload_btn = gr.Button("Carregar PDF") clear_btn = gr.Button("Limpar Base de Conhecimento") status = gr.Textbox(label="Status", interactive=False) with gr.Tab("Consultar"): with gr.Row(): with gr.Column(scale=2): chatbot = gr.Chatbot(label="Conversa") query = gr.Textbox( label="Sua pergunta", placeholder="Digite sua pergunta sobre os documentos..." ) submit_btn = gr.Button("Enviar") with gr.Column(scale=1): context_box = gr.Textbox( label="Contexto Recuperado", interactive=False, lines=15 ) # Callbacks def upload_pdf(file): if file is None: return "Nenhum arquivo selecionado." return rag_engine.load_pdf(file) def clear_knowledge_base(): rag_engine.clear() return "Base de conhecimento limpa." def handle_query(question, chat_history): chat_history = chat_history or [] # Processa a consulta answer, context = process_query(question, chat_history) # Atualiza o histórico chat_history.append((question, answer)) return chat_history, "", context # Eventos upload_btn.click( upload_pdf, inputs=[pdf_upload], outputs=[status] ) clear_btn.click( clear_knowledge_base, inputs=[], outputs=[status] ) submit_btn.click( handle_query, inputs=[query, chatbot], outputs=[chatbot, query, context_box] ) query.submit( handle_query, inputs=[query, chatbot], outputs=[chatbot, query, context_box] ) if __name__ == "__main__": # Inicializa a interface demo.launch()