Spaces:
Paused
Paused
| """ | |
| Resource Manager for Flare | |
| ========================== | |
| Manages lifecycle of all session resources | |
| """ | |
| import asyncio | |
| from typing import Dict, Any, Optional, Callable, Set | |
| from datetime import datetime, timedelta | |
| from dataclasses import dataclass, field | |
| import traceback | |
| from enum import Enum | |
| from event_bus import EventBus, Event, EventType | |
| from utils.logger import log_info, log_error, log_debug, log_warning | |
| class ResourceType(Enum): | |
| """Types of resources managed""" | |
| STT_INSTANCE = "stt_instance" | |
| TTS_INSTANCE = "tts_instance" | |
| LLM_CONTEXT = "llm_context" | |
| AUDIO_BUFFER = "audio_buffer" | |
| WEBSOCKET = "websocket" | |
| GENERIC = "generic" | |
| class Resource: | |
| """Resource wrapper with metadata""" | |
| resource_id: str | |
| resource_type: ResourceType | |
| session_id: str | |
| instance: Any | |
| created_at: datetime = field(default_factory=datetime.utcnow) | |
| last_accessed: datetime = field(default_factory=datetime.utcnow) | |
| disposal_task: Optional[asyncio.Task] = None | |
| cleanup_callback: Optional[Callable] = None | |
| def touch(self): | |
| """Update last accessed time""" | |
| self.last_accessed = datetime.utcnow() | |
| async def cleanup(self): | |
| """Cleanup the resource""" | |
| try: | |
| if self.cleanup_callback: | |
| if asyncio.iscoroutinefunction(self.cleanup_callback): | |
| await self.cleanup_callback(self.instance) | |
| else: | |
| await asyncio.to_thread(self.cleanup_callback, self.instance) | |
| log_debug( | |
| f"🧹 Resource cleaned up", | |
| resource_id=self.resource_id, | |
| resource_type=self.resource_type.value | |
| ) | |
| except Exception as e: | |
| log_error( | |
| f"❌ Error cleaning up resource", | |
| resource_id=self.resource_id, | |
| error=str(e) | |
| ) | |
| class ResourcePool: | |
| """Pool for reusable resources""" | |
| def __init__(self, | |
| resource_type: ResourceType, | |
| factory: Callable, | |
| max_idle: int = 10, | |
| max_age_seconds: int = 300): | |
| self.resource_type = resource_type | |
| self.factory = factory | |
| self.max_idle = max_idle | |
| self.max_age_seconds = max_age_seconds | |
| self.idle_resources: List[Resource] = [] | |
| self.lock = asyncio.Lock() | |
| async def acquire(self, session_id: str) -> Any: | |
| """Get resource from pool or create new""" | |
| async with self.lock: | |
| # Try to get from pool | |
| now = datetime.utcnow() | |
| while self.idle_resources: | |
| resource = self.idle_resources.pop(0) | |
| age = (now - resource.created_at).total_seconds() | |
| if age < self.max_age_seconds: | |
| # Reuse this resource | |
| resource.session_id = session_id | |
| resource.touch() | |
| log_debug( | |
| f"♻️ Reused pooled resource", | |
| resource_type=self.resource_type.value, | |
| age_seconds=age | |
| ) | |
| return resource.instance | |
| else: | |
| # Too old, cleanup | |
| await resource.cleanup() | |
| # Create new resource | |
| if asyncio.iscoroutinefunction(self.factory): | |
| instance = await self.factory() | |
| else: | |
| instance = await asyncio.to_thread(self.factory) | |
| log_debug( | |
| f"🏗️ Created new resource", | |
| resource_type=self.resource_type.value | |
| ) | |
| return instance | |
| async def release(self, resource: Resource): | |
| """Return resource to pool""" | |
| async with self.lock: | |
| if len(self.idle_resources) < self.max_idle: | |
| resource.session_id = "" # Clear session | |
| self.idle_resources.append(resource) | |
| log_debug( | |
| f"📥 Resource returned to pool", | |
| resource_type=self.resource_type.value, | |
| pool_size=len(self.idle_resources) | |
| ) | |
| else: | |
| # Pool full, cleanup | |
| await resource.cleanup() | |
| async def cleanup_old(self): | |
| """Cleanup old resources in pool""" | |
| async with self.lock: | |
| now = datetime.utcnow() | |
| active_resources = [] | |
| for resource in self.idle_resources: | |
| age = (now - resource.created_at).total_seconds() | |
| if age < self.max_age_seconds: | |
| active_resources.append(resource) | |
| else: | |
| await resource.cleanup() | |
| self.idle_resources = active_resources | |
| class ResourceManager: | |
| """Manages all resources with lifecycle and pooling""" | |
| def __init__(self, event_bus: EventBus): | |
| self.event_bus = event_bus | |
| self.resources: Dict[str, Resource] = {} | |
| self.session_resources: Dict[str, Set[str]] = {} | |
| self.pools: Dict[ResourceType, ResourcePool] = {} | |
| self.disposal_delay_seconds = 60 # Default disposal delay | |
| self._cleanup_task: Optional[asyncio.Task] = None | |
| self._running = False | |
| self._setup_event_handlers() | |
| def _setup_event_handlers(self): | |
| """Subscribe to lifecycle events""" | |
| self.event_bus.subscribe(EventType.SESSION_STARTED, self._handle_session_started) | |
| self.event_bus.subscribe(EventType.SESSION_ENDED, self._handle_session_ended) | |
| async def start(self): | |
| """Start resource manager""" | |
| if self._running: | |
| return | |
| self._running = True | |
| self._cleanup_task = asyncio.create_task(self._periodic_cleanup()) | |
| log_info("✅ Resource manager started") | |
| async def stop(self): | |
| """Stop resource manager""" | |
| self._running = False | |
| if self._cleanup_task: | |
| self._cleanup_task.cancel() | |
| try: | |
| await self._cleanup_task | |
| except asyncio.CancelledError: | |
| pass | |
| # Cleanup all resources | |
| for resource_id in list(self.resources.keys()): | |
| await self.release(resource_id, immediate=True) | |
| log_info("✅ Resource manager stopped") | |
| def register_pool(self, | |
| resource_type: ResourceType, | |
| factory: Callable, | |
| max_idle: int = 10, | |
| max_age_seconds: int = 300): | |
| """Register a resource pool""" | |
| self.pools[resource_type] = ResourcePool( | |
| resource_type=resource_type, | |
| factory=factory, | |
| max_idle=max_idle, | |
| max_age_seconds=max_age_seconds | |
| ) | |
| log_info( | |
| f"📊 Resource pool registered", | |
| resource_type=resource_type.value, | |
| max_idle=max_idle | |
| ) | |
| async def acquire(self, | |
| resource_id: str, | |
| session_id: str, | |
| resource_type: ResourceType, | |
| factory: Optional[Callable] = None, | |
| cleanup_callback: Optional[Callable] = None) -> Any: | |
| """Acquire a resource""" | |
| # Check if already exists | |
| if resource_id in self.resources: | |
| resource = self.resources[resource_id] | |
| resource.touch() | |
| # Cancel any pending disposal | |
| if resource.disposal_task: | |
| resource.disposal_task.cancel() | |
| resource.disposal_task = None | |
| return resource.instance | |
| # Try to get from pool | |
| instance = None | |
| if resource_type in self.pools: | |
| instance = await self.pools[resource_type].acquire(session_id) | |
| elif factory: | |
| # Create new resource | |
| if asyncio.iscoroutinefunction(factory): | |
| instance = await factory() | |
| else: | |
| instance = await asyncio.to_thread(factory) | |
| else: | |
| raise ValueError(f"No factory or pool for resource type: {resource_type}") | |
| # Create resource wrapper | |
| resource = Resource( | |
| resource_id=resource_id, | |
| resource_type=resource_type, | |
| session_id=session_id, | |
| instance=instance, | |
| cleanup_callback=cleanup_callback | |
| ) | |
| # Track resource | |
| self.resources[resource_id] = resource | |
| if session_id not in self.session_resources: | |
| self.session_resources[session_id] = set() | |
| self.session_resources[session_id].add(resource_id) | |
| log_info( | |
| f"📌 Resource acquired", | |
| resource_id=resource_id, | |
| resource_type=resource_type.value, | |
| session_id=session_id | |
| ) | |
| return instance | |
| async def release(self, | |
| resource_id: str, | |
| delay_seconds: Optional[int] = None, | |
| immediate: bool = False): | |
| """Release a resource with optional delay""" | |
| resource = self.resources.get(resource_id) | |
| if not resource: | |
| return | |
| if immediate: | |
| # Immediate cleanup | |
| await self._dispose_resource(resource_id) | |
| else: | |
| # Schedule disposal | |
| delay = delay_seconds or self.disposal_delay_seconds | |
| resource.disposal_task = asyncio.create_task( | |
| self._delayed_disposal(resource_id, delay) | |
| ) | |
| log_debug( | |
| f"⏱️ Resource disposal scheduled", | |
| resource_id=resource_id, | |
| delay_seconds=delay | |
| ) | |
| async def _delayed_disposal(self, resource_id: str, delay_seconds: int): | |
| """Dispose resource after delay""" | |
| try: | |
| await asyncio.sleep(delay_seconds) | |
| await self._dispose_resource(resource_id) | |
| except asyncio.CancelledError: | |
| log_debug(f"🚫 Disposal cancelled", resource_id=resource_id) | |
| async def _dispose_resource(self, resource_id: str): | |
| """Actually dispose of a resource""" | |
| resource = self.resources.pop(resource_id, None) | |
| if not resource: | |
| return | |
| # Remove from session tracking | |
| if resource.session_id in self.session_resources: | |
| self.session_resources[resource.session_id].discard(resource_id) | |
| # Return to pool or cleanup | |
| if resource.resource_type in self.pools: | |
| await self.pools[resource.resource_type].release(resource) | |
| else: | |
| await resource.cleanup() | |
| log_info( | |
| f"♻️ Resource disposed", | |
| resource_id=resource_id, | |
| resource_type=resource.resource_type.value | |
| ) | |
| async def release_session_resources(self, session_id: str): | |
| """Release all resources for a session""" | |
| resource_ids = self.session_resources.get(session_id, set()).copy() | |
| for resource_id in resource_ids: | |
| await self.release(resource_id, immediate=True) | |
| # Remove session tracking | |
| self.session_resources.pop(session_id, None) | |
| log_info( | |
| f"🧹 Session resources released", | |
| session_id=session_id, | |
| count=len(resource_ids) | |
| ) | |
| async def _handle_session_started(self, event: Event): | |
| """Initialize session resource tracking""" | |
| session_id = event.session_id | |
| self.session_resources[session_id] = set() | |
| async def _handle_session_ended(self, event: Event): | |
| """Cleanup session resources""" | |
| session_id = event.session_id | |
| await self.release_session_resources(session_id) | |
| async def _periodic_cleanup(self): | |
| """Periodic cleanup of old resources""" | |
| while self._running: | |
| try: | |
| await asyncio.sleep(60) # Check every minute | |
| # Cleanup old pooled resources | |
| for pool in self.pools.values(): | |
| await pool.cleanup_old() | |
| # Check for orphaned resources | |
| now = datetime.utcnow() | |
| for resource_id, resource in list(self.resources.items()): | |
| age = (now - resource.last_accessed).total_seconds() | |
| # If not accessed for 5 minutes and no disposal scheduled | |
| if age > 300 and not resource.disposal_task: | |
| log_warning( | |
| f"⚠️ Orphaned resource detected", | |
| resource_id=resource_id, | |
| age_seconds=age | |
| ) | |
| await self.release(resource_id, delay_seconds=30) | |
| except Exception as e: | |
| log_error( | |
| f"❌ Error in periodic cleanup", | |
| error=str(e), | |
| traceback=traceback.format_exc() | |
| ) | |
| def get_stats(self) -> Dict[str, Any]: | |
| """Get resource manager statistics""" | |
| pool_stats = {} | |
| for resource_type, pool in self.pools.items(): | |
| pool_stats[resource_type.value] = { | |
| "idle_count": len(pool.idle_resources), | |
| "max_idle": pool.max_idle | |
| } | |
| return { | |
| "active_resources": len(self.resources), | |
| "sessions": len(self.session_resources), | |
| "pools": pool_stats, | |
| "total_resources_by_type": self._count_by_type() | |
| } | |
| def _count_by_type(self) -> Dict[str, int]: | |
| """Count resources by type""" | |
| counts = {} | |
| for resource in self.resources.values(): | |
| type_name = resource.resource_type.value | |
| counts[type_name] = counts.get(type_name, 0) + 1 | |
| return counts |