Spaces:
Running
Running
import os | |
from typing import Optional, Tuple, Dict | |
import gradio as gr | |
from langchain_community.document_loaders import PyPDFLoader, DirectoryLoader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
from langchain_community.vectorstores import FAISS | |
from langchain_community.llms import HuggingFacePipeline | |
from langchain.chains import RetrievalQA | |
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline | |
import torch | |
import tempfile | |
import time | |
# Configurações | |
EMBEDDING_MODEL = "sentence-transformers/all-mpnet-base-v2" | |
LLM_MODEL = "google/flan-t5-large" | |
DOCS_DIR = "documents" | |
class RAGSystem: | |
def __init__(self): | |
self.tokenizer = AutoTokenizer.from_pretrained(LLM_MODEL) | |
self.model = AutoModelForSeq2SeqLM.from_pretrained( | |
LLM_MODEL, | |
device_map="auto", | |
torch_dtype=torch.float32 | |
) | |
pipe = pipeline( | |
"text2text-generation", | |
model=self.model, | |
tokenizer=self.tokenizer, | |
max_length=512, | |
temperature=0.3, # Reduzido para respostas mais precisas | |
top_p=0.9, # Ajustado para permitir diversidade controlada | |
repetition_penalty=1.2 # Evita repetições | |
) | |
self.llm = HuggingFacePipeline(pipeline=pipe) | |
self.embeddings = HuggingFaceEmbeddings( | |
model_name=EMBEDDING_MODEL, | |
model_kwargs={'device': 'cpu'} | |
) | |
self.base_db = self.load_base_knowledge() | |
def load_base_knowledge(self) -> Optional[FAISS]: | |
try: | |
if not os.path.exists(DOCS_DIR): | |
os.makedirs(DOCS_DIR) | |
return None | |
loader = DirectoryLoader( | |
DOCS_DIR, | |
glob="**/*.pdf", | |
loader_cls=PyPDFLoader | |
) | |
documents = loader.load() | |
if not documents: | |
return None | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=500, | |
chunk_overlap=100, | |
length_function=len, | |
separators=["\n\n", "\n", ".", " ", ""] | |
) | |
texts = text_splitter.split_documents(documents) | |
return FAISS.from_documents(texts, self.embeddings) | |
except Exception as e: | |
print(f"Erro ao carregar base de conhecimento: {str(e)}") | |
return None | |
def process_pdf(self, file_content: bytes) -> Optional[FAISS]: | |
try: | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: | |
tmp_file.write(file_content) | |
tmp_path = tmp_file.name | |
loader = PyPDFLoader(tmp_path) | |
documents = loader.load() | |
os.unlink(tmp_path) | |
if not documents: | |
return None | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=500, | |
chunk_overlap=100, | |
length_function=len, | |
separators=["\n\n", "\n", ".", " ", ""] | |
) | |
texts = text_splitter.split_documents(documents) | |
db = FAISS.from_documents(texts, self.embeddings) | |
if self.base_db is not None: | |
db.merge_from(self.base_db) | |
return db | |
except Exception as e: | |
print(f"Erro ao processar PDF: {str(e)}") | |
return None | |
def format_response(self, raw_response: str, source_type: str, context_found: bool) -> str: | |
"""Formata a resposta para um formato padronizado e claro""" | |
if not context_found: | |
return "🔍 Não foram encontradas informações suficientes nos documentos para responder esta pergunta." | |
prefix = "" | |
if source_type == "pdf": | |
prefix = "📄 [Resposta baseada no PDF enviado]\n\n" | |
elif source_type == "base": | |
prefix = "📚 [Resposta baseada na base de documentos]\n\n" | |
elif source_type == "both": | |
prefix = "📚📄 [Resposta baseada em ambas as fontes]\n\n" | |
# Limpa e formata a resposta | |
response = raw_response.strip() | |
if not response: | |
return "🔍 Não foi possível gerar uma resposta adequada com as informações disponíveis." | |
return f"{prefix}{response}" | |
def generate_response(self, file_obj, query: str, progress=gr.Progress()) -> Tuple[str, str, str]: | |
"""Retorna (resposta, status, tempo_decorrido)""" | |
if not query.strip(): | |
return "Por favor, insira uma pergunta.", "⚠️ Aguardando pergunta", "0s" | |
start_time = time.time() | |
try: | |
progress(0.2, desc="Processando documentos...") | |
# Determina a fonte dos documentos | |
has_pdf = file_obj is not None | |
has_base = self.base_db is not None | |
source_type = "both" if has_pdf and has_base else "pdf" if has_pdf else "base" if has_base else None | |
if not source_type: | |
return "Nenhuma fonte de documentos disponível.", "❌ Sem documentos", "0s" | |
# Processa documento | |
if has_pdf: | |
db = self.process_pdf(file_obj) | |
if db is None: | |
return "Não foi possível processar o PDF.", "❌ Erro no processamento", "0s" | |
else: | |
db = self.base_db | |
progress(0.4, desc="Buscando informações relevantes...") | |
# Configuração do RAG | |
qa_chain = RetrievalQA.from_chain_type( | |
llm=self.llm, | |
chain_type="stuff", | |
retriever=db.as_retriever( | |
search_kwargs={ | |
"k": 4, | |
"fetch_k": 6, | |
"score_threshold": 0.6 # Filtra resultados pouco relevantes | |
} | |
), | |
return_source_documents=True | |
) | |
progress(0.6, desc="Gerando resposta...") | |
# Prompt mais estruturado | |
prompt = f"""Instruções: | |
1. Analise cuidadosamente os documentos fornecidos. | |
2. Responda à seguinte pergunta em português de forma clara e direta: {query} | |
3. Use apenas informações encontradas nos documentos. | |
4. Se não houver informações suficientes, indique explicitamente. | |
5. Mantenha a resposta objetiva e baseada em fatos. | |
6. Cite exemplos específicos dos documentos quando relevante. | |
7. Evite inventar informações que não estão presentes nos documentos. | |
Pergunta: {query}""" | |
# Gera resposta | |
result = qa_chain({"query": prompt}) | |
# Verifica se encontrou contexto relevante | |
context_found = bool(result.get("source_documents", [])) | |
# Formata a resposta | |
formatted_response = self.format_response( | |
result["result"], | |
source_type, | |
context_found | |
) | |
elapsed_time = f"{time.time() - start_time:.1f}s" | |
progress(1.0, desc="Concluído!") | |
return formatted_response, "✅ Sucesso", elapsed_time | |
except Exception as e: | |
elapsed_time = f"{time.time() - start_time:.1f}s" | |
return f"Erro ao gerar resposta: {str(e)}", "❌ Erro", elapsed_time | |
def create_demo(): | |
rag = RAGSystem() | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
with gr.Column(elem_id="container"): | |
# Cabeçalho | |
gr.Markdown( | |
""" | |
# 🤖 Assistente de Documentos Inteligente | |
Sistema de consulta avançada que responde perguntas sobre seus documentos usando RAG. | |
""" | |
) | |
# Área principal | |
with gr.Row(): | |
# Coluna de entrada | |
with gr.Column(): | |
with gr.Group(): | |
gr.Markdown("### 📄 Documentos") | |
file_input = gr.File( | |
label="Upload de PDF (opcional)", | |
type="binary", | |
file_types=[".pdf"], | |
height=100, | |
) | |
info = gr.Markdown( | |
f""" | |
ℹ️ O sistema consulta: | |
- PDFs enviados por você | |
- Documentos na pasta `{DOCS_DIR}` | |
""" | |
) | |
with gr.Group(): | |
gr.Markdown("### ❓ Sua Pergunta") | |
query_input = gr.Textbox( | |
placeholder="Digite sua pergunta sobre os documentos...", | |
lines=3, | |
max_lines=6, | |
show_label=False, | |
) | |
with gr.Row(): | |
clear_btn = gr.Button("🗑️ Limpar", variant="secondary") | |
submit_btn = gr.Button("🔍 Enviar Pergunta", variant="primary") | |
# Coluna de saída | |
with gr.Column(): | |
with gr.Group(): | |
gr.Markdown("### 📝 Resposta") | |
with gr.Row(): | |
status_output = gr.Textbox( | |
label="Status", | |
value="⏳ Aguardando...", | |
interactive=False, | |
show_label=False, | |
) | |
time_output = gr.Textbox( | |
label="Tempo", | |
value="0s", | |
interactive=False, | |
show_label=False, | |
) | |
response_output = gr.Textbox( | |
label="Resposta", | |
placeholder="A resposta aparecerá aqui...", | |
interactive=False, | |
lines=12, | |
show_label=False, | |
) | |
# Exemplos | |
with gr.Accordion("📚 Exemplos de Perguntas", open=False): | |
gr.Examples( | |
examples=[ | |
[None, "Quais são os principais tópicos abordados neste documento?"], | |
[None, "Resuma as conclusões mais importantes."], | |
[None, "O que o documento diz sobre [tema específico]?"], | |
[None, "Quais são as recomendações apresentadas?"], | |
], | |
inputs=[file_input, query_input], | |
) | |
# Rodapé | |
gr.Markdown( | |
""" | |
--- | |
### 🔧 Informações do Sistema | |
* Respostas geradas usando tecnologia RAG (Retrieval-Augmented Generation) | |
* Processamento inteligente de documentos PDF | |
* Respostas baseadas exclusivamente no conteúdo dos documentos | |
* Suporte a múltiplos documentos e contextos | |
""" | |
) | |
# Eventos | |
submit_btn.click( | |
fn=rag.generate_response, | |
inputs=[file_input, query_input], | |
outputs=[response_output, status_output, time_output], | |
) | |
clear_btn.click( | |
lambda: (None, "", "⏳ Aguardando...", "0s"), | |
outputs=[file_input, query_input, status_output, time_output], | |
) | |
# Limpa a resposta quando a pergunta muda | |
query_input.change( | |
lambda: ("", "⏳ Aguardando...", "0s"), | |
outputs=[response_output, status_output, time_output], | |
) | |
return demo | |
if __name__ == "__main__": | |
demo = create_demo() | |
demo.launch() |