Spaces:
Running
Running
import os | |
from typing import Optional, Tuple | |
import gradio as gr | |
from langchain_community.document_loaders import PyPDFLoader, DirectoryLoader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
from langchain_community.vectorstores import FAISS | |
from langchain_community.llms import HuggingFacePipeline | |
from langchain.chains import RetrievalQA | |
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline | |
import torch | |
import tempfile | |
import time | |
# Configurações | |
EMBEDDING_MODEL = "sentence-transformers/all-mpnet-base-v2" | |
LLM_MODEL = "google/flan-t5-large" | |
DOCS_DIR = "documents" | |
class RAGSystem: | |
def __init__(self): | |
self.tokenizer = AutoTokenizer.from_pretrained(LLM_MODEL) | |
self.model = AutoModelForSeq2SeqLM.from_pretrained( | |
LLM_MODEL, | |
device_map="auto", | |
torch_dtype=torch.float32 | |
) | |
pipe = pipeline( | |
"text2text-generation", | |
model=self.model, | |
tokenizer=self.tokenizer, | |
max_length=512, | |
temperature=0.7, | |
top_p=0.95 | |
) | |
self.llm = HuggingFacePipeline(pipeline=pipe) | |
self.embeddings = HuggingFaceEmbeddings( | |
model_name=EMBEDDING_MODEL, | |
model_kwargs={'device': 'cpu'} | |
) | |
self.base_db = self.load_base_knowledge() | |
def load_base_knowledge(self) -> Optional[FAISS]: | |
try: | |
if not os.path.exists(DOCS_DIR): | |
os.makedirs(DOCS_DIR) | |
return None | |
loader = DirectoryLoader( | |
DOCS_DIR, | |
glob="**/*.pdf", | |
loader_cls=PyPDFLoader | |
) | |
documents = loader.load() | |
if not documents: | |
return None | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=500, | |
chunk_overlap=100, | |
length_function=len, | |
separators=["\n\n", "\n", ".", " ", ""] | |
) | |
texts = text_splitter.split_documents(documents) | |
return FAISS.from_documents(texts, self.embeddings) | |
except Exception as e: | |
print(f"Erro ao carregar base de conhecimento: {str(e)}") | |
return None | |
def process_pdf(self, file_content: bytes) -> Optional[FAISS]: | |
try: | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: | |
tmp_file.write(file_content) | |
tmp_path = tmp_file.name | |
loader = PyPDFLoader(tmp_path) | |
documents = loader.load() | |
os.unlink(tmp_path) | |
if not documents: | |
return None | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=500, | |
chunk_overlap=100, | |
length_function=len, | |
separators=["\n\n", "\n", ".", " ", ""] | |
) | |
texts = text_splitter.split_documents(documents) | |
db = FAISS.from_documents(texts, self.embeddings) | |
if self.base_db is not None: | |
db.merge_from(self.base_db) | |
return db | |
except Exception as e: | |
print(f"Erro ao processar PDF: {str(e)}") | |
return None | |
def generate_response(self, file_obj, query: str, progress=gr.Progress()) -> Tuple[str, str, str]: | |
"""Retorna (resposta, status, tempo_decorrido)""" | |
if not query.strip(): | |
return "Por favor, insira uma pergunta.", "⚠️ Aguardando pergunta", "0s" | |
start_time = time.time() | |
try: | |
progress(0, desc="Iniciando processamento...") | |
# Processa documento | |
progress(0.2, desc="Processando documento...") | |
if file_obj is not None: | |
db = self.process_pdf(file_obj) | |
if db is None: | |
return "Não foi possível processar o PDF.", "❌ Erro no processamento", "0s" | |
elif self.base_db is not None: | |
db = self.base_db | |
else: | |
return "Nenhuma base de conhecimento disponível.", "❌ Sem documentos", "0s" | |
progress(0.4, desc="Buscando informações relevantes...") | |
qa_chain = RetrievalQA.from_chain_type( | |
llm=self.llm, | |
chain_type="stuff", | |
retriever=db.as_retriever( | |
search_kwargs={"k": 4, "fetch_k": 6} | |
), | |
return_source_documents=True | |
) | |
progress(0.6, desc="Gerando resposta...") | |
prompt = f"""Baseado nos documentos fornecidos, responda em português à seguinte pergunta: | |
{query} | |
Se a resposta vier da base de documentos permanente, indique isso no início. | |
Se a resposta vier do PDF enviado, indique isso no início. | |
Se não encontrar informações suficientes, indique isso claramente.""" | |
result = qa_chain({"query": prompt}) | |
elapsed_time = f"{time.time() - start_time:.1f}s" | |
progress(1.0, desc="Concluído!") | |
return result["result"], "✅ Sucesso", elapsed_time | |
except Exception as e: | |
elapsed_time = f"{time.time() - start_time:.1f}s" | |
return f"Erro ao gerar resposta: {str(e)}", "❌ Erro", elapsed_time | |
def create_demo(): | |
rag = RAGSystem() | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
with gr.Column(elem_id="container"): | |
# Cabeçalho | |
gr.Markdown( | |
""" | |
# 🤖 Assistente de Documentos Inteligente | |
Este sistema usa tecnologia RAG (Retrieval-Augmented Generation) para responder perguntas sobre seus documentos. | |
""" | |
) | |
# Área principal | |
with gr.Row(): | |
# Coluna de entrada | |
with gr.Column(): | |
with gr.Group(): | |
gr.Markdown("### 📄 Documentos") | |
file_input = gr.File( | |
label="Upload de PDF (opcional)", | |
type="binary", | |
file_types=[".pdf"], | |
height=100, | |
) | |
info = gr.Markdown( | |
f""" | |
ℹ️ Além do upload, o sistema também consulta a pasta `{DOCS_DIR}` | |
""" | |
) | |
with gr.Group(): | |
gr.Markdown("### ❓ Sua Pergunta") | |
query_input = gr.Textbox( | |
placeholder="Digite sua pergunta aqui...", | |
lines=3, | |
max_lines=6, | |
show_label=False, | |
) | |
with gr.Row(): | |
clear_btn = gr.Button("🗑️ Limpar", variant="secondary") | |
submit_btn = gr.Button("🔍 Enviar Pergunta", variant="primary") | |
# Coluna de saída | |
with gr.Column(): | |
with gr.Group(): | |
gr.Markdown("### 📝 Resposta") | |
with gr.Row(): | |
status_output = gr.Textbox( | |
label="Status", | |
value="⏳ Aguardando...", | |
interactive=False, | |
show_label=False, | |
) | |
time_output = gr.Textbox( | |
label="Tempo", | |
value="0s", | |
interactive=False, | |
show_label=False, | |
) | |
response_output = gr.Textbox( | |
label="Resposta", | |
placeholder="A resposta aparecerá aqui...", | |
interactive=False, | |
lines=12, | |
show_label=False, | |
) | |
# Exemplos | |
with gr.Accordion("📚 Exemplos de Perguntas", open=False): | |
gr.Examples( | |
examples=[ | |
[None, "Qual é o tema principal dos documentos?"], | |
[None, "Pode resumir os pontos principais?"], | |
[None, "Quais são as principais conclusões?"], | |
[None, "Explique o contexto deste documento."], | |
], | |
inputs=[file_input, query_input], | |
) | |
# Rodapé | |
gr.Markdown( | |
""" | |
--- | |
### 🔧 Sobre o Sistema | |
* Usa modelo T5 para geração de respostas | |
* Processamento de documentos com tecnologia RAG | |
* Suporte a múltiplos documentos PDF | |
* Respostas baseadas apenas no conteúdo dos documentos | |
""" | |
) | |
# Eventos | |
submit_btn.click( | |
fn=rag.generate_response, | |
inputs=[file_input, query_input], | |
outputs=[response_output, status_output, time_output], | |
) | |
clear_btn.click( | |
lambda: (None, "", "⏳ Aguardando...", "0s"), | |
outputs=[file_input, query_input, status_output, time_output], | |
) | |
# Limpa a resposta quando a pergunta muda | |
query_input.change( | |
lambda: ("", "⏳ Aguardando...", "0s"), | |
outputs=[response_output, status_output, time_output], | |
) | |
return demo | |
if __name__ == "__main__": | |
demo = create_demo() | |
demo.launch() |