Spaces:
Runtime error
Runtime error
""" | |
Performance optimization module for DigiPal application. | |
This module provides: | |
- Lazy model loading with optimization | |
- Background task system for attribute decay and evolution checks | |
- Resource cleanup and garbage collection | |
- Database query optimization | |
- Memory usage monitoring | |
""" | |
import logging | |
import time | |
import threading | |
import gc | |
import psutil | |
import weakref | |
from datetime import datetime, timedelta | |
from typing import Dict, List, Optional, Any, Callable, Tuple | |
from dataclasses import dataclass | |
from collections import defaultdict | |
import asyncio | |
import sqlite3 | |
import torch | |
from transformers import AutoModelForCausalLM, AutoTokenizer | |
from .models import DigiPal | |
from .enums import LifeStage | |
from ..storage.storage_manager import StorageManager | |
logger = logging.getLogger(__name__) | |
class ModelLoadingConfig: | |
"""Configuration for model loading optimization.""" | |
lazy_loading: bool = True | |
quantization: bool = True | |
model_cache_size: int = 2 # Maximum models to keep in memory | |
unload_after_idle_minutes: int = 30 | |
preload_on_startup: bool = False | |
use_cpu_offload: bool = True | |
class BackgroundTaskConfig: | |
"""Configuration for background tasks.""" | |
attribute_decay_interval: int = 300 # 5 minutes | |
evolution_check_interval: int = 600 # 10 minutes | |
memory_cleanup_interval: int = 1800 # 30 minutes | |
database_optimization_interval: int = 3600 # 1 hour | |
performance_monitoring_interval: int = 60 # 1 minute | |
class PerformanceMetrics: | |
"""Performance metrics tracking.""" | |
timestamp: datetime | |
cpu_usage: float | |
memory_usage: float | |
gpu_memory_usage: float | |
active_pets: int | |
cached_models: int | |
database_connections: int | |
response_time_avg: float | |
class LazyModelLoader: | |
"""Lazy loading system for AI models with optimization.""" | |
def __init__(self, config: ModelLoadingConfig): | |
"""Initialize lazy model loader.""" | |
self.config = config | |
self.loaded_models: Dict[str, Any] = {} | |
self.model_last_used: Dict[str, datetime] = {} | |
self.model_loading_locks: Dict[str, threading.Lock] = defaultdict(threading.Lock) | |
self._cleanup_thread = None | |
self._stop_cleanup = False | |
logger.info("Lazy model loader initialized") | |
def get_language_model(self, model_name: str = "Qwen/Qwen3-0.6B") -> Tuple[Any, Any]: | |
""" | |
Get language model with lazy loading. | |
Args: | |
model_name: Model identifier | |
Returns: | |
Tuple of (model, tokenizer) | |
""" | |
model_key = f"language_{model_name}" | |
with self.model_loading_locks[model_key]: | |
# Check if model is already loaded | |
if model_key in self.loaded_models: | |
self.model_last_used[model_key] = datetime.now() | |
model_data = self.loaded_models[model_key] | |
return model_data['model'], model_data['tokenizer'] | |
# Load model if not in cache | |
if self.config.lazy_loading: | |
logger.info(f"Lazy loading language model: {model_name}") | |
model, tokenizer = self._load_language_model(model_name) | |
# Cache the model | |
self.loaded_models[model_key] = { | |
'model': model, | |
'tokenizer': tokenizer, | |
'type': 'language', | |
'size_mb': self._estimate_model_size(model) | |
} | |
self.model_last_used[model_key] = datetime.now() | |
# Manage cache size | |
self._manage_model_cache() | |
return model, tokenizer | |
else: | |
# Direct loading without caching | |
return self._load_language_model(model_name) | |
def get_speech_model(self, model_name: str = "kyutai/stt-2.6b-en-trfs") -> Tuple[Any, Any]: | |
""" | |
Get speech model with lazy loading. | |
Args: | |
model_name: Model identifier | |
Returns: | |
Tuple of (model, processor) | |
""" | |
model_key = f"speech_{model_name}" | |
with self.model_loading_locks[model_key]: | |
# Check if model is already loaded | |
if model_key in self.loaded_models: | |
self.model_last_used[model_key] = datetime.now() | |
model_data = self.loaded_models[model_key] | |
return model_data['model'], model_data['processor'] | |
# Load model if not in cache | |
if self.config.lazy_loading: | |
logger.info(f"Lazy loading speech model: {model_name}") | |
model, processor = self._load_speech_model(model_name) | |
# Cache the model | |
self.loaded_models[model_key] = { | |
'model': model, | |
'processor': processor, | |
'type': 'speech', | |
'size_mb': self._estimate_model_size(model) | |
} | |
self.model_last_used[model_key] = datetime.now() | |
# Manage cache size | |
self._manage_model_cache() | |
return model, processor | |
else: | |
# Direct loading without caching | |
return self._load_speech_model(model_name) | |
def _load_language_model(self, model_name: str) -> Tuple[Any, Any]: | |
"""Load language model with optimization.""" | |
try: | |
# Load tokenizer | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
# Configure model loading | |
model_kwargs = { | |
"torch_dtype": "auto", | |
"device_map": "auto" if torch.cuda.is_available() else None | |
} | |
# Add quantization if enabled | |
if self.config.quantization and torch.cuda.is_available(): | |
from transformers import BitsAndBytesConfig | |
quantization_config = BitsAndBytesConfig( | |
load_in_4bit=True, | |
bnb_4bit_compute_dtype=torch.float16, | |
bnb_4bit_use_double_quant=True, | |
bnb_4bit_quant_type="nf4" | |
) | |
model_kwargs["quantization_config"] = quantization_config | |
# Load model | |
model = AutoModelForCausalLM.from_pretrained(model_name, **model_kwargs) | |
# Enable CPU offload if configured | |
if self.config.use_cpu_offload and hasattr(model, 'enable_model_cpu_offload'): | |
model.enable_model_cpu_offload() | |
logger.info(f"Successfully loaded language model: {model_name}") | |
return model, tokenizer | |
except Exception as e: | |
logger.error(f"Failed to load language model {model_name}: {e}") | |
raise | |
def _load_speech_model(self, model_name: str) -> Tuple[Any, Any]: | |
"""Load speech model with optimization.""" | |
try: | |
from transformers import KyutaiSpeechToTextProcessor, KyutaiSpeechToTextForConditionalGeneration | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
processor = KyutaiSpeechToTextProcessor.from_pretrained(model_name) | |
model = KyutaiSpeechToTextForConditionalGeneration.from_pretrained( | |
model_name, | |
device_map=device, | |
torch_dtype="auto" | |
) | |
logger.info(f"Successfully loaded speech model: {model_name}") | |
return model, processor | |
except Exception as e: | |
logger.error(f"Failed to load speech model {model_name}: {e}") | |
raise | |
def _estimate_model_size(self, model) -> float: | |
"""Estimate model size in MB.""" | |
try: | |
if hasattr(model, 'get_memory_footprint'): | |
return model.get_memory_footprint() / (1024 * 1024) | |
else: | |
# Rough estimation based on parameters | |
total_params = sum(p.numel() for p in model.parameters()) | |
# Assume 4 bytes per parameter (float32) or 2 bytes (float16) | |
bytes_per_param = 2 if self.config.quantization else 4 | |
return (total_params * bytes_per_param) / (1024 * 1024) | |
except: | |
return 1000.0 # Default estimate | |
def _manage_model_cache(self) -> None: | |
"""Manage model cache size by unloading least recently used models.""" | |
if len(self.loaded_models) <= self.config.model_cache_size: | |
return | |
# Sort models by last used time | |
models_by_usage = sorted( | |
self.loaded_models.items(), | |
key=lambda x: self.model_last_used.get(x[0], datetime.min) | |
) | |
# Unload oldest models | |
models_to_unload = models_by_usage[:-self.config.model_cache_size] | |
for model_key, model_data in models_to_unload: | |
self._unload_model(model_key) | |
logger.info(f"Unloaded model {model_key} due to cache size limit") | |
def _unload_model(self, model_key: str) -> None: | |
"""Unload a specific model from memory.""" | |
if model_key in self.loaded_models: | |
model_data = self.loaded_models[model_key] | |
# Clear model references | |
if 'model' in model_data: | |
del model_data['model'] | |
if 'tokenizer' in model_data: | |
del model_data['tokenizer'] | |
if 'processor' in model_data: | |
del model_data['processor'] | |
# Remove from cache | |
del self.loaded_models[model_key] | |
if model_key in self.model_last_used: | |
del self.model_last_used[model_key] | |
# Force garbage collection | |
gc.collect() | |
# Clear CUDA cache if available | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
def start_cleanup_thread(self) -> None: | |
"""Start background cleanup thread for idle models.""" | |
if self._cleanup_thread and self._cleanup_thread.is_alive(): | |
return | |
self._stop_cleanup = False | |
self._cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True) | |
self._cleanup_thread.start() | |
logger.info("Started model cleanup thread") | |
def stop_cleanup_thread(self) -> None: | |
"""Stop background cleanup thread.""" | |
self._stop_cleanup = True | |
if self._cleanup_thread: | |
self._cleanup_thread.join(timeout=5) | |
def _cleanup_loop(self) -> None: | |
"""Background cleanup loop for idle models.""" | |
while not self._stop_cleanup: | |
try: | |
current_time = datetime.now() | |
idle_threshold = timedelta(minutes=self.config.unload_after_idle_minutes) | |
models_to_unload = [] | |
for model_key, last_used in self.model_last_used.items(): | |
if current_time - last_used > idle_threshold: | |
models_to_unload.append(model_key) | |
for model_key in models_to_unload: | |
self._unload_model(model_key) | |
logger.info(f"Unloaded idle model: {model_key}") | |
# Sleep for cleanup interval | |
time.sleep(300) # 5 minutes | |
except Exception as e: | |
logger.error(f"Error in model cleanup loop: {e}") | |
time.sleep(60) | |
def get_cache_info(self) -> Dict[str, Any]: | |
"""Get information about model cache.""" | |
total_size_mb = sum( | |
model_data.get('size_mb', 0) | |
for model_data in self.loaded_models.values() | |
) | |
return { | |
'loaded_models': len(self.loaded_models), | |
'cache_limit': self.config.model_cache_size, | |
'total_size_mb': total_size_mb, | |
'models': { | |
key: { | |
'type': data.get('type', 'unknown'), | |
'size_mb': data.get('size_mb', 0), | |
'last_used': self.model_last_used.get(key, datetime.min).isoformat() | |
} | |
for key, data in self.loaded_models.items() | |
} | |
} | |
def shutdown(self) -> None: | |
"""Shutdown the model loader and cleanup resources.""" | |
logger.info("Shutting down lazy model loader") | |
# Stop cleanup thread | |
self.stop_cleanup_thread() | |
# Unload all models | |
for model_key in list(self.loaded_models.keys()): | |
self._unload_model(model_key) | |
logger.info("Lazy model loader shutdown complete") | |
class BackgroundTaskManager: | |
"""Manages background tasks for attribute decay, evolution checks, etc.""" | |
def __init__(self, config: BackgroundTaskConfig, storage_manager: StorageManager): | |
"""Initialize background task manager.""" | |
self.config = config | |
self.storage_manager = storage_manager | |
self.active_tasks: Dict[str, threading.Thread] = {} | |
self.task_stop_events: Dict[str, threading.Event] = {} | |
self.task_callbacks: Dict[str, Callable] = {} | |
# Performance tracking | |
self.task_performance: Dict[str, List[float]] = defaultdict(list) | |
logger.info("Background task manager initialized") | |
def register_task(self, task_name: str, callback: Callable, interval_seconds: int) -> None: | |
""" | |
Register a background task. | |
Args: | |
task_name: Unique task name | |
callback: Function to call periodically | |
interval_seconds: Interval between calls in seconds | |
""" | |
self.task_callbacks[task_name] = callback | |
self.task_stop_events[task_name] = threading.Event() | |
# Create and start task thread | |
task_thread = threading.Thread( | |
target=self._task_loop, | |
args=(task_name, callback, interval_seconds), | |
daemon=True, | |
name=f"BackgroundTask-{task_name}" | |
) | |
self.active_tasks[task_name] = task_thread | |
task_thread.start() | |
logger.info(f"Registered background task: {task_name} (interval: {interval_seconds}s)") | |
def _task_loop(self, task_name: str, callback: Callable, interval_seconds: int) -> None: | |
"""Background task execution loop.""" | |
stop_event = self.task_stop_events[task_name] | |
while not stop_event.is_set(): | |
try: | |
start_time = time.time() | |
# Execute task callback | |
callback() | |
# Track performance | |
execution_time = time.time() - start_time | |
self.task_performance[task_name].append(execution_time) | |
# Keep only recent performance data | |
if len(self.task_performance[task_name]) > 100: | |
self.task_performance[task_name] = self.task_performance[task_name][-50:] | |
# Wait for next execution | |
stop_event.wait(timeout=interval_seconds) | |
except Exception as e: | |
logger.error(f"Error in background task {task_name}: {e}") | |
# Wait before retrying | |
stop_event.wait(timeout=min(60, interval_seconds)) | |
def stop_task(self, task_name: str) -> None: | |
"""Stop a specific background task.""" | |
if task_name in self.task_stop_events: | |
self.task_stop_events[task_name].set() | |
if task_name in self.active_tasks: | |
thread = self.active_tasks[task_name] | |
thread.join(timeout=5) | |
del self.active_tasks[task_name] | |
logger.info(f"Stopped background task: {task_name}") | |
def stop_all_tasks(self) -> None: | |
"""Stop all background tasks.""" | |
logger.info("Stopping all background tasks") | |
# Signal all tasks to stop | |
for stop_event in self.task_stop_events.values(): | |
stop_event.set() | |
# Wait for all threads to finish | |
for task_name, thread in self.active_tasks.items(): | |
thread.join(timeout=5) | |
logger.debug(f"Stopped task: {task_name}") | |
# Clear task data | |
self.active_tasks.clear() | |
self.task_stop_events.clear() | |
logger.info("All background tasks stopped") | |
def get_task_performance(self) -> Dict[str, Dict[str, float]]: | |
"""Get performance statistics for all tasks.""" | |
performance_stats = {} | |
for task_name, execution_times in self.task_performance.items(): | |
if execution_times: | |
performance_stats[task_name] = { | |
'avg_execution_time': sum(execution_times) / len(execution_times), | |
'max_execution_time': max(execution_times), | |
'min_execution_time': min(execution_times), | |
'total_executions': len(execution_times) | |
} | |
else: | |
performance_stats[task_name] = { | |
'avg_execution_time': 0.0, | |
'max_execution_time': 0.0, | |
'min_execution_time': 0.0, | |
'total_executions': 0 | |
} | |
return performance_stats | |
class DatabaseOptimizer: | |
"""Database query optimization and maintenance.""" | |
def __init__(self, storage_manager: StorageManager): | |
"""Initialize database optimizer.""" | |
self.storage_manager = storage_manager | |
self.query_cache: Dict[str, Tuple[Any, datetime]] = {} | |
self.cache_ttl_seconds = 300 # 5 minutes | |
logger.info("Database optimizer initialized") | |
def optimize_database(self) -> Dict[str, Any]: | |
"""Perform database optimization tasks.""" | |
results = {} | |
try: | |
# Analyze database | |
results['analyze'] = self._analyze_database() | |
# Vacuum database | |
results['vacuum'] = self._vacuum_database() | |
# Update statistics | |
results['statistics'] = self._update_statistics() | |
# Check indexes | |
results['indexes'] = self._check_indexes() | |
logger.info("Database optimization completed") | |
except Exception as e: | |
logger.error(f"Database optimization failed: {e}") | |
results['error'] = str(e) | |
return results | |
def _analyze_database(self) -> Dict[str, Any]: | |
"""Analyze database for optimization opportunities.""" | |
try: | |
db = self.storage_manager.db | |
# Get table sizes | |
table_info = db.execute_query(""" | |
SELECT name, | |
(SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=m.name) as table_count | |
FROM sqlite_master m WHERE type='table' | |
""") | |
# Get index information | |
index_info = db.execute_query(""" | |
SELECT name, tbl_name, sql | |
FROM sqlite_master | |
WHERE type='index' AND sql IS NOT NULL | |
""") | |
return { | |
'tables': table_info, | |
'indexes': index_info, | |
'database_size': self._get_database_size() | |
} | |
except Exception as e: | |
logger.error(f"Database analysis failed: {e}") | |
return {'error': str(e)} | |
def _vacuum_database(self) -> Dict[str, Any]: | |
"""Vacuum database to reclaim space.""" | |
try: | |
db = self.storage_manager.db | |
# Get size before vacuum | |
size_before = self._get_database_size() | |
# Perform vacuum | |
db.execute_update("VACUUM") | |
# Get size after vacuum | |
size_after = self._get_database_size() | |
space_reclaimed = size_before - size_after | |
return { | |
'size_before_mb': size_before / (1024 * 1024), | |
'size_after_mb': size_after / (1024 * 1024), | |
'space_reclaimed_mb': space_reclaimed / (1024 * 1024) | |
} | |
except Exception as e: | |
logger.error(f"Database vacuum failed: {e}") | |
return {'error': str(e)} | |
def _update_statistics(self) -> Dict[str, Any]: | |
"""Update database statistics.""" | |
try: | |
db = self.storage_manager.db | |
# Analyze all tables | |
db.execute_update("ANALYZE") | |
return {'status': 'completed'} | |
except Exception as e: | |
logger.error(f"Statistics update failed: {e}") | |
return {'error': str(e)} | |
def _check_indexes(self) -> Dict[str, Any]: | |
"""Check and suggest index optimizations.""" | |
try: | |
db = self.storage_manager.db | |
# Check for missing indexes on frequently queried columns | |
suggestions = [] | |
# Check digipals table | |
digipals_count = db.execute_query("SELECT COUNT(*) as count FROM digipals")[0]['count'] | |
if digipals_count > 1000: | |
# Suggest index on user_id if not exists | |
existing_indexes = db.execute_query(""" | |
SELECT name FROM sqlite_master | |
WHERE type='index' AND tbl_name='digipals' AND sql LIKE '%user_id%' | |
""") | |
if not existing_indexes: | |
suggestions.append("CREATE INDEX idx_digipals_user_id ON digipals(user_id)") | |
# Check interactions table | |
interactions_count = db.execute_query("SELECT COUNT(*) as count FROM interactions")[0]['count'] | |
if interactions_count > 5000: | |
# Suggest index on digipal_id and timestamp | |
existing_indexes = db.execute_query(""" | |
SELECT name FROM sqlite_master | |
WHERE type='index' AND tbl_name='interactions' | |
AND sql LIKE '%digipal_id%' AND sql LIKE '%timestamp%' | |
""") | |
if not existing_indexes: | |
suggestions.append("CREATE INDEX idx_interactions_digipal_timestamp ON interactions(digipal_id, timestamp)") | |
return { | |
'suggestions': suggestions, | |
'digipals_count': digipals_count, | |
'interactions_count': interactions_count | |
} | |
except Exception as e: | |
logger.error(f"Index check failed: {e}") | |
return {'error': str(e)} | |
def _get_database_size(self) -> int: | |
"""Get database file size in bytes.""" | |
try: | |
import os | |
return os.path.getsize(self.storage_manager.db_path) | |
except: | |
return 0 | |
def create_suggested_indexes(self) -> Dict[str, Any]: | |
"""Create suggested indexes for better performance.""" | |
try: | |
db = self.storage_manager.db | |
results = [] | |
# Essential indexes for DigiPal application | |
indexes_to_create = [ | |
"CREATE INDEX IF NOT EXISTS idx_digipals_user_id ON digipals(user_id)", | |
"CREATE INDEX IF NOT EXISTS idx_digipals_active ON digipals(is_active)", | |
"CREATE INDEX IF NOT EXISTS idx_interactions_digipal ON interactions(digipal_id)", | |
"CREATE INDEX IF NOT EXISTS idx_interactions_timestamp ON interactions(timestamp)", | |
"CREATE INDEX IF NOT EXISTS idx_care_actions_digipal ON care_actions(digipal_id)", | |
"CREATE INDEX IF NOT EXISTS idx_users_id ON users(id)" | |
] | |
for index_sql in indexes_to_create: | |
try: | |
db.execute_update(index_sql) | |
results.append(f"Created: {index_sql}") | |
except Exception as e: | |
results.append(f"Failed: {index_sql} - {e}") | |
return {'results': results} | |
except Exception as e: | |
logger.error(f"Index creation failed: {e}") | |
return {'error': str(e)} | |
class PerformanceMonitor: | |
"""System performance monitoring and optimization.""" | |
def __init__(self): | |
"""Initialize performance monitor.""" | |
self.metrics_history: List[PerformanceMetrics] = [] | |
self.max_history_size = 1440 # 24 hours of minute-by-minute data | |
logger.info("Performance monitor initialized") | |
def collect_metrics(self, active_pets: int = 0, cached_models: int = 0, | |
response_time_avg: float = 0.0) -> PerformanceMetrics: | |
"""Collect current performance metrics.""" | |
try: | |
# System metrics | |
cpu_usage = psutil.cpu_percent(interval=1) | |
memory = psutil.virtual_memory() | |
memory_usage = memory.percent | |
# GPU metrics | |
gpu_memory_usage = 0.0 | |
if torch.cuda.is_available(): | |
gpu_memory_usage = (torch.cuda.memory_allocated() / torch.cuda.max_memory_allocated()) * 100 | |
# Database connections (approximate) | |
database_connections = 1 # Simplified for SQLite | |
metrics = PerformanceMetrics( | |
timestamp=datetime.now(), | |
cpu_usage=cpu_usage, | |
memory_usage=memory_usage, | |
gpu_memory_usage=gpu_memory_usage, | |
active_pets=active_pets, | |
cached_models=cached_models, | |
database_connections=database_connections, | |
response_time_avg=response_time_avg | |
) | |
# Add to history | |
self.metrics_history.append(metrics) | |
# Manage history size | |
if len(self.metrics_history) > self.max_history_size: | |
self.metrics_history = self.metrics_history[-self.max_history_size:] | |
return metrics | |
except Exception as e: | |
logger.error(f"Failed to collect performance metrics: {e}") | |
return PerformanceMetrics( | |
timestamp=datetime.now(), | |
cpu_usage=0.0, | |
memory_usage=0.0, | |
gpu_memory_usage=0.0, | |
active_pets=active_pets, | |
cached_models=cached_models, | |
database_connections=0, | |
response_time_avg=response_time_avg | |
) | |
def get_performance_summary(self, hours: int = 1) -> Dict[str, Any]: | |
"""Get performance summary for the last N hours.""" | |
if not self.metrics_history: | |
return {} | |
cutoff_time = datetime.now() - timedelta(hours=hours) | |
recent_metrics = [m for m in self.metrics_history if m.timestamp > cutoff_time] | |
if not recent_metrics: | |
recent_metrics = self.metrics_history[-60:] # Last 60 data points | |
# Calculate averages | |
avg_cpu = sum(m.cpu_usage for m in recent_metrics) / len(recent_metrics) | |
avg_memory = sum(m.memory_usage for m in recent_metrics) / len(recent_metrics) | |
avg_gpu_memory = sum(m.gpu_memory_usage for m in recent_metrics) / len(recent_metrics) | |
avg_response_time = sum(m.response_time_avg for m in recent_metrics) / len(recent_metrics) | |
# Calculate peaks | |
max_cpu = max(m.cpu_usage for m in recent_metrics) | |
max_memory = max(m.memory_usage for m in recent_metrics) | |
max_gpu_memory = max(m.gpu_memory_usage for m in recent_metrics) | |
return { | |
'time_period_hours': hours, | |
'data_points': len(recent_metrics), | |
'averages': { | |
'cpu_usage': avg_cpu, | |
'memory_usage': avg_memory, | |
'gpu_memory_usage': avg_gpu_memory, | |
'response_time': avg_response_time | |
}, | |
'peaks': { | |
'cpu_usage': max_cpu, | |
'memory_usage': max_memory, | |
'gpu_memory_usage': max_gpu_memory | |
}, | |
'current': { | |
'active_pets': recent_metrics[-1].active_pets if recent_metrics else 0, | |
'cached_models': recent_metrics[-1].cached_models if recent_metrics else 0 | |
} | |
} | |
def check_performance_alerts(self) -> List[Dict[str, Any]]: | |
"""Check for performance issues and return alerts.""" | |
alerts = [] | |
if not self.metrics_history: | |
return alerts | |
latest = self.metrics_history[-1] | |
# CPU usage alert | |
if latest.cpu_usage > 80: | |
alerts.append({ | |
'type': 'high_cpu', | |
'severity': 'warning' if latest.cpu_usage < 90 else 'critical', | |
'message': f"High CPU usage: {latest.cpu_usage:.1f}%", | |
'value': latest.cpu_usage | |
}) | |
# Memory usage alert | |
if latest.memory_usage > 85: | |
alerts.append({ | |
'type': 'high_memory', | |
'severity': 'warning' if latest.memory_usage < 95 else 'critical', | |
'message': f"High memory usage: {latest.memory_usage:.1f}%", | |
'value': latest.memory_usage | |
}) | |
# GPU memory alert | |
if latest.gpu_memory_usage > 90: | |
alerts.append({ | |
'type': 'high_gpu_memory', | |
'severity': 'warning', | |
'message': f"High GPU memory usage: {latest.gpu_memory_usage:.1f}%", | |
'value': latest.gpu_memory_usage | |
}) | |
# Response time alert | |
if latest.response_time_avg > 5.0: | |
alerts.append({ | |
'type': 'slow_response', | |
'severity': 'warning', | |
'message': f"Slow response time: {latest.response_time_avg:.2f}s", | |
'value': latest.response_time_avg | |
}) | |
return alerts | |
def suggest_optimizations(self) -> List[str]: | |
"""Suggest performance optimizations based on metrics.""" | |
suggestions = [] | |
if not self.metrics_history: | |
return suggestions | |
# Analyze recent performance | |
recent_metrics = self.metrics_history[-60:] # Last hour of data | |
avg_cpu = sum(m.cpu_usage for m in recent_metrics) / len(recent_metrics) | |
avg_memory = sum(m.memory_usage for m in recent_metrics) / len(recent_metrics) | |
avg_gpu_memory = sum(m.gpu_memory_usage for m in recent_metrics) / len(recent_metrics) | |
# CPU optimization suggestions | |
if avg_cpu > 70: | |
suggestions.append("Consider reducing background task frequency") | |
suggestions.append("Enable model CPU offloading to reduce CPU load") | |
# Memory optimization suggestions | |
if avg_memory > 80: | |
suggestions.append("Reduce model cache size to free memory") | |
suggestions.append("Enable more aggressive memory cleanup") | |
suggestions.append("Consider using smaller quantized models") | |
# GPU memory optimization suggestions | |
if avg_gpu_memory > 80: | |
suggestions.append("Enable 4-bit quantization for models") | |
suggestions.append("Reduce maximum concurrent model loading") | |
suggestions.append("Use CPU offloading for less frequently used models") | |
# General suggestions | |
if len(recent_metrics) > 0: | |
max_active_pets = max(m.active_pets for m in recent_metrics) | |
if max_active_pets > 100: | |
suggestions.append("Consider implementing pet data pagination") | |
suggestions.append("Optimize database queries with better indexing") | |
return suggestions | |
class ResourceCleanupManager: | |
"""Manages resource cleanup and garbage collection.""" | |
def __init__(self): | |
"""Initialize resource cleanup manager.""" | |
self.cleanup_callbacks: List[Callable] = [] | |
self.last_cleanup = datetime.now() | |
logger.info("Resource cleanup manager initialized") | |
def register_cleanup_callback(self, callback: Callable) -> None: | |
"""Register a cleanup callback function.""" | |
self.cleanup_callbacks.append(callback) | |
def perform_cleanup(self, force_gc: bool = True) -> Dict[str, Any]: | |
"""Perform comprehensive resource cleanup.""" | |
cleanup_results = {} | |
try: | |
# Execute registered cleanup callbacks | |
for i, callback in enumerate(self.cleanup_callbacks): | |
try: | |
callback() | |
cleanup_results[f'callback_{i}'] = 'success' | |
except Exception as e: | |
cleanup_results[f'callback_{i}'] = f'error: {e}' | |
# Python garbage collection | |
if force_gc: | |
collected = gc.collect() | |
cleanup_results['gc_collected'] = collected | |
# PyTorch cleanup | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
cleanup_results['cuda_cache_cleared'] = True | |
# Update last cleanup time | |
self.last_cleanup = datetime.now() | |
cleanup_results['cleanup_time'] = self.last_cleanup.isoformat() | |
logger.info("Resource cleanup completed") | |
except Exception as e: | |
logger.error(f"Resource cleanup failed: {e}") | |
cleanup_results['error'] = str(e) | |
return cleanup_results | |
def get_memory_info(self) -> Dict[str, Any]: | |
"""Get current memory usage information.""" | |
try: | |
# System memory | |
memory = psutil.virtual_memory() | |
# Python memory (approximate) | |
import sys | |
python_objects = len(gc.get_objects()) | |
# PyTorch memory | |
torch_memory = {} | |
if torch.cuda.is_available(): | |
torch_memory = { | |
'allocated_mb': torch.cuda.memory_allocated() / (1024 * 1024), | |
'cached_mb': torch.cuda.memory_reserved() / (1024 * 1024), | |
'max_allocated_mb': torch.cuda.max_memory_allocated() / (1024 * 1024) | |
} | |
return { | |
'system_memory': { | |
'total_mb': memory.total / (1024 * 1024), | |
'available_mb': memory.available / (1024 * 1024), | |
'used_mb': memory.used / (1024 * 1024), | |
'percent': memory.percent | |
}, | |
'python_objects': python_objects, | |
'torch_memory': torch_memory, | |
'last_cleanup': self.last_cleanup.isoformat() | |
} | |
except Exception as e: | |
logger.error(f"Failed to get memory info: {e}") | |
return {'error': str(e)} |