|
import gradio as gr |
|
from transformers import AutoModelForQuestionAnswering, AutoTokenizer, pipeline |
|
import PyPDF2 |
|
import torch |
|
import re |
|
from typing import List, Dict, Tuple |
|
import nltk |
|
from nltk.tokenize import sent_tokenize |
|
import fitz |
|
import logging |
|
from tqdm import tqdm |
|
import os |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
class NLTKDownloader: |
|
@staticmethod |
|
def download_nltk_resources(): |
|
""" |
|
Download recursos NLTK necessários e configura o diretório de dados |
|
""" |
|
try: |
|
|
|
nltk_data_dir = os.path.join(os.path.expanduser("~"), "nltk_data") |
|
if not os.path.exists(nltk_data_dir): |
|
os.makedirs(nltk_data_dir) |
|
|
|
nltk.data.path.append(nltk_data_dir) |
|
|
|
|
|
resources = ['punkt'] |
|
|
|
for resource in resources: |
|
try: |
|
nltk.data.find(f'tokenizers/{resource}') |
|
logger.info(f"Recurso NLTK '{resource}' já está instalado") |
|
except LookupError: |
|
logger.info(f"Baixando recurso NLTK '{resource}'...") |
|
nltk.download(resource, download_dir=nltk_data_dir, quiet=True) |
|
|
|
return True |
|
except Exception as e: |
|
logger.error(f"Erro ao baixar recursos NLTK: {e}") |
|
return False |
|
|
|
class PDFQuestionAnswering: |
|
def __init__(self): |
|
|
|
if not NLTKDownloader.download_nltk_resources(): |
|
logger.warning("Alguns recursos NLTK podem não estar disponíveis") |
|
|
|
|
|
self.model_name = "deepset/roberta-base-squad2" |
|
try: |
|
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) |
|
self.model = AutoModelForQuestionAnswering.from_pretrained(self.model_name) |
|
self.nlp = pipeline('question-answering', |
|
model=self.model, |
|
tokenizer=self.tokenizer, |
|
device=0 if torch.cuda.is_available() else -1) |
|
logger.info(f"Modelo {self.model_name} carregado com sucesso") |
|
except Exception as e: |
|
logger.error(f"Erro ao carregar o modelo: {e}") |
|
raise |
|
|
|
def split_text_simple(self, text: str, max_length: int = 512) -> List[str]: |
|
""" |
|
Método alternativo de divisão de texto caso o NLTK não esteja disponível |
|
""" |
|
words = text.split() |
|
chunks = [] |
|
current_chunk = [] |
|
current_length = 0 |
|
|
|
for word in words: |
|
if current_length + len(word) + 1 <= max_length: |
|
current_chunk.append(word) |
|
current_length += len(word) + 1 |
|
else: |
|
chunks.append(' '.join(current_chunk)) |
|
current_chunk = [word] |
|
current_length = len(word) + 1 |
|
|
|
if current_chunk: |
|
chunks.append(' '.join(current_chunk)) |
|
|
|
return chunks |
|
|
|
def split_into_chunks(self, text: str, max_length: int = 512) -> List[str]: |
|
""" |
|
Divide o texto em chunks menores, com fallback para método simples |
|
""" |
|
try: |
|
return [chunk for chunk in self.split_text_simple(text, max_length)] |
|
except Exception as e: |
|
logger.warning(f"Erro ao dividir texto com NLTK: {e}. Usando método simples.") |
|
return self.split_text_simple(text, max_length) |
|
|
|
def extract_text_from_pdf(self, pdf_file: str) -> Tuple[str, Dict[int, str]]: |
|
""" |
|
Extrai texto do PDF com fallback para PyPDF2 |
|
""" |
|
try: |
|
|
|
doc = fitz.open(pdf_file) |
|
full_text = "" |
|
page_text = {} |
|
|
|
for page_num in range(len(doc)): |
|
page = doc[page_num] |
|
text = page.get_text("text") |
|
page_text[page_num] = text |
|
full_text += text + "\n" |
|
|
|
doc.close() |
|
return full_text, page_text |
|
|
|
except Exception as e: |
|
logger.warning(f"Erro com PyMuPDF: {e}. Tentando PyPDF2...") |
|
try: |
|
|
|
with open(pdf_file, "rb") as file: |
|
reader = PyPDF2.PdfReader(file) |
|
full_text = "" |
|
page_text = {} |
|
|
|
for i, page in enumerate(reader.pages): |
|
text = page.extract_text() |
|
page_text[i] = text |
|
full_text += text + "\n" |
|
|
|
return full_text, page_text |
|
|
|
except Exception as e2: |
|
logger.error(f"Erro ao extrair texto do PDF: {e2}") |
|
raise |
|
|
|
def preprocess_text(self, text: str) -> str: |
|
""" |
|
Pré-processa o texto removendo caracteres especiais e formatação indesejada |
|
""" |
|
try: |
|
|
|
text = re.sub(r'\n+', ' ', text) |
|
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
|
text = re.sub(r'[^\w\s.,!?-áéíóúâêîôûãõçà]', '', text) |
|
return text.strip() |
|
except Exception as e: |
|
logger.warning(f"Erro no pré-processamento: {e}") |
|
return text |
|
|
|
def get_best_answer(self, question: str, chunks: List[str]) -> Dict: |
|
""" |
|
Obtém a melhor resposta considerando todos os chunks de texto |
|
""" |
|
try: |
|
if not chunks: |
|
return { |
|
'answer': "Não foi possível processar o texto do documento.", |
|
'score': 0, |
|
'context': "" |
|
} |
|
|
|
answers = [] |
|
for chunk in chunks: |
|
if not chunk.strip(): |
|
continue |
|
|
|
try: |
|
result = self.nlp(question=question, context=chunk) |
|
answers.append(result) |
|
except Exception as e: |
|
logger.warning(f"Erro ao processar chunk: {e}") |
|
continue |
|
|
|
if not answers: |
|
return { |
|
'answer': "Não foi possível encontrar uma resposta no documento.", |
|
'score': 0, |
|
'context': "" |
|
} |
|
|
|
|
|
best_answer = max(answers, key=lambda x: x['score']) |
|
|
|
return { |
|
'answer': best_answer['answer'], |
|
'score': best_answer['score'], |
|
'context': best_answer['context'] |
|
} |
|
except Exception as e: |
|
logger.error(f"Erro ao processar resposta: {e}") |
|
return { |
|
'answer': "Ocorreu um erro ao processar sua pergunta.", |
|
'score': 0, |
|
'context': "" |
|
} |
|
|
|
def answer_question(self, pdf_file: gr.File, question: str) -> Dict: |
|
""" |
|
Processa o PDF e responde à pergunta |
|
""" |
|
try: |
|
if not pdf_file or not question: |
|
return { |
|
'answer': "Por favor, forneça um arquivo PDF e uma pergunta.", |
|
'score': 0, |
|
'confidence': "0%", |
|
'context': "" |
|
} |
|
|
|
|
|
full_text, page_text = self.extract_text_from_pdf(pdf_file.name) |
|
|
|
if not full_text.strip(): |
|
return { |
|
'answer': "Não foi possível extrair texto do PDF fornecido.", |
|
'score': 0, |
|
'confidence': "0%", |
|
'context': "" |
|
} |
|
|
|
|
|
processed_text = self.preprocess_text(full_text) |
|
|
|
|
|
chunks = self.split_into_chunks(processed_text) |
|
|
|
|
|
result = self.get_best_answer(question, chunks) |
|
|
|
|
|
result['confidence'] = f"{result['score']*100:.2f}%" |
|
|
|
return result |
|
except Exception as e: |
|
logger.error(f"Erro ao processar pergunta: {e}") |
|
return { |
|
'answer': "Ocorreu um erro ao processar sua pergunta.", |
|
'score': 0, |
|
'confidence': "0%", |
|
'context': "" |
|
} |
|
|
|
def create_interface(): |
|
qa_system = PDFQuestionAnswering() |
|
|
|
|
|
with gr.Blocks(title="Sistema Avançado de QA sobre PDFs") as iface: |
|
gr.Markdown(""" |
|
# Sistema de Perguntas e Respostas sobre PDFs |
|
|
|
Este sistema utiliza um modelo de linguagem avançado para responder perguntas sobre documentos PDF. |
|
Carregue um PDF e faça suas perguntas! |
|
""") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
pdf_input = gr.File( |
|
label="Carregar PDF", |
|
file_types=[".pdf"] |
|
) |
|
question_input = gr.Textbox( |
|
label="Sua Pergunta", |
|
placeholder="Digite sua pergunta aqui..." |
|
) |
|
submit_btn = gr.Button("Obter Resposta", variant="primary") |
|
|
|
with gr.Column(): |
|
answer_output = gr.Textbox(label="Resposta") |
|
confidence_output = gr.Textbox(label="Confiança da Resposta") |
|
context_output = gr.Textbox( |
|
label="Contexto da Resposta", |
|
lines=5 |
|
) |
|
|
|
def process_question(pdf, question): |
|
result = qa_system.answer_question(pdf, question) |
|
return ( |
|
result['answer'], |
|
result['confidence'], |
|
result['context'] |
|
) |
|
|
|
submit_btn.click( |
|
fn=process_question, |
|
inputs=[pdf_input, question_input], |
|
outputs=[answer_output, confidence_output, context_output] |
|
) |
|
|
|
gr.Markdown(""" |
|
### Dicas de Uso |
|
- Faça perguntas específicas e diretas |
|
- O sistema funciona melhor com PDFs bem formatados |
|
- A confiança indica o quanto o sistema está seguro da resposta |
|
""") |
|
|
|
return iface |
|
|
|
if __name__ == "__main__": |
|
|
|
demo = create_interface() |
|
|
|
demo.launch(share=False, debug=True, ssr=False) |