PDFQAApp / app.py
DHEIVER's picture
Update app.py
f84f0d0 verified
import gradio as gr
from transformers import AutoModelForQuestionAnswering, AutoTokenizer, pipeline
import PyPDF2
import torch
import re
from typing import List, Dict, Tuple, Optional, Union
import nltk
from nltk.tokenize import sent_tokenize
import fitz # PyMuPDF
import logging
from tqdm import tqdm
import os
import tempfile
from pathlib import Path
import shutil
import gc
import threading
from concurrent.futures import ThreadPoolExecutor
import warnings
from typing_extensions import TypedDict
# Configurar logging avançado
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('qa_system.log')
]
)
logger = logging.getLogger(__name__)
# Suprimir warnings desnecessários
warnings.filterwarnings('ignore', category=UserWarning, module='gradio')
class QAResult(TypedDict):
answer: str
score: float
confidence: str
context: str
page_number: Optional[int]
class ResourceManager:
"""Gerencia recursos do sistema e limpeza de memória"""
@staticmethod
def clear_gpu_memory():
"""Limpa memória GPU se disponível"""
if torch.cuda.is_available():
torch.cuda.empty_cache()
gc.collect()
@staticmethod
def create_temp_directory() -> Path:
"""Cria diretório temporário para arquivos"""
temp_dir = Path(tempfile.mkdtemp())
return temp_dir
@staticmethod
def cleanup_temp_directory(temp_dir: Path):
"""Remove diretório temporário e seus arquivos"""
try:
shutil.rmtree(temp_dir)
except Exception as e:
logger.warning(f"Erro ao limpar diretório temporário: {e}")
class NLTKManager:
"""Gerencia recursos NLTK"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._initialized = True
self.nltk_data_dir = Path.home() / "nltk_data"
self.required_resources = ['punkt']
self.download_resources()
def download_resources(self):
"""Download e verifica recursos NLTK necessários"""
try:
self.nltk_data_dir.mkdir(parents=True, exist_ok=True)
nltk.data.path.append(str(self.nltk_data_dir))
for resource in self.required_resources:
try:
nltk.data.find(f'tokenizers/{resource}')
logger.debug(f"Recurso NLTK '{resource}' já instalado")
except LookupError:
logger.info(f"Baixando recurso NLTK '{resource}'...")
nltk.download(resource, download_dir=str(self.nltk_data_dir), quiet=True)
except Exception as e:
logger.error(f"Erro ao configurar NLTK: {e}")
raise
class PDFExtractor:
"""Classe especializada em extração de texto de PDFs"""
def __init__(self):
self.temp_dir = ResourceManager.create_temp_directory()
def __del__(self):
ResourceManager.cleanup_temp_directory(self.temp_dir)
def extract_with_pymupdf(self, pdf_path: str) -> Tuple[str, Dict[int, str]]:
"""Extrai texto usando PyMuPDF"""
doc = None
try:
doc = fitz.open(pdf_path)
full_text = ""
page_text = {}
for page_num in range(len(doc)):
page = doc[page_num]
text = page.get_text("text")
if text.strip(): # Ignorar páginas vazias
page_text[page_num] = text
full_text += text + "\n"
return full_text, page_text
except Exception as e:
logger.error(f"Erro PyMuPDF: {e}")
raise
finally:
if doc:
doc.close()
def extract_with_pypdf2(self, pdf_path: str) -> Tuple[str, Dict[int, str]]:
"""Extrai texto usando PyPDF2 como fallback"""
try:
with open(pdf_path, "rb") as file:
reader = PyPDF2.PdfReader(file)
full_text = ""
page_text = {}
for i, page in enumerate(reader.pages):
text = page.extract_text()
if text.strip(): # Ignorar páginas vazias
page_text[i] = text
full_text += text + "\n"
return full_text, page_text
except Exception as e:
logger.error(f"Erro PyPDF2: {e}")
raise
def extract_text(self, pdf_file: Union[str, Path]) -> Tuple[str, Dict[int, str]]:
"""Extrai texto do PDF com fallback"""
pdf_path = str(pdf_file)
try:
return self.extract_with_pymupdf(pdf_path)
except Exception as e1:
logger.warning(f"Falha PyMuPDF, tentando PyPDF2: {e1}")
try:
return self.extract_with_pypdf2(pdf_path)
except Exception as e2:
logger.error(f"Falha total na extração: {e2}")
raise
class TextProcessor:
"""Processa e prepara texto para análise"""
def __init__(self):
self.nltk_manager = NLTKManager()
self.max_chunk_size = 512
self.overlap = 50
def preprocess_text(self, text: str) -> str:
"""Pré-processa texto removendo formatação problemática"""
try:
# Normalizar quebras de linha
text = re.sub(r'\r\n', '\n', text)
# Remover quebras de linha extras
text = re.sub(r'\n+', ' ', text)
# Remover espaços múltiplos
text = re.sub(r'\s+', ' ', text)
# Remover caracteres especiais mantendo pontuação importante
text = re.sub(r'[^\w\s.,!?;:()[\]{}\-–—""''´`^~#$%&*+=@/\\áéíóúâêîôûãõçàèìòùäëïöüÁÉÍÓÚÂÊÎÔÛÃÕÇÀÈÌÒÙÄËÏÖÜ]',
' ', text)
return text.strip()
except Exception as e:
logger.warning(f"Erro no pré-processamento: {e}")
return text
def split_into_chunks(self, text: str) -> List[str]:
"""Divide texto em chunks com sobreposição"""
try:
sentences = sent_tokenize(text)
chunks = []
current_chunk = []
current_length = 0
for sentence in sentences:
sentence_length = len(sentence)
if current_length + sentence_length <= self.max_chunk_size:
current_chunk.append(sentence)
current_length += sentence_length
else:
if current_chunk:
chunks.append(' '.join(current_chunk))
current_chunk = [sentence]
current_length = sentence_length
if current_chunk:
chunks.append(' '.join(current_chunk))
# Adicionar sobreposição
overlapped_chunks = []
for i in range(len(chunks)):
if i > 0:
prefix = chunks[i-1][-self.overlap:]
chunks[i] = prefix + ' ' + chunks[i]
overlapped_chunks.append(chunks[i])
return overlapped_chunks
except Exception as e:
logger.error(f"Erro ao dividir texto: {e}")
return [text]
class ModelManager:
"""Gerencia o modelo de IA e processamento de perguntas"""
def __init__(self):
self.model_name = "deepset/roberta-base-squad2"
self.device = 0 if torch.cuda.is_available() else -1
self.max_tokens_answer = 50 # Máximo de tokens na resposta
self.max_tokens_context = 300 # Máximo de tokens no contexto exibido
self.load_model()
def load_model(self):
"""Carrega o modelo com tratamento de erros"""
try:
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
self.model = AutoModelForQuestionAnswering.from_pretrained(self.model_name)
self.nlp = pipeline('question-answering',
model=self.model,
tokenizer=self.tokenizer,
device=self.device)
logger.info(f"Modelo {self.model_name} carregado com sucesso")
except Exception as e:
logger.error(f"Erro ao carregar modelo: {e}")
raise
def get_answer(self, question: str, context: str) -> Dict:
"""Processa uma única pergunta/contexto"""
try:
return self.nlp(
question=question,
context=context,
max_answer_len=self.max_tokens_answer # Limitar resposta
)
except Exception as e:
logger.error(f"Erro ao processar resposta: {e}")
return {
'answer': '',
'score': 0,
'start': 0,
'end': 0
}
def get_best_answer(self, question: str, chunks: List[str]) -> QAResult:
"""Obtém a melhor resposta de múltiplos chunks"""
try:
if not chunks:
return {
'answer': "Não foi possível processar o documento.",
'score': 0,
'confidence': "0%",
'context': "",
'page_number': None
}
answers = []
with ThreadPoolExecutor() as executor:
futures = [executor.submit(self.get_answer, question, chunk) for chunk in chunks]
answers = [future.result() for future in futures]
# Filtrar respostas vazias
answers = [ans for ans in answers if ans['answer'].strip()]
if not answers:
return {
'answer': "Não foi possível encontrar uma resposta.",
'score': 0,
'confidence': "0%",
'context': "",
'page_number': None
}
best_answer = max(answers, key=lambda x: x['score'])
# Limitar contexto para o máximo de tokens configurado
limited_context = " ".join(
self.tokenizer.convert_ids_to_tokens(
self.tokenizer.encode(best_answer.get('context', ""), add_special_tokens=False)[:self.max_tokens_context]
)
)
return {
'answer': best_answer['answer'],
'score': best_answer['score'],
'confidence': f"{best_answer['score']*100:.2f}%",
'context': limited_context,
'page_number': None # Adapte conforme necessário
}
except Exception as e:
logger.error(f"Erro ao obter melhor resposta: {e}")
return {
'answer': "Erro ao processar o documento.",
'score': 0,
'confidence': "0%",
'context': "",
'page_number': None
}
class PDFQuestionAnswering:
"""Classe principal que coordena o sistema de QA"""
def __init__(self):
self.pdf_extractor = PDFExtractor()
self.text_processor = TextProcessor()
self.model_manager = ModelManager()
def answer_question(self, pdf_file: gr.File, question: str) -> QAResult:
"""Processa PDF e responde pergunta"""
try:
if not pdf_file or not question:
return {
'answer': "Por favor, forneça um arquivo PDF e uma pergunta.",
'score': 0,
'confidence': "0%",
'context': "",
'page_number': None
}
# Extrair texto do PDF
full_text, page_text = self.pdf_extractor.extract_text(pdf_file.name)
if not full_text.strip():
return {
'answer': "Não foi possível extrair texto do PDF.",
'score': 0,
'confidence': "0%",
'context': "",
'page_number': None
}
# Pré-processar texto
processed_text = self.text_processor.preprocess_text(full_text)
# Dividir em chunks
chunks = self.text_processor.split_into_chunks(processed_text)
# Obter melhor resposta
result = self.model_manager.get_best_answer(question, chunks)
# Limpar GPU se necessário
ResourceManager.clear_gpu_memory()
return result
except Exception as e:
logger.error(f"Erro ao processar pergunta: {e}")
return {
'answer': "Ocorreu um erro ao processar sua pergunta.",
'score': 0,
'confidence': "0%",
'context': "",
'page_number': None
}
def create_interface() -> gr.Blocks:
"""Cria interface Gradio"""
qa_system = PDFQuestionAnswering()
with gr.Blocks(title="Sistema Avançado de QA sobre PDFs") as iface:
# Interface HTML/Markdown aqui...
with gr.Row():
with gr.Column():
pdf_input = gr.File(
label="Carregar PDF",
file_types=[".pdf"],
type="filepath"
)
question_input = gr.Textbox(
label="Sua Pergunta",
placeholder="Digite sua pergunta aqui...",
lines=2
)
submit_btn = gr.Button("Obter Resposta", variant="primary")
with gr.Column():
answer_output = gr.Textbox(
label="Resposta",
lines=3
)
confidence_output = gr.Textbox(
label="Confiança da Resposta"
)
context_output = gr.Textbox(
label="Contexto da Resposta",
lines=5
)
page_output = gr.Textbox(
label="Página da Resposta",
visible=False # TODO: Implementar
)
def process_question(pdf, question):
result = qa_system.answer_question(pdf, question)
return (
result['answer'],
result['confidence'],
result['context'],
f"Página {result['page_number']}" if result['page_number'] else "Página não disponível"
)
submit_btn.click(
fn=process_question,
inputs=[pdf_input, question_input],
outputs=[answer_output, confidence_output, context_output, page_output]
)
gr.Markdown("""
### Dicas para Perguntas sobre Metrologia e Normas
#### Documentos Recomendados:
- Normas técnicas (ABNT, ISO, etc.)
- Procedimentos de calibração
- Manuais de qualidade
- Relatórios técnicos
- Documentos do sistema de gestão
#### Tipos de Perguntas:
**Metrologia e Calibração:**
- "Quais são os requisitos de calibração para [instrumento]?"
- "Qual é a incerteza de medição especificada para [processo]?"
- "Como é feita a rastreabilidade metrológica?"
- "Quais são os intervalos de calibração recomendados?"
- "Que condições ambientais são especificadas?"
**Normas e Requisitos:**
- "Quais são os requisitos para [processo específico]?"
- "Como a norma define [termo técnico]?"
- "Quais são os critérios de aceitação?"
- "Que documentação é exigida?"
- "Quais são as referências normativas?"
**Sistema de Gestão:**
- "Como é feito o controle de qualidade?"
- "Quais são os requisitos de competência?"
- "Como são tratadas as não conformidades?"
- "Que registros são obrigatórios?"
- "Como é feita a validação?"
#### Boas Práticas:
1. Use termos técnicos precisos
2. Referencie seções específicas
3. Pergunte sobre um tópico por vez
4. Verifique o nível de confiança
5. Confirme informações críticas
#### Interpretação:
- Confiança > 90%: Alta probabilidade de resposta correta
- Confiança 70-90%: Resposta provável, verificar contexto
- Confiança < 70%: Considerar reformular a pergunta
""")
return iface
def main():
"""Função principal"""
try:
# Configurar logging detalhado
logging.basicConfig(
level=logging.DEBUG, # Aumentado para DEBUG
format='%(asctime)s - %(name)s - %(levelname)s - %(pathname)s:%(lineno)d - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('qa_system_detailed.log')
]
)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('qa_system.log')
]
)
# Criar e iniciar interface
demo = create_interface()
# Configurações de lançamento
launch_kwargs = {
'server_name': "0.0.0.0", # Permite acesso externo
'server_port': 7860, # Porta padrão
'share': False, # Não criar túnel público
'debug': True, # Modo debug ativado
'show_error': True, # Mostrar erros detalhados
'enable_queue': True # Habilitar fila de requisições
}
# Tentar lançar com configurações completas
try:
demo.launch(**launch_kwargs)
except TypeError as e:
# Se algum parâmetro não for suportado, remover e tentar novamente
logger.warning(f"Erro no lançamento completo: {e}")
minimal_kwargs = {
'server_name': "0.0.0.0",
'server_port': 7860,
'share': False
}
demo.launch(**minimal_kwargs)
except Exception as e:
logger.error(f"Erro fatal: {e}")
raise
if __name__ == "__main__":
main()