|
""" |
|
Security module for the MONA application. |
|
|
|
This module provides security features including client-side encryption, |
|
data anonymization, secure sessions, GDPR compliance, audit logging, |
|
access controls, data validation, and error handling. |
|
""" |
|
|
|
import os |
|
import json |
|
import base64 |
|
import hashlib |
|
import secrets |
|
import time |
|
from datetime import datetime, timedelta |
|
from typing import Dict, List, Any, Optional, Union, Tuple |
|
import re |
|
import uuid |
|
|
|
|
|
from Crypto.Cipher import AES |
|
from Crypto.Random import get_random_bytes |
|
from Crypto.Util.Padding import pad, unpad |
|
|
|
|
|
import jwt |
|
|
|
|
|
from passlib.hash import pbkdf2_sha256 |
|
|
|
|
|
from utils.logging import get_logger, log_error |
|
from utils.error_handling import handle_exceptions, ValidationError, MonaError |
|
from utils.config import DATA_DIR |
|
|
|
|
|
logger = get_logger(__name__) |
|
|
|
|
|
SECURITY_CONFIG = { |
|
"encryption": { |
|
"enabled": True, |
|
"algorithm": "AES-256-GCM", |
|
"key_derivation": "PBKDF2", |
|
}, |
|
"session": { |
|
"timeout": 30, |
|
"jwt_expiry": 24, |
|
"refresh_token_expiry": 7, |
|
}, |
|
"password": { |
|
"min_length": 8, |
|
"require_uppercase": True, |
|
"require_lowercase": True, |
|
"require_numbers": True, |
|
"require_special": True, |
|
}, |
|
"gdpr": { |
|
"enabled": True, |
|
"data_retention": 365, |
|
"anonymize_on_delete": True, |
|
}, |
|
"audit": { |
|
"enabled": True, |
|
"log_user_actions": True, |
|
"log_admin_actions": True, |
|
"log_system_actions": True, |
|
} |
|
} |
|
|
|
|
|
SECURITY_DIR = DATA_DIR / "security" |
|
os.makedirs(SECURITY_DIR, exist_ok=True) |
|
|
|
|
|
class SecurityError(MonaError): |
|
"""Exception raised for security-related errors.""" |
|
pass |
|
|
|
|
|
|
|
|
|
@handle_exceptions |
|
def generate_encryption_key(password: str, salt: Optional[bytes] = None) -> Tuple[bytes, bytes]: |
|
""" |
|
Generate an encryption key from a password using PBKDF2. |
|
|
|
Args: |
|
password: The password to derive the key from |
|
salt: Optional salt for key derivation |
|
|
|
Returns: |
|
Tuple of (key, salt) |
|
""" |
|
if salt is None: |
|
salt = get_random_bytes(16) |
|
|
|
|
|
key = hashlib.pbkdf2_hmac('sha256', password.encode(), salt, 100000, dklen=32) |
|
|
|
return key, salt |
|
|
|
|
|
@handle_exceptions |
|
def encrypt_data(data: Any, password: str) -> Dict[str, str]: |
|
""" |
|
Encrypt data using AES-GCM with a password-derived key. |
|
|
|
Args: |
|
data: The data to encrypt (will be converted to JSON) |
|
password: The password to derive the encryption key from |
|
|
|
Returns: |
|
Dictionary with encrypted data and metadata |
|
""" |
|
if not SECURITY_CONFIG["encryption"]["enabled"]: |
|
logger.warning("Encryption is disabled in configuration") |
|
return {"data": json.dumps(data)} |
|
|
|
try: |
|
|
|
data_json = json.dumps(data) |
|
|
|
|
|
salt = get_random_bytes(16) |
|
key, _ = generate_encryption_key(password, salt) |
|
|
|
|
|
nonce = get_random_bytes(12) |
|
|
|
|
|
cipher = AES.new(key, AES.MODE_GCM, nonce=nonce) |
|
|
|
|
|
ciphertext, tag = cipher.encrypt_and_digest(data_json.encode('utf-8')) |
|
|
|
|
|
encrypted_data = { |
|
"ciphertext": base64.b64encode(ciphertext).decode('utf-8'), |
|
"tag": base64.b64encode(tag).decode('utf-8'), |
|
"nonce": base64.b64encode(nonce).decode('utf-8'), |
|
"salt": base64.b64encode(salt).decode('utf-8'), |
|
"algorithm": SECURITY_CONFIG["encryption"]["algorithm"], |
|
"timestamp": datetime.now().isoformat() |
|
} |
|
|
|
return encrypted_data |
|
|
|
except Exception as e: |
|
logger.error(f"Encryption error: {str(e)}") |
|
raise SecurityError(f"Failed to encrypt data: {str(e)}") |
|
|
|
|
|
@handle_exceptions |
|
def decrypt_data(encrypted_data: Dict[str, str], password: str) -> Any: |
|
""" |
|
Decrypt data that was encrypted with encrypt_data(). |
|
|
|
Args: |
|
encrypted_data: Dictionary with encrypted data and metadata |
|
password: The password used for encryption |
|
|
|
Returns: |
|
The decrypted data |
|
""" |
|
|
|
if "data" in encrypted_data and len(encrypted_data) == 1: |
|
|
|
return json.loads(encrypted_data["data"]) |
|
|
|
try: |
|
|
|
ciphertext = base64.b64decode(encrypted_data["ciphertext"]) |
|
tag = base64.b64decode(encrypted_data["tag"]) |
|
nonce = base64.b64decode(encrypted_data["nonce"]) |
|
salt = base64.b64decode(encrypted_data["salt"]) |
|
|
|
|
|
key, _ = generate_encryption_key(password, salt) |
|
|
|
|
|
cipher = AES.new(key, AES.MODE_GCM, nonce=nonce) |
|
|
|
|
|
data_json = cipher.decrypt_and_verify(ciphertext, tag).decode('utf-8') |
|
|
|
|
|
return json.loads(data_json) |
|
|
|
except (ValueError, KeyError) as e: |
|
logger.error(f"Decryption error: {str(e)}") |
|
raise SecurityError(f"Failed to decrypt data: Invalid password or corrupted data") |
|
|
|
except Exception as e: |
|
logger.error(f"Decryption error: {str(e)}") |
|
raise SecurityError(f"Failed to decrypt data: {str(e)}") |
|
|
|
|
|
|
|
|
|
@handle_exceptions |
|
def hash_password(password: str) -> str: |
|
""" |
|
Hash a password using PBKDF2-SHA256. |
|
|
|
Args: |
|
password: The password to hash |
|
|
|
Returns: |
|
The hashed password |
|
""" |
|
return pbkdf2_sha256.hash(password) |
|
|
|
|
|
@handle_exceptions |
|
def verify_password(password: str, password_hash: str) -> bool: |
|
""" |
|
Verify a password against a hash. |
|
|
|
Args: |
|
password: The password to verify |
|
password_hash: The hash to verify against |
|
|
|
Returns: |
|
True if the password matches the hash, False otherwise |
|
""" |
|
return pbkdf2_sha256.verify(password, password_hash) |
|
|
|
|
|
@handle_exceptions |
|
def validate_password_strength(password: str) -> Tuple[bool, str]: |
|
""" |
|
Validate password strength based on security configuration. |
|
|
|
Args: |
|
password: The password to validate |
|
|
|
Returns: |
|
Tuple of (is_valid, message) |
|
""" |
|
config = SECURITY_CONFIG["password"] |
|
errors = [] |
|
|
|
|
|
if len(password) < config["min_length"]: |
|
errors.append(f"Password must be at least {config['min_length']} characters long") |
|
|
|
|
|
if config["require_uppercase"] and not any(c.isupper() for c in password): |
|
errors.append("Password must contain at least one uppercase letter") |
|
|
|
|
|
if config["require_lowercase"] and not any(c.islower() for c in password): |
|
errors.append("Password must contain at least one lowercase letter") |
|
|
|
|
|
if config["require_numbers"] and not any(c.isdigit() for c in password): |
|
errors.append("Password must contain at least one number") |
|
|
|
|
|
if config["require_special"] and not any(not c.isalnum() for c in password): |
|
errors.append("Password must contain at least one special character") |
|
|
|
if errors: |
|
return False, "\n".join(errors) |
|
|
|
return True, "Password meets strength requirements" |
|
|
|
|
|
|
|
|
|
@handle_exceptions |
|
def generate_session_token() -> str: |
|
""" |
|
Generate a secure random session token. |
|
|
|
Returns: |
|
A secure random token |
|
""" |
|
return secrets.token_urlsafe(32) |
|
|
|
|
|
@handle_exceptions |
|
def create_jwt_token(user_id: str, additional_data: Optional[Dict[str, Any]] = None) -> str: |
|
""" |
|
Create a JWT token for user authentication. |
|
|
|
Args: |
|
user_id: The user ID to include in the token |
|
additional_data: Additional data to include in the token |
|
|
|
Returns: |
|
A JWT token string |
|
""" |
|
|
|
expiry_hours = SECURITY_CONFIG["session"]["jwt_expiry"] |
|
expiry = datetime.now() + timedelta(hours=expiry_hours) |
|
|
|
|
|
payload = { |
|
"sub": user_id, |
|
"iat": datetime.now(), |
|
"exp": expiry, |
|
"jti": str(uuid.uuid4()) |
|
} |
|
|
|
|
|
if additional_data: |
|
payload.update(additional_data) |
|
|
|
|
|
|
|
|
|
secret_key = hashlib.sha256(b"MONA_JWT_SECRET_KEY").digest() |
|
|
|
|
|
token = jwt.encode(payload, secret_key, algorithm="HS256") |
|
|
|
return token |
|
|
|
|
|
@handle_exceptions |
|
def verify_jwt_token(token: str) -> Dict[str, Any]: |
|
""" |
|
Verify a JWT token and return its payload. |
|
|
|
Args: |
|
token: The JWT token to verify |
|
|
|
Returns: |
|
The token payload if valid |
|
|
|
Raises: |
|
SecurityError: If the token is invalid or expired |
|
""" |
|
try: |
|
|
|
secret_key = hashlib.sha256(b"MONA_JWT_SECRET_KEY").digest() |
|
|
|
|
|
payload = jwt.decode(token, secret_key, algorithms=["HS256"]) |
|
|
|
return payload |
|
|
|
except jwt.ExpiredSignatureError: |
|
raise SecurityError("Token has expired") |
|
|
|
except jwt.InvalidTokenError as e: |
|
raise SecurityError(f"Invalid token: {str(e)}") |
|
|
|
|
|
|
|
|
|
@handle_exceptions |
|
def anonymize_data(data: Dict[str, Any], fields_to_anonymize: List[str]) -> Dict[str, Any]: |
|
""" |
|
Anonymize sensitive data fields. |
|
|
|
Args: |
|
data: The data to anonymize |
|
fields_to_anonymize: List of field names to anonymize |
|
|
|
Returns: |
|
Anonymized data |
|
""" |
|
anonymized_data = data.copy() |
|
|
|
for field in fields_to_anonymize: |
|
if field in anonymized_data: |
|
|
|
if '.' in field: |
|
parts = field.split('.') |
|
current = anonymized_data |
|
for part in parts[:-1]: |
|
if part in current and isinstance(current[part], dict): |
|
current = current[part] |
|
else: |
|
break |
|
else: |
|
|
|
last_part = parts[-1] |
|
if last_part in current: |
|
|
|
current[last_part] = _anonymize_value(current[last_part]) |
|
else: |
|
|
|
anonymized_data[field] = _anonymize_value(anonymized_data[field]) |
|
|
|
return anonymized_data |
|
|
|
|
|
def _anonymize_value(value: Any) -> Any: |
|
""" |
|
Anonymize a single value based on its type. |
|
|
|
Args: |
|
value: The value to anonymize |
|
|
|
Returns: |
|
Anonymized value |
|
""" |
|
if isinstance(value, str): |
|
|
|
if '@' in value and '.' in value.split('@')[1]: |
|
username, domain = value.split('@') |
|
hashed_username = hashlib.sha256(username.encode()).hexdigest()[:8] |
|
return f"{hashed_username}@{domain}" |
|
|
|
|
|
return hashlib.sha256(value.encode()).hexdigest()[:len(value)] |
|
|
|
elif isinstance(value, int): |
|
|
|
return 0 |
|
|
|
elif isinstance(value, list): |
|
|
|
return [_anonymize_value(item) for item in value] |
|
|
|
elif isinstance(value, dict): |
|
|
|
return {k: _anonymize_value(v) for k, v in value.items()} |
|
|
|
|
|
return None |
|
|
|
|
|
@handle_exceptions |
|
def pseudonymize_user_data(user_data: Dict[str, Any], user_id: str) -> Dict[str, Any]: |
|
""" |
|
Pseudonymize user data by replacing identifiers with pseudonyms. |
|
|
|
Args: |
|
user_data: The user data to pseudonymize |
|
user_id: The user ID to use for generating pseudonyms |
|
|
|
Returns: |
|
Pseudonymized user data |
|
""" |
|
|
|
pseudonymize_fields = [ |
|
"name", "email", "phone", "address", "ip_address", "user_agent" |
|
] |
|
|
|
|
|
pseudonym_file = SECURITY_DIR / "pseudonyms.json" |
|
if not os.path.exists(pseudonym_file): |
|
with open(pseudonym_file, 'w') as f: |
|
json.dump({}, f) |
|
|
|
|
|
with open(pseudonym_file, 'r') as f: |
|
pseudonyms = json.load(f) |
|
|
|
|
|
if user_id not in pseudonyms: |
|
pseudonyms[user_id] = {} |
|
|
|
|
|
pseudonymized_data = user_data.copy() |
|
|
|
for field in pseudonymize_fields: |
|
if field in user_data: |
|
|
|
if field not in pseudonyms[user_id]: |
|
|
|
value = str(user_data[field]) |
|
pseudonym = hashlib.sha256(f"{user_id}:{field}:{value}".encode()).hexdigest()[:16] |
|
pseudonyms[user_id][field] = pseudonym |
|
|
|
|
|
pseudonymized_data[field] = pseudonyms[user_id][field] |
|
|
|
|
|
with open(pseudonym_file, 'w') as f: |
|
json.dump(pseudonyms, f) |
|
|
|
return pseudonymized_data |
|
|
|
|
|
|
|
|
|
@handle_exceptions |
|
def log_audit_event(event_type: str, user_id: Optional[str] = None, |
|
details: Optional[Dict[str, Any]] = None, |
|
success: bool = True) -> None: |
|
""" |
|
Log an audit event for security monitoring. |
|
|
|
Args: |
|
event_type: Type of event (e.g., 'login', 'data_access', 'settings_change') |
|
user_id: ID of the user performing the action (if applicable) |
|
details: Additional details about the event |
|
success: Whether the action was successful |
|
""" |
|
if not SECURITY_CONFIG["audit"]["enabled"]: |
|
return |
|
|
|
|
|
audit_log_dir = SECURITY_DIR / "audit_logs" |
|
os.makedirs(audit_log_dir, exist_ok=True) |
|
|
|
|
|
current_date = datetime.now().strftime("%Y-%m-%d") |
|
log_file = audit_log_dir / f"audit_{current_date}.log" |
|
|
|
|
|
audit_event = { |
|
"timestamp": datetime.now().isoformat(), |
|
"event_type": event_type, |
|
"user_id": user_id, |
|
"details": details or {}, |
|
"success": success, |
|
"ip_address": "127.0.0.1", |
|
"user_agent": "MONA App" |
|
} |
|
|
|
|
|
with open(log_file, 'a') as f: |
|
f.write(json.dumps(audit_event) + "\n") |
|
|
|
|
|
log_level = "info" if success else "warning" |
|
log_message = f"AUDIT: {event_type} by {user_id or 'system'} - {'Success' if success else 'Failed'}" |
|
|
|
if log_level == "info": |
|
logger.info(log_message) |
|
else: |
|
logger.warning(log_message) |
|
|
|
|
|
|
|
|
|
@handle_exceptions |
|
def check_permission(user_id: str, resource: str, action: str) -> bool: |
|
""" |
|
Check if a user has permission to perform an action on a resource. |
|
|
|
Args: |
|
user_id: The ID of the user |
|
resource: The resource being accessed (e.g., 'data', 'settings') |
|
action: The action being performed (e.g., 'read', 'write', 'delete') |
|
|
|
Returns: |
|
True if the user has permission, False otherwise |
|
""" |
|
|
|
|
|
|
|
|
|
roles_file = SECURITY_DIR / "roles.json" |
|
|
|
|
|
if not os.path.exists(roles_file): |
|
default_roles = { |
|
"admin": { |
|
"users": ["admin_user_id"], |
|
"permissions": {"*": ["*"]} |
|
}, |
|
"user": { |
|
"users": [], |
|
"permissions": { |
|
"data": ["read", "write"], |
|
"settings": ["read"], |
|
"backups": ["read"] |
|
} |
|
} |
|
} |
|
|
|
with open(roles_file, 'w') as f: |
|
json.dump(default_roles, f, indent=4) |
|
|
|
|
|
with open(roles_file, 'r') as f: |
|
roles = json.load(f) |
|
|
|
|
|
for role_name, role_data in roles.items(): |
|
if user_id in role_data["users"]: |
|
permissions = role_data["permissions"] |
|
|
|
|
|
if "*" in permissions and "*" in permissions["*"]: |
|
return True |
|
|
|
|
|
if resource in permissions and "*" in permissions[resource]: |
|
return True |
|
|
|
|
|
if resource in permissions and action in permissions[resource]: |
|
return True |
|
|
|
|
|
return False |
|
|
|
|
|
|
|
|
|
@handle_exceptions |
|
def validate_email(email: str) -> bool: |
|
""" |
|
Validate an email address format. |
|
|
|
Args: |
|
email: The email address to validate |
|
|
|
Returns: |
|
True if the email is valid, False otherwise |
|
""" |
|
|
|
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' |
|
return bool(re.match(email_pattern, email)) |
|
|
|
|
|
@handle_exceptions |
|
def sanitize_input(input_str: str) -> str: |
|
""" |
|
Sanitize user input to prevent injection attacks. |
|
|
|
Args: |
|
input_str: The input string to sanitize |
|
|
|
Returns: |
|
Sanitized string |
|
""" |
|
|
|
sanitized = re.sub(r'<script.*?>.*?</script>', '', input_str, flags=re.DOTALL | re.IGNORECASE) |
|
sanitized = re.sub(r'<.*?>', '', sanitized) |
|
|
|
|
|
sanitized = sanitized.replace('&', '&') |
|
sanitized = sanitized.replace('<', '<') |
|
sanitized = sanitized.replace('>', '>') |
|
sanitized = sanitized.replace('"', '"') |
|
sanitized = sanitized.replace("'", ''') |
|
|
|
return sanitized |
|
|
|
|
|
@handle_exceptions |
|
def validate_data_schema(data: Dict[str, Any], schema: Dict[str, Any]) -> Tuple[bool, List[str]]: |
|
""" |
|
Validate data against a schema. |
|
|
|
Args: |
|
data: The data to validate |
|
schema: The schema to validate against |
|
|
|
Returns: |
|
Tuple of (is_valid, error_messages) |
|
""" |
|
errors = [] |
|
|
|
|
|
for field, field_schema in schema.items(): |
|
if field_schema.get("required", False) and field not in data: |
|
errors.append(f"Missing required field: {field}") |
|
|
|
|
|
for field, value in data.items(): |
|
if field in schema: |
|
field_schema = schema[field] |
|
|
|
|
|
expected_type = field_schema.get("type") |
|
if expected_type: |
|
if expected_type == "string" and not isinstance(value, str): |
|
errors.append(f"Field {field} must be a string") |
|
elif expected_type == "number" and not isinstance(value, (int, float)): |
|
errors.append(f"Field {field} must be a number") |
|
elif expected_type == "boolean" and not isinstance(value, bool): |
|
errors.append(f"Field {field} must be a boolean") |
|
elif expected_type == "array" and not isinstance(value, list): |
|
errors.append(f"Field {field} must be an array") |
|
elif expected_type == "object" and not isinstance(value, dict): |
|
errors.append(f"Field {field} must be an object") |
|
|
|
|
|
if isinstance(value, str) and "pattern" in field_schema: |
|
pattern = field_schema["pattern"] |
|
if not re.match(pattern, value): |
|
errors.append(f"Field {field} does not match required pattern") |
|
|
|
|
|
if isinstance(value, (int, float)): |
|
if "minimum" in field_schema and value < field_schema["minimum"]: |
|
errors.append(f"Field {field} must be at least {field_schema['minimum']}") |
|
if "maximum" in field_schema and value > field_schema["maximum"]: |
|
errors.append(f"Field {field} must be at most {field_schema['maximum']}") |
|
|
|
|
|
if isinstance(value, (str, list)): |
|
if "minLength" in field_schema and len(value) < field_schema["minLength"]: |
|
errors.append(f"Field {field} must have at least {field_schema['minLength']} items") |
|
if "maxLength" in field_schema and len(value) > field_schema["maxLength"]: |
|
errors.append(f"Field {field} must have at most {field_schema['maxLength']} items") |
|
|
|
|
|
if "enum" in field_schema and value not in field_schema["enum"]: |
|
errors.append(f"Field {field} must be one of: {', '.join(map(str, field_schema['enum']))}") |
|
|
|
return len(errors) == 0, errors |
|
|
|
|
|
|
|
|
|
@handle_exceptions |
|
def create_data_export(user_id: str) -> str: |
|
""" |
|
Create a GDPR-compliant export of all user data. |
|
|
|
Args: |
|
user_id: The ID of the user requesting their data |
|
|
|
Returns: |
|
Path to the exported data file |
|
""" |
|
if not SECURITY_CONFIG["gdpr"]["enabled"]: |
|
raise SecurityError("GDPR features are not enabled") |
|
|
|
|
|
exports_dir = SECURITY_DIR / "exports" |
|
os.makedirs(exports_dir, exist_ok=True) |
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
export_file = exports_dir / f"data_export_{user_id}_{timestamp}.json" |
|
|
|
|
|
|
|
|
|
|
|
log_audit_event("data_export", user_id, {"file": str(export_file)}) |
|
|
|
|
|
user_data = { |
|
"user_id": user_id, |
|
"export_date": datetime.now().isoformat(), |
|
"data_categories": { |
|
"profile": { |
|
"description": "Your profile information", |
|
"last_updated": datetime.now().isoformat(), |
|
"data": {} |
|
}, |
|
"activity": { |
|
"description": "Your activity history", |
|
"last_updated": datetime.now().isoformat(), |
|
"data": [] |
|
}, |
|
"settings": { |
|
"description": "Your application settings", |
|
"last_updated": datetime.now().isoformat(), |
|
"data": {} |
|
} |
|
} |
|
} |
|
|
|
|
|
with open(export_file, 'w') as f: |
|
json.dump(user_data, f, indent=4) |
|
|
|
return str(export_file) |
|
|
|
|
|
@handle_exceptions |
|
def delete_user_data(user_id: str, delete_type: str = "anonymize") -> bool: |
|
""" |
|
Delete or anonymize all data for a user (GDPR right to be forgotten). |
|
|
|
Args: |
|
user_id: The ID of the user whose data should be deleted |
|
delete_type: Type of deletion ('anonymize' or 'complete') |
|
|
|
Returns: |
|
True if successful |
|
""" |
|
if not SECURITY_CONFIG["gdpr"]["enabled"]: |
|
raise SecurityError("GDPR features are not enabled") |
|
|
|
|
|
log_audit_event("data_deletion", user_id, {"delete_type": delete_type}) |
|
|
|
|
|
|
|
|
|
if delete_type == "anonymize" and SECURITY_CONFIG["gdpr"]["anonymize_on_delete"]: |
|
logger.info(f"Anonymizing all data for user {user_id}") |
|
|
|
else: |
|
logger.info(f"Completely deleting all data for user {user_id}") |
|
|
|
|
|
|
|
pseudonym_file = SECURITY_DIR / "pseudonyms.json" |
|
if os.path.exists(pseudonym_file): |
|
with open(pseudonym_file, 'r') as f: |
|
pseudonyms = json.load(f) |
|
|
|
if user_id in pseudonyms: |
|
del pseudonyms[user_id] |
|
|
|
with open(pseudonym_file, 'w') as f: |
|
json.dump(pseudonyms, f) |
|
|
|
return True |