Spaces:
Running
Running
import os | |
from typing import Optional | |
import gradio as gr | |
from langchain_community.document_loaders import PyPDFLoader, DirectoryLoader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
from langchain_community.vectorstores import FAISS | |
from langchain_community.llms import HuggingFacePipeline | |
from langchain.chains import RetrievalQA | |
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline | |
import torch | |
import tempfile | |
# Configurações | |
EMBEDDING_MODEL = "sentence-transformers/all-mpnet-base-v2" | |
LLM_MODEL = "google/flan-t5-large" # Modelo aberto e sem necessidade de autenticação | |
DOCS_DIR = "documentos" | |
class RAGSystem: | |
def __init__(self): | |
# Inicializa o modelo de linguagem | |
print("Carregando modelo de linguagem...") | |
self.tokenizer = AutoTokenizer.from_pretrained(LLM_MODEL) | |
self.model = AutoModelForSeq2SeqLM.from_pretrained( | |
LLM_MODEL, | |
device_map="auto", | |
torch_dtype=torch.float32 # T5 funciona bem com float32 | |
) | |
# Configura o pipeline | |
pipe = pipeline( | |
"text2text-generation", | |
model=self.model, | |
tokenizer=self.tokenizer, | |
max_length=512, | |
temperature=0.7, | |
top_p=0.95 | |
) | |
# Configura o modelo LangChain | |
self.llm = HuggingFacePipeline(pipeline=pipe) | |
# Configura embeddings | |
print("Configurando embeddings...") | |
self.embeddings = HuggingFaceEmbeddings( | |
model_name=EMBEDDING_MODEL, | |
model_kwargs={'device': 'cpu'} | |
) | |
# Carrega a base de conhecimento permanente | |
self.base_db = self.load_base_knowledge() | |
def load_base_knowledge(self) -> Optional[FAISS]: | |
"""Carrega a base de conhecimento permanente da pasta de documentos""" | |
try: | |
if not os.path.exists(DOCS_DIR): | |
print(f"Pasta {DOCS_DIR} não encontrada. Criando...") | |
os.makedirs(DOCS_DIR) | |
return None | |
# Carrega todos os PDFs da pasta | |
loader = DirectoryLoader( | |
DOCS_DIR, | |
glob="**/*.pdf", | |
loader_cls=PyPDFLoader | |
) | |
documents = loader.load() | |
if not documents: | |
print("Nenhum documento encontrado na pasta base.") | |
return None | |
# Divide o texto em chunks | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=500, # Chunks menores para o T5 | |
chunk_overlap=100, | |
length_function=len, | |
separators=["\n\n", "\n", ".", " ", ""] | |
) | |
texts = text_splitter.split_documents(documents) | |
# Cria base de conhecimento | |
print(f"Criando base de conhecimento com {len(texts)} chunks...") | |
db = FAISS.from_documents(texts, self.embeddings) | |
return db | |
except Exception as e: | |
print(f"Erro ao carregar base de conhecimento: {str(e)}") | |
return None | |
def process_pdf(self, file_content: bytes) -> Optional[FAISS]: | |
"""Processa o PDF do usuário""" | |
try: | |
# Cria arquivo temporário | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: | |
tmp_file.write(file_content) | |
tmp_path = tmp_file.name | |
# Carrega e processa o PDF | |
loader = PyPDFLoader(tmp_path) | |
documents = loader.load() | |
# Remove arquivo temporário | |
os.unlink(tmp_path) | |
if not documents: | |
return None | |
# Divide o texto em chunks | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=500, | |
chunk_overlap=100, | |
length_function=len, | |
separators=["\n\n", "\n", ".", " ", ""] | |
) | |
texts = text_splitter.split_documents(documents) | |
# Cria base de conhecimento | |
db = FAISS.from_documents(texts, self.embeddings) | |
# Se existir uma base permanente, mescla com ela | |
if self.base_db is not None: | |
db.merge_from(self.base_db) | |
return db | |
except Exception as e: | |
print(f"Erro ao processar PDF: {str(e)}") | |
return None | |
def generate_response(self, file_obj, query: str) -> str: | |
"""Gera resposta para a consulta""" | |
if not query.strip(): | |
return "Por favor, insira uma pergunta." | |
try: | |
# Se tiver arquivo do usuário, processa e mescla com a base | |
if file_obj is not None: | |
db = self.process_pdf(file_obj) | |
if db is None: | |
return "Não foi possível processar o PDF." | |
# Se não tiver arquivo do usuário, usa só a base permanente | |
elif self.base_db is not None: | |
db = self.base_db | |
else: | |
return "Nenhuma base de conhecimento disponível. Por favor, faça upload de um PDF ou adicione documentos à pasta base." | |
# Configura o chain RAG | |
qa_chain = RetrievalQA.from_chain_type( | |
llm=self.llm, | |
chain_type="stuff", | |
retriever=db.as_retriever( | |
search_kwargs={ | |
"k": 4, # Aumentamos o k para ter mais contexto | |
"fetch_k": 6 | |
} | |
), | |
return_source_documents=True | |
) | |
# Adiciona contexto sobre a fonte da resposta | |
prompt = f"""Baseado nos documentos fornecidos, responda em português à seguinte pergunta: | |
{query} | |
Se a resposta vier da base de documentos permanente, indique isso no início. | |
Se a resposta vier do PDF enviado, indique isso no início. | |
Se não encontrar informações suficientes, indique isso claramente.""" | |
# Gera resposta | |
result = qa_chain({"query": prompt}) | |
return result["result"] | |
except Exception as e: | |
return f"Erro ao gerar resposta: {str(e)}" | |
# Interface Gradio | |
def create_demo(): | |
rag = RAGSystem() | |
with gr.Blocks() as demo: | |
gr.Markdown("# 📚 Sistema RAG de Consulta a Documentos") | |
gr.Markdown(f""" | |
### Como usar: | |
1. Os documentos da pasta `{DOCS_DIR}` são usados como base de conhecimento | |
2. Você pode fazer upload de PDFs adicionais para consulta | |
3. Digite sua pergunta e aguarde a resposta | |
4. As respostas são baseadas no conteúdo dos documentos | |
""") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
file_input = gr.File( | |
label="Upload do PDF (opcional)", | |
type="binary", | |
file_types=[".pdf"] | |
) | |
query_input = gr.Textbox( | |
label="Sua Pergunta", | |
placeholder="Digite sua pergunta sobre o documento...", | |
lines=3 | |
) | |
submit_btn = gr.Button("🔍 Buscar Resposta", variant="primary") | |
with gr.Column(scale=1): | |
output = gr.Textbox( | |
label="Resposta", | |
lines=10 | |
) | |
submit_btn.click( | |
fn=rag.generate_response, | |
inputs=[file_input, query_input], | |
outputs=output | |
) | |
gr.Examples( | |
examples=[ | |
[None, "Qual é o tema principal dos documentos?"], | |
[None, "Pode fazer um resumo dos pontos principais?"], | |
[None, "Quais são as principais conclusões?"] | |
], | |
inputs=[file_input, query_input] | |
) | |
return demo | |
if __name__ == "__main__": | |
demo = create_demo() | |
demo.launch() |