import gradio as gr from transformers import AutoModelForQuestionAnswering, AutoTokenizer, pipeline import PyPDF2 import torch import re from typing import List, Dict, Tuple, Optional, Union import nltk from nltk.tokenize import sent_tokenize import fitz # PyMuPDF import logging from tqdm import tqdm import os import tempfile from pathlib import Path import shutil import gc import threading from concurrent.futures import ThreadPoolExecutor import warnings from typing_extensions import TypedDict # Configurar logging avançado logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('qa_system.log') ] ) logger = logging.getLogger(__name__) # Suprimir warnings desnecessários warnings.filterwarnings('ignore', category=UserWarning, module='gradio') class QAResult(TypedDict): answer: str score: float confidence: str context: str page_number: Optional[int] class ResourceManager: """Gerencia recursos do sistema e limpeza de memória""" @staticmethod def clear_gpu_memory(): """Limpa memória GPU se disponível""" if torch.cuda.is_available(): torch.cuda.empty_cache() gc.collect() @staticmethod def create_temp_directory() -> Path: """Cria diretório temporário para arquivos""" temp_dir = Path(tempfile.mkdtemp()) return temp_dir @staticmethod def cleanup_temp_directory(temp_dir: Path): """Remove diretório temporário e seus arquivos""" try: shutil.rmtree(temp_dir) except Exception as e: logger.warning(f"Erro ao limpar diretório temporário: {e}") class NLTKManager: """Gerencia recursos NLTK""" _instance = None _lock = threading.Lock() def __new__(cls): with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return self._initialized = True self.nltk_data_dir = Path.home() / "nltk_data" self.required_resources = ['punkt'] self.download_resources() def download_resources(self): """Download e verifica recursos NLTK necessários""" try: self.nltk_data_dir.mkdir(parents=True, exist_ok=True) nltk.data.path.append(str(self.nltk_data_dir)) for resource in self.required_resources: try: nltk.data.find(f'tokenizers/{resource}') logger.debug(f"Recurso NLTK '{resource}' já instalado") except LookupError: logger.info(f"Baixando recurso NLTK '{resource}'...") nltk.download(resource, download_dir=str(self.nltk_data_dir), quiet=True) except Exception as e: logger.error(f"Erro ao configurar NLTK: {e}") raise class PDFExtractor: """Classe especializada em extração de texto de PDFs""" def __init__(self): self.temp_dir = ResourceManager.create_temp_directory() def __del__(self): ResourceManager.cleanup_temp_directory(self.temp_dir) def extract_with_pymupdf(self, pdf_path: str) -> Tuple[str, Dict[int, str]]: """Extrai texto usando PyMuPDF""" doc = None try: doc = fitz.open(pdf_path) full_text = "" page_text = {} for page_num in range(len(doc)): page = doc[page_num] text = page.get_text("text") if text.strip(): # Ignorar páginas vazias page_text[page_num] = text full_text += text + "\n" return full_text, page_text except Exception as e: logger.error(f"Erro PyMuPDF: {e}") raise finally: if doc: doc.close() def extract_with_pypdf2(self, pdf_path: str) -> Tuple[str, Dict[int, str]]: """Extrai texto usando PyPDF2 como fallback""" try: with open(pdf_path, "rb") as file: reader = PyPDF2.PdfReader(file) full_text = "" page_text = {} for i, page in enumerate(reader.pages): text = page.extract_text() if text.strip(): # Ignorar páginas vazias page_text[i] = text full_text += text + "\n" return full_text, page_text except Exception as e: logger.error(f"Erro PyPDF2: {e}") raise def extract_text(self, pdf_file: Union[str, Path]) -> Tuple[str, Dict[int, str]]: """Extrai texto do PDF com fallback""" pdf_path = str(pdf_file) try: return self.extract_with_pymupdf(pdf_path) except Exception as e1: logger.warning(f"Falha PyMuPDF, tentando PyPDF2: {e1}") try: return self.extract_with_pypdf2(pdf_path) except Exception as e2: logger.error(f"Falha total na extração: {e2}") raise class TextProcessor: """Processa e prepara texto para análise""" def __init__(self): self.nltk_manager = NLTKManager() self.max_chunk_size = 512 self.overlap = 50 def preprocess_text(self, text: str) -> str: """Pré-processa texto removendo formatação problemática""" try: # Normalizar quebras de linha text = re.sub(r'\r\n', '\n', text) # Remover quebras de linha extras text = re.sub(r'\n+', ' ', text) # Remover espaços múltiplos text = re.sub(r'\s+', ' ', text) # Remover caracteres especiais mantendo pontuação importante text = re.sub(r'[^\w\s.,!?;:()[\]{}\-–—""''´`^~#$%&*+=@/\\áéíóúâêîôûãõçàèìòùäëïöüÁÉÍÓÚÂÊÎÔÛÃÕÇÀÈÌÒÙÄËÏÖÜ]', ' ', text) return text.strip() except Exception as e: logger.warning(f"Erro no pré-processamento: {e}") return text def split_into_chunks(self, text: str) -> List[str]: """Divide texto em chunks com sobreposição""" try: sentences = sent_tokenize(text) chunks = [] current_chunk = [] current_length = 0 for sentence in sentences: sentence_length = len(sentence) if current_length + sentence_length <= self.max_chunk_size: current_chunk.append(sentence) current_length += sentence_length else: if current_chunk: chunks.append(' '.join(current_chunk)) current_chunk = [sentence] current_length = sentence_length if current_chunk: chunks.append(' '.join(current_chunk)) # Adicionar sobreposição overlapped_chunks = [] for i in range(len(chunks)): if i > 0: prefix = chunks[i-1][-self.overlap:] chunks[i] = prefix + ' ' + chunks[i] overlapped_chunks.append(chunks[i]) return overlapped_chunks except Exception as e: logger.error(f"Erro ao dividir texto: {e}") return [text] class ModelManager: """Gerencia o modelo de IA e processamento de perguntas""" def __init__(self): self.model_name = "deepset/roberta-base-squad2" self.device = 0 if torch.cuda.is_available() else -1 self.max_tokens_answer = 50 # Máximo de tokens na resposta self.max_tokens_context = 300 # Máximo de tokens no contexto exibido self.load_model() def load_model(self): """Carrega o modelo com tratamento de erros""" try: self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) self.model = AutoModelForQuestionAnswering.from_pretrained(self.model_name) self.nlp = pipeline('question-answering', model=self.model, tokenizer=self.tokenizer, device=self.device) logger.info(f"Modelo {self.model_name} carregado com sucesso") except Exception as e: logger.error(f"Erro ao carregar modelo: {e}") raise def get_answer(self, question: str, context: str) -> Dict: """Processa uma única pergunta/contexto""" try: return self.nlp( question=question, context=context, max_answer_len=self.max_tokens_answer # Limitar resposta ) except Exception as e: logger.error(f"Erro ao processar resposta: {e}") return { 'answer': '', 'score': 0, 'start': 0, 'end': 0 } def get_best_answer(self, question: str, chunks: List[str]) -> QAResult: """Obtém a melhor resposta de múltiplos chunks""" try: if not chunks: return { 'answer': "Não foi possível processar o documento.", 'score': 0, 'confidence': "0%", 'context': "", 'page_number': None } answers = [] with ThreadPoolExecutor() as executor: futures = [executor.submit(self.get_answer, question, chunk) for chunk in chunks] answers = [future.result() for future in futures] # Filtrar respostas vazias answers = [ans for ans in answers if ans['answer'].strip()] if not answers: return { 'answer': "Não foi possível encontrar uma resposta.", 'score': 0, 'confidence': "0%", 'context': "", 'page_number': None } best_answer = max(answers, key=lambda x: x['score']) # Limitar contexto para o máximo de tokens configurado limited_context = " ".join( self.tokenizer.convert_ids_to_tokens( self.tokenizer.encode(best_answer.get('context', ""), add_special_tokens=False)[:self.max_tokens_context] ) ) return { 'answer': best_answer['answer'], 'score': best_answer['score'], 'confidence': f"{best_answer['score']*100:.2f}%", 'context': limited_context, 'page_number': None # Adapte conforme necessário } except Exception as e: logger.error(f"Erro ao obter melhor resposta: {e}") return { 'answer': "Erro ao processar o documento.", 'score': 0, 'confidence': "0%", 'context': "", 'page_number': None } class PDFQuestionAnswering: """Classe principal que coordena o sistema de QA""" def __init__(self): self.pdf_extractor = PDFExtractor() self.text_processor = TextProcessor() self.model_manager = ModelManager() def answer_question(self, pdf_file: gr.File, question: str) -> QAResult: """Processa PDF e responde pergunta""" try: if not pdf_file or not question: return { 'answer': "Por favor, forneça um arquivo PDF e uma pergunta.", 'score': 0, 'confidence': "0%", 'context': "", 'page_number': None } # Extrair texto do PDF full_text, page_text = self.pdf_extractor.extract_text(pdf_file.name) if not full_text.strip(): return { 'answer': "Não foi possível extrair texto do PDF.", 'score': 0, 'confidence': "0%", 'context': "", 'page_number': None } # Pré-processar texto processed_text = self.text_processor.preprocess_text(full_text) # Dividir em chunks chunks = self.text_processor.split_into_chunks(processed_text) # Obter melhor resposta result = self.model_manager.get_best_answer(question, chunks) # Limpar GPU se necessário ResourceManager.clear_gpu_memory() return result except Exception as e: logger.error(f"Erro ao processar pergunta: {e}") return { 'answer': "Ocorreu um erro ao processar sua pergunta.", 'score': 0, 'confidence': "0%", 'context': "", 'page_number': None } def create_interface() -> gr.Blocks: """Cria interface Gradio""" qa_system = PDFQuestionAnswering() with gr.Blocks(title="Sistema Avançado de QA sobre PDFs") as iface: # Interface HTML/Markdown aqui... with gr.Row(): with gr.Column(): pdf_input = gr.File( label="Carregar PDF", file_types=[".pdf"], type="filepath" ) question_input = gr.Textbox( label="Sua Pergunta", placeholder="Digite sua pergunta aqui...", lines=2 ) submit_btn = gr.Button("Obter Resposta", variant="primary") with gr.Column(): answer_output = gr.Textbox( label="Resposta", lines=3 ) confidence_output = gr.Textbox( label="Confiança da Resposta" ) context_output = gr.Textbox( label="Contexto da Resposta", lines=5 ) page_output = gr.Textbox( label="Página da Resposta", visible=False # TODO: Implementar ) def process_question(pdf, question): result = qa_system.answer_question(pdf, question) return ( result['answer'], result['confidence'], result['context'], f"Página {result['page_number']}" if result['page_number'] else "Página não disponível" ) submit_btn.click( fn=process_question, inputs=[pdf_input, question_input], outputs=[answer_output, confidence_output, context_output, page_output] ) gr.Markdown(""" ### Dicas para Perguntas sobre Metrologia e Normas #### Documentos Recomendados: - Normas técnicas (ABNT, ISO, etc.) - Procedimentos de calibração - Manuais de qualidade - Relatórios técnicos - Documentos do sistema de gestão #### Tipos de Perguntas: **Metrologia e Calibração:** - "Quais são os requisitos de calibração para [instrumento]?" - "Qual é a incerteza de medição especificada para [processo]?" - "Como é feita a rastreabilidade metrológica?" - "Quais são os intervalos de calibração recomendados?" - "Que condições ambientais são especificadas?" **Normas e Requisitos:** - "Quais são os requisitos para [processo específico]?" - "Como a norma define [termo técnico]?" - "Quais são os critérios de aceitação?" - "Que documentação é exigida?" - "Quais são as referências normativas?" **Sistema de Gestão:** - "Como é feito o controle de qualidade?" - "Quais são os requisitos de competência?" - "Como são tratadas as não conformidades?" - "Que registros são obrigatórios?" - "Como é feita a validação?" #### Boas Práticas: 1. Use termos técnicos precisos 2. Referencie seções específicas 3. Pergunte sobre um tópico por vez 4. Verifique o nível de confiança 5. Confirme informações críticas #### Interpretação: - Confiança > 90%: Alta probabilidade de resposta correta - Confiança 70-90%: Resposta provável, verificar contexto - Confiança < 70%: Considerar reformular a pergunta """) return iface def main(): """Função principal""" try: # Configurar logging detalhado logging.basicConfig( level=logging.DEBUG, # Aumentado para DEBUG format='%(asctime)s - %(name)s - %(levelname)s - %(pathname)s:%(lineno)d - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('qa_system_detailed.log') ] ) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('qa_system.log') ] ) # Criar e iniciar interface demo = create_interface() # Configurações de lançamento launch_kwargs = { 'server_name': "0.0.0.0", # Permite acesso externo 'server_port': 7860, # Porta padrão 'share': False, # Não criar túnel público 'debug': True, # Modo debug ativado 'show_error': True, # Mostrar erros detalhados 'enable_queue': True # Habilitar fila de requisições } # Tentar lançar com configurações completas try: demo.launch(**launch_kwargs) except TypeError as e: # Se algum parâmetro não for suportado, remover e tentar novamente logger.warning(f"Erro no lançamento completo: {e}") minimal_kwargs = { 'server_name': "0.0.0.0", 'server_port': 7860, 'share': False } demo.launch(**minimal_kwargs) except Exception as e: logger.error(f"Erro fatal: {e}") raise if __name__ == "__main__": main()