ciyidogan commited on
Commit
124d8da
·
verified ·
1 Parent(s): ed5b329

Rename llm/llm_manager.py to llm/llm_lifecycle_manager.py

Browse files
llm/{llm_manager.py → llm_lifecycle_manager.py} RENAMED
@@ -1,689 +1,703 @@
1
- """
2
- LLM Manager for Flare
3
- ====================
4
- Manages LLM interactions per session with stateless approach
5
- """
6
- import asyncio
7
- from typing import Dict, Optional, Any, List
8
- from datetime import datetime
9
- import traceback
10
- from dataclasses import dataclass, field
11
- import json
12
-
13
- from chat_session.event_bus import EventBus, Event, EventType, publish_error
14
- from chat_session.resource_manager import ResourceManager, ResourceType
15
- from chat_session.session import Session
16
- from llm.llm_factory import LLMFactory
17
- from llm.llm_interface import LLMInterface
18
- from llm.prompt_builder import build_intent_prompt, build_parameter_prompt
19
- from utils.logger import log_info, log_error, log_debug, log_warning
20
- from config.config_provider import ConfigProvider
21
-
22
-
23
- @dataclass
24
- class LLMJob:
25
- """LLM processing job"""
26
- job_id: str
27
- session_id: str
28
- input_text: str
29
- job_type: str # "intent_detection", "parameter_collection", "response_generation"
30
- created_at: datetime = field(default_factory=datetime.utcnow)
31
- completed_at: Optional[datetime] = None
32
- response_text: Optional[str] = None
33
- detected_intent: Optional[str] = None
34
- error: Optional[str] = None
35
- metadata: Dict[str, Any] = field(default_factory=dict)
36
-
37
- def complete(self, response_text: str, intent: Optional[str] = None):
38
- """Mark job as completed"""
39
- self.response_text = response_text
40
- self.detected_intent = intent
41
- self.completed_at = datetime.utcnow()
42
-
43
- def fail(self, error: str):
44
- """Mark job as failed"""
45
- self.error = error
46
- self.completed_at = datetime.utcnow()
47
-
48
-
49
- @dataclass
50
- class LLMSession:
51
- """LLM session wrapper"""
52
- session_id: str
53
- session: Session
54
- llm_instance: LLMInterface
55
- active_job: Optional[LLMJob] = None
56
- job_history: List[LLMJob] = field(default_factory=list)
57
- created_at: datetime = field(default_factory=datetime.utcnow)
58
- last_activity: datetime = field(default_factory=datetime.utcnow)
59
- total_jobs = 0
60
- total_tokens = 0
61
-
62
- def update_activity(self):
63
- """Update last activity timestamp"""
64
- self.last_activity = datetime.utcnow()
65
-
66
-
67
- class LLMManager:
68
- """Manages LLM interactions with stateless approach"""
69
-
70
- def __init__(self, event_bus: EventBus, resource_manager: ResourceManager):
71
- self.event_bus = event_bus
72
- self.resource_manager = resource_manager
73
- self.llm_sessions: Dict[str, LLMSession] = {}
74
- self.config = ConfigProvider.get()
75
- self._setup_event_handlers()
76
- self._setup_resource_pool()
77
-
78
- def _setup_event_handlers(self):
79
- """Subscribe to LLM-related events"""
80
- self.event_bus.subscribe(EventType.LLM_PROCESSING_STARTED, self._handle_llm_processing)
81
- self.event_bus.subscribe(EventType.SESSION_ENDED, self._handle_session_ended)
82
-
83
- def _setup_resource_pool(self):
84
- """Setup LLM instance pool"""
85
- self.resource_manager.register_pool(
86
- resource_type=ResourceType.LLM_CONTEXT,
87
- factory=self._create_llm_instance,
88
- max_idle=2, # Lower pool size for LLM
89
- max_age_seconds=900 # 15 minutes
90
- )
91
-
92
- async def _create_llm_instance(self) -> LLMInterface:
93
- """Factory for creating LLM instances"""
94
- try:
95
- llm_instance = LLMFactory.create_provider()
96
- if not llm_instance:
97
- raise ValueError("Failed to create LLM instance")
98
-
99
- log_debug("🤖 Created new LLM instance")
100
- return llm_instance
101
-
102
- except Exception as e:
103
- log_error(f"❌ Failed to create LLM instance", error=str(e))
104
- raise
105
-
106
- async def _handle_llm_processing(self, event: Event):
107
- """Handle LLM processing request"""
108
- session_id = event.session_id
109
- input_text = event.data.get("text", "")
110
-
111
- if not input_text:
112
- log_warning(f"⚠️ Empty text for LLM", session_id=session_id)
113
- return
114
-
115
- try:
116
- log_info(
117
- f"🤖 Starting LLM processing",
118
- session_id=session_id,
119
- text_length=len(input_text)
120
- )
121
-
122
- # Get or create LLM session
123
- llm_session = await self._get_or_create_session(session_id)
124
- if not llm_session:
125
- raise ValueError("Failed to create LLM session")
126
-
127
- # Determine job type based on session state
128
- job_type = self._determine_job_type(llm_session.session)
129
-
130
- # Create job
131
- job_id = f"{session_id}_{llm_session.total_jobs}"
132
- job = LLMJob(
133
- job_id=job_id,
134
- session_id=session_id,
135
- input_text=input_text,
136
- job_type=job_type,
137
- metadata={
138
- "session_state": llm_session.session.state,
139
- "current_intent": llm_session.session.current_intent
140
- }
141
- )
142
-
143
- llm_session.active_job = job
144
- llm_session.total_jobs += 1
145
- llm_session.update_activity()
146
-
147
- # Process based on job type
148
- if job_type == "intent_detection":
149
- await self._process_intent_detection(llm_session, job)
150
- elif job_type == "parameter_collection":
151
- await self._process_parameter_collection(llm_session, job)
152
- else:
153
- await self._process_response_generation(llm_session, job)
154
-
155
- except Exception as e:
156
- log_error(
157
- f"❌ Failed to process LLM request",
158
- session_id=session_id,
159
- error=str(e),
160
- traceback=traceback.format_exc()
161
- )
162
-
163
- # Publish error event
164
- await publish_error(
165
- session_id=session_id,
166
- error_type="llm_error",
167
- error_message=f"LLM processing failed: {str(e)}"
168
- )
169
-
170
- async def _get_or_create_session(self, session_id: str) -> Optional[LLMSession]:
171
- """Get or create LLM session"""
172
- if session_id in self.llm_sessions:
173
- return self.llm_sessions[session_id]
174
-
175
- # Get session from store
176
- from chat_session.session import session_store
177
- session = session_store.get_session(session_id)
178
- if not session:
179
- log_error(f"❌ Session not found", session_id=session_id)
180
- return None
181
-
182
- # Acquire LLM instance from pool
183
- resource_id = f"llm_{session_id}"
184
- llm_instance = await self.resource_manager.acquire(
185
- resource_id=resource_id,
186
- session_id=session_id,
187
- resource_type=ResourceType.LLM_CONTEXT,
188
- cleanup_callback=self._cleanup_llm_instance
189
- )
190
-
191
- # Create LLM session
192
- llm_session = LLMSession(
193
- session_id=session_id,
194
- session=session,
195
- llm_instance=llm_instance
196
- )
197
-
198
- self.llm_sessions[session_id] = llm_session
199
- return llm_session
200
-
201
- def _determine_job_type(self, session: Session) -> str:
202
- """Determine job type based on session state"""
203
- if session.state == "idle":
204
- return "intent_detection"
205
- elif session.state == "collect_params":
206
- return "parameter_collection"
207
- else:
208
- return "response_generation"
209
-
210
- async def _process_intent_detection(self, llm_session: LLMSession, job: LLMJob):
211
- """Process intent detection"""
212
- try:
213
- session = llm_session.session
214
-
215
- # Get project and version config
216
- project = next((p for p in self.config.projects if p.name == session.project_name), None)
217
- if not project:
218
- raise ValueError(f"Project not found: {session.project_name}")
219
-
220
- version = session.get_version_config()
221
- if not version:
222
- raise ValueError("Version config not found")
223
-
224
- # Build intent detection prompt
225
- prompt = build_intent_prompt(
226
- version=version,
227
- conversation=session.chat_history,
228
- project_locale=project.default_locale
229
- )
230
-
231
- log_debug(
232
- f"📝 Intent detection prompt built",
233
- session_id=job.session_id,
234
- prompt_length=len(prompt)
235
- )
236
-
237
- # Call LLM
238
- response = await llm_session.llm_instance.generate(
239
- system_prompt=prompt,
240
- user_input=job.input_text,
241
- context=session.chat_history[-10:] # Last 10 messages
242
- )
243
-
244
- # Parse intent
245
- intent_name, response_text = self._parse_intent_response(response)
246
-
247
- if intent_name:
248
- # Find intent config
249
- intent_config = next((i for i in version.intents if i.name == intent_name), None)
250
-
251
- if intent_config:
252
- # Update session
253
- session.current_intent = intent_name
254
- session.set_intent_config(intent_config)
255
- session.state = "collect_params"
256
-
257
- log_info(
258
- f"🎯 Intent detected",
259
- session_id=job.session_id,
260
- intent=intent_name
261
- )
262
-
263
- # Check if we need to collect parameters
264
- missing_params = [
265
- p.name for p in intent_config.parameters
266
- if p.required and p.variable_name not in session.variables
267
- ]
268
-
269
- if not missing_params:
270
- # All parameters ready, execute action
271
- await self._execute_intent_action(llm_session, intent_config)
272
- return
273
- else:
274
- # Need to collect parameters
275
- await self._request_parameter_collection(llm_session, intent_config, missing_params)
276
- return
277
-
278
- # No intent detected, use response as is
279
- response_text = self._clean_response(response)
280
- job.complete(response_text, intent_name)
281
-
282
- # Publish response
283
- await self._publish_response(job)
284
-
285
- except Exception as e:
286
- job.fail(str(e))
287
- raise
288
-
289
- async def _process_parameter_collection(self, llm_session: LLMSession, job: LLMJob):
290
- """Process parameter collection"""
291
- try:
292
- session = llm_session.session
293
- intent_config = session.get_intent_config()
294
-
295
- if not intent_config:
296
- raise ValueError("No intent config in session")
297
-
298
- # Extract parameters from user input
299
- extracted_params = await self._extract_parameters(
300
- llm_session,
301
- job.input_text,
302
- intent_config,
303
- session.variables
304
- )
305
-
306
- # Update session variables
307
- for param_name, param_value in extracted_params.items():
308
- param_config = next(
309
- (p for p in intent_config.parameters if p.name == param_name),
310
- None
311
- )
312
- if param_config:
313
- session.variables[param_config.variable_name] = str(param_value)
314
-
315
- # Check what parameters are still missing
316
- missing_params = [
317
- p.name for p in intent_config.parameters
318
- if p.required and p.variable_name not in session.variables
319
- ]
320
-
321
- if not missing_params:
322
- # All parameters collected, execute action
323
- await self._execute_intent_action(llm_session, intent_config)
324
- else:
325
- # Still need more parameters
326
- await self._request_parameter_collection(llm_session, intent_config, missing_params)
327
-
328
- except Exception as e:
329
- job.fail(str(e))
330
- raise
331
-
332
- async def _process_response_generation(self, llm_session: LLMSession, job: LLMJob):
333
- """Process general response generation"""
334
- try:
335
- session = llm_session.session
336
-
337
- # Get version config
338
- version = session.get_version_config()
339
- if not version:
340
- raise ValueError("Version config not found")
341
-
342
- # Use general prompt
343
- prompt = version.general_prompt
344
-
345
- # Generate response
346
- response = await llm_session.llm_instance.generate(
347
- system_prompt=prompt,
348
- user_input=job.input_text,
349
- context=session.chat_history[-10:]
350
- )
351
-
352
- response_text = self._clean_response(response)
353
- job.complete(response_text)
354
-
355
- # Publish response
356
- await self._publish_response(job)
357
-
358
- except Exception as e:
359
- job.fail(str(e))
360
- raise
361
-
362
- async def _extract_parameters(self,
363
- llm_session: LLMSession,
364
- user_input: str,
365
- intent_config: Any,
366
- existing_params: Dict[str, str]) -> Dict[str, Any]:
367
- """Extract parameters from user input"""
368
- # Build extraction prompt
369
- param_info = []
370
- for param in intent_config.parameters:
371
- if param.variable_name not in existing_params:
372
- param_info.append({
373
- 'name': param.name,
374
- 'type': param.type,
375
- 'required': param.required,
376
- 'extraction_prompt': param.extraction_prompt
377
- })
378
-
379
- prompt = f"""
380
- Extract parameters from user message: "{user_input}"
381
-
382
- Expected parameters:
383
- {json.dumps(param_info, ensure_ascii=False)}
384
-
385
- Return as JSON object with parameter names as keys.
386
- """
387
-
388
- # Call LLM
389
- response = await llm_session.llm_instance.generate(
390
- system_prompt=prompt,
391
- user_input=user_input,
392
- context=[]
393
- )
394
-
395
- # Parse JSON response
396
- try:
397
- # Look for JSON block in response
398
- import re
399
- json_match = re.search(r'```json\s*(.*?)\s*```', response, re.DOTALL)
400
- if not json_match:
401
- json_match = re.search(r'\{[^}]+\}', response)
402
-
403
- if json_match:
404
- json_str = json_match.group(1) if '```' in response else json_match.group(0)
405
- return json.loads(json_str)
406
- except:
407
- pass
408
-
409
- return {}
410
-
411
- async def _request_parameter_collection(self,
412
- llm_session: LLMSession,
413
- intent_config: Any,
414
- missing_params: List[str]):
415
- """Request parameter collection from user"""
416
- session = llm_session.session
417
-
418
- # Get project config
419
- project = next((p for p in self.config.projects if p.name == session.project_name), None)
420
- if not project:
421
- return
422
-
423
- version = session.get_version_config()
424
- if not version:
425
- return
426
-
427
- # Get parameter collection config
428
- collection_config = self.config.global_config.llm_provider.settings.get("parameter_collection_config", {})
429
- max_params = collection_config.get("max_params_per_question", 2)
430
-
431
- # Decide which parameters to ask
432
- params_to_ask = missing_params[:max_params]
433
-
434
- # Build parameter collection prompt
435
- prompt = build_parameter_prompt(
436
- version=version,
437
- intent_config=intent_config,
438
- chat_history=session.chat_history,
439
- collected_params=session.variables,
440
- missing_params=missing_params,
441
- params_to_ask=params_to_ask,
442
- max_params=max_params,
443
- project_locale=project.default_locale,
444
- unanswered_params=session.unanswered_parameters
445
- )
446
-
447
- # Generate question
448
- response = await llm_session.llm_instance.generate(
449
- system_prompt=prompt,
450
- user_input="",
451
- context=session.chat_history[-5:]
452
- )
453
-
454
- response_text = self._clean_response(response)
455
-
456
- # Create a job for the response
457
- job = LLMJob(
458
- job_id=f"{session.session_id}_param_request",
459
- session_id=session.session_id,
460
- input_text="",
461
- job_type="parameter_request",
462
- response_text=response_text
463
- )
464
-
465
- await self._publish_response(job)
466
-
467
- async def _execute_intent_action(self, llm_session: LLMSession, intent_config: Any):
468
- """Execute intent action (API call)"""
469
- session = llm_session.session
470
-
471
- try:
472
- # Get API config
473
- api_name = intent_config.action
474
- api_config = self.config.get_api(api_name)
475
-
476
- if not api_config:
477
- raise ValueError(f"API config not found: {api_name}")
478
-
479
- log_info(
480
- f"📡 Executing intent action",
481
- session_id=session.session_id,
482
- api_name=api_name,
483
- variables=session.variables
484
- )
485
-
486
- # Execute API call
487
- from api.api_executor import call_api
488
- response = call_api(api_config, session)
489
- api_json = response.json()
490
-
491
- log_info(f"✅ API response received", session_id=session.session_id)
492
-
493
- # Humanize response if prompt exists
494
- if api_config.response_prompt:
495
- prompt = api_config.response_prompt.replace(
496
- "{{api_response}}",
497
- json.dumps(api_json, ensure_ascii=False)
498
- )
499
-
500
- human_response = await llm_session.llm_instance.generate(
501
- system_prompt=prompt,
502
- user_input=json.dumps(api_json),
503
- context=[]
504
- )
505
-
506
- response_text = self._clean_response(human_response)
507
- else:
508
- response_text = f"İşlem tamamlandı: {api_json}"
509
-
510
- # Reset session flow
511
- session.reset_flow()
512
-
513
- # Create job for response
514
- job = LLMJob(
515
- job_id=f"{session.session_id}_action_result",
516
- session_id=session.session_id,
517
- input_text="",
518
- job_type="action_result",
519
- response_text=response_text
520
- )
521
-
522
- await self._publish_response(job)
523
-
524
- except Exception as e:
525
- log_error(
526
- f"❌ API execution failed",
527
- session_id=session.session_id,
528
- error=str(e)
529
- )
530
-
531
- # Reset flow
532
- session.reset_flow()
533
-
534
- # Send error response
535
- error_response = self._get_user_friendly_error("api_error", {"api_name": api_name})
536
-
537
- job = LLMJob(
538
- job_id=f"{session.session_id}_error",
539
- session_id=session.session_id,
540
- input_text="",
541
- job_type="error",
542
- response_text=error_response
543
- )
544
-
545
- await self._publish_response(job)
546
-
547
- async def _publish_response(self, job: LLMJob):
548
- """Publish LLM response"""
549
- # Update job history
550
- llm_session = self.llm_sessions.get(job.session_id)
551
- if llm_session:
552
- llm_session.job_history.append(job)
553
- # Keep only last 20 jobs
554
- if len(llm_session.job_history) > 20:
555
- llm_session.job_history.pop(0)
556
-
557
- # Publish event
558
- await self.event_bus.publish(Event(
559
- type=EventType.LLM_RESPONSE_READY,
560
- session_id=job.session_id,
561
- data={
562
- "text": job.response_text,
563
- "intent": job.detected_intent,
564
- "job_type": job.job_type
565
- }
566
- ))
567
-
568
- log_info(
569
- f"✅ LLM response published",
570
- session_id=job.session_id,
571
- response_length=len(job.response_text) if job.response_text else 0
572
- )
573
-
574
- def _parse_intent_response(self, response: str) -> tuple[str, str]:
575
- """Parse intent from LLM response"""
576
- import re
577
-
578
- # Look for intent pattern
579
- match = re.search(r"#DETECTED_INTENT:\s*([A-Za-z0-9_-]+)", response)
580
- if not match:
581
- return "", response
582
-
583
- intent_name = match.group(1)
584
-
585
- # Remove 'assistant' suffix if exists
586
- if intent_name.endswith("assistant"):
587
- intent_name = intent_name[:-9]
588
-
589
- # Get remaining text after intent
590
- remaining_text = response[match.end():]
591
-
592
- return intent_name, remaining_text
593
-
594
- def _clean_response(self, response: str) -> str:
595
- """Clean LLM response"""
596
- # Remove everything after the first logical assistant block or intent tag
597
- for stop in ["#DETECTED_INTENT", "⚠️", "\nassistant", "assistant\n", "assistant"]:
598
- idx = response.find(stop)
599
- if idx != -1:
600
- response = response[:idx]
601
-
602
- # Normalize common greetings
603
- import re
604
- response = re.sub(r"Hoş[\s-]?geldin(iz)?", "Hoş geldiniz", response, flags=re.IGNORECASE)
605
-
606
- return response.strip()
607
-
608
- def _get_user_friendly_error(self, error_type: str, context: dict = None) -> str:
609
- """Get user-friendly error messages"""
610
- error_messages = {
611
- "session_not_found": "Oturumunuz bulunamadı. Lütfen yeni bir konuşma başlatın.",
612
- "project_not_found": "Proje konfigürasyonu bulunamadı. Lütfen yönetici ile iletişime geçin.",
613
- "version_not_found": "Proje versiyonu bulunamadı. Lütfen geçerli bir versiyon seçin.",
614
- "intent_not_found": "Üzgünüm, ne yapmak istediğinizi anlayamadım. Lütfen daha açık bir şekilde belirtir misiniz?",
615
- "api_timeout": "İşlem zaman aşımına uğradı. Lütfen tekrar deneyin.",
616
- "api_error": "İşlem sırasında bir hata oluştu. Lütfen daha sonra tekrar deneyin.",
617
- "parameter_validation": "Girdiğiniz bilgide bir hata var. Lütfen kontrol edip tekrar deneyin.",
618
- "llm_error": "Sistem yanıt veremedi. Lütfen biraz sonra tekrar deneyin.",
619
- "llm_timeout": "Sistem meşgul. Lütfen birkaç saniye bekleyip tekrar deneyin.",
620
- "session_expired": "Oturumunuz zaman aşımına uğradı. Lütfen yeni bir konuşma başlatın.",
621
- "rate_limit": "Çok fazla istek gönderdiniz. Lütfen biraz bekleyin.",
622
- "internal_error": "Beklenmeyen bir hata oluştu. Lütfen yönetici ile iletişime geçin."
623
- }
624
-
625
- message = error_messages.get(error_type, error_messages["internal_error"])
626
-
627
- # Add context if available
628
- if context:
629
- if error_type == "api_error" and "api_name" in context:
630
- message = f"{context['api_name']} servisi için {message}"
631
-
632
- return message
633
-
634
- async def _handle_session_ended(self, event: Event):
635
- """Clean up LLM resources when session ends"""
636
- session_id = event.session_id
637
- await self._cleanup_session(session_id)
638
-
639
- async def _cleanup_session(self, session_id: str):
640
- """Clean up LLM session"""
641
- llm_session = self.llm_sessions.pop(session_id, None)
642
- if not llm_session:
643
- return
644
-
645
- try:
646
- # Release resource
647
- resource_id = f"llm_{session_id}"
648
- await self.resource_manager.release(resource_id, delay_seconds=180) # 3 minutes
649
-
650
- log_info(
651
- f"🧹 LLM session cleaned up",
652
- session_id=session_id,
653
- total_jobs=llm_session.total_jobs,
654
- job_history_size=len(llm_session.job_history)
655
- )
656
-
657
- except Exception as e:
658
- log_error(
659
- f"❌ Error cleaning up LLM session",
660
- session_id=session_id,
661
- error=str(e)
662
- )
663
-
664
- async def _cleanup_llm_instance(self, llm_instance: LLMInterface):
665
- """Cleanup callback for LLM instance"""
666
- try:
667
- # LLM instances typically don't need special cleanup
668
- log_debug("🧹 LLM instance cleaned up")
669
-
670
- except Exception as e:
671
- log_error(f"❌ Error cleaning up LLM instance", error=str(e))
672
-
673
- def get_stats(self) -> Dict[str, Any]:
674
- """Get LLM manager statistics"""
675
- session_stats = {}
676
- for session_id, llm_session in self.llm_sessions.items():
677
- session_stats[session_id] = {
678
- "active_job": llm_session.active_job.job_id if llm_session.active_job else None,
679
- "total_jobs": llm_session.total_jobs,
680
- "job_history_size": len(llm_session.job_history),
681
- "uptime_seconds": (datetime.utcnow() - llm_session.created_at).total_seconds(),
682
- "last_activity": llm_session.last_activity.isoformat()
683
- }
684
-
685
- return {
686
- "active_sessions": len(self.llm_sessions),
687
- "total_active_jobs": sum(1 for s in self.llm_sessions.values() if s.active_job),
688
- "sessions": session_stats
689
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ LLM Manager for Flare
3
+ ====================
4
+ Manages LLM interactions per session with stateless approach
5
+ """
6
+ import asyncio
7
+ from typing import Dict, Optional, Any, List
8
+ from datetime import datetime
9
+ import traceback
10
+ from dataclasses import dataclass, field
11
+ import json
12
+
13
+ from chat_session.event_bus import EventBus, Event, EventType, publish_error
14
+ from chat_session.resource_manager import ResourceManager, ResourceType
15
+ from chat_session.session import Session
16
+ from llm.llm_factory import LLMFactory
17
+ from llm.llm_interface import LLMInterface
18
+ from llm.prompt_builder import build_intent_prompt, build_parameter_prompt
19
+ from utils.logger import log_info, log_error, log_debug, log_warning
20
+ from config.config_provider import ConfigProvider
21
+
22
+
23
+ @dataclass
24
+ class LLMJob:
25
+ """LLM processing job"""
26
+ job_id: str
27
+ session_id: str
28
+ input_text: str
29
+ job_type: str # "intent_detection", "parameter_collection", "response_generation"
30
+ created_at: datetime = field(default_factory=datetime.utcnow)
31
+ completed_at: Optional[datetime] = None
32
+ response_text: Optional[str] = None
33
+ detected_intent: Optional[str] = None
34
+ error: Optional[str] = None
35
+ metadata: Dict[str, Any] = field(default_factory=dict)
36
+
37
+ def complete(self, response_text: str, intent: Optional[str] = None):
38
+ """Mark job as completed"""
39
+ self.response_text = response_text
40
+ self.detected_intent = intent
41
+ self.completed_at = datetime.utcnow()
42
+
43
+ def fail(self, error: str):
44
+ """Mark job as failed"""
45
+ self.error = error
46
+ self.completed_at = datetime.utcnow()
47
+
48
+
49
+ @dataclass
50
+ class LLMSession:
51
+ """LLM session wrapper"""
52
+ session_id: str
53
+ session: Session
54
+ llm_instance: LLMInterface
55
+ active_job: Optional[LLMJob] = None
56
+ job_history: List[LLMJob] = field(default_factory=list)
57
+ created_at: datetime = field(default_factory=datetime.utcnow)
58
+ last_activity: datetime = field(default_factory=datetime.utcnow)
59
+ total_jobs = 0
60
+ total_tokens = 0
61
+
62
+ def update_activity(self):
63
+ """Update last activity timestamp"""
64
+ self.last_activity = datetime.utcnow()
65
+
66
+
67
+ class LLMManager:
68
+ """Manages LLM interactions with stateless approach"""
69
+
70
+ def __init__(self, event_bus: EventBus, resource_manager: ResourceManager):
71
+ self.event_bus = event_bus
72
+ self.resource_manager = resource_manager
73
+ self.llm_sessions: Dict[str, LLMSession] = {}
74
+ self.config = ConfigProvider.get()
75
+ self._setup_event_handlers()
76
+ self._setup_resource_pool()
77
+
78
+ def _setup_event_handlers(self):
79
+ """Subscribe to LLM-related events"""
80
+ self.event_bus.subscribe(EventType.LLM_PROCESSING_STARTED, self._handle_llm_processing)
81
+ self.event_bus.subscribe(EventType.SESSION_STARTED, self._handle_session_started)
82
+ self.event_bus.subscribe(EventType.SESSION_ENDED, self._handle_session_ended)
83
+
84
+ def _setup_resource_pool(self):
85
+ """Setup LLM instance pool"""
86
+ self.resource_manager.register_pool(
87
+ resource_type=ResourceType.LLM_CONTEXT,
88
+ factory=self._create_llm_instance,
89
+ max_idle=2, # Lower pool size for LLM
90
+ max_age_seconds=900 # 15 minutes
91
+ )
92
+
93
+ async def _create_llm_instance(self) -> LLMInterface:
94
+ """Factory for creating LLM instances"""
95
+ try:
96
+ llm_instance = LLMFactory.create_provider()
97
+ if not llm_instance:
98
+ raise ValueError("Failed to create LLM instance")
99
+
100
+ log_debug("🤖 Created new LLM instance")
101
+ return llm_instance
102
+
103
+ except Exception as e:
104
+ log_error(f"❌ Failed to create LLM instance", error=str(e))
105
+ raise
106
+
107
+ async def _handle_llm_processing(self, event: Event):
108
+ """Handle LLM processing request"""
109
+ session_id = event.session_id
110
+ input_text = event.data.get("text", "")
111
+
112
+ if not input_text:
113
+ log_warning(f"⚠️ Empty text for LLM", session_id=session_id)
114
+ return
115
+
116
+ try:
117
+ log_info(
118
+ f"🤖 Starting LLM processing",
119
+ session_id=session_id,
120
+ text_length=len(input_text)
121
+ )
122
+
123
+ # Get or create LLM session
124
+ llm_session = await self._get_or_create_session(session_id)
125
+ if not llm_session:
126
+ raise ValueError("Failed to create LLM session")
127
+
128
+ # Determine job type based on session state
129
+ job_type = self._determine_job_type(llm_session.session)
130
+
131
+ # Create job
132
+ job_id = f"{session_id}_{llm_session.total_jobs}"
133
+ job = LLMJob(
134
+ job_id=job_id,
135
+ session_id=session_id,
136
+ input_text=input_text,
137
+ job_type=job_type,
138
+ metadata={
139
+ "session_state": llm_session.session.state,
140
+ "current_intent": llm_session.session.current_intent
141
+ }
142
+ )
143
+
144
+ llm_session.active_job = job
145
+ llm_session.total_jobs += 1
146
+ llm_session.update_activity()
147
+
148
+ # Process based on job type
149
+ if job_type == "intent_detection":
150
+ await self._process_intent_detection(llm_session, job)
151
+ elif job_type == "parameter_collection":
152
+ await self._process_parameter_collection(llm_session, job)
153
+ else:
154
+ await self._process_response_generation(llm_session, job)
155
+
156
+ except Exception as e:
157
+ log_error(
158
+ f"❌ Failed to process LLM request",
159
+ session_id=session_id,
160
+ error=str(e),
161
+ traceback=traceback.format_exc()
162
+ )
163
+
164
+ # Publish error event
165
+ await publish_error(
166
+ session_id=session_id,
167
+ error_type="llm_error",
168
+ error_message=f"LLM processing failed: {str(e)}"
169
+ )
170
+
171
+ async def _get_or_create_session(self, session_id: str) -> Optional[LLMSession]:
172
+ """Get or create LLM session"""
173
+ if session_id in self.llm_sessions:
174
+ return self.llm_sessions[session_id]
175
+
176
+ # Get session from store
177
+ from chat_session.session import session_store
178
+ session = session_store.get_session(session_id)
179
+ if not session:
180
+ log_error(f"❌ Session not found", session_id=session_id)
181
+ return None
182
+
183
+ # Acquire LLM instance from pool
184
+ resource_id = f"llm_{session_id}"
185
+ llm_instance = await self.resource_manager.acquire(
186
+ resource_id=resource_id,
187
+ session_id=session_id,
188
+ resource_type=ResourceType.LLM_CONTEXT,
189
+ cleanup_callback=self._cleanup_llm_instance
190
+ )
191
+
192
+ # Create LLM session
193
+ llm_session = LLMSession(
194
+ session_id=session_id,
195
+ session=session,
196
+ llm_instance=llm_instance
197
+ )
198
+
199
+ self.llm_sessions[session_id] = llm_session
200
+ return llm_session
201
+
202
+ def _determine_job_type(self, session: Session) -> str:
203
+ """Determine job type based on session state"""
204
+ if session.state == "idle":
205
+ return "intent_detection"
206
+ elif session.state == "collect_params":
207
+ return "parameter_collection"
208
+ else:
209
+ return "response_generation"
210
+
211
+ async def _process_intent_detection(self, llm_session: LLMSession, job: LLMJob):
212
+ """Process intent detection"""
213
+ try:
214
+ session = llm_session.session
215
+
216
+ # Get project and version config
217
+ project = next((p for p in self.config.projects if p.name == session.project_name), None)
218
+ if not project:
219
+ raise ValueError(f"Project not found: {session.project_name}")
220
+
221
+ version = session.get_version_config()
222
+ if not version:
223
+ raise ValueError("Version config not found")
224
+
225
+ # Build intent detection prompt
226
+ prompt = build_intent_prompt(
227
+ version=version,
228
+ conversation=session.chat_history,
229
+ project_locale=project.default_locale
230
+ )
231
+
232
+ log_debug(
233
+ f"📝 Intent detection prompt built",
234
+ session_id=job.session_id,
235
+ prompt_length=len(prompt)
236
+ )
237
+
238
+ # Call LLM
239
+ response = await llm_session.llm_instance.generate(
240
+ system_prompt=prompt,
241
+ user_input=job.input_text,
242
+ context=session.chat_history[-10:] # Last 10 messages
243
+ )
244
+
245
+ # Parse intent
246
+ intent_name, response_text = self._parse_intent_response(response)
247
+
248
+ if intent_name:
249
+ # Find intent config
250
+ intent_config = next((i for i in version.intents if i.name == intent_name), None)
251
+
252
+ if intent_config:
253
+ # Update session
254
+ session.current_intent = intent_name
255
+ session.set_intent_config(intent_config)
256
+ session.state = "collect_params"
257
+
258
+ log_info(
259
+ f"🎯 Intent detected",
260
+ session_id=job.session_id,
261
+ intent=intent_name
262
+ )
263
+
264
+ # Check if we need to collect parameters
265
+ missing_params = [
266
+ p.name for p in intent_config.parameters
267
+ if p.required and p.variable_name not in session.variables
268
+ ]
269
+
270
+ if not missing_params:
271
+ # All parameters ready, execute action
272
+ await self._execute_intent_action(llm_session, intent_config)
273
+ return
274
+ else:
275
+ # Need to collect parameters
276
+ await self._request_parameter_collection(llm_session, intent_config, missing_params)
277
+ return
278
+
279
+ # No intent detected, use response as is
280
+ response_text = self._clean_response(response)
281
+ job.complete(response_text, intent_name)
282
+
283
+ # Publish response
284
+ await self._publish_response(job)
285
+
286
+ except Exception as e:
287
+ job.fail(str(e))
288
+ raise
289
+
290
+ async def _process_parameter_collection(self, llm_session: LLMSession, job: LLMJob):
291
+ """Process parameter collection"""
292
+ try:
293
+ session = llm_session.session
294
+ intent_config = session.get_intent_config()
295
+
296
+ if not intent_config:
297
+ raise ValueError("No intent config in session")
298
+
299
+ # Extract parameters from user input
300
+ extracted_params = await self._extract_parameters(
301
+ llm_session,
302
+ job.input_text,
303
+ intent_config,
304
+ session.variables
305
+ )
306
+
307
+ # Update session variables
308
+ for param_name, param_value in extracted_params.items():
309
+ param_config = next(
310
+ (p for p in intent_config.parameters if p.name == param_name),
311
+ None
312
+ )
313
+ if param_config:
314
+ session.variables[param_config.variable_name] = str(param_value)
315
+
316
+ # Check what parameters are still missing
317
+ missing_params = [
318
+ p.name for p in intent_config.parameters
319
+ if p.required and p.variable_name not in session.variables
320
+ ]
321
+
322
+ if not missing_params:
323
+ # All parameters collected, execute action
324
+ await self._execute_intent_action(llm_session, intent_config)
325
+ else:
326
+ # Still need more parameters
327
+ await self._request_parameter_collection(llm_session, intent_config, missing_params)
328
+
329
+ except Exception as e:
330
+ job.fail(str(e))
331
+ raise
332
+
333
+ async def _process_response_generation(self, llm_session: LLMSession, job: LLMJob):
334
+ """Process general response generation"""
335
+ try:
336
+ session = llm_session.session
337
+
338
+ # Get version config
339
+ version = session.get_version_config()
340
+ if not version:
341
+ raise ValueError("Version config not found")
342
+
343
+ # Use general prompt
344
+ prompt = version.general_prompt
345
+
346
+ # Generate response
347
+ response = await llm_session.llm_instance.generate(
348
+ system_prompt=prompt,
349
+ user_input=job.input_text,
350
+ context=session.chat_history[-10:]
351
+ )
352
+
353
+ response_text = self._clean_response(response)
354
+ job.complete(response_text)
355
+
356
+ # Publish response
357
+ await self._publish_response(job)
358
+
359
+ except Exception as e:
360
+ job.fail(str(e))
361
+ raise
362
+
363
+ async def _extract_parameters(self,
364
+ llm_session: LLMSession,
365
+ user_input: str,
366
+ intent_config: Any,
367
+ existing_params: Dict[str, str]) -> Dict[str, Any]:
368
+ """Extract parameters from user input"""
369
+ # Build extraction prompt
370
+ param_info = []
371
+ for param in intent_config.parameters:
372
+ if param.variable_name not in existing_params:
373
+ param_info.append({
374
+ 'name': param.name,
375
+ 'type': param.type,
376
+ 'required': param.required,
377
+ 'extraction_prompt': param.extraction_prompt
378
+ })
379
+
380
+ prompt = f"""
381
+ Extract parameters from user message: "{user_input}"
382
+
383
+ Expected parameters:
384
+ {json.dumps(param_info, ensure_ascii=False)}
385
+
386
+ Return as JSON object with parameter names as keys.
387
+ """
388
+
389
+ # Call LLM
390
+ response = await llm_session.llm_instance.generate(
391
+ system_prompt=prompt,
392
+ user_input=user_input,
393
+ context=[]
394
+ )
395
+
396
+ # Parse JSON response
397
+ try:
398
+ # Look for JSON block in response
399
+ import re
400
+ json_match = re.search(r'```json\s*(.*?)\s*```', response, re.DOTALL)
401
+ if not json_match:
402
+ json_match = re.search(r'\{[^}]+\}', response)
403
+
404
+ if json_match:
405
+ json_str = json_match.group(1) if '```' in response else json_match.group(0)
406
+ return json.loads(json_str)
407
+ except:
408
+ pass
409
+
410
+ return {}
411
+
412
+ async def _request_parameter_collection(self,
413
+ llm_session: LLMSession,
414
+ intent_config: Any,
415
+ missing_params: List[str]):
416
+ """Request parameter collection from user"""
417
+ session = llm_session.session
418
+
419
+ # Get project config
420
+ project = next((p for p in self.config.projects if p.name == session.project_name), None)
421
+ if not project:
422
+ return
423
+
424
+ version = session.get_version_config()
425
+ if not version:
426
+ return
427
+
428
+ # Get parameter collection config
429
+ collection_config = self.config.global_config.llm_provider.settings.get("parameter_collection_config", {})
430
+ max_params = collection_config.get("max_params_per_question", 2)
431
+
432
+ # Decide which parameters to ask
433
+ params_to_ask = missing_params[:max_params]
434
+
435
+ # Build parameter collection prompt
436
+ prompt = build_parameter_prompt(
437
+ version=version,
438
+ intent_config=intent_config,
439
+ chat_history=session.chat_history,
440
+ collected_params=session.variables,
441
+ missing_params=missing_params,
442
+ params_to_ask=params_to_ask,
443
+ max_params=max_params,
444
+ project_locale=project.default_locale,
445
+ unanswered_params=session.unanswered_parameters
446
+ )
447
+
448
+ # Generate question
449
+ response = await llm_session.llm_instance.generate(
450
+ system_prompt=prompt,
451
+ user_input="",
452
+ context=session.chat_history[-5:]
453
+ )
454
+
455
+ response_text = self._clean_response(response)
456
+
457
+ # Create a job for the response
458
+ job = LLMJob(
459
+ job_id=f"{session.session_id}_param_request",
460
+ session_id=session.session_id,
461
+ input_text="",
462
+ job_type="parameter_request",
463
+ response_text=response_text
464
+ )
465
+
466
+ await self._publish_response(job)
467
+
468
+ async def _execute_intent_action(self, llm_session: LLMSession, intent_config: Any):
469
+ """Execute intent action (API call)"""
470
+ session = llm_session.session
471
+
472
+ try:
473
+ # Get API config
474
+ api_name = intent_config.action
475
+ api_config = self.config.get_api(api_name)
476
+
477
+ if not api_config:
478
+ raise ValueError(f"API config not found: {api_name}")
479
+
480
+ log_info(
481
+ f"📡 Executing intent action",
482
+ session_id=session.session_id,
483
+ api_name=api_name,
484
+ variables=session.variables
485
+ )
486
+
487
+ # Execute API call
488
+ from api.api_executor import call_api
489
+ response = call_api(api_config, session)
490
+ api_json = response.json()
491
+
492
+ log_info(f"✅ API response received", session_id=session.session_id)
493
+
494
+ # Humanize response if prompt exists
495
+ if api_config.response_prompt:
496
+ prompt = api_config.response_prompt.replace(
497
+ "{{api_response}}",
498
+ json.dumps(api_json, ensure_ascii=False)
499
+ )
500
+
501
+ human_response = await llm_session.llm_instance.generate(
502
+ system_prompt=prompt,
503
+ user_input=json.dumps(api_json),
504
+ context=[]
505
+ )
506
+
507
+ response_text = self._clean_response(human_response)
508
+ else:
509
+ response_text = f"İşlem tamamlandı: {api_json}"
510
+
511
+ # Reset session flow
512
+ session.reset_flow()
513
+
514
+ # Create job for response
515
+ job = LLMJob(
516
+ job_id=f"{session.session_id}_action_result",
517
+ session_id=session.session_id,
518
+ input_text="",
519
+ job_type="action_result",
520
+ response_text=response_text
521
+ )
522
+
523
+ await self._publish_response(job)
524
+
525
+ except Exception as e:
526
+ log_error(
527
+ f"❌ API execution failed",
528
+ session_id=session.session_id,
529
+ error=str(e)
530
+ )
531
+
532
+ # Reset flow
533
+ session.reset_flow()
534
+
535
+ # Send error response
536
+ error_response = self._get_user_friendly_error("api_error", {"api_name": api_name})
537
+
538
+ job = LLMJob(
539
+ job_id=f"{session.session_id}_error",
540
+ session_id=session.session_id,
541
+ input_text="",
542
+ job_type="error",
543
+ response_text=error_response
544
+ )
545
+
546
+ await self._publish_response(job)
547
+
548
+ async def _publish_response(self, job: LLMJob):
549
+ """Publish LLM response"""
550
+ # Update job history
551
+ llm_session = self.llm_sessions.get(job.session_id)
552
+ if llm_session:
553
+ llm_session.job_history.append(job)
554
+ # Keep only last 20 jobs
555
+ if len(llm_session.job_history) > 20:
556
+ llm_session.job_history.pop(0)
557
+
558
+ # Publish event
559
+ await self.event_bus.publish(Event(
560
+ type=EventType.LLM_RESPONSE_READY,
561
+ session_id=job.session_id,
562
+ data={
563
+ "text": job.response_text,
564
+ "intent": job.detected_intent,
565
+ "job_type": job.job_type
566
+ }
567
+ ))
568
+
569
+ log_info(
570
+ f"✅ LLM response published",
571
+ session_id=job.session_id,
572
+ response_length=len(job.response_text) if job.response_text else 0
573
+ )
574
+
575
+ def _parse_intent_response(self, response: str) -> tuple[str, str]:
576
+ """Parse intent from LLM response"""
577
+ import re
578
+
579
+ # Look for intent pattern
580
+ match = re.search(r"#DETECTED_INTENT:\s*([A-Za-z0-9_-]+)", response)
581
+ if not match:
582
+ return "", response
583
+
584
+ intent_name = match.group(1)
585
+
586
+ # Remove 'assistant' suffix if exists
587
+ if intent_name.endswith("assistant"):
588
+ intent_name = intent_name[:-9]
589
+
590
+ # Get remaining text after intent
591
+ remaining_text = response[match.end():]
592
+
593
+ return intent_name, remaining_text
594
+
595
+ def _clean_response(self, response: str) -> str:
596
+ """Clean LLM response"""
597
+ # Remove everything after the first logical assistant block or intent tag
598
+ for stop in ["#DETECTED_INTENT", "⚠️", "\nassistant", "assistant\n", "assistant"]:
599
+ idx = response.find(stop)
600
+ if idx != -1:
601
+ response = response[:idx]
602
+
603
+ # Normalize common greetings
604
+ import re
605
+ response = re.sub(r"Hoş[\s-]?geldin(iz)?", "Hoş geldiniz", response, flags=re.IGNORECASE)
606
+
607
+ return response.strip()
608
+
609
+ def _get_user_friendly_error(self, error_type: str, context: dict = None) -> str:
610
+ """Get user-friendly error messages"""
611
+ error_messages = {
612
+ "session_not_found": "Oturumunuz bulunamadı. Lütfen yeni bir konuşma başlatın.",
613
+ "project_not_found": "Proje konfigürasyonu bulunamadı. Lütfen yönetici ile iletişime geçin.",
614
+ "version_not_found": "Proje versiyonu bulunamadı. Lütfen geçerli bir versiyon seçin.",
615
+ "intent_not_found": "Üzgünüm, ne yapmak istediğinizi anlayamadım. Lütfen daha açık bir şekilde belirtir misiniz?",
616
+ "api_timeout": "İşlem zaman aşımına uğradı. Lütfen tekrar deneyin.",
617
+ "api_error": "İşlem sırasında bir hata oluştu. Lütfen daha sonra tekrar deneyin.",
618
+ "parameter_validation": "Girdiğiniz bilgide bir hata var. Lütfen kontrol edip tekrar deneyin.",
619
+ "llm_error": "Sistem yanıt veremedi. Lütfen biraz sonra tekrar deneyin.",
620
+ "llm_timeout": "Sistem meşgul. Lütfen birkaç saniye bekleyip tekrar deneyin.",
621
+ "session_expired": "Oturumunuz zaman aşımına uğradı. Lütfen yeni bir konuşma başlatın.",
622
+ "rate_limit": "Çok fazla istek gönderdiniz. Lütfen biraz bekleyin.",
623
+ "internal_error": "Beklenmeyen bir hata oluştu. Lütfen yönetici ile iletişime geçin."
624
+ }
625
+
626
+ message = error_messages.get(error_type, error_messages["internal_error"])
627
+
628
+ # Add context if available
629
+ if context:
630
+ if error_type == "api_error" and "api_name" in context:
631
+ message = f"{context['api_name']} servisi için {message}"
632
+
633
+ return message
634
+
635
+ async def _handle_session_started(self, event: Event):
636
+ """Initialize LLM for session at start"""
637
+ session_id = event.session_id
638
+
639
+ try:
640
+ # Create LLM instance when session starts
641
+ await self._get_or_create_session(session_id)
642
+
643
+ log_info(f"✅ LLM initialized for session", session_id=session_id)
644
+
645
+ except Exception as e:
646
+ log_error(f"❌ Failed to initialize LLM", session_id=session_id, error=str(e))
647
+
648
+ async def _handle_session_ended(self, event: Event):
649
+ """Clean up LLM resources when session ends"""
650
+ session_id = event.session_id
651
+ await self._cleanup_session(session_id)
652
+
653
+ async def _cleanup_session(self, session_id: str):
654
+ """Clean up LLM session"""
655
+ llm_session = self.llm_sessions.pop(session_id, None)
656
+ if not llm_session:
657
+ return
658
+
659
+ try:
660
+ # Release resource
661
+ resource_id = f"llm_{session_id}"
662
+ await self.resource_manager.release(resource_id, delay_seconds=180) # 3 minutes
663
+
664
+ log_info(
665
+ f"🧹 LLM session cleaned up",
666
+ session_id=session_id,
667
+ total_jobs=llm_session.total_jobs,
668
+ job_history_size=len(llm_session.job_history)
669
+ )
670
+
671
+ except Exception as e:
672
+ log_error(
673
+ f"❌ Error cleaning up LLM session",
674
+ session_id=session_id,
675
+ error=str(e)
676
+ )
677
+
678
+ async def _cleanup_llm_instance(self, llm_instance: LLMInterface):
679
+ """Cleanup callback for LLM instance"""
680
+ try:
681
+ # LLM instances typically don't need special cleanup
682
+ log_debug("🧹 LLM instance cleaned up")
683
+
684
+ except Exception as e:
685
+ log_error(f"❌ Error cleaning up LLM instance", error=str(e))
686
+
687
+ def get_stats(self) -> Dict[str, Any]:
688
+ """Get LLM manager statistics"""
689
+ session_stats = {}
690
+ for session_id, llm_session in self.llm_sessions.items():
691
+ session_stats[session_id] = {
692
+ "active_job": llm_session.active_job.job_id if llm_session.active_job else None,
693
+ "total_jobs": llm_session.total_jobs,
694
+ "job_history_size": len(llm_session.job_history),
695
+ "uptime_seconds": (datetime.utcnow() - llm_session.created_at).total_seconds(),
696
+ "last_activity": llm_session.last_activity.isoformat()
697
+ }
698
+
699
+ return {
700
+ "active_sessions": len(self.llm_sessions),
701
+ "total_active_jobs": sum(1 for s in self.llm_sessions.values() if s.active_job),
702
+ "sessions": session_stats
703
+ }