""" Automated Data Quality Monitoring System Monitors data quality metrics, detects anomalies, and ensures data integrity """ import json import sqlite3 import numpy as np import pandas as pd from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Optional, Any, Tuple from dataclasses import dataclass, asdict from enum import Enum import hashlib import re import statistics class QualityMetricType(Enum): COMPLETENESS = "completeness" ACCURACY = "accuracy" CONSISTENCY = "consistency" VALIDITY = "validity" UNIQUENESS = "uniqueness" TIMELINESS = "timeliness" RELEVANCE = "relevance" class AlertSeverity(Enum): LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical" @dataclass class QualityMetric: """Represents a data quality metric measurement""" metric_id: str dataset_id: str metric_type: QualityMetricType value: float threshold_min: float threshold_max: float measured_at: str passed: bool details: Dict[str, Any] @dataclass class QualityAlert: """Represents a data quality alert""" alert_id: str dataset_id: str metric_type: QualityMetricType severity: AlertSeverity message: str value: float threshold: float created_at: str resolved_at: Optional[str] resolved: bool @dataclass class DatasetProfile: """Statistical profile of a dataset""" dataset_id: str total_records: int total_columns: int null_percentage: float duplicate_percentage: float schema_hash: str last_updated: str column_profiles: Dict[str, Any] class DataQualityMonitor: """Automated data quality monitoring system""" def __init__(self, db_path: str = "data/quality/data_quality.db"): self.db_path = Path(db_path) self.db_path.parent.mkdir(parents=True, exist_ok=True) self._init_database() self.quality_thresholds = self._load_default_thresholds() def _init_database(self): """Initialize the quality monitoring database""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Quality Metrics table cursor.execute(""" CREATE TABLE IF NOT EXISTS quality_metrics ( metric_id TEXT PRIMARY KEY, dataset_id TEXT NOT NULL, metric_type TEXT NOT NULL, value REAL NOT NULL, threshold_min REAL NOT NULL, threshold_max REAL NOT NULL, measured_at TEXT NOT NULL, passed BOOLEAN NOT NULL, details TEXT NOT NULL ) """) # Quality Alerts table cursor.execute(""" CREATE TABLE IF NOT EXISTS quality_alerts ( alert_id TEXT PRIMARY KEY, dataset_id TEXT NOT NULL, metric_type TEXT NOT NULL, severity TEXT NOT NULL, message TEXT NOT NULL, value REAL NOT NULL, threshold REAL NOT NULL, created_at TEXT NOT NULL, resolved_at TEXT, resolved BOOLEAN DEFAULT FALSE ) """) # Dataset Profiles table cursor.execute(""" CREATE TABLE IF NOT EXISTS dataset_profiles ( dataset_id TEXT PRIMARY KEY, total_records INTEGER NOT NULL, total_columns INTEGER NOT NULL, null_percentage REAL NOT NULL, duplicate_percentage REAL NOT NULL, schema_hash TEXT NOT NULL, last_updated TEXT NOT NULL, column_profiles TEXT NOT NULL ) """) # Quality Rules table cursor.execute(""" CREATE TABLE IF NOT EXISTS quality_rules ( rule_id TEXT PRIMARY KEY, dataset_pattern TEXT NOT NULL, metric_type TEXT NOT NULL, threshold_min REAL, threshold_max REAL, severity TEXT NOT NULL, enabled BOOLEAN DEFAULT TRUE, created_at TEXT NOT NULL ) """) # Create indices cursor.execute("CREATE INDEX IF NOT EXISTS idx_metrics_dataset ON quality_metrics(dataset_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_metrics_type ON quality_metrics(metric_type)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_alerts_dataset ON quality_alerts(dataset_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_alerts_severity ON quality_alerts(severity)") conn.commit() conn.close() def _load_default_thresholds(self) -> Dict[str, Dict[str, float]]: """Load default quality thresholds for cybersecurity data""" return { "mitre_attack": { "completeness": {"min": 0.95, "max": 1.0}, "accuracy": {"min": 0.90, "max": 1.0}, "consistency": {"min": 0.85, "max": 1.0}, "validity": {"min": 0.95, "max": 1.0}, "uniqueness": {"min": 0.98, "max": 1.0} }, "cve_data": { "completeness": {"min": 0.90, "max": 1.0}, "accuracy": {"min": 0.95, "max": 1.0}, "timeliness": {"min": 0.80, "max": 1.0}, "validity": {"min": 0.95, "max": 1.0} }, "threat_intel": { "completeness": {"min": 0.85, "max": 1.0}, "accuracy": {"min": 0.90, "max": 1.0}, "timeliness": {"min": 0.90, "max": 1.0}, "relevance": {"min": 0.80, "max": 1.0} }, "red_team_logs": { "completeness": {"min": 0.98, "max": 1.0}, "consistency": {"min": 0.90, "max": 1.0}, "validity": {"min": 0.95, "max": 1.0} } } def measure_completeness(self, data: pd.DataFrame) -> float: """Measure data completeness (percentage of non-null values)""" if data.empty: return 0.0 total_cells = data.shape[0] * data.shape[1] non_null_cells = total_cells - data.isnull().sum().sum() return non_null_cells / total_cells if total_cells > 0 else 0.0 def measure_accuracy(self, data: pd.DataFrame, dataset_type: str) -> float: """Measure data accuracy based on validation rules""" if data.empty: return 0.0 accuracy_score = 1.0 total_checks = 0 failed_checks = 0 # Cybersecurity-specific accuracy checks if dataset_type == "mitre_attack": # Check technique ID format if 'technique_id' in data.columns: technique_pattern = re.compile(r'^T\d{4}(\.\d{3})?$') invalid_ids = ~data['technique_id'].str.match(technique_pattern, na=False) failed_checks += invalid_ids.sum() total_checks += len(data) elif dataset_type == "cve_data": # Check CVE ID format if 'cve_id' in data.columns: cve_pattern = re.compile(r'^CVE-\d{4}-\d{4,}$') invalid_cves = ~data['cve_id'].str.match(cve_pattern, na=False) failed_checks += invalid_cves.sum() total_checks += len(data) elif dataset_type == "threat_intel": # Check IP address format if 'ip_address' in data.columns: ip_pattern = re.compile(r'^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$') invalid_ips = ~data['ip_address'].str.match(ip_pattern, na=False) failed_checks += invalid_ips.sum() total_checks += len(data) # General accuracy checks for column in data.select_dtypes(include=['object']).columns: # Check for suspicious patterns suspicious_patterns = ['