Spaces:
Paused
Paused
| """ | |
| LLM Manager for Flare | |
| ==================== | |
| Manages LLM interactions per session with stateless approach | |
| """ | |
| import asyncio | |
| from typing import Dict, Optional, Any, List | |
| from datetime import datetime | |
| import traceback | |
| from dataclasses import dataclass, field | |
| import json | |
| from chat_session.event_bus import EventBus, Event, EventType, publish_error | |
| from chat_session.resource_manager import ResourceManager, ResourceType | |
| from chat_session.session import Session | |
| from llm.llm_factory import LLMFactory | |
| from llm.llm_interface import LLMInterface | |
| from llm.prompt_builder import build_intent_prompt, build_parameter_prompt | |
| from utils.logger import log_info, log_error, log_debug, log_warning | |
| from config.config_provider import ConfigProvider | |
| class LLMJob: | |
| """LLM processing job""" | |
| job_id: str | |
| session_id: str | |
| input_text: str | |
| job_type: str # "intent_detection", "parameter_collection", "response_generation" | |
| created_at: datetime = field(default_factory=datetime.utcnow) | |
| completed_at: Optional[datetime] = None | |
| response_text: Optional[str] = None | |
| detected_intent: Optional[str] = None | |
| error: Optional[str] = None | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |
| def complete(self, response_text: str, intent: Optional[str] = None): | |
| """Mark job as completed""" | |
| self.response_text = response_text | |
| self.detected_intent = intent | |
| self.completed_at = datetime.utcnow() | |
| def fail(self, error: str): | |
| """Mark job as failed""" | |
| self.error = error | |
| self.completed_at = datetime.utcnow() | |
| class LLMSession: | |
| """LLM session wrapper""" | |
| session_id: str | |
| session: Session | |
| llm_instance: LLMInterface | |
| active_job: Optional[LLMJob] = None | |
| job_history: List[LLMJob] = field(default_factory=list) | |
| created_at: datetime = field(default_factory=datetime.utcnow) | |
| last_activity: datetime = field(default_factory=datetime.utcnow) | |
| total_jobs = 0 | |
| total_tokens = 0 | |
| def update_activity(self): | |
| """Update last activity timestamp""" | |
| self.last_activity = datetime.utcnow() | |
| class LLMManager: | |
| """Manages LLM interactions with stateless approach""" | |
| def __init__(self, event_bus: EventBus, resource_manager: ResourceManager): | |
| self.event_bus = event_bus | |
| self.resource_manager = resource_manager | |
| self.llm_sessions: Dict[str, LLMSession] = {} | |
| self.config = ConfigProvider.get() | |
| self._setup_event_handlers() | |
| self._setup_resource_pool() | |
| def _setup_event_handlers(self): | |
| """Subscribe to LLM-related events""" | |
| self.event_bus.subscribe(EventType.LLM_PROCESSING_STARTED, self._handle_llm_processing) | |
| self.event_bus.subscribe(EventType.SESSION_ENDED, self._handle_session_ended) | |
| def _setup_resource_pool(self): | |
| """Setup LLM instance pool""" | |
| self.resource_manager.register_pool( | |
| resource_type=ResourceType.LLM_CONTEXT, | |
| factory=self._create_llm_instance, | |
| max_idle=2, # Lower pool size for LLM | |
| max_age_seconds=900 # 15 minutes | |
| ) | |
| async def _create_llm_instance(self) -> LLMInterface: | |
| """Factory for creating LLM instances""" | |
| try: | |
| llm_instance = LLMFactory.create_provider() | |
| if not llm_instance: | |
| raise ValueError("Failed to create LLM instance") | |
| log_debug("🤖 Created new LLM instance") | |
| return llm_instance | |
| except Exception as e: | |
| log_error(f"❌ Failed to create LLM instance", error=str(e)) | |
| raise | |
| async def _handle_llm_processing(self, event: Event): | |
| """Handle LLM processing request""" | |
| session_id = event.session_id | |
| input_text = event.data.get("text", "") | |
| if not input_text: | |
| log_warning(f"⚠️ Empty text for LLM", session_id=session_id) | |
| return | |
| try: | |
| log_info( | |
| f"🤖 Starting LLM processing", | |
| session_id=session_id, | |
| text_length=len(input_text) | |
| ) | |
| # Get or create LLM session | |
| llm_session = await self._get_or_create_session(session_id) | |
| if not llm_session: | |
| raise ValueError("Failed to create LLM session") | |
| # Determine job type based on session state | |
| job_type = self._determine_job_type(llm_session.session) | |
| # Create job | |
| job_id = f"{session_id}_{llm_session.total_jobs}" | |
| job = LLMJob( | |
| job_id=job_id, | |
| session_id=session_id, | |
| input_text=input_text, | |
| job_type=job_type, | |
| metadata={ | |
| "session_state": llm_session.session.state, | |
| "current_intent": llm_session.session.current_intent | |
| } | |
| ) | |
| llm_session.active_job = job | |
| llm_session.total_jobs += 1 | |
| llm_session.update_activity() | |
| # Process based on job type | |
| if job_type == "intent_detection": | |
| await self._process_intent_detection(llm_session, job) | |
| elif job_type == "parameter_collection": | |
| await self._process_parameter_collection(llm_session, job) | |
| else: | |
| await self._process_response_generation(llm_session, job) | |
| except Exception as e: | |
| log_error( | |
| f"❌ Failed to process LLM request", | |
| session_id=session_id, | |
| error=str(e), | |
| traceback=traceback.format_exc() | |
| ) | |
| # Publish error event | |
| await publish_error( | |
| session_id=session_id, | |
| error_type="llm_error", | |
| error_message=f"LLM processing failed: {str(e)}" | |
| ) | |
| async def _get_or_create_session(self, session_id: str) -> Optional[LLMSession]: | |
| """Get or create LLM session""" | |
| if session_id in self.llm_sessions: | |
| return self.llm_sessions[session_id] | |
| # Get session from store | |
| from chat_session.session import session_store | |
| session = session_store.get_session(session_id) | |
| if not session: | |
| log_error(f"❌ Session not found", session_id=session_id) | |
| return None | |
| # Acquire LLM instance from pool | |
| resource_id = f"llm_{session_id}" | |
| llm_instance = await self.resource_manager.acquire( | |
| resource_id=resource_id, | |
| session_id=session_id, | |
| resource_type=ResourceType.LLM_CONTEXT, | |
| cleanup_callback=self._cleanup_llm_instance | |
| ) | |
| # Create LLM session | |
| llm_session = LLMSession( | |
| session_id=session_id, | |
| session=session, | |
| llm_instance=llm_instance | |
| ) | |
| self.llm_sessions[session_id] = llm_session | |
| return llm_session | |
| def _determine_job_type(self, session: Session) -> str: | |
| """Determine job type based on session state""" | |
| if session.state == "idle": | |
| return "intent_detection" | |
| elif session.state == "collect_params": | |
| return "parameter_collection" | |
| else: | |
| return "response_generation" | |
| async def _process_intent_detection(self, llm_session: LLMSession, job: LLMJob): | |
| """Process intent detection""" | |
| try: | |
| session = llm_session.session | |
| # Get project and version config | |
| project = next((p for p in self.config.projects if p.name == session.project_name), None) | |
| if not project: | |
| raise ValueError(f"Project not found: {session.project_name}") | |
| version = session.get_version_config() | |
| if not version: | |
| raise ValueError("Version config not found") | |
| # Build intent detection prompt | |
| prompt = build_intent_prompt( | |
| version=version, | |
| conversation=session.chat_history, | |
| project_locale=project.default_locale | |
| ) | |
| log_debug( | |
| f"📝 Intent detection prompt built", | |
| session_id=job.session_id, | |
| prompt_length=len(prompt) | |
| ) | |
| # Call LLM | |
| response = await llm_session.llm_instance.generate( | |
| system_prompt=prompt, | |
| user_input=job.input_text, | |
| context=session.chat_history[-10:] # Last 10 messages | |
| ) | |
| # Parse intent | |
| intent_name, response_text = self._parse_intent_response(response) | |
| if intent_name: | |
| # Find intent config | |
| intent_config = next((i for i in version.intents if i.name == intent_name), None) | |
| if intent_config: | |
| # Update session | |
| session.current_intent = intent_name | |
| session.set_intent_config(intent_config) | |
| session.state = "collect_params" | |
| log_info( | |
| f"🎯 Intent detected", | |
| session_id=job.session_id, | |
| intent=intent_name | |
| ) | |
| # Check if we need to collect parameters | |
| missing_params = [ | |
| p.name for p in intent_config.parameters | |
| if p.required and p.variable_name not in session.variables | |
| ] | |
| if not missing_params: | |
| # All parameters ready, execute action | |
| await self._execute_intent_action(llm_session, intent_config) | |
| return | |
| else: | |
| # Need to collect parameters | |
| await self._request_parameter_collection(llm_session, intent_config, missing_params) | |
| return | |
| # No intent detected, use response as is | |
| response_text = self._clean_response(response) | |
| job.complete(response_text, intent_name) | |
| # Publish response | |
| await self._publish_response(job) | |
| except Exception as e: | |
| job.fail(str(e)) | |
| raise | |
| async def _process_parameter_collection(self, llm_session: LLMSession, job: LLMJob): | |
| """Process parameter collection""" | |
| try: | |
| session = llm_session.session | |
| intent_config = session.get_intent_config() | |
| if not intent_config: | |
| raise ValueError("No intent config in session") | |
| # Extract parameters from user input | |
| extracted_params = await self._extract_parameters( | |
| llm_session, | |
| job.input_text, | |
| intent_config, | |
| session.variables | |
| ) | |
| # Update session variables | |
| for param_name, param_value in extracted_params.items(): | |
| param_config = next( | |
| (p for p in intent_config.parameters if p.name == param_name), | |
| None | |
| ) | |
| if param_config: | |
| session.variables[param_config.variable_name] = str(param_value) | |
| # Check what parameters are still missing | |
| missing_params = [ | |
| p.name for p in intent_config.parameters | |
| if p.required and p.variable_name not in session.variables | |
| ] | |
| if not missing_params: | |
| # All parameters collected, execute action | |
| await self._execute_intent_action(llm_session, intent_config) | |
| else: | |
| # Still need more parameters | |
| await self._request_parameter_collection(llm_session, intent_config, missing_params) | |
| except Exception as e: | |
| job.fail(str(e)) | |
| raise | |
| async def _process_response_generation(self, llm_session: LLMSession, job: LLMJob): | |
| """Process general response generation""" | |
| try: | |
| session = llm_session.session | |
| # Get version config | |
| version = session.get_version_config() | |
| if not version: | |
| raise ValueError("Version config not found") | |
| # Use general prompt | |
| prompt = version.general_prompt | |
| # Generate response | |
| response = await llm_session.llm_instance.generate( | |
| system_prompt=prompt, | |
| user_input=job.input_text, | |
| context=session.chat_history[-10:] | |
| ) | |
| response_text = self._clean_response(response) | |
| job.complete(response_text) | |
| # Publish response | |
| await self._publish_response(job) | |
| except Exception as e: | |
| job.fail(str(e)) | |
| raise | |
| async def _extract_parameters(self, | |
| llm_session: LLMSession, | |
| user_input: str, | |
| intent_config: Any, | |
| existing_params: Dict[str, str]) -> Dict[str, Any]: | |
| """Extract parameters from user input""" | |
| # Build extraction prompt | |
| param_info = [] | |
| for param in intent_config.parameters: | |
| if param.variable_name not in existing_params: | |
| param_info.append({ | |
| 'name': param.name, | |
| 'type': param.type, | |
| 'required': param.required, | |
| 'extraction_prompt': param.extraction_prompt | |
| }) | |
| prompt = f""" | |
| Extract parameters from user message: "{user_input}" | |
| Expected parameters: | |
| {json.dumps(param_info, ensure_ascii=False)} | |
| Return as JSON object with parameter names as keys. | |
| """ | |
| # Call LLM | |
| response = await llm_session.llm_instance.generate( | |
| system_prompt=prompt, | |
| user_input=user_input, | |
| context=[] | |
| ) | |
| # Parse JSON response | |
| try: | |
| # Look for JSON block in response | |
| import re | |
| json_match = re.search(r'```json\s*(.*?)\s*```', response, re.DOTALL) | |
| if not json_match: | |
| json_match = re.search(r'\{[^}]+\}', response) | |
| if json_match: | |
| json_str = json_match.group(1) if '```' in response else json_match.group(0) | |
| return json.loads(json_str) | |
| except: | |
| pass | |
| return {} | |
| async def _request_parameter_collection(self, | |
| llm_session: LLMSession, | |
| intent_config: Any, | |
| missing_params: List[str]): | |
| """Request parameter collection from user""" | |
| session = llm_session.session | |
| # Get project config | |
| project = next((p for p in self.config.projects if p.name == session.project_name), None) | |
| if not project: | |
| return | |
| version = session.get_version_config() | |
| if not version: | |
| return | |
| # Get parameter collection config | |
| collection_config = self.config.global_config.llm_provider.settings.get("parameter_collection_config", {}) | |
| max_params = collection_config.get("max_params_per_question", 2) | |
| # Decide which parameters to ask | |
| params_to_ask = missing_params[:max_params] | |
| # Build parameter collection prompt | |
| prompt = build_parameter_prompt( | |
| version=version, | |
| intent_config=intent_config, | |
| chat_history=session.chat_history, | |
| collected_params=session.variables, | |
| missing_params=missing_params, | |
| params_to_ask=params_to_ask, | |
| max_params=max_params, | |
| project_locale=project.default_locale, | |
| unanswered_params=session.unanswered_parameters | |
| ) | |
| # Generate question | |
| response = await llm_session.llm_instance.generate( | |
| system_prompt=prompt, | |
| user_input="", | |
| context=session.chat_history[-5:] | |
| ) | |
| response_text = self._clean_response(response) | |
| # Create a job for the response | |
| job = LLMJob( | |
| job_id=f"{session.session_id}_param_request", | |
| session_id=session.session_id, | |
| input_text="", | |
| job_type="parameter_request", | |
| response_text=response_text | |
| ) | |
| await self._publish_response(job) | |
| async def _execute_intent_action(self, llm_session: LLMSession, intent_config: Any): | |
| """Execute intent action (API call)""" | |
| session = llm_session.session | |
| try: | |
| # Get API config | |
| api_name = intent_config.action | |
| api_config = self.config.get_api(api_name) | |
| if not api_config: | |
| raise ValueError(f"API config not found: {api_name}") | |
| log_info( | |
| f"📡 Executing intent action", | |
| session_id=session.session_id, | |
| api_name=api_name, | |
| variables=session.variables | |
| ) | |
| # Execute API call | |
| from api.api_executor import call_api | |
| response = call_api(api_config, session) | |
| api_json = response.json() | |
| log_info(f"✅ API response received", session_id=session.session_id) | |
| # Humanize response if prompt exists | |
| if api_config.response_prompt: | |
| prompt = api_config.response_prompt.replace( | |
| "{{api_response}}", | |
| json.dumps(api_json, ensure_ascii=False) | |
| ) | |
| human_response = await llm_session.llm_instance.generate( | |
| system_prompt=prompt, | |
| user_input=json.dumps(api_json), | |
| context=[] | |
| ) | |
| response_text = self._clean_response(human_response) | |
| else: | |
| response_text = f"İşlem tamamlandı: {api_json}" | |
| # Reset session flow | |
| session.reset_flow() | |
| # Create job for response | |
| job = LLMJob( | |
| job_id=f"{session.session_id}_action_result", | |
| session_id=session.session_id, | |
| input_text="", | |
| job_type="action_result", | |
| response_text=response_text | |
| ) | |
| await self._publish_response(job) | |
| except Exception as e: | |
| log_error( | |
| f"❌ API execution failed", | |
| session_id=session.session_id, | |
| error=str(e) | |
| ) | |
| # Reset flow | |
| session.reset_flow() | |
| # Send error response | |
| error_response = self._get_user_friendly_error("api_error", {"api_name": api_name}) | |
| job = LLMJob( | |
| job_id=f"{session.session_id}_error", | |
| session_id=session.session_id, | |
| input_text="", | |
| job_type="error", | |
| response_text=error_response | |
| ) | |
| await self._publish_response(job) | |
| async def _publish_response(self, job: LLMJob): | |
| """Publish LLM response""" | |
| # Update job history | |
| llm_session = self.llm_sessions.get(job.session_id) | |
| if llm_session: | |
| llm_session.job_history.append(job) | |
| # Keep only last 20 jobs | |
| if len(llm_session.job_history) > 20: | |
| llm_session.job_history.pop(0) | |
| # Publish event | |
| await self.event_bus.publish(Event( | |
| type=EventType.LLM_RESPONSE_READY, | |
| session_id=job.session_id, | |
| data={ | |
| "text": job.response_text, | |
| "intent": job.detected_intent, | |
| "job_type": job.job_type | |
| } | |
| )) | |
| log_info( | |
| f"✅ LLM response published", | |
| session_id=job.session_id, | |
| response_length=len(job.response_text) if job.response_text else 0 | |
| ) | |
| def _parse_intent_response(self, response: str) -> tuple[str, str]: | |
| """Parse intent from LLM response""" | |
| import re | |
| # Look for intent pattern | |
| match = re.search(r"#DETECTED_INTENT:\s*([A-Za-z0-9_-]+)", response) | |
| if not match: | |
| return "", response | |
| intent_name = match.group(1) | |
| # Remove 'assistant' suffix if exists | |
| if intent_name.endswith("assistant"): | |
| intent_name = intent_name[:-9] | |
| # Get remaining text after intent | |
| remaining_text = response[match.end():] | |
| return intent_name, remaining_text | |
| def _clean_response(self, response: str) -> str: | |
| """Clean LLM response""" | |
| # Remove everything after the first logical assistant block or intent tag | |
| for stop in ["#DETECTED_INTENT", "⚠️", "\nassistant", "assistant\n", "assistant"]: | |
| idx = response.find(stop) | |
| if idx != -1: | |
| response = response[:idx] | |
| # Normalize common greetings | |
| import re | |
| response = re.sub(r"Hoş[\s-]?geldin(iz)?", "Hoş geldiniz", response, flags=re.IGNORECASE) | |
| return response.strip() | |
| def _get_user_friendly_error(self, error_type: str, context: dict = None) -> str: | |
| """Get user-friendly error messages""" | |
| error_messages = { | |
| "session_not_found": "Oturumunuz bulunamadı. Lütfen yeni bir konuşma başlatın.", | |
| "project_not_found": "Proje konfigürasyonu bulunamadı. Lütfen yönetici ile iletişime geçin.", | |
| "version_not_found": "Proje versiyonu bulunamadı. Lütfen geçerli bir versiyon seçin.", | |
| "intent_not_found": "Üzgünüm, ne yapmak istediğinizi anlayamadım. Lütfen daha açık bir şekilde belirtir misiniz?", | |
| "api_timeout": "İşlem zaman aşımına uğradı. Lütfen tekrar deneyin.", | |
| "api_error": "İşlem sırasında bir hata oluştu. Lütfen daha sonra tekrar deneyin.", | |
| "parameter_validation": "Girdiğiniz bilgide bir hata var. Lütfen kontrol edip tekrar deneyin.", | |
| "llm_error": "Sistem yanıt veremedi. Lütfen biraz sonra tekrar deneyin.", | |
| "llm_timeout": "Sistem meşgul. Lütfen birkaç saniye bekleyip tekrar deneyin.", | |
| "session_expired": "Oturumunuz zaman aşımına uğradı. Lütfen yeni bir konuşma başlatın.", | |
| "rate_limit": "Çok fazla istek gönderdiniz. Lütfen biraz bekleyin.", | |
| "internal_error": "Beklenmeyen bir hata oluştu. Lütfen yönetici ile iletişime geçin." | |
| } | |
| message = error_messages.get(error_type, error_messages["internal_error"]) | |
| # Add context if available | |
| if context: | |
| if error_type == "api_error" and "api_name" in context: | |
| message = f"{context['api_name']} servisi için {message}" | |
| return message | |
| async def _handle_session_ended(self, event: Event): | |
| """Clean up LLM resources when session ends""" | |
| session_id = event.session_id | |
| await self._cleanup_session(session_id) | |
| async def _cleanup_session(self, session_id: str): | |
| """Clean up LLM session""" | |
| llm_session = self.llm_sessions.pop(session_id, None) | |
| if not llm_session: | |
| return | |
| try: | |
| # Release resource | |
| resource_id = f"llm_{session_id}" | |
| await self.resource_manager.release(resource_id, delay_seconds=180) # 3 minutes | |
| log_info( | |
| f"🧹 LLM session cleaned up", | |
| session_id=session_id, | |
| total_jobs=llm_session.total_jobs, | |
| job_history_size=len(llm_session.job_history) | |
| ) | |
| except Exception as e: | |
| log_error( | |
| f"❌ Error cleaning up LLM session", | |
| session_id=session_id, | |
| error=str(e) | |
| ) | |
| async def _cleanup_llm_instance(self, llm_instance: LLMInterface): | |
| """Cleanup callback for LLM instance""" | |
| try: | |
| # LLM instances typically don't need special cleanup | |
| log_debug("🧹 LLM instance cleaned up") | |
| except Exception as e: | |
| log_error(f"❌ Error cleaning up LLM instance", error=str(e)) | |
| def get_stats(self) -> Dict[str, Any]: | |
| """Get LLM manager statistics""" | |
| session_stats = {} | |
| for session_id, llm_session in self.llm_sessions.items(): | |
| session_stats[session_id] = { | |
| "active_job": llm_session.active_job.job_id if llm_session.active_job else None, | |
| "total_jobs": llm_session.total_jobs, | |
| "job_history_size": len(llm_session.job_history), | |
| "uptime_seconds": (datetime.utcnow() - llm_session.created_at).total_seconds(), | |
| "last_activity": llm_session.last_activity.isoformat() | |
| } | |
| return { | |
| "active_sessions": len(self.llm_sessions), | |
| "total_active_jobs": sum(1 for s in self.llm_sessions.values() if s.active_job), | |
| "sessions": session_stats | |
| } | |