Spaces:
Sleeping
Sleeping
from gradio_client import Client | |
from langchain_community.document_loaders import PyPDFDirectoryLoader | |
from langchain_text_splitters import RecursiveCharacterTextSplitter | |
from sentence_transformers import SentenceTransformer | |
from rank_bm25 import BM25Okapi | |
import faiss | |
import re | |
import os | |
import sys | |
import time | |
import json | |
import numpy as np | |
import logging | |
from typing import List, Dict, Tuple, Optional | |
from PyPDF2 import PdfReader | |
from colorama import Fore, Style | |
from datetime import datetime | |
from sklearn.metrics.pairwise import cosine_similarity | |
class MetrologyRAGSystem: | |
def __init__(self, config: Optional[Dict] = None): | |
self.config = self._load_default_config(config) | |
self.embedder = SentenceTransformer(self.config['embedding_model']) | |
self.client = Client(self.config['api_endpoint']) | |
self.documents = [] | |
self.faiss_index = None | |
self.bm25 = None | |
self._init_logger() | |
def _load_default_config(self, config: Dict) -> Dict: | |
default_config = { | |
'embedding_model': 'all-MiniLM-L6-v2', | |
'chunk_size': 1600, | |
'chunk_overlap': 450, | |
'top_k': 7, | |
'max_retries': 5, | |
'hybrid_ratio': 0.6, | |
'allowed_file_types': ['.pdf'], | |
'api_endpoint': "yuntian-deng/ChatGPT", | |
'required_norms': ['ISO/IEC 17025', 'ABNT NBR ISO 9001'], | |
'min_confidence': 0.78, | |
'temperature': 0.3 | |
} | |
return {**default_config, **(config or {})} | |
def _init_logger(self): | |
self.logger = logging.getLogger('MetrologyRAG') | |
self.logger.setLevel(logging.INFO) | |
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') | |
file_handler = logging.FileHandler('metrology_audit.log') | |
file_handler.setFormatter(formatter) | |
stream_handler = logging.StreamHandler() | |
stream_handler.setFormatter(formatter) | |
self.logger.addHandler(file_handler) | |
self.logger.addHandler(stream_handler) | |
def initialize_system(self, pdf_folder: str): | |
try: | |
self._validate_data_source(pdf_folder) | |
start_time = time.time() | |
self._load_documents(pdf_folder) | |
self._create_vector_index() | |
self.logger.info(f"Sistema inicializado em {time.time()-start_time:.2f}s | Documentos: {len(self.documents)}") | |
except Exception as e: | |
self.logger.critical(f"Falha na inicialização: {str(e)}") | |
sys.exit(1) | |
def _validate_data_source(self, folder_path: str): | |
if not os.path.exists(folder_path): | |
raise FileNotFoundError(f"Diretório inexistente: {folder_path}") | |
valid_files = [f for f in os.listdir(folder_path) | |
if os.path.splitext(f)[1].lower() in self.config['allowed_file_types']] | |
if not valid_files: | |
raise ValueError("Nenhum documento PDF válido encontrado") | |
def _load_documents(self, folder_path: str): | |
try: | |
loader = PyPDFDirectoryLoader(folder_path) | |
pages = loader.load() | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=self.config['chunk_size'], | |
chunk_overlap=self.config['chunk_overlap'], | |
separators=["\n\n• ", "\n■ ", "(?<=\. )", "; ", "► ", "\\|"] | |
) | |
clean_docs = [] | |
for i, page in enumerate(pages): | |
try: | |
text = self._preprocess_technical_text(page.page_content) | |
clean_docs.extend(text_splitter.split_text(text)) | |
except Exception as e: | |
self.logger.error(f"Erro no documento {i+1}: {str(e)}") | |
continue | |
self.documents = clean_docs | |
self.logger.info(f"Documentos técnicos carregados: {len(self.documents)} segmentos") | |
except Exception as e: | |
self.logger.error(f"Falha no carregamento: {str(e)}") | |
raise | |
def _preprocess_technical_text(self, text: str) -> str: | |
replacements = [ | |
(r'\b(um)\b', 'µm'), | |
(r'(?i)graus?\s*C', '°C'), | |
(r'(\d)([A-Za-z°µ])', r'\1 \2'), | |
(r'±\s*(\d)', r'±\1'), | |
(r'kN/m²', 'kPa'), | |
(r'(\d+)\s*-\s*(\d+)', r'\1 a \2'), | |
(r'\s+', ' '), | |
(r'\[.*?\]', '') | |
] | |
for pattern, replacement in replacements: | |
text = re.sub(pattern, replacement, text) | |
return text.strip() | |
def _create_vector_index(self): | |
try: | |
dense_vectors = self.embedder.encode(self.documents) | |
self.faiss_index = faiss.IndexHNSWFlat(dense_vectors.shape[1], 32) | |
self.faiss_index.add(dense_vectors.astype('float32')) | |
tokenized_docs = [self._technical_tokenizer(doc) for doc in self.documents] | |
self.bm25 = BM25Okapi(tokenized_docs) | |
self.logger.info("Índices vetoriais criados com sucesso") | |
except Exception as e: | |
self.logger.error(f"Erro na criação de índices: {str(e)}") | |
raise | |
def _technical_tokenizer(self, text: str) -> List[str]: | |
tokens = re.findall( | |
r'\b[\wµ°±]+(?:[/-]\d+)?\b|' | |
r'\d+\.\d+[eE]?[+-]?\d*|' | |
r'[A-Z]{2,}(?:\s+\d+[A-Z]*)?|' | |
r'[;:±≤≥]', | |
text | |
) | |
return [t.lower() for t in tokens if t] | |
def retrieve_context(self, query: str) -> List[str]: | |
try: | |
boosted_query = self._boost_query(query) | |
query_embedding = self.embedder.encode([boosted_query]) | |
_, dense_ids = self.faiss_index.search(query_embedding.astype('float32'), 50) | |
tokenized_query = self._technical_tokenizer(boosted_query) | |
bm25_scores = self.bm25.get_scores(tokenized_query) | |
bm25_ids = np.argsort(bm25_scores)[::-1][:50] | |
combined_scores = self._reciprocal_rank_fusion(dense_ids[0], bm25_ids) | |
return [self.documents[i] for i in combined_scores[:self.config['top_k']]] | |
except Exception as e: | |
self.logger.error(f"Falha na recuperação: {str(e)}") | |
return [] | |
def _boost_query(self, query: str) -> str: | |
terms = [ | |
'incerteza de medição', | |
'calibração rastreável', | |
'certificado de calibração', | |
'padrão de referência', | |
'ISO/IEC 17025' | |
] | |
return f"{query} {' '.join(terms)}" | |
def _reciprocal_rank_fusion(self, dense_ids: List[int], bm25_ids: List[int]) -> List[int]: | |
combined_scores = {} | |
for i, idx in enumerate(dense_ids): | |
combined_scores[idx] = combined_scores.get(idx, 0) + 1/(i + 60) | |
for i, idx in enumerate(bm25_ids): | |
combined_scores[idx] = combined_scores.get(idx, 0) + 1/(i + 60) | |
sorted_scores = sorted(combined_scores.items(), key=lambda x: x[1], reverse=True) | |
valid_ids = [idx for idx, _ in sorted_scores if idx < len(self.documents)] | |
return valid_ids | |
def generate_technical_response(self, query: str) -> str: | |
try: | |
context = self.retrieve_context(query) | |
if not context: | |
raise ValueError("Contexto insuficiente") | |
prompt = self._build_structured_prompt(query, context) | |
if not self._validate_prompt(prompt): | |
raise ValueError("Prompt inválido") | |
response = self._call_llm_with_retry(prompt) | |
return self._postprocess_response(response, context) | |
except Exception as e: | |
self.logger.error(f"Falha na geração: {str(e)}") | |
return self._fallback_procedure(query) | |
def _build_structured_prompt(self, query: str, context: List[str]) -> str: | |
detected_norms = self._detect_norms(context) | |
detected_equipment = self._detect_equipment(context) | |
context_entries = [] | |
for i, text in enumerate(context[:3]): | |
cleaned_text = text[:250].replace('\n', ' ') | |
context_entries.append(f'[Doc {i+1}] {cleaned_text}...') | |
context_str = '\n'.join(context_entries) | |
template = ( | |
f"## Diretrizes Técnicas ISO/IEC 17025:2017 ##\n" | |
f"1. Formato obrigatório:\n" | |
f" - Seção 1: Fundamentação Normativa ({', '.join(detected_norms)})\n" | |
f" - Seção 2: Procedimento de Medição\n" | |
f" - Seção 3: Análise de Incertezas (k=2)\n" | |
f" - Seção 4: Condições Ambientais\n\n" | |
f"2. Dados obrigatórios:\n" | |
f" - Tolerâncias: ± valores com unidades\n" | |
f" - Equipamentos: {', '.join(detected_equipment)}\n" | |
f" - Normas: {', '.join(detected_norms)}\n\n" | |
f"## Contexto Técnico ##\n" | |
f"{context_str}\n\n" | |
f"## Consulta ##\n" | |
f"{query}\n\n" | |
f"## Resposta Estruturada ##" | |
) | |
return template | |
def _detect_norms(self, context: List[str]) -> List[str]: | |
norms = set() | |
pattern = r'\b(ISO/IEC|ABNT NBR|OIML R)\s+[\d\.]+' | |
for text in context: | |
norms.update(re.findall(pattern, text)) | |
return list(norms)[:3] or self.config['required_norms'] | |
def _detect_equipment(self, context: List[str]) -> List[str]: | |
equipment = set() | |
pattern = r'\b([A-Z][a-z]*\s+)?(\d+[A-Z]+\b|Micrômetro|Paquímetro|Manômetro|Multímetro)' | |
for text in context: | |
matches = re.findall(pattern, text) | |
equipment.update([f"{m[0]}{m[1]}" for m in matches]) | |
return list(equipment)[:5] | |
def _validate_prompt(self, prompt: str) -> bool: | |
checks = [ | |
(r'ISO/IEC 17025', 2), | |
(r'\d+ ± \d+', 1), | |
(r'k=\d', 1), | |
(r'°C', 1) | |
] | |
score = sum(weight for pattern, weight in checks if re.search(pattern, prompt)) | |
return score >= 3 | |
def _call_llm_with_retry(self, prompt: str) -> str: | |
for attempt in range(self.config['max_retries']): | |
try: | |
result = self.client.predict( | |
inputs=prompt, | |
top_p=0.9, | |
temperature=self.config['temperature'], | |
chat_counter=0, | |
chatbot=[], | |
api_name="/predict" | |
) | |
return self._clean_api_response(result) | |
except Exception as e: | |
self.logger.warning(f"Tentativa {attempt+1} falhou: {str(e)}") | |
time.sleep(2**attempt) | |
raise TimeoutError("Falha após múltiplas tentativas") | |
def _clean_api_response(self, response) -> str: | |
if isinstance(response, (list, tuple)): | |
return ' '.join(str(item) for item in response if item) | |
return str(response).replace('**', '').replace('```', '').strip() | |
def _postprocess_response(self, response: str, context: List[str]) -> str: | |
processed = response.replace('Resposta Estruturada', '').strip() | |
processed = self._enhance_technical_terms(processed) | |
processed = self._add_references(processed, context) | |
return self._format_response(processed) | |
def _enhance_technical_terms(self, text: str) -> str: | |
replacements = { | |
r'\b(incerteza)\b': r'incerteza de medição', | |
r'\b(calibração)\b': r'calibração rastreável', | |
r'\b(norma)\b': r'norma técnica', | |
r'(\d)([a-zA-Zµ°])': r'\1 \2' | |
} | |
for pattern, repl in replacements.items(): | |
text = re.sub(pattern, repl, text, flags=re.IGNORECASE) | |
return text | |
def _add_references(self, text: str, context: List[str]) -> str: | |
refs = set() | |
for doc in context[:3]: | |
match = re.search(r'\[Doc \d+\] (.{30})', doc) | |
if match: | |
refs.add(f"- {match.group(1)}...") | |
return f"{text}\n\n## Referências Técnicas ##\n" + "\n".join(list(refs)[:3]) | |
def _format_response(self, text: str) -> str: | |
border = "="*80 | |
header = f"{Fore.GREEN}▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓\n RESPOSTA TÉCNICA CERTIFICADA\n▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓{Style.RESET_ALL}" | |
formatted = re.sub(r'^(\d+\.)\s+(.+)$', | |
f'{Fore.CYAN}\\1 {Style.RESET_ALL}\\2', | |
text, flags=re.M) | |
formatted = re.sub(r'(± \d+\.?\d*)', | |
f'{Fore.YELLOW}\\1{Style.RESET_ALL}', | |
formatted) | |
return f"\n{border}\n{header}\n{border}\n{formatted}\n{border}" | |
def _fallback_procedure(self, query: str) -> str: | |
try: | |
key_terms = re.findall(r'\b[A-Z]{3,}\b|\b\d+[A-Z]+\b', query) | |
relevant = [doc for doc in self.documents if any(term in doc for term in key_terms)][:3] | |
return ( | |
f"{Fore.YELLOW}INFORMAÇÃO TÉCNICA PARCIAL:{Style.RESET_ALL}\n" + | |
"\n".join([f"• {doc[:300]}..." for doc in relevant]) + | |
f"\n\n{Fore.RED}AVISO: Resposta não validada - consulte documentos originais{Style.RESET_ALL}" | |
) | |
except: | |
return f"{Fore.RED}Erro crítico - sistema necessita re-inicialização{Style.RESET_ALL}" | |
def generate_report(self, query: str, response: str, filename: str = "relatorio_tecnico.md"): | |
try: | |
timestamp = datetime.now().strftime("%d/%m/%Y %H:%M:%S") | |
report = ( | |
f"# RELATÓRIO TÉCNICO - METROLOGIA\n\n" | |
f"**Data:** {timestamp}\n" | |
f"**Consulta:** {query}\n\n" | |
"## Resposta Técnica\n" | |
f"{response}\n\n" | |
"**Assinatura Digital:** [Sistema Certificado v2.1]" | |
) | |
with open(filename, 'w', encoding='utf-8') as f: | |
f.write(report) | |
self.logger.info(f"Relatório gerado: {filename}") | |
except Exception as e: | |
self.logger.error(f"Falha ao gerar relatório: {str(e)}") | |
def analyze_metrology_report(self, pdf_path: str) -> str: | |
try: | |
text = self._extract_pdf_text(pdf_path) | |
compliance = self._check_compliance(text) | |
analysis = self._generate_analysis_report(text, compliance) | |
return self._format_compliance_report(analysis, compliance) | |
except Exception as e: | |
self.logger.error(f"Falha na análise: {str(e)}") | |
return self._fallback_procedure("Análise de relatório") | |
def _extract_pdf_text(self, path: str) -> str: | |
reader = PdfReader(path) | |
return '\n'.join([page.extract_text() for page in reader.pages if page.extract_text()]) | |
def _check_compliance(self, text: str) -> Dict: | |
checks = { | |
'rastreabilidade': {'patterns': [r'rastreab[i|í]lidade.*INMETRO'], 'required': True}, | |
'incerteza': {'patterns': [r'incerteza expandida.*≤?\s*\d+'], 'required': True}, | |
'ambiente': {'patterns': [r'temperatura.*23\s*±\s*2\s*°C'], 'required': False}, | |
'normas': {'patterns': [r'ISO/IEC\s+17025'], 'required': True} | |
} | |
results = {} | |
for key, config in checks.items(): | |
found = any(re.search(p, text) for p in config['patterns']) | |
results[key] = { | |
'status': 'OK' if found else 'FALHA' if config['required'] else 'N/A', | |
'critical': config['required'] and not found | |
} | |
return results | |
def _generate_analysis_report(self, text: str, compliance: Dict) -> str: | |
critical = sum(1 for v in compliance.values() if v['critical']) | |
status = "NÃO CONFORME" if critical else "CONFORME" | |
prompt = f"""## Análise de Conformidade Metrológica ## | |
Documento analisado: {text[:2000]}... | |
Resultados: | |
{json.dumps(compliance, indent=2)} | |
## Parecer Técnico ## | |
Emitir parecer considerando: | |
- Status: {status} | |
- Itens críticos: {critical} | |
- Recomendações de adequação""" | |
return self._call_llm_with_retry(prompt) | |
def _format_compliance_report(self, text: str, compliance: Dict) -> str: | |
status = "APROVADO" if not any(v['critical'] for v in compliance.values()) else "REPROVADO" | |
color = Fore.GREEN if status == "APROVADO" else Fore.RED | |
header = f""" | |
{color}▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓ | |
PARECER TÉCNICO - STATUS: {status} | |
▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓{Style.RESET_ALL} | |
""" | |
summary = "## Resumo de Conformidade ##\n" | |
for k, v in compliance.items(): | |
summary += f"• {k.upper()}: {v['status']}\n" | |
return header + summary + "\n" + text | |
def main_menu(): | |
print(Fore.BLUE + "\n🔧 Sistema de Metrologia Inteligente v2.1" + Style.RESET_ALL) | |
print(Fore.CYAN + "Menu Principal:" + Style.RESET_ALL) | |
print("1. Inicializar sistema com documentos PDF") | |
print("2. Consulta técnica") | |
print("3. Analisar relatório PDF") | |
print("4. Gerar relatório completo") | |
print("5. Sair") | |
return input(Fore.YELLOW + "> Selecione uma opção: " + Style.RESET_ALL) |