|
import gradio as gr |
|
from transformers import pipeline |
|
import PyPDF2 |
|
import os |
|
import re |
|
import numpy as np |
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
import torch |
|
|
|
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) |
|
PDF_DIR = os.path.join(BASE_DIR, "pdf_data") |
|
os.makedirs(PDF_DIR, exist_ok=True) |
|
|
|
|
|
|
|
device = 0 if torch.cuda.is_available() else -1 |
|
summarizer = pipeline( |
|
"summarization", |
|
model="facebook/bart-large-cnn", |
|
device=device |
|
) |
|
|
|
|
|
class FreeRAG: |
|
def __init__(self): |
|
self.documents = [] |
|
self.vectorizer = TfidfVectorizer(stop_words='english') |
|
self.vectors = None |
|
|
|
def clear(self): |
|
self.documents = [] |
|
self.vectors = None |
|
|
|
def process_text(self, text): |
|
"""Processa e divide o texto em chunks de tamanho razoável""" |
|
|
|
text = re.sub(r'\s+', ' ', text).strip() |
|
|
|
|
|
paragraphs = text.split('\n') |
|
chunks = [] |
|
|
|
current_chunk = "" |
|
for p in paragraphs: |
|
p = p.strip() |
|
if not p: |
|
continue |
|
|
|
|
|
if len(p) > 1000: |
|
sentences = re.split(r'(?<=[.!?])\s+', p) |
|
for sentence in sentences: |
|
if len(current_chunk) + len(sentence) < 1000: |
|
current_chunk += " " + sentence |
|
else: |
|
if current_chunk: |
|
chunks.append(current_chunk.strip()) |
|
current_chunk = sentence |
|
else: |
|
if len(current_chunk) + len(p) < 1000: |
|
current_chunk += " " + p |
|
else: |
|
if current_chunk: |
|
chunks.append(current_chunk.strip()) |
|
current_chunk = p |
|
|
|
if current_chunk: |
|
chunks.append(current_chunk.strip()) |
|
|
|
return [c for c in chunks if len(c) > 100] |
|
|
|
def load_pdf(self, file_obj): |
|
"""Carrega um arquivo PDF""" |
|
try: |
|
|
|
filename = os.path.basename(file_obj.name) |
|
file_path = os.path.join(PDF_DIR, filename) |
|
|
|
|
|
with open(file_path, 'wb') as f: |
|
f.write(file_obj.read()) |
|
|
|
|
|
text = "" |
|
with open(file_path, 'rb') as f: |
|
reader = PyPDF2.PdfReader(f) |
|
for page in reader.pages: |
|
page_text = page.extract_text() |
|
if page_text: |
|
text += page_text + "\n" |
|
|
|
if not text.strip(): |
|
return "Erro: Não foi possível extrair texto do PDF." |
|
|
|
|
|
chunks = self.process_text(text) |
|
if not chunks: |
|
return "Erro: Conteúdo do PDF não pôde ser processado adequadamente." |
|
|
|
|
|
for chunk in chunks: |
|
self.documents.append({ |
|
"source": filename, |
|
"content": chunk |
|
}) |
|
|
|
|
|
self._update_vectors() |
|
|
|
return f"PDF carregado com sucesso: {filename} ({len(chunks)} segmentos)" |
|
|
|
except Exception as e: |
|
return f"Erro ao processar PDF: {str(e)}" |
|
|
|
def _update_vectors(self): |
|
"""Atualiza os vetores TF-IDF para todos os documentos""" |
|
if not self.documents: |
|
self.vectors = None |
|
return |
|
|
|
texts = [doc["content"] for doc in self.documents] |
|
self.vectors = self.vectorizer.fit_transform(texts) |
|
|
|
def search(self, query, top_k=3): |
|
"""Busca documentos relevantes para a query""" |
|
if not self.documents or self.vectors is None: |
|
return [] |
|
|
|
|
|
query_vec = self.vectorizer.transform([query]) |
|
|
|
|
|
similarity_scores = cosine_similarity(query_vec, self.vectors)[0] |
|
|
|
|
|
top_indices = similarity_scores.argsort()[-top_k:][::-1] |
|
|
|
results = [] |
|
for idx in top_indices: |
|
results.append({ |
|
"score": similarity_scores[idx], |
|
"document": self.documents[idx] |
|
}) |
|
|
|
return results |
|
|
|
|
|
rag_engine = FreeRAG() |
|
|
|
def generate_response(prompt, max_length=300): |
|
"""Gera uma resposta baseada no prompt usando o modelo carregado""" |
|
try: |
|
|
|
if len(prompt) > 1024: |
|
prompt = prompt[:1024] |
|
|
|
|
|
response = summarizer( |
|
prompt, |
|
max_length=max_length, |
|
min_length=50, |
|
do_sample=False |
|
)[0]['summary_text'] |
|
|
|
return response |
|
except Exception as e: |
|
return f"Erro ao gerar resposta: {str(e)}" |
|
|
|
def process_query(query, history): |
|
"""Processa uma consulta do usuário""" |
|
|
|
if not rag_engine.documents: |
|
return "Por favor, carregue alguns PDFs primeiro.", "Nenhum documento carregado." |
|
|
|
|
|
results = rag_engine.search(query, top_k=3) |
|
|
|
|
|
context = "" |
|
for i, result in enumerate(results): |
|
context += f"[{i+1}] Documento: {result['document']['source']}\n" |
|
context += f"Trecho: {result['document']['content'][:300]}...\n" |
|
context += f"Relevância: {result['score']:.2f}\n\n" |
|
|
|
|
|
prompt = f""" |
|
Com base nos seguintes documentos, responda à pergunta de forma concisa e informativa. |
|
Se a resposta não estiver nos documentos, diga que não há informações suficientes. |
|
|
|
DOCUMENTOS: |
|
{context} |
|
|
|
PERGUNTA: {query} |
|
|
|
RESPOSTA: |
|
""" |
|
|
|
|
|
response = generate_response(prompt) |
|
|
|
return response, context |
|
|
|
|
|
with gr.Blocks(title="RAG PDF Gratuito") as demo: |
|
gr.Markdown("# Sistema de RAG PDF (Retrieval Augmented Generation)") |
|
gr.Markdown("Carregue PDFs e faça perguntas sobre eles.") |
|
|
|
with gr.Tab("Carregar PDFs"): |
|
with gr.Row(): |
|
with gr.Column(): |
|
pdf_upload = gr.File( |
|
label="Selecionar PDF", |
|
file_types=[".pdf"], |
|
file_count="single" |
|
) |
|
upload_btn = gr.Button("Carregar PDF") |
|
clear_btn = gr.Button("Limpar Base de Conhecimento") |
|
status = gr.Textbox(label="Status", interactive=False) |
|
|
|
with gr.Tab("Consultar"): |
|
with gr.Row(): |
|
with gr.Column(scale=2): |
|
chatbot = gr.Chatbot(label="Conversa") |
|
query = gr.Textbox( |
|
label="Sua pergunta", |
|
placeholder="Digite sua pergunta sobre os documentos..." |
|
) |
|
submit_btn = gr.Button("Enviar") |
|
|
|
with gr.Column(scale=1): |
|
context_box = gr.Textbox( |
|
label="Contexto Recuperado", |
|
interactive=False, |
|
lines=15 |
|
) |
|
|
|
|
|
def upload_pdf(file): |
|
if file is None: |
|
return "Nenhum arquivo selecionado." |
|
return rag_engine.load_pdf(file) |
|
|
|
def clear_knowledge_base(): |
|
rag_engine.clear() |
|
return "Base de conhecimento limpa." |
|
|
|
def handle_query(question, chat_history): |
|
chat_history = chat_history or [] |
|
|
|
|
|
answer, context = process_query(question, chat_history) |
|
|
|
|
|
chat_history.append((question, answer)) |
|
|
|
return chat_history, "", context |
|
|
|
|
|
upload_btn.click( |
|
upload_pdf, |
|
inputs=[pdf_upload], |
|
outputs=[status] |
|
) |
|
|
|
clear_btn.click( |
|
clear_knowledge_base, |
|
inputs=[], |
|
outputs=[status] |
|
) |
|
|
|
submit_btn.click( |
|
handle_query, |
|
inputs=[query, chatbot], |
|
outputs=[chatbot, query, context_box] |
|
) |
|
|
|
query.submit( |
|
handle_query, |
|
inputs=[query, chatbot], |
|
outputs=[chatbot, query, context_box] |
|
) |
|
|
|
if __name__ == "__main__": |
|
|
|
demo.launch() |