|
import gradio as gr |
|
from transformers import AutoModelForQuestionAnswering, AutoTokenizer, pipeline |
|
import PyPDF2 |
|
import torch |
|
import re |
|
from typing import List, Dict, Tuple, Optional, Union |
|
import nltk |
|
from nltk.tokenize import sent_tokenize |
|
import fitz |
|
import logging |
|
from tqdm import tqdm |
|
import os |
|
import tempfile |
|
from pathlib import Path |
|
import shutil |
|
import gc |
|
import threading |
|
from concurrent.futures import ThreadPoolExecutor |
|
import warnings |
|
from typing_extensions import TypedDict |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
handlers=[ |
|
logging.StreamHandler(), |
|
logging.FileHandler('qa_system.log') |
|
] |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
warnings.filterwarnings('ignore', category=UserWarning, module='gradio') |
|
|
|
class QAResult(TypedDict): |
|
answer: str |
|
score: float |
|
confidence: str |
|
context: str |
|
page_number: Optional[int] |
|
|
|
class ResourceManager: |
|
"""Gerencia recursos do sistema e limpeza de memória""" |
|
|
|
@staticmethod |
|
def clear_gpu_memory(): |
|
"""Limpa memória GPU se disponível""" |
|
if torch.cuda.is_available(): |
|
torch.cuda.empty_cache() |
|
gc.collect() |
|
|
|
@staticmethod |
|
def create_temp_directory() -> Path: |
|
"""Cria diretório temporário para arquivos""" |
|
temp_dir = Path(tempfile.mkdtemp()) |
|
return temp_dir |
|
|
|
@staticmethod |
|
def cleanup_temp_directory(temp_dir: Path): |
|
"""Remove diretório temporário e seus arquivos""" |
|
try: |
|
shutil.rmtree(temp_dir) |
|
except Exception as e: |
|
logger.warning(f"Erro ao limpar diretório temporário: {e}") |
|
|
|
class NLTKManager: |
|
"""Gerencia recursos NLTK""" |
|
|
|
_instance = None |
|
_lock = threading.Lock() |
|
|
|
def __new__(cls): |
|
with cls._lock: |
|
if cls._instance is None: |
|
cls._instance = super().__new__(cls) |
|
cls._instance._initialized = False |
|
return cls._instance |
|
|
|
def __init__(self): |
|
if self._initialized: |
|
return |
|
|
|
self._initialized = True |
|
self.nltk_data_dir = Path.home() / "nltk_data" |
|
self.required_resources = ['punkt'] |
|
self.download_resources() |
|
|
|
def download_resources(self): |
|
"""Download e verifica recursos NLTK necessários""" |
|
try: |
|
self.nltk_data_dir.mkdir(parents=True, exist_ok=True) |
|
nltk.data.path.append(str(self.nltk_data_dir)) |
|
|
|
for resource in self.required_resources: |
|
try: |
|
nltk.data.find(f'tokenizers/{resource}') |
|
logger.debug(f"Recurso NLTK '{resource}' já instalado") |
|
except LookupError: |
|
logger.info(f"Baixando recurso NLTK '{resource}'...") |
|
nltk.download(resource, download_dir=str(self.nltk_data_dir), quiet=True) |
|
except Exception as e: |
|
logger.error(f"Erro ao configurar NLTK: {e}") |
|
raise |
|
|
|
class PDFExtractor: |
|
"""Classe especializada em extração de texto de PDFs""" |
|
|
|
def __init__(self): |
|
self.temp_dir = ResourceManager.create_temp_directory() |
|
|
|
def __del__(self): |
|
ResourceManager.cleanup_temp_directory(self.temp_dir) |
|
|
|
def extract_with_pymupdf(self, pdf_path: str) -> Tuple[str, Dict[int, str]]: |
|
"""Extrai texto usando PyMuPDF""" |
|
doc = None |
|
try: |
|
doc = fitz.open(pdf_path) |
|
full_text = "" |
|
page_text = {} |
|
|
|
for page_num in range(len(doc)): |
|
page = doc[page_num] |
|
text = page.get_text("text") |
|
if text.strip(): |
|
page_text[page_num] = text |
|
full_text += text + "\n" |
|
|
|
return full_text, page_text |
|
except Exception as e: |
|
logger.error(f"Erro PyMuPDF: {e}") |
|
raise |
|
finally: |
|
if doc: |
|
doc.close() |
|
|
|
def extract_with_pypdf2(self, pdf_path: str) -> Tuple[str, Dict[int, str]]: |
|
"""Extrai texto usando PyPDF2 como fallback""" |
|
try: |
|
with open(pdf_path, "rb") as file: |
|
reader = PyPDF2.PdfReader(file) |
|
full_text = "" |
|
page_text = {} |
|
|
|
for i, page in enumerate(reader.pages): |
|
text = page.extract_text() |
|
if text.strip(): |
|
page_text[i] = text |
|
full_text += text + "\n" |
|
|
|
return full_text, page_text |
|
except Exception as e: |
|
logger.error(f"Erro PyPDF2: {e}") |
|
raise |
|
|
|
def extract_text(self, pdf_file: Union[str, Path]) -> Tuple[str, Dict[int, str]]: |
|
"""Extrai texto do PDF com fallback""" |
|
pdf_path = str(pdf_file) |
|
|
|
try: |
|
return self.extract_with_pymupdf(pdf_path) |
|
except Exception as e1: |
|
logger.warning(f"Falha PyMuPDF, tentando PyPDF2: {e1}") |
|
try: |
|
return self.extract_with_pypdf2(pdf_path) |
|
except Exception as e2: |
|
logger.error(f"Falha total na extração: {e2}") |
|
raise |
|
|
|
class TextProcessor: |
|
"""Processa e prepara texto para análise""" |
|
|
|
def __init__(self): |
|
self.nltk_manager = NLTKManager() |
|
self.max_chunk_size = 512 |
|
self.overlap = 50 |
|
|
|
def preprocess_text(self, text: str) -> str: |
|
"""Pré-processa texto removendo formatação problemática""" |
|
try: |
|
|
|
text = re.sub(r'\r\n', '\n', text) |
|
|
|
text = re.sub(r'\n+', ' ', text) |
|
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
|
text = re.sub(r'[^\w\s.,!?;:()[\]{}\-–—""''´`^~#$%&*+=@/\\áéíóúâêîôûãõçàèìòùäëïöüÁÉÍÓÚÂÊÎÔÛÃÕÇÀÈÌÒÙÄËÏÖÜ]', |
|
' ', text) |
|
return text.strip() |
|
except Exception as e: |
|
logger.warning(f"Erro no pré-processamento: {e}") |
|
return text |
|
|
|
def split_into_chunks(self, text: str) -> List[str]: |
|
"""Divide texto em chunks com sobreposição""" |
|
try: |
|
sentences = sent_tokenize(text) |
|
chunks = [] |
|
current_chunk = [] |
|
current_length = 0 |
|
|
|
for sentence in sentences: |
|
sentence_length = len(sentence) |
|
|
|
if current_length + sentence_length <= self.max_chunk_size: |
|
current_chunk.append(sentence) |
|
current_length += sentence_length |
|
else: |
|
if current_chunk: |
|
chunks.append(' '.join(current_chunk)) |
|
current_chunk = [sentence] |
|
current_length = sentence_length |
|
|
|
if current_chunk: |
|
chunks.append(' '.join(current_chunk)) |
|
|
|
|
|
overlapped_chunks = [] |
|
for i in range(len(chunks)): |
|
if i > 0: |
|
prefix = chunks[i-1][-self.overlap:] |
|
chunks[i] = prefix + ' ' + chunks[i] |
|
overlapped_chunks.append(chunks[i]) |
|
|
|
return overlapped_chunks |
|
except Exception as e: |
|
logger.error(f"Erro ao dividir texto: {e}") |
|
return [text] |
|
|
|
class ModelManager: |
|
"""Gerencia o modelo de IA e processamento de perguntas""" |
|
|
|
def __init__(self): |
|
self.model_name = "deepset/roberta-base-squad2" |
|
self.device = 0 if torch.cuda.is_available() else -1 |
|
self.max_tokens_answer = 50 |
|
self.max_tokens_context = 300 |
|
self.load_model() |
|
|
|
def load_model(self): |
|
"""Carrega o modelo com tratamento de erros""" |
|
try: |
|
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) |
|
self.model = AutoModelForQuestionAnswering.from_pretrained(self.model_name) |
|
self.nlp = pipeline('question-answering', |
|
model=self.model, |
|
tokenizer=self.tokenizer, |
|
device=self.device) |
|
logger.info(f"Modelo {self.model_name} carregado com sucesso") |
|
except Exception as e: |
|
logger.error(f"Erro ao carregar modelo: {e}") |
|
raise |
|
|
|
def get_answer(self, question: str, context: str) -> Dict: |
|
"""Processa uma única pergunta/contexto""" |
|
try: |
|
return self.nlp( |
|
question=question, |
|
context=context, |
|
max_answer_len=self.max_tokens_answer |
|
) |
|
except Exception as e: |
|
logger.error(f"Erro ao processar resposta: {e}") |
|
return { |
|
'answer': '', |
|
'score': 0, |
|
'start': 0, |
|
'end': 0 |
|
} |
|
|
|
def get_best_answer(self, question: str, chunks: List[str]) -> QAResult: |
|
"""Obtém a melhor resposta de múltiplos chunks""" |
|
try: |
|
if not chunks: |
|
return { |
|
'answer': "Não foi possível processar o documento.", |
|
'score': 0, |
|
'confidence': "0%", |
|
'context': "", |
|
'page_number': None |
|
} |
|
|
|
answers = [] |
|
with ThreadPoolExecutor() as executor: |
|
futures = [executor.submit(self.get_answer, question, chunk) for chunk in chunks] |
|
answers = [future.result() for future in futures] |
|
|
|
|
|
answers = [ans for ans in answers if ans['answer'].strip()] |
|
|
|
if not answers: |
|
return { |
|
'answer': "Não foi possível encontrar uma resposta.", |
|
'score': 0, |
|
'confidence': "0%", |
|
'context': "", |
|
'page_number': None |
|
} |
|
|
|
best_answer = max(answers, key=lambda x: x['score']) |
|
|
|
|
|
limited_context = " ".join( |
|
self.tokenizer.convert_ids_to_tokens( |
|
self.tokenizer.encode(best_answer.get('context', ""), add_special_tokens=False)[:self.max_tokens_context] |
|
) |
|
) |
|
|
|
return { |
|
'answer': best_answer['answer'], |
|
'score': best_answer['score'], |
|
'confidence': f"{best_answer['score']*100:.2f}%", |
|
'context': limited_context, |
|
'page_number': None |
|
} |
|
except Exception as e: |
|
logger.error(f"Erro ao obter melhor resposta: {e}") |
|
return { |
|
'answer': "Erro ao processar o documento.", |
|
'score': 0, |
|
'confidence': "0%", |
|
'context': "", |
|
'page_number': None |
|
} |
|
|
|
class PDFQuestionAnswering: |
|
"""Classe principal que coordena o sistema de QA""" |
|
|
|
def __init__(self): |
|
self.pdf_extractor = PDFExtractor() |
|
self.text_processor = TextProcessor() |
|
self.model_manager = ModelManager() |
|
|
|
def answer_question(self, pdf_file: gr.File, question: str) -> QAResult: |
|
"""Processa PDF e responde pergunta""" |
|
try: |
|
if not pdf_file or not question: |
|
return { |
|
'answer': "Por favor, forneça um arquivo PDF e uma pergunta.", |
|
'score': 0, |
|
'confidence': "0%", |
|
'context': "", |
|
'page_number': None |
|
} |
|
|
|
|
|
full_text, page_text = self.pdf_extractor.extract_text(pdf_file.name) |
|
|
|
if not full_text.strip(): |
|
return { |
|
'answer': "Não foi possível extrair texto do PDF.", |
|
'score': 0, |
|
'confidence': "0%", |
|
'context': "", |
|
'page_number': None |
|
} |
|
|
|
|
|
processed_text = self.text_processor.preprocess_text(full_text) |
|
|
|
|
|
chunks = self.text_processor.split_into_chunks(processed_text) |
|
|
|
|
|
result = self.model_manager.get_best_answer(question, chunks) |
|
|
|
|
|
ResourceManager.clear_gpu_memory() |
|
|
|
return result |
|
|
|
except Exception as e: |
|
logger.error(f"Erro ao processar pergunta: {e}") |
|
return { |
|
'answer': "Ocorreu um erro ao processar sua pergunta.", |
|
'score': 0, |
|
'confidence': "0%", |
|
'context': "", |
|
'page_number': None |
|
} |
|
|
|
def create_interface() -> gr.Blocks: |
|
"""Cria interface Gradio""" |
|
qa_system = PDFQuestionAnswering() |
|
|
|
with gr.Blocks(title="Sistema Avançado de QA sobre PDFs") as iface: |
|
|
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
pdf_input = gr.File( |
|
label="Carregar PDF", |
|
file_types=[".pdf"], |
|
type="filepath" |
|
) |
|
question_input = gr.Textbox( |
|
label="Sua Pergunta", |
|
placeholder="Digite sua pergunta aqui...", |
|
lines=2 |
|
) |
|
submit_btn = gr.Button("Obter Resposta", variant="primary") |
|
|
|
with gr.Column(): |
|
answer_output = gr.Textbox( |
|
label="Resposta", |
|
lines=3 |
|
) |
|
confidence_output = gr.Textbox( |
|
label="Confiança da Resposta" |
|
) |
|
context_output = gr.Textbox( |
|
label="Contexto da Resposta", |
|
lines=5 |
|
) |
|
page_output = gr.Textbox( |
|
label="Página da Resposta", |
|
visible=False |
|
) |
|
|
|
def process_question(pdf, question): |
|
result = qa_system.answer_question(pdf, question) |
|
return ( |
|
result['answer'], |
|
result['confidence'], |
|
result['context'], |
|
f"Página {result['page_number']}" if result['page_number'] else "Página não disponível" |
|
) |
|
|
|
submit_btn.click( |
|
fn=process_question, |
|
inputs=[pdf_input, question_input], |
|
outputs=[answer_output, confidence_output, context_output, page_output] |
|
) |
|
|
|
gr.Markdown(""" |
|
### Dicas para Perguntas sobre Metrologia e Normas |
|
|
|
#### Documentos Recomendados: |
|
- Normas técnicas (ABNT, ISO, etc.) |
|
- Procedimentos de calibração |
|
- Manuais de qualidade |
|
- Relatórios técnicos |
|
- Documentos do sistema de gestão |
|
|
|
#### Tipos de Perguntas: |
|
|
|
**Metrologia e Calibração:** |
|
- "Quais são os requisitos de calibração para [instrumento]?" |
|
- "Qual é a incerteza de medição especificada para [processo]?" |
|
- "Como é feita a rastreabilidade metrológica?" |
|
- "Quais são os intervalos de calibração recomendados?" |
|
- "Que condições ambientais são especificadas?" |
|
|
|
**Normas e Requisitos:** |
|
- "Quais são os requisitos para [processo específico]?" |
|
- "Como a norma define [termo técnico]?" |
|
- "Quais são os critérios de aceitação?" |
|
- "Que documentação é exigida?" |
|
- "Quais são as referências normativas?" |
|
|
|
**Sistema de Gestão:** |
|
- "Como é feito o controle de qualidade?" |
|
- "Quais são os requisitos de competência?" |
|
- "Como são tratadas as não conformidades?" |
|
- "Que registros são obrigatórios?" |
|
- "Como é feita a validação?" |
|
|
|
#### Boas Práticas: |
|
1. Use termos técnicos precisos |
|
2. Referencie seções específicas |
|
3. Pergunte sobre um tópico por vez |
|
4. Verifique o nível de confiança |
|
5. Confirme informações críticas |
|
|
|
#### Interpretação: |
|
- Confiança > 90%: Alta probabilidade de resposta correta |
|
- Confiança 70-90%: Resposta provável, verificar contexto |
|
- Confiança < 70%: Considerar reformular a pergunta |
|
""") |
|
|
|
return iface |
|
|
|
def main(): |
|
"""Função principal""" |
|
try: |
|
|
|
logging.basicConfig( |
|
level=logging.DEBUG, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(pathname)s:%(lineno)d - %(message)s', |
|
handlers=[ |
|
logging.StreamHandler(), |
|
logging.FileHandler('qa_system_detailed.log') |
|
] |
|
) |
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
handlers=[ |
|
logging.StreamHandler(), |
|
logging.FileHandler('qa_system.log') |
|
] |
|
) |
|
|
|
|
|
demo = create_interface() |
|
|
|
|
|
launch_kwargs = { |
|
'server_name': "0.0.0.0", |
|
'server_port': 7860, |
|
'share': False, |
|
'debug': True, |
|
'show_error': True, |
|
'enable_queue': True |
|
} |
|
|
|
|
|
try: |
|
demo.launch(**launch_kwargs) |
|
except TypeError as e: |
|
|
|
logger.warning(f"Erro no lançamento completo: {e}") |
|
minimal_kwargs = { |
|
'server_name': "0.0.0.0", |
|
'server_port': 7860, |
|
'share': False |
|
} |
|
demo.launch(**minimal_kwargs) |
|
|
|
except Exception as e: |
|
logger.error(f"Erro fatal: {e}") |
|
raise |
|
|
|
if __name__ == "__main__": |
|
main() |