"""Structured logging with correlation IDs and context management.""" import logging import json import uuid import time from typing import Dict, Any, Optional, Union from datetime import datetime from contextvars import ContextVar from dataclasses import dataclass, asdict from enum import Enum # Context variable for correlation ID correlation_id_context: ContextVar[Optional[str]] = ContextVar('correlation_id', default=None) class LogLevel(Enum): """Log levels.""" DEBUG = "DEBUG" INFO = "INFO" WARNING = "WARNING" ERROR = "ERROR" CRITICAL = "CRITICAL" @dataclass class LogContext: """Structured log context.""" correlation_id: str operation: Optional[str] = None component: Optional[str] = None user_id: Optional[str] = None session_id: Optional[str] = None request_id: Optional[str] = None metadata: Optional[Dict[str, Any]] = None def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return {k: v for k, v in asdict(self).items() if v is not None} class StructuredLogger: """Structured logger with correlation ID support.""" def __init__(self, name: str, enable_json_logging: bool = True): """ Initialize structured logger. Args: name: Logger name enable_json_logging: Whether to use JSON format """ self.logger = logging.getLogger(name) self.enable_json_logging = enable_json_logging self._setup_formatter() def _setup_formatter(self) -> None: """Setup log formatter.""" if self.enable_json_logging: formatter = JsonFormatter() else: formatter = ContextFormatter() # Apply formatter to all handlers for handler in self.logger.handlers: handler.setFormatter(formatter) def _get_log_data(self, message: str, level: str, context: Optional[LogContext] = None, extra: Optional[Dict[str, Any]] = None, exception: Optional[Exception] = None) -> Dict[str, Any]: """ Prepare log data structure. Args: message: Log message level: Log level context: Log context extra: Extra data exception: Exception if any Returns: Dict[str, Any]: Structured log data """ # Get correlation ID from context or generate new one correlation_id = correlation_id_context.get() if not correlation_id and context: correlation_id = context.correlation_id if not correlation_id: correlation_id = str(uuid.uuid4()) log_data = { 'timestamp': datetime.utcnow().isoformat() + 'Z', 'level': level, 'message': message, 'correlation_id': correlation_id, 'logger_name': self.logger.name } # Add context information if context: log_data.update(context.to_dict()) # Add extra data if extra: log_data['extra'] = extra # Add exception information if exception: log_data['exception'] = { 'type': type(exception).__name__, 'message': str(exception), 'module': getattr(exception, '__module__', None) } # Add stack trace for debugging import traceback log_data['exception']['traceback'] = traceback.format_exc() return log_data def debug(self, message: str, context: Optional[LogContext] = None, extra: Optional[Dict[str, Any]] = None) -> None: """Log debug message.""" if self.logger.isEnabledFor(logging.DEBUG): log_data = self._get_log_data(message, LogLevel.DEBUG.value, context, extra) # Use 'structured_data' to avoid conflicts with LogRecord attributes self.logger.info(message, extra={'structured_data': log_data}) def info(self, message: str, context: Optional[LogContext] = None, extra: Optional[Dict[str, Any]] = None) -> None: """Log info message.""" if self.logger.isEnabledFor(logging.INFO): log_data = self._get_log_data(message, LogLevel.INFO.value, context, extra) # Use 'structured_data' to avoid conflicts with LogRecord attributes self.logger.info(message, extra={'structured_data': log_data}) def warning(self, message: str, context: Optional[LogContext] = None, extra: Optional[Dict[str, Any]] = None) -> None: """Log warning message.""" if self.logger.isEnabledFor(logging.WARNING): log_data = self._get_log_data(message, LogLevel.WARNING.value, context, extra) # Use 'structured_data' to avoid conflicts with LogRecord attributes self.logger.warning(message, extra={'structured_data': log_data}) def error(self, message: str, context: Optional[LogContext] = None, extra: Optional[Dict[str, Any]] = None, exception: Optional[Exception] = None) -> None: """Log error message.""" if self.logger.isEnabledFor(logging.ERROR): log_data = self._get_log_data(message, LogLevel.ERROR.value, context, extra, exception) # Use 'structured_data' to avoid conflicts with LogRecord attributes self.logger.error(message, extra={'structured_data': log_data}) def critical(self, message: str, context: Optional[LogContext] = None, extra: Optional[Dict[str, Any]] = None, exception: Optional[Exception] = None) -> None: """Log critical message.""" if self.logger.isEnabledFor(logging.CRITICAL): log_data = self._get_log_data(message, LogLevel.CRITICAL.value, context, extra, exception) # Use 'structured_data' to avoid conflicts with LogRecord attributes self.logger.critical(message, extra={'structured_data': log_data}) def log_operation_start(self, operation: str, context: Optional[LogContext] = None, extra: Optional[Dict[str, Any]] = None) -> str: """ Log operation start and return correlation ID. Args: operation: Operation name context: Log context extra: Extra data Returns: str: Correlation ID for the operation """ correlation_id = str(uuid.uuid4()) if context: context.correlation_id = correlation_id context.operation = operation else: context = LogContext(correlation_id=correlation_id, operation=operation) # Set correlation ID in context correlation_id_context.set(correlation_id) self.info(f"Operation started: {operation}", context, extra) return correlation_id def log_operation_end(self, operation: str, correlation_id: str, success: bool = True, duration: Optional[float] = None, context: Optional[LogContext] = None, extra: Optional[Dict[str, Any]] = None) -> None: """ Log operation end. Args: operation: Operation name correlation_id: Correlation ID success: Whether operation succeeded duration: Operation duration in seconds context: Log context extra: Extra data """ if context: context.correlation_id = correlation_id context.operation = operation else: context = LogContext(correlation_id=correlation_id, operation=operation) # Add performance data if extra is None: extra = {} extra['success'] = success if duration is not None: extra['duration_seconds'] = duration status = "completed successfully" if success else "failed" message = f"Operation {status}: {operation}" if success: self.info(message, context, extra) else: self.error(message, context, extra) def log_performance_metric(self, metric_name: str, value: Union[int, float], unit: str = None, context: Optional[LogContext] = None, extra: Optional[Dict[str, Any]] = None) -> None: """ Log performance metric. Args: metric_name: Name of the metric value: Metric value unit: Unit of measurement context: Log context extra: Extra data """ if extra is None: extra = {} extra['metric'] = { 'name': metric_name, 'value': value, 'unit': unit, 'timestamp': time.time() } message = f"Performance metric: {metric_name}={value}" if unit: message += f" {unit}" self.info(message, context, extra) class JsonFormatter(logging.Formatter): """JSON log formatter.""" def format(self, record: logging.LogRecord) -> str: """Format log record as JSON.""" try: # Get structured data from extra log_data = getattr(record, 'structured_data', {}) # Ensure basic fields are present if 'timestamp' not in log_data: log_data['timestamp'] = datetime.utcnow().isoformat() + 'Z' if 'level' not in log_data: log_data['level'] = record.levelname if 'message' not in log_data: log_data['message'] = record.getMessage() if 'logger_name' not in log_data: log_data['logger_name'] = record.name # Add standard log record fields log_data.update({ 'module': record.module, 'function': record.funcName, 'line': record.lineno, 'thread': record.thread, 'process': record.process }) return json.dumps(log_data, default=str, ensure_ascii=False) except Exception as e: # Fallback to standard formatting return f"JSON formatting error: {e} | Original: {record.getMessage()}" class ContextFormatter(logging.Formatter): """Context-aware log formatter.""" def __init__(self): """Initialize formatter.""" super().__init__( fmt='%(asctime)s - %(name)s - %(levelname)s - [%(correlation_id)s] - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) def format(self, record: logging.LogRecord) -> str: """Format log record with context.""" try: # Get correlation ID from context or record correlation_id = correlation_id_context.get() if not correlation_id: structured_data = getattr(record, 'structured_data', {}) correlation_id = structured_data.get('correlation_id', 'unknown') # Add correlation ID to record record.correlation_id = correlation_id # Add context information if available structured_data = getattr(record, 'structured_data', {}) if 'operation' in structured_data: record.message = f"[{structured_data['operation']}] {record.getMessage()}" return super().format(record) except Exception as e: # Fallback formatting return f"Formatting error: {e} | Original: {record.getMessage()}" def get_structured_logger(name: str, enable_json_logging: bool = True) -> StructuredLogger: """ Get a structured logger instance. Args: name: Logger name enable_json_logging: Whether to use JSON format Returns: StructuredLogger: Configured structured logger """ return StructuredLogger(name, enable_json_logging) def set_correlation_id(correlation_id: str) -> None: """ Set correlation ID in context. Args: correlation_id: Correlation ID to set """ correlation_id_context.set(correlation_id) def get_correlation_id() -> Optional[str]: """ Get current correlation ID from context. Returns: Optional[str]: Current correlation ID """ return correlation_id_context.get() def generate_correlation_id() -> str: """ Generate a new correlation ID. Returns: str: New correlation ID """ return str(uuid.uuid4())