|
import os |
|
import sys |
|
import logging |
|
import traceback |
|
import json |
|
import time |
|
from datetime import datetime |
|
import platform |
|
|
|
|
|
try: |
|
import psutil |
|
PSUTIL_AVAILABLE = True |
|
except ImportError: |
|
PSUTIL_AVAILABLE = False |
|
logging.warning("psutil module not available. System monitoring features will be limited.") |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
class DebugInfo: |
|
"""Class containing debug information""" |
|
|
|
@staticmethod |
|
def get_system_info(): |
|
"""Get system information""" |
|
try: |
|
info = { |
|
"os": platform.system(), |
|
"os_version": platform.version(), |
|
"python_version": platform.python_version(), |
|
"cpu_count": os.cpu_count(), |
|
"timestamp": datetime.now().isoformat() |
|
} |
|
|
|
|
|
if PSUTIL_AVAILABLE: |
|
info.update({ |
|
"total_memory": round(psutil.virtual_memory().total / (1024 * 1024 * 1024), 2), |
|
"available_memory": round(psutil.virtual_memory().available / (1024 * 1024 * 1024), 2), |
|
"cpu_usage": psutil.cpu_percent(interval=0.1), |
|
"memory_usage": psutil.virtual_memory().percent, |
|
"disk_usage": psutil.disk_usage('/').percent, |
|
}) |
|
else: |
|
info.update({ |
|
"total_memory": "psutil not available", |
|
"available_memory": "psutil not available", |
|
"cpu_usage": "psutil not available", |
|
"memory_usage": "psutil not available", |
|
"disk_usage": "psutil not available", |
|
}) |
|
|
|
return info |
|
except Exception as e: |
|
logger.error(f"Error getting system info: {e}") |
|
return {"error": str(e)} |
|
|
|
@staticmethod |
|
def get_env_info(): |
|
"""Get environment variable information (masking sensitive information)""" |
|
try: |
|
|
|
sensitive_vars = [ |
|
"API_KEY", "SECRET", "PASSWORD", "TOKEN", "AUTH", "MONGODB_URL", |
|
"AIVEN_DB_URL", "PINECONE_API_KEY", "GOOGLE_API_KEY" |
|
] |
|
|
|
env_vars = {} |
|
for key, value in os.environ.items(): |
|
|
|
is_sensitive = any(s in key.upper() for s in sensitive_vars) |
|
|
|
if is_sensitive and value: |
|
|
|
masked_value = value[:4] + "****" if len(value) > 4 else "****" |
|
env_vars[key] = masked_value |
|
else: |
|
env_vars[key] = value |
|
|
|
return env_vars |
|
except Exception as e: |
|
logger.error(f"Error getting environment info: {e}") |
|
return {"error": str(e)} |
|
|
|
@staticmethod |
|
def get_database_status(): |
|
"""Get database connection status""" |
|
try: |
|
from app.database.postgresql import check_db_connection as check_postgresql |
|
from app.database.mongodb import check_db_connection as check_mongodb |
|
from app.database.pinecone import check_db_connection as check_pinecone |
|
|
|
return { |
|
"postgresql": check_postgresql(), |
|
"mongodb": check_mongodb(), |
|
"pinecone": check_pinecone(), |
|
"timestamp": datetime.now().isoformat() |
|
} |
|
except Exception as e: |
|
logger.error(f"Error getting database status: {e}") |
|
return {"error": str(e)} |
|
|
|
class PerformanceMonitor: |
|
"""Performance monitoring class""" |
|
|
|
def __init__(self): |
|
self.start_time = time.time() |
|
self.checkpoints = [] |
|
|
|
def checkpoint(self, name): |
|
"""Mark a checkpoint and record the time""" |
|
current_time = time.time() |
|
elapsed = current_time - self.start_time |
|
self.checkpoints.append({ |
|
"name": name, |
|
"time": current_time, |
|
"elapsed": elapsed |
|
}) |
|
logger.debug(f"Checkpoint '{name}' at {elapsed:.4f}s") |
|
return elapsed |
|
|
|
def get_report(self): |
|
"""Generate performance report""" |
|
if not self.checkpoints: |
|
return {"error": "No checkpoints recorded"} |
|
|
|
total_time = time.time() - self.start_time |
|
|
|
|
|
intervals = [] |
|
prev_time = self.start_time |
|
|
|
for checkpoint in self.checkpoints: |
|
interval = checkpoint["time"] - prev_time |
|
intervals.append({ |
|
"name": checkpoint["name"], |
|
"interval": interval, |
|
"elapsed": checkpoint["elapsed"] |
|
}) |
|
prev_time = checkpoint["time"] |
|
|
|
return { |
|
"total_time": total_time, |
|
"checkpoint_count": len(self.checkpoints), |
|
"intervals": intervals |
|
} |
|
|
|
class ErrorTracker: |
|
"""Class to track and record errors""" |
|
|
|
def __init__(self, max_errors=100): |
|
self.errors = [] |
|
self.max_errors = max_errors |
|
|
|
def track_error(self, error, context=None): |
|
"""Record error information""" |
|
error_info = { |
|
"error_type": type(error).__name__, |
|
"error_message": str(error), |
|
"traceback": traceback.format_exc(), |
|
"timestamp": datetime.now().isoformat(), |
|
"context": context or {} |
|
} |
|
|
|
|
|
self.errors.append(error_info) |
|
|
|
|
|
if len(self.errors) > self.max_errors: |
|
self.errors.pop(0) |
|
|
|
return error_info |
|
|
|
def get_errors(self, limit=None): |
|
"""Get list of recorded errors""" |
|
if limit is None or limit >= len(self.errors): |
|
return self.errors |
|
return self.errors[-limit:] |
|
|
|
|
|
error_tracker = ErrorTracker() |
|
performance_monitor = PerformanceMonitor() |
|
|
|
def debug_view(request=None): |
|
"""Create a full debug report""" |
|
debug_data = { |
|
"system_info": DebugInfo.get_system_info(), |
|
"database_status": DebugInfo.get_database_status(), |
|
"performance": performance_monitor.get_report(), |
|
"recent_errors": error_tracker.get_errors(limit=10), |
|
"timestamp": datetime.now().isoformat() |
|
} |
|
|
|
|
|
if request: |
|
debug_data["request"] = { |
|
"method": request.method, |
|
"url": str(request.url), |
|
"headers": dict(request.headers), |
|
"client": { |
|
"host": request.client.host if request.client else "unknown", |
|
"port": request.client.port if request.client else "unknown" |
|
} |
|
} |
|
|
|
return debug_data |