my-rag-space / app.py
DHEIVER's picture
Update app.py
6f1dbe9 verified
raw
history blame
10.3 kB
import os
from typing import Optional, Tuple
import gradio as gr
from langchain_community.document_loaders import PyPDFLoader, DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_community.llms import HuggingFacePipeline
from langchain.chains import RetrievalQA
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline
import torch
import tempfile
import time
# Configurações
EMBEDDING_MODEL = "sentence-transformers/all-mpnet-base-v2"
LLM_MODEL = "google/flan-t5-large"
DOCS_DIR = "documents"
class RAGSystem:
def __init__(self):
self.tokenizer = AutoTokenizer.from_pretrained(LLM_MODEL)
self.model = AutoModelForSeq2SeqLM.from_pretrained(
LLM_MODEL,
device_map="auto",
torch_dtype=torch.float32
)
pipe = pipeline(
"text2text-generation",
model=self.model,
tokenizer=self.tokenizer,
max_length=512,
temperature=0.7,
top_p=0.95
)
self.llm = HuggingFacePipeline(pipeline=pipe)
self.embeddings = HuggingFaceEmbeddings(
model_name=EMBEDDING_MODEL,
model_kwargs={'device': 'cpu'}
)
self.base_db = self.load_base_knowledge()
def load_base_knowledge(self) -> Optional[FAISS]:
try:
if not os.path.exists(DOCS_DIR):
os.makedirs(DOCS_DIR)
return None
loader = DirectoryLoader(
DOCS_DIR,
glob="**/*.pdf",
loader_cls=PyPDFLoader
)
documents = loader.load()
if not documents:
return None
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=100,
length_function=len,
separators=["\n\n", "\n", ".", " ", ""]
)
texts = text_splitter.split_documents(documents)
return FAISS.from_documents(texts, self.embeddings)
except Exception as e:
print(f"Erro ao carregar base de conhecimento: {str(e)}")
return None
def process_pdf(self, file_content: bytes) -> Optional[FAISS]:
try:
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file:
tmp_file.write(file_content)
tmp_path = tmp_file.name
loader = PyPDFLoader(tmp_path)
documents = loader.load()
os.unlink(tmp_path)
if not documents:
return None
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=100,
length_function=len,
separators=["\n\n", "\n", ".", " ", ""]
)
texts = text_splitter.split_documents(documents)
db = FAISS.from_documents(texts, self.embeddings)
if self.base_db is not None:
db.merge_from(self.base_db)
return db
except Exception as e:
print(f"Erro ao processar PDF: {str(e)}")
return None
def generate_response(self, file_obj, query: str, progress=gr.Progress()) -> Tuple[str, str, str]:
"""Retorna (resposta, status, tempo_decorrido)"""
if not query.strip():
return "Por favor, insira uma pergunta.", "⚠️ Aguardando pergunta", "0s"
start_time = time.time()
try:
progress(0, desc="Iniciando processamento...")
# Processa documento
progress(0.2, desc="Processando documento...")
if file_obj is not None:
db = self.process_pdf(file_obj)
if db is None:
return "Não foi possível processar o PDF.", "❌ Erro no processamento", "0s"
elif self.base_db is not None:
db = self.base_db
else:
return "Nenhuma base de conhecimento disponível.", "❌ Sem documentos", "0s"
progress(0.4, desc="Buscando informações relevantes...")
qa_chain = RetrievalQA.from_chain_type(
llm=self.llm,
chain_type="stuff",
retriever=db.as_retriever(
search_kwargs={"k": 4, "fetch_k": 6}
),
return_source_documents=True
)
progress(0.6, desc="Gerando resposta...")
prompt = f"""Baseado nos documentos fornecidos, responda em português à seguinte pergunta:
{query}
Se a resposta vier da base de documentos permanente, indique isso no início.
Se a resposta vier do PDF enviado, indique isso no início.
Se não encontrar informações suficientes, indique isso claramente."""
result = qa_chain({"query": prompt})
elapsed_time = f"{time.time() - start_time:.1f}s"
progress(1.0, desc="Concluído!")
return result["result"], "✅ Sucesso", elapsed_time
except Exception as e:
elapsed_time = f"{time.time() - start_time:.1f}s"
return f"Erro ao gerar resposta: {str(e)}", "❌ Erro", elapsed_time
def create_demo():
rag = RAGSystem()
with gr.Blocks(theme=gr.themes.Soft()) as demo:
with gr.Column(elem_id="container"):
# Cabeçalho
gr.Markdown(
"""
# 🤖 Assistente de Documentos Inteligente
Este sistema usa tecnologia RAG (Retrieval-Augmented Generation) para responder perguntas sobre seus documentos.
"""
)
# Área principal
with gr.Row():
# Coluna de entrada
with gr.Column():
with gr.Group():
gr.Markdown("### 📄 Documentos")
file_input = gr.File(
label="Upload de PDF (opcional)",
type="binary",
file_types=[".pdf"],
height=100,
)
info = gr.Markdown(
f"""
ℹ️ Além do upload, o sistema também consulta a pasta `{DOCS_DIR}`
"""
)
with gr.Group():
gr.Markdown("### ❓ Sua Pergunta")
query_input = gr.Textbox(
placeholder="Digite sua pergunta aqui...",
lines=3,
max_lines=6,
show_label=False,
)
with gr.Row():
clear_btn = gr.Button("🗑️ Limpar", variant="secondary")
submit_btn = gr.Button("🔍 Enviar Pergunta", variant="primary")
# Coluna de saída
with gr.Column():
with gr.Group():
gr.Markdown("### 📝 Resposta")
with gr.Row():
status_output = gr.Textbox(
label="Status",
value="⏳ Aguardando...",
interactive=False,
show_label=False,
)
time_output = gr.Textbox(
label="Tempo",
value="0s",
interactive=False,
show_label=False,
)
response_output = gr.Textbox(
label="Resposta",
placeholder="A resposta aparecerá aqui...",
interactive=False,
lines=12,
show_label=False,
)
# Exemplos
with gr.Accordion("📚 Exemplos de Perguntas", open=False):
gr.Examples(
examples=[
[None, "Qual é o tema principal dos documentos?"],
[None, "Pode resumir os pontos principais?"],
[None, "Quais são as principais conclusões?"],
[None, "Explique o contexto deste documento."],
],
inputs=[file_input, query_input],
)
# Rodapé
gr.Markdown(
"""
---
### 🔧 Sobre o Sistema
* Usa modelo T5 para geração de respostas
* Processamento de documentos com tecnologia RAG
* Suporte a múltiplos documentos PDF
* Respostas baseadas apenas no conteúdo dos documentos
"""
)
# Eventos
submit_btn.click(
fn=rag.generate_response,
inputs=[file_input, query_input],
outputs=[response_output, status_output, time_output],
)
clear_btn.click(
lambda: (None, "", "⏳ Aguardando...", "0s"),
outputs=[file_input, query_input, status_output, time_output],
)
# Limpa a resposta quando a pergunta muda
query_input.change(
lambda: ("", "⏳ Aguardando...", "0s"),
outputs=[response_output, status_output, time_output],
)
return demo
if __name__ == "__main__":
demo = create_demo()
demo.launch()