Michael Hu
initial check in
05b45a5
import threading
import time
from datetime import datetime
import psutil
import torch
from fastapi import APIRouter
try:
import GPUtil
GPU_AVAILABLE = True
except ImportError:
GPU_AVAILABLE = False
router = APIRouter(tags=["debug"])
@router.get("/debug/threads")
async def get_thread_info():
process = psutil.Process()
current_threads = threading.enumerate()
# Get per-thread CPU times
thread_details = []
for thread in current_threads:
thread_info = {
"name": thread.name,
"id": thread.ident,
"alive": thread.is_alive(),
"daemon": thread.daemon,
}
thread_details.append(thread_info)
return {
"total_threads": process.num_threads(),
"active_threads": len(current_threads),
"thread_names": [t.name for t in current_threads],
"thread_details": thread_details,
"memory_mb": process.memory_info().rss / 1024 / 1024,
}
@router.get("/debug/storage")
async def get_storage_info():
# Get disk partitions
partitions = psutil.disk_partitions()
storage_info = []
for partition in partitions:
try:
usage = psutil.disk_usage(partition.mountpoint)
storage_info.append(
{
"device": partition.device,
"mountpoint": partition.mountpoint,
"fstype": partition.fstype,
"total_gb": usage.total / (1024**3),
"used_gb": usage.used / (1024**3),
"free_gb": usage.free / (1024**3),
"percent_used": usage.percent,
}
)
except PermissionError:
continue
return {"storage_info": storage_info}
@router.get("/debug/system")
async def get_system_info():
process = psutil.Process()
# CPU Info
cpu_info = {
"cpu_count": psutil.cpu_count(),
"cpu_percent": psutil.cpu_percent(interval=1),
"per_cpu_percent": psutil.cpu_percent(interval=1, percpu=True),
"load_avg": psutil.getloadavg(),
}
# Memory Info
virtual_memory = psutil.virtual_memory()
swap_memory = psutil.swap_memory()
memory_info = {
"virtual": {
"total_gb": virtual_memory.total / (1024**3),
"available_gb": virtual_memory.available / (1024**3),
"used_gb": virtual_memory.used / (1024**3),
"percent": virtual_memory.percent,
},
"swap": {
"total_gb": swap_memory.total / (1024**3),
"used_gb": swap_memory.used / (1024**3),
"free_gb": swap_memory.free / (1024**3),
"percent": swap_memory.percent,
},
}
# Process Info
process_info = {
"pid": process.pid,
"status": process.status(),
"create_time": datetime.fromtimestamp(process.create_time()).isoformat(),
"cpu_percent": process.cpu_percent(),
"memory_percent": process.memory_percent(),
}
# Network Info
network_info = {
"connections": len(process.net_connections()),
"network_io": psutil.net_io_counters()._asdict(),
}
# GPU Info if available
gpu_info = None
if torch.backends.mps.is_available():
gpu_info = {
"type": "MPS",
"available": True,
"device": "Apple Silicon",
"backend": "Metal",
}
elif GPU_AVAILABLE:
try:
gpus = GPUtil.getGPUs()
gpu_info = [
{
"id": gpu.id,
"name": gpu.name,
"load": gpu.load,
"memory": {
"total": gpu.memoryTotal,
"used": gpu.memoryUsed,
"free": gpu.memoryFree,
"percent": (gpu.memoryUsed / gpu.memoryTotal) * 100,
},
"temperature": gpu.temperature,
}
for gpu in gpus
]
except Exception:
gpu_info = "GPU information unavailable"
return {
"cpu": cpu_info,
"memory": memory_info,
"process": process_info,
"network": network_info,
"gpu": gpu_info,
}
@router.get("/debug/session_pools")
async def get_session_pool_info():
"""Get information about ONNX session pools."""
from ..inference.model_manager import get_manager
manager = await get_manager()
pools = manager._session_pools
current_time = time.time()
pool_info = {}
# Get CPU pool info
if "onnx_cpu" in pools:
cpu_pool = pools["onnx_cpu"]
pool_info["cpu"] = {
"active_sessions": len(cpu_pool._sessions),
"max_sessions": cpu_pool._max_size,
"sessions": [
{"model": path, "age_seconds": current_time - info.last_used}
for path, info in cpu_pool._sessions.items()
],
}
# Get GPU pool info
if "onnx_gpu" in pools:
gpu_pool = pools["onnx_gpu"]
pool_info["gpu"] = {
"active_sessions": len(gpu_pool._sessions),
"max_streams": gpu_pool._max_size,
"available_streams": len(gpu_pool._available_streams),
"sessions": [
{
"model": path,
"age_seconds": current_time - info.last_used,
"stream_id": info.stream_id,
}
for path, info in gpu_pool._sessions.items()
],
}
# Add GPU memory info if available
if GPU_AVAILABLE:
try:
gpus = GPUtil.getGPUs()
if gpus:
gpu = gpus[0] # Assume first GPU
pool_info["gpu"]["memory"] = {
"total_mb": gpu.memoryTotal,
"used_mb": gpu.memoryUsed,
"free_mb": gpu.memoryFree,
"percent_used": (gpu.memoryUsed / gpu.memoryTotal) * 100,
}
except Exception:
pass
return pool_info