""" Security module for the MONA application. This module provides security features including client-side encryption, data anonymization, secure sessions, GDPR compliance, audit logging, access controls, data validation, and error handling. """ import os import json import base64 import hashlib import secrets import time from datetime import datetime, timedelta from typing import Dict, List, Any, Optional, Union, Tuple import re import uuid # For encryption from Crypto.Cipher import AES from Crypto.Random import get_random_bytes from Crypto.Util.Padding import pad, unpad # For JWT import jwt # For password hashing from passlib.hash import pbkdf2_sha256 # Import utilities from utils.logging import get_logger, log_error from utils.error_handling import handle_exceptions, ValidationError, MonaError from utils.config import DATA_DIR # Initialize logger logger = get_logger(__name__) # Security configuration SECURITY_CONFIG = { "encryption": { "enabled": True, "algorithm": "AES-256-GCM", "key_derivation": "PBKDF2", }, "session": { "timeout": 30, # minutes "jwt_expiry": 24, # hours "refresh_token_expiry": 7, # days }, "password": { "min_length": 8, "require_uppercase": True, "require_lowercase": True, "require_numbers": True, "require_special": True, }, "gdpr": { "enabled": True, "data_retention": 365, # days "anonymize_on_delete": True, }, "audit": { "enabled": True, "log_user_actions": True, "log_admin_actions": True, "log_system_actions": True, } } # Create security directory if it doesn't exist SECURITY_DIR = DATA_DIR / "security" os.makedirs(SECURITY_DIR, exist_ok=True) # Security error class class SecurityError(MonaError): """Exception raised for security-related errors.""" pass # ===== Encryption Functions ===== @handle_exceptions def generate_encryption_key(password: str, salt: Optional[bytes] = None) -> Tuple[bytes, bytes]: """ Generate an encryption key from a password using PBKDF2. Args: password: The password to derive the key from salt: Optional salt for key derivation Returns: Tuple of (key, salt) """ if salt is None: salt = get_random_bytes(16) # Use PBKDF2 to derive a key from the password key = hashlib.pbkdf2_hmac('sha256', password.encode(), salt, 100000, dklen=32) return key, salt @handle_exceptions def encrypt_data(data: Any, password: str) -> Dict[str, str]: """ Encrypt data using AES-GCM with a password-derived key. Args: data: The data to encrypt (will be converted to JSON) password: The password to derive the encryption key from Returns: Dictionary with encrypted data and metadata """ if not SECURITY_CONFIG["encryption"]["enabled"]: logger.warning("Encryption is disabled in configuration") return {"data": json.dumps(data)} try: # Convert data to JSON string data_json = json.dumps(data) # Generate key and salt salt = get_random_bytes(16) key, _ = generate_encryption_key(password, salt) # Generate a random nonce nonce = get_random_bytes(12) # Create cipher cipher = AES.new(key, AES.MODE_GCM, nonce=nonce) # Encrypt the data ciphertext, tag = cipher.encrypt_and_digest(data_json.encode('utf-8')) # Encode binary data as base64 for storage encrypted_data = { "ciphertext": base64.b64encode(ciphertext).decode('utf-8'), "tag": base64.b64encode(tag).decode('utf-8'), "nonce": base64.b64encode(nonce).decode('utf-8'), "salt": base64.b64encode(salt).decode('utf-8'), "algorithm": SECURITY_CONFIG["encryption"]["algorithm"], "timestamp": datetime.now().isoformat() } return encrypted_data except Exception as e: logger.error(f"Encryption error: {str(e)}") raise SecurityError(f"Failed to encrypt data: {str(e)}") @handle_exceptions def decrypt_data(encrypted_data: Dict[str, str], password: str) -> Any: """ Decrypt data that was encrypted with encrypt_data(). Args: encrypted_data: Dictionary with encrypted data and metadata password: The password used for encryption Returns: The decrypted data """ # Check if data is actually encrypted if "data" in encrypted_data and len(encrypted_data) == 1: # Data is not encrypted return json.loads(encrypted_data["data"]) try: # Decode the base64 encoded data ciphertext = base64.b64decode(encrypted_data["ciphertext"]) tag = base64.b64decode(encrypted_data["tag"]) nonce = base64.b64decode(encrypted_data["nonce"]) salt = base64.b64decode(encrypted_data["salt"]) # Derive the key from the password and salt key, _ = generate_encryption_key(password, salt) # Create cipher cipher = AES.new(key, AES.MODE_GCM, nonce=nonce) # Decrypt the data data_json = cipher.decrypt_and_verify(ciphertext, tag).decode('utf-8') # Parse the JSON data return json.loads(data_json) except (ValueError, KeyError) as e: logger.error(f"Decryption error: {str(e)}") raise SecurityError(f"Failed to decrypt data: Invalid password or corrupted data") except Exception as e: logger.error(f"Decryption error: {str(e)}") raise SecurityError(f"Failed to decrypt data: {str(e)}") # ===== Password Management ===== @handle_exceptions def hash_password(password: str) -> str: """ Hash a password using PBKDF2-SHA256. Args: password: The password to hash Returns: The hashed password """ return pbkdf2_sha256.hash(password) @handle_exceptions def verify_password(password: str, password_hash: str) -> bool: """ Verify a password against a hash. Args: password: The password to verify password_hash: The hash to verify against Returns: True if the password matches the hash, False otherwise """ return pbkdf2_sha256.verify(password, password_hash) @handle_exceptions def validate_password_strength(password: str) -> Tuple[bool, str]: """ Validate password strength based on security configuration. Args: password: The password to validate Returns: Tuple of (is_valid, message) """ config = SECURITY_CONFIG["password"] errors = [] # Check length if len(password) < config["min_length"]: errors.append(f"Password must be at least {config['min_length']} characters long") # Check for uppercase letters if config["require_uppercase"] and not any(c.isupper() for c in password): errors.append("Password must contain at least one uppercase letter") # Check for lowercase letters if config["require_lowercase"] and not any(c.islower() for c in password): errors.append("Password must contain at least one lowercase letter") # Check for numbers if config["require_numbers"] and not any(c.isdigit() for c in password): errors.append("Password must contain at least one number") # Check for special characters if config["require_special"] and not any(not c.isalnum() for c in password): errors.append("Password must contain at least one special character") if errors: return False, "\n".join(errors) return True, "Password meets strength requirements" # ===== Session Management ===== @handle_exceptions def generate_session_token() -> str: """ Generate a secure random session token. Returns: A secure random token """ return secrets.token_urlsafe(32) @handle_exceptions def create_jwt_token(user_id: str, additional_data: Optional[Dict[str, Any]] = None) -> str: """ Create a JWT token for user authentication. Args: user_id: The user ID to include in the token additional_data: Additional data to include in the token Returns: A JWT token string """ # Set expiration time based on configuration expiry_hours = SECURITY_CONFIG["session"]["jwt_expiry"] expiry = datetime.now() + timedelta(hours=expiry_hours) # Create the payload payload = { "sub": user_id, "iat": datetime.now(), "exp": expiry, "jti": str(uuid.uuid4()) } # Add additional data if provided if additional_data: payload.update(additional_data) # Create a secret key (in production, this should be stored securely) # For this implementation, we'll derive it from a fixed string # In a real application, use a proper secret key management system secret_key = hashlib.sha256(b"MONA_JWT_SECRET_KEY").digest() # Create the token token = jwt.encode(payload, secret_key, algorithm="HS256") return token @handle_exceptions def verify_jwt_token(token: str) -> Dict[str, Any]: """ Verify a JWT token and return its payload. Args: token: The JWT token to verify Returns: The token payload if valid Raises: SecurityError: If the token is invalid or expired """ try: # Get the secret key (same as in create_jwt_token) secret_key = hashlib.sha256(b"MONA_JWT_SECRET_KEY").digest() # Decode and verify the token payload = jwt.decode(token, secret_key, algorithms=["HS256"]) return payload except jwt.ExpiredSignatureError: raise SecurityError("Token has expired") except jwt.InvalidTokenError as e: raise SecurityError(f"Invalid token: {str(e)}") # ===== Data Anonymization ===== @handle_exceptions def anonymize_data(data: Dict[str, Any], fields_to_anonymize: List[str]) -> Dict[str, Any]: """ Anonymize sensitive data fields. Args: data: The data to anonymize fields_to_anonymize: List of field names to anonymize Returns: Anonymized data """ anonymized_data = data.copy() for field in fields_to_anonymize: if field in anonymized_data: # Check if the field is a nested path (e.g., "user.email") if '.' in field: parts = field.split('.') current = anonymized_data for part in parts[:-1]: if part in current and isinstance(current[part], dict): current = current[part] else: break else: # If we got here, we found the parent object last_part = parts[-1] if last_part in current: # Anonymize the field based on its type current[last_part] = _anonymize_value(current[last_part]) else: # Anonymize the field based on its type anonymized_data[field] = _anonymize_value(anonymized_data[field]) return anonymized_data def _anonymize_value(value: Any) -> Any: """ Anonymize a single value based on its type. Args: value: The value to anonymize Returns: Anonymized value """ if isinstance(value, str): # For emails, replace with a hash if '@' in value and '.' in value.split('@')[1]: username, domain = value.split('@') hashed_username = hashlib.sha256(username.encode()).hexdigest()[:8] return f"{hashed_username}@{domain}" # For other strings, replace with a fixed-length hash return hashlib.sha256(value.encode()).hexdigest()[:len(value)] elif isinstance(value, int): # For numbers, replace with a random number of similar magnitude return 0 # For complete anonymization elif isinstance(value, list): # For lists, anonymize each element return [_anonymize_value(item) for item in value] elif isinstance(value, dict): # For dictionaries, anonymize each value return {k: _anonymize_value(v) for k, v in value.items()} # For other types, return None return None @handle_exceptions def pseudonymize_user_data(user_data: Dict[str, Any], user_id: str) -> Dict[str, Any]: """ Pseudonymize user data by replacing identifiers with pseudonyms. Args: user_data: The user data to pseudonymize user_id: The user ID to use for generating pseudonyms Returns: Pseudonymized user data """ # Fields that should be pseudonymized pseudonymize_fields = [ "name", "email", "phone", "address", "ip_address", "user_agent" ] # Create a pseudonym mapping file if it doesn't exist pseudonym_file = SECURITY_DIR / "pseudonyms.json" if not os.path.exists(pseudonym_file): with open(pseudonym_file, 'w') as f: json.dump({}, f) # Load existing pseudonyms with open(pseudonym_file, 'r') as f: pseudonyms = json.load(f) # Create user pseudonyms if they don't exist if user_id not in pseudonyms: pseudonyms[user_id] = {} # Pseudonymize the data pseudonymized_data = user_data.copy() for field in pseudonymize_fields: if field in user_data: # Generate a pseudonym if it doesn't exist if field not in pseudonyms[user_id]: # Create a deterministic but non-reversible pseudonym value = str(user_data[field]) pseudonym = hashlib.sha256(f"{user_id}:{field}:{value}".encode()).hexdigest()[:16] pseudonyms[user_id][field] = pseudonym # Replace the value with the pseudonym pseudonymized_data[field] = pseudonyms[user_id][field] # Save the updated pseudonyms with open(pseudonym_file, 'w') as f: json.dump(pseudonyms, f) return pseudonymized_data # ===== Audit Logging ===== @handle_exceptions def log_audit_event(event_type: str, user_id: Optional[str] = None, details: Optional[Dict[str, Any]] = None, success: bool = True) -> None: """ Log an audit event for security monitoring. Args: event_type: Type of event (e.g., 'login', 'data_access', 'settings_change') user_id: ID of the user performing the action (if applicable) details: Additional details about the event success: Whether the action was successful """ if not SECURITY_CONFIG["audit"]["enabled"]: return # Create audit log directory if it doesn't exist audit_log_dir = SECURITY_DIR / "audit_logs" os.makedirs(audit_log_dir, exist_ok=True) # Get the current date for log file name current_date = datetime.now().strftime("%Y-%m-%d") log_file = audit_log_dir / f"audit_{current_date}.log" # Create the audit event audit_event = { "timestamp": datetime.now().isoformat(), "event_type": event_type, "user_id": user_id, "details": details or {}, "success": success, "ip_address": "127.0.0.1", # In a real app, get the actual IP "user_agent": "MONA App" # In a real app, get the actual user agent } # Log the event with open(log_file, 'a') as f: f.write(json.dumps(audit_event) + "\n") # Also log to the application logger log_level = "info" if success else "warning" log_message = f"AUDIT: {event_type} by {user_id or 'system'} - {'Success' if success else 'Failed'}" if log_level == "info": logger.info(log_message) else: logger.warning(log_message) # ===== Access Control ===== @handle_exceptions def check_permission(user_id: str, resource: str, action: str) -> bool: """ Check if a user has permission to perform an action on a resource. Args: user_id: The ID of the user resource: The resource being accessed (e.g., 'data', 'settings') action: The action being performed (e.g., 'read', 'write', 'delete') Returns: True if the user has permission, False otherwise """ # In a real application, this would check against a permissions database # For this implementation, we'll use a simple role-based approach # Load user roles (in a real app, this would come from a database) roles_file = SECURITY_DIR / "roles.json" # Create default roles if the file doesn't exist if not os.path.exists(roles_file): default_roles = { "admin": { "users": ["admin_user_id"], "permissions": {"*": ["*"]} # All permissions on all resources }, "user": { "users": [], "permissions": { "data": ["read", "write"], "settings": ["read"], "backups": ["read"] } } } with open(roles_file, 'w') as f: json.dump(default_roles, f, indent=4) # Load roles with open(roles_file, 'r') as f: roles = json.load(f) # Check if the user has admin role (all permissions) for role_name, role_data in roles.items(): if user_id in role_data["users"]: permissions = role_data["permissions"] # Check for wildcard permissions if "*" in permissions and "*" in permissions["*"]: return True # Check for resource-specific wildcard if resource in permissions and "*" in permissions[resource]: return True # Check for specific permission if resource in permissions and action in permissions[resource]: return True # Default to denying access return False # ===== Data Validation ===== @handle_exceptions def validate_email(email: str) -> bool: """ Validate an email address format. Args: email: The email address to validate Returns: True if the email is valid, False otherwise """ # Basic email validation regex email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' return bool(re.match(email_pattern, email)) @handle_exceptions def sanitize_input(input_str: str) -> str: """ Sanitize user input to prevent injection attacks. Args: input_str: The input string to sanitize Returns: Sanitized string """ # Remove potentially dangerous HTML/script tags sanitized = re.sub(r'.*?', '', input_str, flags=re.DOTALL | re.IGNORECASE) sanitized = re.sub(r'<.*?>', '', sanitized) # Escape special characters sanitized = sanitized.replace('&', '&') sanitized = sanitized.replace('<', '<') sanitized = sanitized.replace('>', '>') sanitized = sanitized.replace('"', '"') sanitized = sanitized.replace("'", ''') return sanitized @handle_exceptions def validate_data_schema(data: Dict[str, Any], schema: Dict[str, Any]) -> Tuple[bool, List[str]]: """ Validate data against a schema. Args: data: The data to validate schema: The schema to validate against Returns: Tuple of (is_valid, error_messages) """ errors = [] # Check required fields for field, field_schema in schema.items(): if field_schema.get("required", False) and field not in data: errors.append(f"Missing required field: {field}") # Validate field types and values for field, value in data.items(): if field in schema: field_schema = schema[field] # Check type expected_type = field_schema.get("type") if expected_type: if expected_type == "string" and not isinstance(value, str): errors.append(f"Field {field} must be a string") elif expected_type == "number" and not isinstance(value, (int, float)): errors.append(f"Field {field} must be a number") elif expected_type == "boolean" and not isinstance(value, bool): errors.append(f"Field {field} must be a boolean") elif expected_type == "array" and not isinstance(value, list): errors.append(f"Field {field} must be an array") elif expected_type == "object" and not isinstance(value, dict): errors.append(f"Field {field} must be an object") # Check pattern for strings if isinstance(value, str) and "pattern" in field_schema: pattern = field_schema["pattern"] if not re.match(pattern, value): errors.append(f"Field {field} does not match required pattern") # Check min/max for numbers if isinstance(value, (int, float)): if "minimum" in field_schema and value < field_schema["minimum"]: errors.append(f"Field {field} must be at least {field_schema['minimum']}") if "maximum" in field_schema and value > field_schema["maximum"]: errors.append(f"Field {field} must be at most {field_schema['maximum']}") # Check min/max length for strings and arrays if isinstance(value, (str, list)): if "minLength" in field_schema and len(value) < field_schema["minLength"]: errors.append(f"Field {field} must have at least {field_schema['minLength']} items") if "maxLength" in field_schema and len(value) > field_schema["maxLength"]: errors.append(f"Field {field} must have at most {field_schema['maxLength']} items") # Check enum values if "enum" in field_schema and value not in field_schema["enum"]: errors.append(f"Field {field} must be one of: {', '.join(map(str, field_schema['enum']))}") return len(errors) == 0, errors # ===== GDPR Compliance ===== @handle_exceptions def create_data_export(user_id: str) -> str: """ Create a GDPR-compliant export of all user data. Args: user_id: The ID of the user requesting their data Returns: Path to the exported data file """ if not SECURITY_CONFIG["gdpr"]["enabled"]: raise SecurityError("GDPR features are not enabled") # Create exports directory if it doesn't exist exports_dir = SECURITY_DIR / "exports" os.makedirs(exports_dir, exist_ok=True) # Generate a unique filename timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") export_file = exports_dir / f"data_export_{user_id}_{timestamp}.json" # In a real application, this would gather all user data from various sources # For this implementation, we'll create a sample export # Log the export for audit purposes log_audit_event("data_export", user_id, {"file": str(export_file)}) # Sample user data (in a real app, this would be fetched from databases) user_data = { "user_id": user_id, "export_date": datetime.now().isoformat(), "data_categories": { "profile": { "description": "Your profile information", "last_updated": datetime.now().isoformat(), "data": {} }, "activity": { "description": "Your activity history", "last_updated": datetime.now().isoformat(), "data": [] }, "settings": { "description": "Your application settings", "last_updated": datetime.now().isoformat(), "data": {} } } } # Write the export file with open(export_file, 'w') as f: json.dump(user_data, f, indent=4) return str(export_file) @handle_exceptions def delete_user_data(user_id: str, delete_type: str = "anonymize") -> bool: """ Delete or anonymize all data for a user (GDPR right to be forgotten). Args: user_id: The ID of the user whose data should be deleted delete_type: Type of deletion ('anonymize' or 'complete') Returns: True if successful """ if not SECURITY_CONFIG["gdpr"]["enabled"]: raise SecurityError("GDPR features are not enabled") # Log the deletion request for audit purposes log_audit_event("data_deletion", user_id, {"delete_type": delete_type}) # In a real application, this would delete or anonymize user data from all sources # For this implementation, we'll just log the action if delete_type == "anonymize" and SECURITY_CONFIG["gdpr"]["anonymize_on_delete"]: logger.info(f"Anonymizing all data for user {user_id}") # In a real app, this would anonymize all user data else: logger.info(f"Completely deleting all data for user {user_id}") # In a real app, this would delete all user data # Remove user from pseudonyms file if it exists pseudonym_file = SECURITY_DIR / "pseudonyms.json" if os.path.exists(pseudonym_file): with open(pseudonym_file, 'r') as f: pseudonyms = json.load(f) if user_id in pseudonyms: del pseudonyms[user_id] with open(pseudonym_file, 'w') as f: json.dump(pseudonyms, f) return True