Spaces:
Paused
Paused
// conversation-manager.service.ts | |
// Path: /flare-ui/src/app/services/conversation-manager.service.ts | |
import { Injectable, OnDestroy } from '@angular/core'; | |
import { Subject, Subscription, BehaviorSubject, throwError } from 'rxjs'; | |
import { catchError, retry } from 'rxjs/operators'; | |
import { WebSocketService } from './websocket.service'; | |
import { AudioStreamService } from './audio-stream.service'; | |
export type ConversationState = | |
| 'idle' | |
| 'listening' | |
| 'processing_stt' | |
| 'processing_llm' | |
| 'processing_tts' | |
| 'playing_audio' | |
| 'error'; | |
export interface ConversationMessage { | |
role: 'user' | 'assistant' | 'system'; | |
text: string; | |
timestamp: Date; | |
audioUrl?: string; | |
error?: boolean; | |
} | |
export interface ConversationConfig { | |
language?: string; | |
stt_engine?: string; | |
tts_engine?: string; | |
enable_barge_in?: boolean; | |
max_silence_duration?: number; | |
} | |
export interface ConversationError { | |
type: 'websocket' | 'audio' | 'permission' | 'network' | 'unknown'; | |
message: string; | |
details?: any; | |
timestamp: Date; | |
} | |
({ | |
providedIn: 'root' | |
}) | |
export class ConversationManagerService implements OnDestroy { | |
private subscriptions = new Subscription(); | |
private audioQueue: string[] = []; | |
private isInterrupting = false; | |
private sessionId: string | null = null; | |
private conversationConfig: ConversationConfig = { | |
language: 'tr-TR', | |
stt_engine: 'google', | |
enable_barge_in: true | |
}; | |
// State management | |
private currentStateSubject = new BehaviorSubject<ConversationState>('idle'); | |
public currentState$ = this.currentStateSubject.asObservable(); | |
// Message history | |
private messagesSubject = new BehaviorSubject<ConversationMessage[]>([]); | |
public messages$ = this.messagesSubject.asObservable(); | |
// Current transcription | |
private transcriptionSubject = new BehaviorSubject<string>(''); | |
public transcription$ = this.transcriptionSubject.asObservable(); | |
// Error handling | |
private errorSubject = new Subject<ConversationError>(); | |
public error$ = this.errorSubject.asObservable(); | |
// Audio player reference | |
private audioPlayer: HTMLAudioElement | null = null; | |
private audioPlayerPromise: Promise<void> | null = null; | |
constructor( | |
private wsService: WebSocketService, | |
private audioService: AudioStreamService | |
) {} | |
ngOnDestroy(): void { | |
this.cleanup(); | |
} | |
async startConversation(sessionId: string, config?: ConversationConfig): Promise<void> { | |
try { | |
if (!sessionId) { | |
throw new Error('Session ID is required'); | |
} | |
// Update configuration | |
if (config) { | |
this.conversationConfig = { ...this.conversationConfig, ...config }; | |
} | |
this.sessionId = sessionId; | |
// Reset state | |
this.clearMessages(); | |
this.currentStateSubject.next('idle'); | |
// Connect WebSocket first | |
await this.wsService.connect(sessionId).catch(error => { | |
throw new Error(`WebSocket connection failed: ${error.message}`); | |
}); | |
// Start audio recording | |
await this.audioService.startRecording().catch(error => { | |
// Disconnect WebSocket if audio fails | |
this.wsService.disconnect(); | |
throw new Error(`Audio recording failed: ${error.message}`); | |
}); | |
// Set up subscriptions | |
this.setupSubscriptions(); | |
// Send start signal with configuration | |
this.wsService.sendControl('start_session', this.conversationConfig); | |
console.log('✅ Conversation started with config:', this.conversationConfig); | |
} catch (error: any) { | |
console.error('Failed to start conversation:', error); | |
const conversationError: ConversationError = { | |
type: this.determineErrorType(error), | |
message: error.message || 'Failed to start conversation', | |
details: error, | |
timestamp: new Date() | |
}; | |
this.errorSubject.next(conversationError); | |
this.currentStateSubject.next('error'); | |
this.cleanup(); | |
throw error; | |
} | |
} | |
stopConversation(): void { | |
try { | |
// Send stop signal if connected | |
if (this.wsService.isConnected()) { | |
this.wsService.sendControl('stop_session'); | |
} | |
this.cleanup(); | |
this.addSystemMessage('Conversation ended'); | |
} catch (error) { | |
console.error('Error stopping conversation:', error); | |
this.cleanup(); | |
} | |
} | |
private setupSubscriptions(): void { | |
// Audio chunks from microphone | |
this.subscriptions.add( | |
this.audioService.audioChunk$.subscribe({ | |
next: (chunk) => { | |
if (!this.isInterrupting && this.wsService.isConnected()) { | |
try { | |
this.wsService.sendAudioChunk(chunk.data); | |
} catch (error) { | |
console.error('Failed to send audio chunk:', error); | |
} | |
} | |
}, | |
error: (error) => { | |
console.error('Audio stream error:', error); | |
this.handleAudioError(error); | |
} | |
}) | |
); | |
// Audio stream errors | |
this.subscriptions.add( | |
this.audioService.error$.subscribe(error => { | |
this.handleAudioError(error); | |
}) | |
); | |
// WebSocket messages | |
this.subscriptions.add( | |
this.wsService.message$.subscribe({ | |
next: (message) => { | |
this.handleMessage(message); | |
}, | |
error: (error) => { | |
console.error('WebSocket message error:', error); | |
this.handleWebSocketError(error); | |
} | |
}) | |
); | |
// Transcription updates | |
this.subscriptions.add( | |
this.wsService.transcription$.subscribe(result => { | |
if (!result.is_final) { | |
this.transcriptionSubject.next(result.text); | |
} | |
}) | |
); | |
// State changes | |
this.subscriptions.add( | |
this.wsService.stateChange$.subscribe(change => { | |
this.currentStateSubject.next(change.to as ConversationState); | |
this.handleStateChange(change.from, change.to); | |
}) | |
); | |
// WebSocket errors | |
this.subscriptions.add( | |
this.wsService.error$.subscribe(error => { | |
console.error('WebSocket error:', error); | |
this.handleWebSocketError({ message: error }); | |
}) | |
); | |
// WebSocket connection state | |
this.subscriptions.add( | |
this.wsService.connection$.subscribe(connected => { | |
if (!connected && this.currentStateSubject.value !== 'idle') { | |
this.addSystemMessage('Connection lost. Attempting to reconnect...'); | |
} | |
}) | |
); | |
} | |
private handleMessage(message: any): void { | |
try { | |
switch (message.type) { | |
case 'transcription': | |
if (message['is_final']) { | |
this.addMessage('user', message['text']); | |
this.transcriptionSubject.next(''); | |
} | |
break; | |
case 'assistant_response': | |
this.addMessage('assistant', message['text']); | |
break; | |
case 'tts_audio': | |
this.handleTTSAudio(message); | |
break; | |
case 'control': | |
if (message['action'] === 'stop_playback') { | |
this.stopAudioPlayback(); | |
} | |
break; | |
case 'error': | |
this.handleServerError(message); | |
break; | |
case 'session_config': | |
// Update configuration from server | |
if (message['config']) { | |
this.conversationConfig = { ...this.conversationConfig, ...message['config'] }; | |
} | |
break; | |
} | |
} catch (error) { | |
console.error('Error handling message:', error); | |
this.errorSubject.next({ | |
type: 'unknown', | |
message: 'Failed to process message', | |
details: error, | |
timestamp: new Date() | |
}); | |
} | |
} | |
private handleStateChange(from: string, to: string): void { | |
console.log(`📊 State: ${from} → ${to}`); | |
// Handle state-specific logic | |
if (to === 'listening') { | |
// Clear transcription when starting to listen | |
this.transcriptionSubject.next(''); | |
} else if (to === 'playing_audio') { | |
// Start playing accumulated audio | |
this.playQueuedAudio(); | |
} else if (to === 'error') { | |
// Handle error state | |
this.addSystemMessage('An error occurred in the conversation flow'); | |
} | |
} | |
private handleTTSAudio(message: any): void { | |
try { | |
// Validate audio data | |
if (!message['data']) { | |
console.warn('TTS audio message missing data'); | |
return; | |
} | |
// Accumulate audio chunks | |
this.audioQueue.push(message['data']); | |
if (message['is_last']) { | |
// All chunks received, create audio blob | |
const audioData = this.audioQueue.join(''); | |
const audioBlob = this.base64ToBlob(audioData, message['mime_type'] || 'audio/mpeg'); | |
const audioUrl = URL.createObjectURL(audioBlob); | |
// Update last message with audio URL | |
const messages = this.messagesSubject.value; | |
if (messages.length > 0 && messages[messages.length - 1].role === 'assistant') { | |
messages[messages.length - 1].audioUrl = audioUrl; | |
this.messagesSubject.next([...messages]); | |
} | |
// Clear queue | |
this.audioQueue = []; | |
} | |
} catch (error) { | |
console.error('Error handling TTS audio:', error); | |
this.audioQueue = []; // Clear queue on error | |
} | |
} | |
private playQueuedAudio(): void { | |
const messages = this.messagesSubject.value; | |
const lastMessage = messages[messages.length - 1]; | |
if (lastMessage?.audioUrl && lastMessage.role === 'assistant') { | |
this.playAudio(lastMessage.audioUrl); | |
} | |
} | |
private async playAudio(audioUrl: string): Promise<void> { | |
try { | |
if (!this.audioPlayer) { | |
this.audioPlayer = new Audio(); | |
this.setupAudioPlayerHandlers(); | |
} | |
this.audioPlayer.src = audioUrl; | |
// Store the play promise to handle interruptions properly | |
this.audioPlayerPromise = this.audioPlayer.play(); | |
await this.audioPlayerPromise; | |
} catch (error: any) { | |
// Check if error is due to interruption | |
if (error.name === 'AbortError') { | |
console.log('Audio playback interrupted'); | |
} else { | |
console.error('Audio playback error:', error); | |
this.errorSubject.next({ | |
type: 'audio', | |
message: 'Failed to play audio response', | |
details: error, | |
timestamp: new Date() | |
}); | |
} | |
} finally { | |
this.audioPlayerPromise = null; | |
} | |
} | |
private setupAudioPlayerHandlers(): void { | |
if (!this.audioPlayer) return; | |
this.audioPlayer.onended = () => { | |
// Notify that audio playback ended | |
if (this.wsService.isConnected()) { | |
try { | |
this.wsService.sendControl('audio_ended'); | |
} catch (error) { | |
console.error('Failed to send audio_ended signal:', error); | |
} | |
} | |
this.currentStateSubject.next('idle'); | |
}; | |
this.audioPlayer.onerror = (error) => { | |
console.error('Audio player error:', error); | |
this.currentStateSubject.next('idle'); | |
}; | |
} | |
private stopAudioPlayback(): void { | |
try { | |
if (this.audioPlayer) { | |
this.audioPlayer.pause(); | |
this.audioPlayer.currentTime = 0; | |
// Cancel any pending play promise | |
if (this.audioPlayerPromise) { | |
this.audioPlayerPromise.catch(() => { | |
// Ignore abort errors | |
}); | |
this.audioPlayerPromise = null; | |
} | |
} | |
} catch (error) { | |
console.error('Error stopping audio playback:', error); | |
} | |
} | |
// Barge-in handling | |
performBargeIn(): void { | |
const currentState = this.currentStateSubject.value; | |
if (currentState !== 'idle' && currentState !== 'listening' && currentState !== 'error') { | |
console.log('🛑 Performing barge-in'); | |
this.isInterrupting = true; | |
// Stop audio playback | |
this.stopAudioPlayback(); | |
// Clear audio queue | |
this.audioQueue = []; | |
// Notify server | |
if (this.wsService.isConnected()) { | |
try { | |
this.wsService.sendControl('interrupt', { | |
at_state: currentState, | |
timestamp: Date.now() | |
}); | |
} catch (error) { | |
console.error('Failed to send interrupt signal:', error); | |
} | |
} | |
// Reset interruption flag after a delay | |
setTimeout(() => { | |
this.isInterrupting = false; | |
}, 500); | |
} | |
} | |
private addMessage(role: 'user' | 'assistant', text: string, error: boolean = false): void { | |
if (!text || text.trim().length === 0) { | |
return; | |
} | |
const messages = this.messagesSubject.value; | |
messages.push({ | |
role, | |
text, | |
timestamp: new Date(), | |
error | |
}); | |
this.messagesSubject.next([...messages]); | |
} | |
private addSystemMessage(text: string): void { | |
console.log(`📢 System: ${text}`); | |
const messages = this.messagesSubject.value; | |
messages.push({ | |
role: 'system', | |
text, | |
timestamp: new Date() | |
}); | |
this.messagesSubject.next([...messages]); | |
} | |
private base64ToBlob(base64: string, mimeType: string): Blob { | |
try { | |
const byteCharacters = atob(base64); | |
const byteNumbers = new Array(byteCharacters.length); | |
for (let i = 0; i < byteCharacters.length; i++) { | |
byteNumbers[i] = byteCharacters.charCodeAt(i); | |
} | |
const byteArray = new Uint8Array(byteNumbers); | |
return new Blob([byteArray], { type: mimeType }); | |
} catch (error) { | |
console.error('Error converting base64 to blob:', error); | |
throw new Error('Failed to convert audio data'); | |
} | |
} | |
private handleAudioError(error: any): void { | |
const conversationError: ConversationError = { | |
type: error.type || 'audio', | |
message: error.message || 'Audio error occurred', | |
details: error, | |
timestamp: new Date() | |
}; | |
this.errorSubject.next(conversationError); | |
// Add user-friendly message | |
if (error.type === 'permission') { | |
this.addSystemMessage('Microphone permission denied. Please allow microphone access.'); | |
} else if (error.type === 'device') { | |
this.addSystemMessage('Microphone not found or not accessible.'); | |
} else { | |
this.addSystemMessage('Audio error occurred. Please check your microphone.'); | |
} | |
// Update state | |
this.currentStateSubject.next('error'); | |
} | |
private handleWebSocketError(error: any): void { | |
const conversationError: ConversationError = { | |
type: 'websocket', | |
message: error.message || 'WebSocket error occurred', | |
details: error, | |
timestamp: new Date() | |
}; | |
this.errorSubject.next(conversationError); | |
this.addSystemMessage('Connection error. Please check your internet connection.'); | |
// Don't set error state for temporary connection issues | |
if (this.wsService.getReconnectionInfo().isReconnecting) { | |
this.addSystemMessage('Attempting to reconnect...'); | |
} else { | |
this.currentStateSubject.next('error'); | |
} | |
} | |
private handleServerError(message: any): void { | |
const errorType = message['error_type'] || 'unknown'; | |
const errorMessage = message['message'] || 'Server error occurred'; | |
const conversationError: ConversationError = { | |
type: errorType === 'race_condition' ? 'network' : 'unknown', | |
message: errorMessage, | |
details: message, | |
timestamp: new Date() | |
}; | |
this.errorSubject.next(conversationError); | |
// Add user-friendly message based on error type | |
if (errorType === 'race_condition') { | |
this.addSystemMessage('Session conflict detected. Please restart the conversation.'); | |
} else if (errorType === 'stt_error') { | |
this.addSystemMessage('Speech recognition error. Please try speaking again.'); | |
} else if (errorType === 'tts_error') { | |
this.addSystemMessage('Text-to-speech error. Response will be shown as text only.'); | |
} else { | |
this.addSystemMessage(`Error: ${errorMessage}`); | |
} | |
} | |
private determineErrorType(error: any): ConversationError['type'] { | |
if (error.type) { | |
return error.type; | |
} | |
if (error.message?.includes('WebSocket') || error.message?.includes('connection')) { | |
return 'websocket'; | |
} | |
if (error.message?.includes('microphone') || error.message?.includes('audio')) { | |
return 'audio'; | |
} | |
if (error.message?.includes('permission')) { | |
return 'permission'; | |
} | |
if (error.message?.includes('network') || error.status === 0) { | |
return 'network'; | |
} | |
return 'unknown'; | |
} | |
private cleanup(): void { | |
try { | |
this.subscriptions.unsubscribe(); | |
this.subscriptions = new Subscription(); | |
this.audioService.stopRecording(); | |
this.wsService.disconnect(); | |
this.stopAudioPlayback(); | |
if (this.audioPlayer) { | |
this.audioPlayer = null; | |
} | |
this.audioQueue = []; | |
this.isInterrupting = false; | |
this.currentStateSubject.next('idle'); | |
console.log('🧹 Conversation cleaned up'); | |
} catch (error) { | |
console.error('Error during cleanup:', error); | |
} | |
} | |
// Public methods for UI | |
getCurrentState(): ConversationState { | |
return this.currentStateSubject.value; | |
} | |
getMessages(): ConversationMessage[] { | |
return this.messagesSubject.value; | |
} | |
clearMessages(): void { | |
this.messagesSubject.next([]); | |
this.transcriptionSubject.next(''); | |
} | |
updateConfig(config: Partial<ConversationConfig>): void { | |
this.conversationConfig = { ...this.conversationConfig, ...config }; | |
// Send config update if connected | |
if (this.wsService.isConnected()) { | |
try { | |
this.wsService.sendControl('update_config', config); | |
} catch (error) { | |
console.error('Failed to update config:', error); | |
} | |
} | |
} | |
getConfig(): ConversationConfig { | |
return { ...this.conversationConfig }; | |
} | |
isConnected(): boolean { | |
return this.wsService.isConnected(); | |
} | |
// Retry connection | |
async retryConnection(): Promise<void> { | |
if (!this.sessionId) { | |
throw new Error('No session ID available for retry'); | |
} | |
this.currentStateSubject.next('idle'); | |
await this.startConversation(this.sessionId, this.conversationConfig); | |
} | |
} |