Spaces:
Build error
Build error
"""Structured logging with correlation IDs and context management.""" | |
import logging | |
import json | |
import uuid | |
import time | |
from typing import Dict, Any, Optional, Union | |
from datetime import datetime | |
from contextvars import ContextVar | |
from dataclasses import dataclass, asdict | |
from enum import Enum | |
# Context variable for correlation ID | |
correlation_id_context: ContextVar[Optional[str]] = ContextVar('correlation_id', default=None) | |
class LogLevel(Enum): | |
"""Log levels.""" | |
DEBUG = "DEBUG" | |
INFO = "INFO" | |
WARNING = "WARNING" | |
ERROR = "ERROR" | |
CRITICAL = "CRITICAL" | |
class LogContext: | |
"""Structured log context.""" | |
correlation_id: str | |
operation: Optional[str] = None | |
component: Optional[str] = None | |
user_id: Optional[str] = None | |
session_id: Optional[str] = None | |
request_id: Optional[str] = None | |
metadata: Optional[Dict[str, Any]] = None | |
def to_dict(self) -> Dict[str, Any]: | |
"""Convert to dictionary.""" | |
return {k: v for k, v in asdict(self).items() if v is not None} | |
class StructuredLogger: | |
"""Structured logger with correlation ID support.""" | |
def __init__(self, name: str, enable_json_logging: bool = True): | |
""" | |
Initialize structured logger. | |
Args: | |
name: Logger name | |
enable_json_logging: Whether to use JSON format | |
""" | |
self.logger = logging.getLogger(name) | |
self.enable_json_logging = enable_json_logging | |
self._setup_formatter() | |
def _setup_formatter(self) -> None: | |
"""Setup log formatter.""" | |
if self.enable_json_logging: | |
formatter = JsonFormatter() | |
else: | |
formatter = ContextFormatter() | |
# Apply formatter to all handlers | |
for handler in self.logger.handlers: | |
handler.setFormatter(formatter) | |
def _get_log_data(self, message: str, level: str, | |
context: Optional[LogContext] = None, | |
extra: Optional[Dict[str, Any]] = None, | |
exception: Optional[Exception] = None) -> Dict[str, Any]: | |
""" | |
Prepare log data structure. | |
Args: | |
message: Log message | |
level: Log level | |
context: Log context | |
extra: Extra data | |
exception: Exception if any | |
Returns: | |
Dict[str, Any]: Structured log data | |
""" | |
# Get correlation ID from context or generate new one | |
correlation_id = correlation_id_context.get() | |
if not correlation_id and context: | |
correlation_id = context.correlation_id | |
if not correlation_id: | |
correlation_id = str(uuid.uuid4()) | |
log_data = { | |
'timestamp': datetime.utcnow().isoformat() + 'Z', | |
'level': level, | |
'message': message, | |
'correlation_id': correlation_id, | |
'logger_name': self.logger.name | |
} | |
# Add context information | |
if context: | |
log_data.update(context.to_dict()) | |
# Add extra data | |
if extra: | |
log_data['extra'] = extra | |
# Add exception information | |
if exception: | |
log_data['exception'] = { | |
'type': type(exception).__name__, | |
'message': str(exception), | |
'module': getattr(exception, '__module__', None) | |
} | |
# Add stack trace for debugging | |
import traceback | |
log_data['exception']['traceback'] = traceback.format_exc() | |
return log_data | |
def debug(self, message: str, context: Optional[LogContext] = None, | |
extra: Optional[Dict[str, Any]] = None) -> None: | |
"""Log debug message.""" | |
if self.logger.isEnabledFor(logging.DEBUG): | |
log_data = self._get_log_data(message, LogLevel.DEBUG.value, context, extra) | |
# Use 'structured_data' to avoid conflicts with LogRecord attributes | |
self.logger.info(message, extra={'structured_data': log_data}) | |
def info(self, message: str, context: Optional[LogContext] = None, | |
extra: Optional[Dict[str, Any]] = None) -> None: | |
"""Log info message.""" | |
if self.logger.isEnabledFor(logging.INFO): | |
log_data = self._get_log_data(message, LogLevel.INFO.value, context, extra) | |
# Use 'structured_data' to avoid conflicts with LogRecord attributes | |
self.logger.info(message, extra={'structured_data': log_data}) | |
def warning(self, message: str, context: Optional[LogContext] = None, | |
extra: Optional[Dict[str, Any]] = None) -> None: | |
"""Log warning message.""" | |
if self.logger.isEnabledFor(logging.WARNING): | |
log_data = self._get_log_data(message, LogLevel.WARNING.value, context, extra) | |
# Use 'structured_data' to avoid conflicts with LogRecord attributes | |
self.logger.warning(message, extra={'structured_data': log_data}) | |
def error(self, message: str, context: Optional[LogContext] = None, | |
extra: Optional[Dict[str, Any]] = None, | |
exception: Optional[Exception] = None) -> None: | |
"""Log error message.""" | |
if self.logger.isEnabledFor(logging.ERROR): | |
log_data = self._get_log_data(message, LogLevel.ERROR.value, context, extra, exception) | |
# Use 'structured_data' to avoid conflicts with LogRecord attributes | |
self.logger.error(message, extra={'structured_data': log_data}) | |
def critical(self, message: str, context: Optional[LogContext] = None, | |
extra: Optional[Dict[str, Any]] = None, | |
exception: Optional[Exception] = None) -> None: | |
"""Log critical message.""" | |
if self.logger.isEnabledFor(logging.CRITICAL): | |
log_data = self._get_log_data(message, LogLevel.CRITICAL.value, context, extra, exception) | |
# Use 'structured_data' to avoid conflicts with LogRecord attributes | |
self.logger.critical(message, extra={'structured_data': log_data}) | |
def log_operation_start(self, operation: str, context: Optional[LogContext] = None, | |
extra: Optional[Dict[str, Any]] = None) -> str: | |
""" | |
Log operation start and return correlation ID. | |
Args: | |
operation: Operation name | |
context: Log context | |
extra: Extra data | |
Returns: | |
str: Correlation ID for the operation | |
""" | |
correlation_id = str(uuid.uuid4()) | |
if context: | |
context.correlation_id = correlation_id | |
context.operation = operation | |
else: | |
context = LogContext(correlation_id=correlation_id, operation=operation) | |
# Set correlation ID in context | |
correlation_id_context.set(correlation_id) | |
self.info(f"Operation started: {operation}", context, extra) | |
return correlation_id | |
def log_operation_end(self, operation: str, correlation_id: str, | |
success: bool = True, duration: Optional[float] = None, | |
context: Optional[LogContext] = None, | |
extra: Optional[Dict[str, Any]] = None) -> None: | |
""" | |
Log operation end. | |
Args: | |
operation: Operation name | |
correlation_id: Correlation ID | |
success: Whether operation succeeded | |
duration: Operation duration in seconds | |
context: Log context | |
extra: Extra data | |
""" | |
if context: | |
context.correlation_id = correlation_id | |
context.operation = operation | |
else: | |
context = LogContext(correlation_id=correlation_id, operation=operation) | |
# Add performance data | |
if extra is None: | |
extra = {} | |
extra['success'] = success | |
if duration is not None: | |
extra['duration_seconds'] = duration | |
status = "completed successfully" if success else "failed" | |
message = f"Operation {status}: {operation}" | |
if success: | |
self.info(message, context, extra) | |
else: | |
self.error(message, context, extra) | |
def log_performance_metric(self, metric_name: str, value: Union[int, float], | |
unit: str = None, context: Optional[LogContext] = None, | |
extra: Optional[Dict[str, Any]] = None) -> None: | |
""" | |
Log performance metric. | |
Args: | |
metric_name: Name of the metric | |
value: Metric value | |
unit: Unit of measurement | |
context: Log context | |
extra: Extra data | |
""" | |
if extra is None: | |
extra = {} | |
extra['metric'] = { | |
'name': metric_name, | |
'value': value, | |
'unit': unit, | |
'timestamp': time.time() | |
} | |
message = f"Performance metric: {metric_name}={value}" | |
if unit: | |
message += f" {unit}" | |
self.info(message, context, extra) | |
class JsonFormatter(logging.Formatter): | |
"""JSON log formatter.""" | |
def format(self, record: logging.LogRecord) -> str: | |
"""Format log record as JSON.""" | |
try: | |
# Get structured data from extra | |
log_data = getattr(record, 'structured_data', {}) | |
# Ensure basic fields are present | |
if 'timestamp' not in log_data: | |
log_data['timestamp'] = datetime.utcnow().isoformat() + 'Z' | |
if 'level' not in log_data: | |
log_data['level'] = record.levelname | |
if 'message' not in log_data: | |
log_data['message'] = record.getMessage() | |
if 'logger_name' not in log_data: | |
log_data['logger_name'] = record.name | |
# Add standard log record fields | |
log_data.update({ | |
'module': record.module, | |
'function': record.funcName, | |
'line': record.lineno, | |
'thread': record.thread, | |
'process': record.process | |
}) | |
return json.dumps(log_data, default=str, ensure_ascii=False) | |
except Exception as e: | |
# Fallback to standard formatting | |
return f"JSON formatting error: {e} | Original: {record.getMessage()}" | |
class ContextFormatter(logging.Formatter): | |
"""Context-aware log formatter.""" | |
def __init__(self): | |
"""Initialize formatter.""" | |
super().__init__( | |
fmt='%(asctime)s - %(name)s - %(levelname)s - [%(correlation_id)s] - %(message)s', | |
datefmt='%Y-%m-%d %H:%M:%S' | |
) | |
def format(self, record: logging.LogRecord) -> str: | |
"""Format log record with context.""" | |
try: | |
# Get correlation ID from context or record | |
correlation_id = correlation_id_context.get() | |
if not correlation_id: | |
structured_data = getattr(record, 'structured_data', {}) | |
correlation_id = structured_data.get('correlation_id', 'unknown') | |
# Add correlation ID to record | |
record.correlation_id = correlation_id | |
# Add context information if available | |
structured_data = getattr(record, 'structured_data', {}) | |
if 'operation' in structured_data: | |
record.message = f"[{structured_data['operation']}] {record.getMessage()}" | |
return super().format(record) | |
except Exception as e: | |
# Fallback formatting | |
return f"Formatting error: {e} | Original: {record.getMessage()}" | |
def get_structured_logger(name: str, enable_json_logging: bool = True) -> StructuredLogger: | |
""" | |
Get a structured logger instance. | |
Args: | |
name: Logger name | |
enable_json_logging: Whether to use JSON format | |
Returns: | |
StructuredLogger: Configured structured logger | |
""" | |
return StructuredLogger(name, enable_json_logging) | |
def set_correlation_id(correlation_id: str) -> None: | |
""" | |
Set correlation ID in context. | |
Args: | |
correlation_id: Correlation ID to set | |
""" | |
correlation_id_context.set(correlation_id) | |
def get_correlation_id() -> Optional[str]: | |
""" | |
Get current correlation ID from context. | |
Returns: | |
Optional[str]: Current correlation ID | |
""" | |
return correlation_id_context.get() | |
def generate_correlation_id() -> str: | |
""" | |
Generate a new correlation ID. | |
Returns: | |
str: New correlation ID | |
""" | |
return str(uuid.uuid4()) |