// conversation-manager.service.ts // Path: /flare-ui/src/app/services/conversation-manager.service.ts import { Injectable, OnDestroy } from '@angular/core'; import { Subject, Subscription, BehaviorSubject, throwError } from 'rxjs'; import { catchError, retry } from 'rxjs/operators'; import { WebSocketService } from './websocket.service'; import { AudioStreamService } from './audio-stream.service'; export type ConversationState = | 'idle' | 'listening' | 'processing_stt' | 'processing_llm' | 'processing_tts' | 'playing_audio' | 'error'; export interface ConversationMessage { role: 'user' | 'assistant' | 'system'; text: string; timestamp: Date; audioUrl?: string; error?: boolean; } export interface ConversationConfig { language?: string; stt_engine?: string; tts_engine?: string; enable_barge_in?: boolean; max_silence_duration?: number; } export interface ConversationError { type: 'websocket' | 'audio' | 'permission' | 'network' | 'unknown'; message: string; details?: any; timestamp: Date; } @Injectable({ providedIn: 'root' }) export class ConversationManagerService implements OnDestroy { private subscriptions = new Subscription(); private audioQueue: string[] = []; private isInterrupting = false; private sessionId: string | null = null; private conversationConfig: ConversationConfig = { language: 'tr-TR', stt_engine: 'google', enable_barge_in: true }; // State management private currentStateSubject = new BehaviorSubject('idle'); public currentState$ = this.currentStateSubject.asObservable(); // Message history private messagesSubject = new BehaviorSubject([]); public messages$ = this.messagesSubject.asObservable(); // Current transcription private transcriptionSubject = new BehaviorSubject(''); public transcription$ = this.transcriptionSubject.asObservable(); // Error handling private errorSubject = new Subject(); public error$ = this.errorSubject.asObservable(); private sttReadySubject = new Subject(); // Audio player reference private audioPlayer: HTMLAudioElement | null = null; private audioPlayerPromise: Promise | null = null; constructor( private wsService: WebSocketService, private audioService: AudioStreamService ) { this.audioService.setUseLinear16(true); } ngOnDestroy(): void { this.cleanup(); } async startConversation(sessionId: string, config?: ConversationConfig): Promise { try { if (!sessionId) { throw new Error('Session ID is required'); } // Update configuration if (config) { this.conversationConfig = { ...this.conversationConfig, ...config }; } this.sessionId = sessionId; // Start in listening state this.currentStateSubject.next('listening'); console.log('🎤 Starting conversation in continuous listening mode'); // Connect WebSocket first await this.wsService.connect(sessionId).catch(error => { throw new Error(`WebSocket connection failed: ${error.message}`); }); // Set up subscriptions BEFORE sending any messages this.setupSubscriptions(); // Send start signal with configuration this.wsService.sendControl('start_conversation', { ...this.conversationConfig, continuous_listening: true }); console.log('✅ [ConversationManager] Conversation started - waiting for welcome TTS'); } catch (error: any) { console.error('Failed to start conversation:', error); const conversationError: ConversationError = { type: this.determineErrorType(error), message: error.message || 'Failed to start conversation', details: error, timestamp: new Date() }; this.errorSubject.next(conversationError); this.currentStateSubject.next('error'); this.cleanup(); throw error; } } stopConversation(): void { try { // First stop audio recording this.audioService.stopRecording(); // Send conversation end signal if (this.wsService.isConnected()) { this.wsService.sendControl('stop_conversation'); // stop_session yerine } // Small delay before disconnecting setTimeout(() => { this.cleanup(); this.addSystemMessage('Conversation ended'); }, 100); } catch (error) { console.error('Error stopping conversation:', error); this.cleanup(); } } private setupSubscriptions(): void { // Audio chunks from microphone this.subscriptions.add( this.audioService.audioChunk$.subscribe({ next: (chunk) => { if (!this.isInterrupting && this.wsService.isConnected()) { try { this.wsService.sendAudioChunk(chunk.data); } catch (error) { console.error('Failed to send audio chunk:', error); } } }, error: (error) => { console.error('Audio stream error:', error); this.handleAudioError(error); } }) ); // Audio stream errors this.subscriptions.add( this.audioService.error$.subscribe(error => { this.handleAudioError(error); }) ); // WebSocket messages this.subscriptions.add( this.wsService.message$.subscribe({ next: (message) => { this.handleMessage(message); }, error: (error) => { console.error('WebSocket message error:', error); this.handleWebSocketError(error); } }) ); // WebSocket transcription handling this.subscriptions.add( this.wsService.transcription$.subscribe(result => { // Batch mode'da her zaman final result gelir console.log('📝 Transcription received:', result); const messages = this.messagesSubject.value; const lastMessage = messages[messages.length - 1]; if (!lastMessage || lastMessage.role !== 'user' || lastMessage.text !== result.text) { this.addMessage('user', result.text); } }) ); // State changes this.subscriptions.add( this.wsService.stateChange$.subscribe(change => { this.currentStateSubject.next(change.to as ConversationState); this.handleStateChange(change.from, change.to); }) ); // WebSocket errors this.subscriptions.add( this.wsService.error$.subscribe(error => { console.error('WebSocket error:', error); this.handleWebSocketError({ message: error }); }) ); // WebSocket connection state this.subscriptions.add( this.wsService.connection$.subscribe(connected => { if (!connected && this.currentStateSubject.value !== 'idle') { this.addSystemMessage('Connection lost. Attempting to reconnect...'); } }) ); } private handleMessage(message: any): void { try { switch (message.type) { case 'transcription': // SADECE final transcription'ları işle. Interim transcription'ları işlemiyoruz if (message['is_final']) { const messages = this.messagesSubject.value; const lastMessage = messages[messages.length - 1]; if (!lastMessage || lastMessage.role !== 'user' || lastMessage.text !== message['text']) { this.addMessage('user', message['text']); } } break; case 'assistant_response': // Welcome mesajı veya normal yanıt const isWelcome = message['is_welcome'] || false; this.addMessage('assistant', message['text']); if (isWelcome) { console.log('📢 Welcome message received:', message['text']); } break; case 'tts_audio': this.handleTTSAudio(message); break; case 'tts_error': // TTS hatası durumunda kullanıcıya bilgi ver console.error('TTS Error:', message['message']); this.addSystemMessage(message['message']); break; case 'control': if (message['action'] === 'stop_playback') { this.stopAudioPlayback(); } break; case 'error': this.handleServerError(message); break; case 'session_config': // Update configuration from server if (message['config']) { this.conversationConfig = { ...this.conversationConfig, ...message['config'] }; } break; case 'session_started': // Session başladı, STT durumunu kontrol et console.log('📢 Session started:', message); if (!message['stt_initialized']) { this.addSystemMessage('Speech recognition failed to initialize. Voice input will not be available.'); } break; case 'stt_ready': console.log('✅ [ConversationManager] STT ready signal received', { hasReadyFlag: !!message['ready_for_audio'], isRecording: this.audioService.isRecording(), currentState: this.currentStateSubject.value }); // ✅ STT hazır, recording'i başlat if (!this.audioService.isRecording()) { console.log('🎤 [ConversationManager] Starting audio recording (STT ready)'); this.audioService.startRecording().then(() => { console.log('✅ [ConversationManager] Audio recording started successfully'); }).catch(error => { console.error('❌ Failed to start recording:', error); this.handleAudioError(error); }); } else { console.log('⚠️ [ConversationManager] Recording already active'); } break; case 'stt_stopped': console.log('🛑 [ConversationManager] STT stopped signal received'); // ✅ Backend STT durduğunu bildirdi, recording'i derhal durdur if (message['stop_recording'] && this.audioService.isRecording()) { console.log('🛑 [ConversationManager] Stopping audio recording (backend request)'); this.audioService.stopRecording(); } break; case 'state_change': // Backend'den gelen state'i frontend state'ine map et const backendState = message['to'] || message['state']; const mappedState = this.mapBackendStateToFrontend(backendState); if (mappedState) { this.currentStateSubject.next(mappedState); // Log state changes with better format console.log(`📊 Backend state: ${backendState} → Frontend state: ${mappedState}`); } else { console.warn(`⚠️ Unknown backend state: ${backendState}`); } break; case 'conversation_started': // Conversation başladığında log at console.log('📢 Conversation started:', message); break; } } catch (error) { console.error('Error handling message:', error); this.errorSubject.next({ type: 'unknown', message: 'Failed to process message', details: error, timestamp: new Date() }); } } private mapBackendStateToFrontend(backendState: string): ConversationState | null { const stateMap: { [key: string]: ConversationState } = { 'idle': 'idle', 'initializing': 'idle', 'preparing_welcome': 'processing_tts', 'playing_welcome': 'playing_audio', 'listening': 'listening', 'processing_speech': 'processing_stt', 'preparing_response': 'processing_llm', 'playing_response': 'playing_audio', 'error': 'error', 'ended': 'idle' }; return stateMap[backendState] || null; } private handleStateChange(from: string, to: string): void { console.log(`📊 State: ${from} → ${to}`); // State değişimlerinde transcription'ı temizleme // Sadece error durumunda temizle if (to === 'error') { this.transcriptionSubject.next(''); } // Log state changes for debugging console.log(`🎤 Continuous listening mode - state: ${to}`); } private playQueuedAudio(): void { const messages = this.messagesSubject.value; const lastMessage = messages[messages.length - 1]; if (lastMessage?.audioUrl && lastMessage.role === 'assistant') { this.playAudio(lastMessage.audioUrl); } } private async playAudio(audioUrl: string): Promise { try { console.log('🎵 [ConversationManager] playAudio called', { hasAudioPlayer: !!this.audioPlayer, audioUrl: audioUrl, timestamp: new Date().toISOString() }); // Her seferinde yeni audio player oluştur ve handler'ları set et if (this.audioPlayer) { // Eski player'ı temizle this.audioPlayer.pause(); this.audioPlayer.src = ''; this.audioPlayer = null; } // Yeni player oluştur this.audioPlayer = new Audio(); this.setupAudioPlayerHandlers(); // HER SEFERINDE handler'ları set et this.audioPlayer.src = audioUrl; // Store the play promise to handle interruptions properly this.audioPlayerPromise = this.audioPlayer.play(); await this.audioPlayerPromise; } catch (error: any) { // Check if error is due to interruption if (error.name === 'AbortError') { console.log('Audio playback interrupted'); } else { console.error('Audio playback error:', error); this.errorSubject.next({ type: 'audio', message: 'Failed to play audio response', details: error, timestamp: new Date() }); } } finally { this.audioPlayerPromise = null; } } private setupAudioPlayerHandlers(): void { if (!this.audioPlayer) return; this.audioPlayer.onended = async () => { console.log('🎵 [ConversationManager] Audio playback ended', { currentState: this.currentStateSubject.value, isRecording: this.audioService.isRecording(), timestamp: new Date().toISOString() }); try { // Backend'e audio bittiğini bildir if (this.wsService.isConnected()) { console.log('📤 [ConversationManager] Sending audio_ended to backend'); this.wsService.sendControl('audio_ended'); // ✅ Backend STT başlatacak ve bize stt_ready sinyali gönderecek // ✅ Recording'i burada başlatmıyoruz, handleMessage'da stt_ready gelince başlatacağız console.log('⏳ [ConversationManager] Waiting for STT ready signal from backend...'); } } catch (error) { console.error('❌ [ConversationManager] Failed to handle audio end:', error); this.handleAudioError(error); } }; this.audioPlayer.onerror = (error) => { console.error('Audio player error:', error); this.errorSubject.next({ type: 'audio', message: 'Audio playback error occurred', details: error, timestamp: new Date() }); }; this.audioPlayer.onplay = () => { console.log('▶️ [ConversationManager] Audio playback started'); }; this.audioPlayer.onpause = () => { console.log('⏸️ [ConversationManager] Audio playback paused'); }; } private stopAudioPlayback(): void { try { if (this.audioPlayer) { this.audioPlayer.pause(); this.audioPlayer.currentTime = 0; // Cancel any pending play promise if (this.audioPlayerPromise) { this.audioPlayerPromise.catch(() => { // Ignore abort errors }); this.audioPlayerPromise = null; } } } catch (error) { console.error('Error stopping audio playback:', error); } } // Barge-in handling - DEVRE DIŞI performBargeIn(): void { // Barge-in özelliği devre dışı bırakıldı console.log('⚠️ Barge-in is currently disabled'); // Kullanıcıya bilgi ver this.addSystemMessage('Barge-in feature is currently disabled.'); } private addMessage(role: 'user' | 'assistant', text: string, error: boolean = false): void { if (!text || text.trim().length === 0) { return; } const messages = this.messagesSubject.value; messages.push({ role, text, timestamp: new Date(), error }); this.messagesSubject.next([...messages]); } private addSystemMessage(text: string): void { console.log(`📢 System: ${text}`); const messages = this.messagesSubject.value; messages.push({ role: 'system', text, timestamp: new Date() }); this.messagesSubject.next([...messages]); } private handleTTSAudio(message: any): void { try { // Validate audio data if (!message['data']) { console.warn('❌ TTS audio message missing data'); return; } // Detailed log console.log('🎵 TTS chunk received:', { chunkIndex: message['chunk_index'], totalChunks: message['total_chunks'], dataLength: message['data'].length, dataPreview: message['data'].substring(0, 50) + '...', isLast: message['is_last'], mimeType: message['mime_type'] }); // Accumulate audio chunks (already base64) this.audioQueue.push(message['data']); console.log(`📦 Audio queue size: ${this.audioQueue.length} chunks`); if (message['is_last']) { console.log('🔧 Processing final audio chunk...'); try { // All chunks received, combine and create audio blob const combinedBase64 = this.audioQueue.join(''); console.log('✅ Combined audio data:', { totalLength: combinedBase64.length, queueSize: this.audioQueue.length, preview: combinedBase64.substring(0, 100) + '...' }); // Validate base64 console.log('🔍 Validating base64...'); if (!this.isValidBase64(combinedBase64)) { throw new Error('Invalid base64 data received'); } console.log('✅ Base64 validation passed'); const audioBlob = this.base64ToBlob(combinedBase64, message['mime_type'] || 'audio/mpeg'); const audioUrl = URL.createObjectURL(audioBlob); console.log('🎧 Audio URL created:', audioUrl); // Update last message with audio URL const messages = this.messagesSubject.value; if (messages.length > 0) { const lastAssistantMessageIndex = this.findLastAssistantMessageIndex(messages); if (lastAssistantMessageIndex >= 0) { messages[lastAssistantMessageIndex].audioUrl = audioUrl; this.messagesSubject.next([...messages]); console.log('✅ Audio URL attached to assistant message at index:', lastAssistantMessageIndex); // Auto-play if it's welcome message or if in playing_audio state const isWelcomeMessage = messages[lastAssistantMessageIndex].text && messages[lastAssistantMessageIndex].timestamp && (new Date().getTime() - messages[lastAssistantMessageIndex].timestamp.getTime()) < 10000; // 10 saniye içinde if (isWelcomeMessage || this.currentStateSubject.value === 'playing_audio') { setTimeout(() => { console.log('🎵 Auto-playing audio for welcome message'); this.playAudio(audioUrl); }, 500); } } else { console.warn('⚠️ No assistant message found to attach audio'); } } // Clear queue this.audioQueue = []; console.log('🧹 Audio queue cleared'); console.log('✅ Audio processing completed successfully'); } catch (error) { console.error('❌ Error creating audio blob:', error); console.error('Queue size was:', this.audioQueue.length); this.audioQueue = []; } } } catch (error) { console.error('❌ Error handling TTS audio:', error); this.audioQueue = []; // Clear queue on error } } private findLastAssistantMessageIndex(messages: ConversationMessage[]): number { for (let i = messages.length - 1; i >= 0; i--) { if (messages[i].role === 'assistant') { return i; } } return -1; } private isValidBase64(str: string): boolean { try { console.log(`🔍 Checking base64 validity for ${str.length} chars`); // Check if string contains only valid base64 characters const base64Regex = /^[A-Za-z0-9+/]*={0,2}$/; if (!base64Regex.test(str)) { console.error('❌ Base64 regex test failed'); return false; } // Try to decode to verify const decoded = atob(str); console.log(`✅ Base64 decode successful, decoded length: ${decoded.length}`); return true; } catch (e) { console.error('❌ Base64 validation error:', e); return false; } } private base64ToBlob(base64: string, mimeType: string): Blob { try { console.log('🔄 Converting base64 to blob:', { base64Length: base64.length, mimeType: mimeType }); const byteCharacters = atob(base64); console.log(`📊 Decoded to ${byteCharacters.length} bytes`); const byteNumbers = new Array(byteCharacters.length); for (let i = 0; i < byteCharacters.length; i++) { byteNumbers[i] = byteCharacters.charCodeAt(i); } const byteArray = new Uint8Array(byteNumbers); const blob = new Blob([byteArray], { type: mimeType }); console.log('✅ Blob created:', { size: blob.size, type: blob.type, sizeKB: (blob.size / 1024).toFixed(2) + ' KB' }); return blob; } catch (error) { console.error('❌ Error converting base64 to blob:', error); console.error('Input details:', { base64Length: base64.length, base64Preview: base64.substring(0, 100) + '...', mimeType: mimeType }); throw new Error('Failed to convert audio data'); } } private handleAudioError(error: any): void { const conversationError: ConversationError = { type: error.type || 'audio', message: error.message || 'Audio error occurred', details: error, timestamp: new Date() }; this.errorSubject.next(conversationError); // Add user-friendly message if (error.type === 'permission') { this.addSystemMessage('Microphone permission denied. Please allow microphone access.'); } else if (error.type === 'device') { this.addSystemMessage('Microphone not found or not accessible.'); } else { this.addSystemMessage('Audio error occurred. Please check your microphone.'); } // Update state this.currentStateSubject.next('error'); } private handleWebSocketError(error: any): void { const conversationError: ConversationError = { type: 'websocket', message: error.message || 'WebSocket error occurred', details: error, timestamp: new Date() }; this.errorSubject.next(conversationError); this.addSystemMessage('Connection error. Please check your internet connection.'); // Don't set error state for temporary connection issues if (this.wsService.getReconnectionInfo().isReconnecting) { this.addSystemMessage('Attempting to reconnect...'); } else { this.currentStateSubject.next('error'); } } private handleServerError(message: any): void { const errorType = message['error_type'] || 'unknown'; const errorMessage = message['message'] || 'Server error occurred'; const conversationError: ConversationError = { type: errorType === 'race_condition' ? 'network' : 'unknown', message: errorMessage, details: message, timestamp: new Date() }; this.errorSubject.next(conversationError); // STT initialization hatası için özel handling if (errorType === 'stt_init_failed') { this.addSystemMessage('Speech recognition service failed to initialize. Please check your configuration.'); // Konuşmayı durdur this.stopConversation(); } else if (errorType === 'race_condition') { this.addSystemMessage('Session conflict detected. Please restart the conversation.'); } else if (errorType === 'stt_error') { this.addSystemMessage('Speech recognition error. Please try speaking again.'); // STT hatası durumunda yeniden başlatmayı dene if (errorMessage.includes('Streaming not started')) { this.addSystemMessage('Restarting speech recognition...'); // WebSocket'e restart sinyali gönder if (this.wsService.isConnected()) { this.wsService.sendControl('restart_stt'); } } } else if (errorType === 'tts_error') { this.addSystemMessage('Text-to-speech error. Response will be shown as text only.'); } else { this.addSystemMessage(`Error: ${errorMessage}`); } } private determineErrorType(error: any): ConversationError['type'] { if (error.type) { return error.type; } if (error.message?.includes('WebSocket') || error.message?.includes('connection')) { return 'websocket'; } if (error.message?.includes('microphone') || error.message?.includes('audio')) { return 'audio'; } if (error.message?.includes('permission')) { return 'permission'; } if (error.message?.includes('network') || error.status === 0) { return 'network'; } return 'unknown'; } private cleanup(): void { try { this.subscriptions.unsubscribe(); this.subscriptions = new Subscription(); // Audio recording'i kesinlikle durdur if (this.audioService.isRecording()) { this.audioService.stopRecording(); } this.wsService.disconnect(); this.stopAudioPlayback(); if (this.audioPlayer) { this.audioPlayer = null; } this.audioQueue = []; this.isInterrupting = false; this.currentStateSubject.next('idle'); this.sttReadySubject.complete(); console.log('🧹 Conversation cleaned up'); } catch (error) { console.error('Error during cleanup:', error); } } // Public methods for UI getCurrentState(): ConversationState { return this.currentStateSubject.value; } getMessages(): ConversationMessage[] { return this.messagesSubject.value; } clearMessages(): void { this.messagesSubject.next([]); this.transcriptionSubject.next(''); } updateConfig(config: Partial): void { this.conversationConfig = { ...this.conversationConfig, ...config }; // Send config update if connected if (this.wsService.isConnected()) { try { this.wsService.sendControl('update_config', config); } catch (error) { console.error('Failed to update config:', error); } } } getConfig(): ConversationConfig { return { ...this.conversationConfig }; } isConnected(): boolean { return this.wsService.isConnected(); } // Retry connection async retryConnection(): Promise { if (!this.sessionId) { throw new Error('No session ID available for retry'); } this.currentStateSubject.next('idle'); await this.startConversation(this.sessionId, this.conversationConfig); } }