Gemma-3 / app.py
DHEIVER's picture
Update app.py
2fa7d8f verified
import gradio as gr
from transformers import pipeline
import PyPDF2
import os
import re
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import torch
# Configuração de diretórios
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
PDF_DIR = os.path.join(BASE_DIR, "pdf_data")
os.makedirs(PDF_DIR, exist_ok=True)
# Inicialização de modelos gratuitos
# Carregando o modelo de geração de texto
device = 0 if torch.cuda.is_available() else -1
summarizer = pipeline(
"summarization",
model="facebook/bart-large-cnn",
device=device
)
# Classe RAG totalmente gratuita
class FreeRAG:
def __init__(self):
self.documents = []
self.vectorizer = TfidfVectorizer(stop_words='english')
self.vectors = None
def clear(self):
self.documents = []
self.vectors = None
def process_text(self, text):
"""Processa e divide o texto em chunks de tamanho razoável"""
# Limpa espaços extras
text = re.sub(r'\s+', ' ', text).strip()
# Divide por parágrafos
paragraphs = text.split('\n')
chunks = []
current_chunk = ""
for p in paragraphs:
p = p.strip()
if not p:
continue
# Se o parágrafo for muito grande, divide-o
if len(p) > 1000:
sentences = re.split(r'(?<=[.!?])\s+', p)
for sentence in sentences:
if len(current_chunk) + len(sentence) < 1000:
current_chunk += " " + sentence
else:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = sentence
else:
if len(current_chunk) + len(p) < 1000:
current_chunk += " " + p
else:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = p
if current_chunk:
chunks.append(current_chunk.strip())
return [c for c in chunks if len(c) > 100] # Filtra chunks muito pequenos
def load_pdf(self, file_obj):
"""Carrega um arquivo PDF"""
try:
# Determina o caminho do arquivo
filename = os.path.basename(file_obj.name)
file_path = os.path.join(PDF_DIR, filename)
# Salva o arquivo
with open(file_path, 'wb') as f:
f.write(file_obj.read())
# Extrai o texto
text = ""
with open(file_path, 'rb') as f:
reader = PyPDF2.PdfReader(f)
for page in reader.pages:
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
if not text.strip():
return "Erro: Não foi possível extrair texto do PDF."
# Processa o texto
chunks = self.process_text(text)
if not chunks:
return "Erro: Conteúdo do PDF não pôde ser processado adequadamente."
# Adiciona à base de conhecimento
for chunk in chunks:
self.documents.append({
"source": filename,
"content": chunk
})
# Atualiza vetores TF-IDF
self._update_vectors()
return f"PDF carregado com sucesso: {filename} ({len(chunks)} segmentos)"
except Exception as e:
return f"Erro ao processar PDF: {str(e)}"
def _update_vectors(self):
"""Atualiza os vetores TF-IDF para todos os documentos"""
if not self.documents:
self.vectors = None
return
texts = [doc["content"] for doc in self.documents]
self.vectors = self.vectorizer.fit_transform(texts)
def search(self, query, top_k=3):
"""Busca documentos relevantes para a query"""
if not self.documents or self.vectors is None:
return []
# Vetoriza a query
query_vec = self.vectorizer.transform([query])
# Calcula similaridade
similarity_scores = cosine_similarity(query_vec, self.vectors)[0]
# Encontra os top-k resultados
top_indices = similarity_scores.argsort()[-top_k:][::-1]
results = []
for idx in top_indices:
results.append({
"score": similarity_scores[idx],
"document": self.documents[idx]
})
return results
# Instância do RAG
rag_engine = FreeRAG()
def generate_response(prompt, max_length=300):
"""Gera uma resposta baseada no prompt usando o modelo carregado"""
try:
# Limita o tamanho do prompt para evitar erros
if len(prompt) > 1024:
prompt = prompt[:1024]
# Gera a resposta
response = summarizer(
prompt,
max_length=max_length,
min_length=50,
do_sample=False
)[0]['summary_text']
return response
except Exception as e:
return f"Erro ao gerar resposta: {str(e)}"
def process_query(query, history):
"""Processa uma consulta do usuário"""
# Verifica se há documentos carregados
if not rag_engine.documents:
return "Por favor, carregue alguns PDFs primeiro.", "Nenhum documento carregado."
# Busca documentos relevantes
results = rag_engine.search(query, top_k=3)
# Formata o contexto
context = ""
for i, result in enumerate(results):
context += f"[{i+1}] Documento: {result['document']['source']}\n"
context += f"Trecho: {result['document']['content'][:300]}...\n"
context += f"Relevância: {result['score']:.2f}\n\n"
# Constrói o prompt
prompt = f"""
Com base nos seguintes documentos, responda à pergunta de forma concisa e informativa.
Se a resposta não estiver nos documentos, diga que não há informações suficientes.
DOCUMENTOS:
{context}
PERGUNTA: {query}
RESPOSTA:
"""
# Gera a resposta
response = generate_response(prompt)
return response, context
# Interface Gradio
with gr.Blocks(title="RAG PDF Gratuito") as demo:
gr.Markdown("# Sistema de RAG PDF (Retrieval Augmented Generation)")
gr.Markdown("Carregue PDFs e faça perguntas sobre eles.")
with gr.Tab("Carregar PDFs"):
with gr.Row():
with gr.Column():
pdf_upload = gr.File(
label="Selecionar PDF",
file_types=[".pdf"],
file_count="single"
)
upload_btn = gr.Button("Carregar PDF")
clear_btn = gr.Button("Limpar Base de Conhecimento")
status = gr.Textbox(label="Status", interactive=False)
with gr.Tab("Consultar"):
with gr.Row():
with gr.Column(scale=2):
chatbot = gr.Chatbot(label="Conversa")
query = gr.Textbox(
label="Sua pergunta",
placeholder="Digite sua pergunta sobre os documentos..."
)
submit_btn = gr.Button("Enviar")
with gr.Column(scale=1):
context_box = gr.Textbox(
label="Contexto Recuperado",
interactive=False,
lines=15
)
# Callbacks
def upload_pdf(file):
if file is None:
return "Nenhum arquivo selecionado."
return rag_engine.load_pdf(file)
def clear_knowledge_base():
rag_engine.clear()
return "Base de conhecimento limpa."
def handle_query(question, chat_history):
chat_history = chat_history or []
# Processa a consulta
answer, context = process_query(question, chat_history)
# Atualiza o histórico
chat_history.append((question, answer))
return chat_history, "", context
# Eventos
upload_btn.click(
upload_pdf,
inputs=[pdf_upload],
outputs=[status]
)
clear_btn.click(
clear_knowledge_base,
inputs=[],
outputs=[status]
)
submit_btn.click(
handle_query,
inputs=[query, chatbot],
outputs=[chatbot, query, context_box]
)
query.submit(
handle_query,
inputs=[query, chatbot],
outputs=[chatbot, query, context_box]
)
if __name__ == "__main__":
# Inicializa a interface
demo.launch()