Spaces:
Runtime error
Runtime error
| from gradio_client import Client | |
| from langchain_community.document_loaders import PyPDFDirectoryLoader | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from sentence_transformers import SentenceTransformer | |
| from rank_bm25 import BM25Okapi | |
| import faiss | |
| import re | |
| import os | |
| import sys | |
| import time | |
| import json | |
| import numpy as np | |
| import logging | |
| from typing import List, Dict, Tuple, Optional | |
| from PyPDF2 import PdfReader | |
| from colorama import Fore, Style | |
| from datetime import datetime | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| class MetrologyRAGSystem: | |
| def __init__(self, config: Optional[Dict] = None): | |
| self.config = self._load_default_config(config) | |
| self.embedder = SentenceTransformer(self.config['embedding_model']) | |
| self.client = Client(self.config['api_endpoint']) | |
| self.documents = [] | |
| self.faiss_index = None | |
| self.bm25 = None | |
| self._init_logger() | |
| def _load_default_config(self, config: Dict) -> Dict: | |
| default_config = { | |
| 'embedding_model': 'all-MiniLM-L6-v2', | |
| 'chunk_size': 1600, | |
| 'chunk_overlap': 450, | |
| 'top_k': 7, | |
| 'max_retries': 5, | |
| 'hybrid_ratio': 0.6, | |
| 'allowed_file_types': ['.pdf'], | |
| 'api_endpoint': "yuntian-deng/ChatGPT", | |
| 'required_norms': ['ISO/IEC 17025', 'ABNT NBR ISO 9001'], | |
| 'min_confidence': 0.78, | |
| 'temperature': 0.3 | |
| } | |
| return {**default_config, **(config or {})} | |
| def _init_logger(self): | |
| self.logger = logging.getLogger('MetrologyRAG') | |
| self.logger.setLevel(logging.INFO) | |
| formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') | |
| file_handler = logging.FileHandler('metrology_audit.log') | |
| file_handler.setFormatter(formatter) | |
| stream_handler = logging.StreamHandler() | |
| stream_handler.setFormatter(formatter) | |
| self.logger.addHandler(file_handler) | |
| self.logger.addHandler(stream_handler) | |
| def initialize_system(self, pdf_folder: str): | |
| try: | |
| self._validate_data_source(pdf_folder) | |
| start_time = time.time() | |
| self._load_documents(pdf_folder) | |
| self._create_vector_index() | |
| self.logger.info(f"Sistema inicializado em {time.time()-start_time:.2f}s | Documentos: {len(self.documents)}") | |
| except Exception as e: | |
| self.logger.critical(f"Falha na inicialização: {str(e)}") | |
| sys.exit(1) | |
| def _validate_data_source(self, folder_path: str): | |
| if not os.path.exists(folder_path): | |
| raise FileNotFoundError(f"Diretório inexistente: {folder_path}") | |
| valid_files = [f for f in os.listdir(folder_path) | |
| if os.path.splitext(f)[1].lower() in self.config['allowed_file_types']] | |
| if not valid_files: | |
| raise ValueError("Nenhum documento PDF válido encontrado") | |
| def _load_documents(self, folder_path: str): | |
| try: | |
| loader = PyPDFDirectoryLoader(folder_path) | |
| pages = loader.load() | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=self.config['chunk_size'], | |
| chunk_overlap=self.config['chunk_overlap'], | |
| separators=["\n\n• ", "\n■ ", "(?<=\. )", "; ", "► ", "\\|"] | |
| ) | |
| clean_docs = [] | |
| for i, page in enumerate(pages): | |
| try: | |
| text = self._preprocess_technical_text(page.page_content) | |
| clean_docs.extend(text_splitter.split_text(text)) | |
| except Exception as e: | |
| self.logger.error(f"Erro no documento {i+1}: {str(e)}") | |
| continue | |
| self.documents = clean_docs | |
| self.logger.info(f"Documentos técnicos carregados: {len(self.documents)} segmentos") | |
| except Exception as e: | |
| self.logger.error(f"Falha no carregamento: {str(e)}") | |
| raise | |
| def _preprocess_technical_text(self, text: str) -> str: | |
| replacements = [ | |
| (r'\b(um)\b', 'µm'), | |
| (r'(?i)graus?\s*C', '°C'), | |
| (r'(\d)([A-Za-z°µ])', r'\1 \2'), | |
| (r'±\s*(\d)', r'±\1'), | |
| (r'kN/m²', 'kPa'), | |
| (r'(\d+)\s*-\s*(\d+)', r'\1 a \2'), | |
| (r'\s+', ' '), | |
| (r'\[.*?\]', '') | |
| ] | |
| for pattern, replacement in replacements: | |
| text = re.sub(pattern, replacement, text) | |
| return text.strip() | |
| def _create_vector_index(self): | |
| try: | |
| dense_vectors = self.embedder.encode(self.documents) | |
| self.faiss_index = faiss.IndexHNSWFlat(dense_vectors.shape[1], 32) | |
| self.faiss_index.add(dense_vectors.astype('float32')) | |
| tokenized_docs = [self._technical_tokenizer(doc) for doc in self.documents] | |
| self.bm25 = BM25Okapi(tokenized_docs) | |
| self.logger.info("Índices vetoriais criados com sucesso") | |
| except Exception as e: | |
| self.logger.error(f"Erro na criação de índices: {str(e)}") | |
| raise | |
| def _technical_tokenizer(self, text: str) -> List[str]: | |
| tokens = re.findall( | |
| r'\b[\wµ°±]+(?:[/-]\d+)?\b|' | |
| r'\d+\.\d+[eE]?[+-]?\d*|' | |
| r'[A-Z]{2,}(?:\s+\d+[A-Z]*)?|' | |
| r'[;:±≤≥]', | |
| text | |
| ) | |
| return [t.lower() for t in tokens if t] | |
| def retrieve_context(self, query: str) -> List[str]: | |
| try: | |
| boosted_query = self._boost_query(query) | |
| query_embedding = self.embedder.encode([boosted_query]) | |
| _, dense_ids = self.faiss_index.search(query_embedding.astype('float32'), 50) | |
| tokenized_query = self._technical_tokenizer(boosted_query) | |
| bm25_scores = self.bm25.get_scores(tokenized_query) | |
| bm25_ids = np.argsort(bm25_scores)[::-1][:50] | |
| combined_scores = self._reciprocal_rank_fusion(dense_ids[0], bm25_ids) | |
| return [self.documents[i] for i in combined_scores[:self.config['top_k']]] | |
| except Exception as e: | |
| self.logger.error(f"Falha na recuperação: {str(e)}") | |
| return [] | |
| def _boost_query(self, query: str) -> str: | |
| terms = [ | |
| 'incerteza de medição', | |
| 'calibração rastreável', | |
| 'certificado de calibração', | |
| 'padrão de referência', | |
| 'ISO/IEC 17025' | |
| ] | |
| return f"{query} {' '.join(terms)}" | |
| def _reciprocal_rank_fusion(self, dense_ids: List[int], bm25_ids: List[int]) -> List[int]: | |
| combined_scores = {} | |
| for i, idx in enumerate(dense_ids): | |
| combined_scores[idx] = combined_scores.get(idx, 0) + 1/(i + 60) | |
| for i, idx in enumerate(bm25_ids): | |
| combined_scores[idx] = combined_scores.get(idx, 0) + 1/(i + 60) | |
| sorted_scores = sorted(combined_scores.items(), key=lambda x: x[1], reverse=True) | |
| valid_ids = [idx for idx, _ in sorted_scores if idx < len(self.documents)] | |
| return valid_ids | |
| def generate_technical_response(self, query: str) -> str: | |
| try: | |
| context = self.retrieve_context(query) | |
| if not context: | |
| raise ValueError("Contexto insuficiente") | |
| prompt = self._build_structured_prompt(query, context) | |
| if not self._validate_prompt(prompt): | |
| raise ValueError("Prompt inválido") | |
| response = self._call_llm_with_retry(prompt) | |
| return self._postprocess_response(response, context) | |
| except Exception as e: | |
| self.logger.error(f"Falha na geração: {str(e)}") | |
| return self._fallback_procedure(query) | |
| def _build_structured_prompt(self, query: str, context: List[str]) -> str: | |
| detected_norms = self._detect_norms(context) | |
| detected_equipment = self._detect_equipment(context) | |
| context_entries = [] | |
| for i, text in enumerate(context[:3]): | |
| cleaned_text = text[:250].replace('\n', ' ') | |
| context_entries.append(f'[Doc {i+1}] {cleaned_text}...') | |
| context_str = '\n'.join(context_entries) | |
| template = ( | |
| f"## Diretrizes Técnicas ISO/IEC 17025:2017 ##\n" | |
| f"1. Formato obrigatório:\n" | |
| f" - Seção 1: Fundamentação Normativa ({', '.join(detected_norms)})\n" | |
| f" - Seção 2: Procedimento de Medição\n" | |
| f" - Seção 3: Análise de Incertezas (k=2)\n" | |
| f" - Seção 4: Condições Ambientais\n\n" | |
| f"2. Dados obrigatórios:\n" | |
| f" - Tolerâncias: ± valores com unidades\n" | |
| f" - Equipamentos: {', '.join(detected_equipment)}\n" | |
| f" - Normas: {', '.join(detected_norms)}\n\n" | |
| f"## Contexto Técnico ##\n" | |
| f"{context_str}\n\n" | |
| f"## Consulta ##\n" | |
| f"{query}\n\n" | |
| f"## Resposta Estruturada ##" | |
| ) | |
| return template | |
| def _detect_norms(self, context: List[str]) -> List[str]: | |
| norms = set() | |
| pattern = r'\b(ISO/IEC|ABNT NBR|OIML R)\s+[\d\.]+' | |
| for text in context: | |
| norms.update(re.findall(pattern, text)) | |
| return list(norms)[:3] or self.config['required_norms'] | |
| def _detect_equipment(self, context: List[str]) -> List[str]: | |
| equipment = set() | |
| pattern = r'\b([A-Z][a-z]*\s+)?(\d+[A-Z]+\b|Micrômetro|Paquímetro|Manômetro|Multímetro)' | |
| for text in context: | |
| matches = re.findall(pattern, text) | |
| equipment.update([f"{m[0]}{m[1]}" for m in matches]) | |
| return list(equipment)[:5] | |
| def _validate_prompt(self, prompt: str) -> bool: | |
| checks = [ | |
| (r'ISO/IEC 17025', 2), | |
| (r'\d+ ± \d+', 1), | |
| (r'k=\d', 1), | |
| (r'°C', 1) | |
| ] | |
| score = sum(weight for pattern, weight in checks if re.search(pattern, prompt)) | |
| return score >= 3 | |
| def _call_llm_with_retry(self, prompt: str) -> str: | |
| for attempt in range(self.config['max_retries']): | |
| try: | |
| result = self.client.predict( | |
| inputs=prompt, | |
| top_p=0.9, | |
| temperature=self.config['temperature'], | |
| chat_counter=0, | |
| chatbot=[], | |
| api_name="/predict" | |
| ) | |
| return self._clean_api_response(result) | |
| except Exception as e: | |
| self.logger.warning(f"Tentativa {attempt+1} falhou: {str(e)}") | |
| time.sleep(2**attempt) | |
| raise TimeoutError("Falha após múltiplas tentativas") | |
| def _clean_api_response(self, response) -> str: | |
| if isinstance(response, (list, tuple)): | |
| return ' '.join(str(item) for item in response if item) | |
| return str(response).replace('**', '').replace('```', '').strip() | |
| def _postprocess_response(self, response: str, context: List[str]) -> str: | |
| processed = response.replace('Resposta Estruturada', '').strip() | |
| processed = self._enhance_technical_terms(processed) | |
| processed = self._add_references(processed, context) | |
| return self._format_response(processed) | |
| def _enhance_technical_terms(self, text: str) -> str: | |
| replacements = { | |
| r'\b(incerteza)\b': r'incerteza de medição', | |
| r'\b(calibração)\b': r'calibração rastreável', | |
| r'\b(norma)\b': r'norma técnica', | |
| r'(\d)([a-zA-Zµ°])': r'\1 \2' | |
| } | |
| for pattern, repl in replacements.items(): | |
| text = re.sub(pattern, repl, text, flags=re.IGNORECASE) | |
| return text | |
| def _add_references(self, text: str, context: List[str]) -> str: | |
| refs = set() | |
| for doc in context[:3]: | |
| match = re.search(r'\[Doc \d+\] (.{30})', doc) | |
| if match: | |
| refs.add(f"- {match.group(1)}...") | |
| return f"{text}\n\n## Referências Técnicas ##\n" + "\n".join(list(refs)[:3]) | |
| def _format_response(self, text: str) -> str: | |
| border = "="*80 | |
| header = f"{Fore.GREEN}▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓\n RESPOSTA TÉCNICA CERTIFICADA\n▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓{Style.RESET_ALL}" | |
| formatted = re.sub(r'^(\d+\.)\s+(.+)$', | |
| f'{Fore.CYAN}\\1 {Style.RESET_ALL}\\2', | |
| text, flags=re.M) | |
| formatted = re.sub(r'(± \d+\.?\d*)', | |
| f'{Fore.YELLOW}\\1{Style.RESET_ALL}', | |
| formatted) | |
| return f"\n{border}\n{header}\n{border}\n{formatted}\n{border}" | |
| def _fallback_procedure(self, query: str) -> str: | |
| try: | |
| key_terms = re.findall(r'\b[A-Z]{3,}\b|\b\d+[A-Z]+\b', query) | |
| relevant = [doc for doc in self.documents if any(term in doc for term in key_terms)][:3] | |
| return ( | |
| f"{Fore.YELLOW}INFORMAÇÃO TÉCNICA PARCIAL:{Style.RESET_ALL}\n" + | |
| "\n".join([f"• {doc[:300]}..." for doc in relevant]) + | |
| f"\n\n{Fore.RED}AVISO: Resposta não validada - consulte documentos originais{Style.RESET_ALL}" | |
| ) | |
| except: | |
| return f"{Fore.RED}Erro crítico - sistema necessita re-inicialização{Style.RESET_ALL}" | |
| def generate_report(self, query: str, response: str, filename: str = "relatorio_tecnico.md"): | |
| try: | |
| timestamp = datetime.now().strftime("%d/%m/%Y %H:%M:%S") | |
| report = ( | |
| f"# RELATÓRIO TÉCNICO - METROLOGIA\n\n" | |
| f"**Data:** {timestamp}\n" | |
| f"**Consulta:** {query}\n\n" | |
| "## Resposta Técnica\n" | |
| f"{response}\n\n" | |
| "**Assinatura Digital:** [Sistema Certificado v2.1]" | |
| ) | |
| with open(filename, 'w', encoding='utf-8') as f: | |
| f.write(report) | |
| self.logger.info(f"Relatório gerado: {filename}") | |
| except Exception as e: | |
| self.logger.error(f"Falha ao gerar relatório: {str(e)}") | |
| def analyze_metrology_report(self, pdf_path: str) -> str: | |
| try: | |
| text = self._extract_pdf_text(pdf_path) | |
| compliance = self._check_compliance(text) | |
| analysis = self._generate_analysis_report(text, compliance) | |
| return self._format_compliance_report(analysis, compliance) | |
| except Exception as e: | |
| self.logger.error(f"Falha na análise: {str(e)}") | |
| return self._fallback_procedure("Análise de relatório") | |
| def _extract_pdf_text(self, path: str) -> str: | |
| reader = PdfReader(path) | |
| return '\n'.join([page.extract_text() for page in reader.pages if page.extract_text()]) | |
| def _check_compliance(self, text: str) -> Dict: | |
| checks = { | |
| 'rastreabilidade': {'patterns': [r'rastreab[i|í]lidade.*INMETRO'], 'required': True}, | |
| 'incerteza': {'patterns': [r'incerteza expandida.*≤?\s*\d+'], 'required': True}, | |
| 'ambiente': {'patterns': [r'temperatura.*23\s*±\s*2\s*°C'], 'required': False}, | |
| 'normas': {'patterns': [r'ISO/IEC\s+17025'], 'required': True} | |
| } | |
| results = {} | |
| for key, config in checks.items(): | |
| found = any(re.search(p, text) for p in config['patterns']) | |
| results[key] = { | |
| 'status': 'OK' if found else 'FALHA' if config['required'] else 'N/A', | |
| 'critical': config['required'] and not found | |
| } | |
| return results | |
| def _generate_analysis_report(self, text: str, compliance: Dict) -> str: | |
| critical = sum(1 for v in compliance.values() if v['critical']) | |
| status = "NÃO CONFORME" if critical else "CONFORME" | |
| prompt = f"""## Análise de Conformidade Metrológica ## | |
| Documento analisado: {text[:2000]}... | |
| Resultados: | |
| {json.dumps(compliance, indent=2)} | |
| ## Parecer Técnico ## | |
| Emitir parecer considerando: | |
| - Status: {status} | |
| - Itens críticos: {critical} | |
| - Recomendações de adequação""" | |
| return self._call_llm_with_retry(prompt) | |
| def _format_compliance_report(self, text: str, compliance: Dict) -> str: | |
| status = "APROVADO" if not any(v['critical'] for v in compliance.values()) else "REPROVADO" | |
| color = Fore.GREEN if status == "APROVADO" else Fore.RED | |
| header = f""" | |
| {color}▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓ | |
| PARECER TÉCNICO - STATUS: {status} | |
| ▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓{Style.RESET_ALL} | |
| """ | |
| summary = "## Resumo de Conformidade ##\n" | |
| for k, v in compliance.items(): | |
| summary += f"• {k.upper()}: {v['status']}\n" | |
| return header + summary + "\n" + text | |
| def main_menu(): | |
| print(Fore.BLUE + "\n🔧 Sistema de Metrologia Inteligente v2.1" + Style.RESET_ALL) | |
| print(Fore.CYAN + "Menu Principal:" + Style.RESET_ALL) | |
| print("1. Inicializar sistema com documentos PDF") | |
| print("2. Consulta técnica") | |
| print("3. Analisar relatório PDF") | |
| print("4. Gerar relatório completo") | |
| print("5. Sair") | |
| return input(Fore.YELLOW + "> Selecione uma opção: " + Style.RESET_ALL) |