flare / flare-ui /src /app /services /conversation-manager.service.ts
ciyidogan's picture
Update flare-ui/src/app/services/conversation-manager.service.ts
fed974c verified
raw
history blame
18.5 kB
// conversation-manager.service.ts
// Path: /flare-ui/src/app/services/conversation-manager.service.ts
import { Injectable, OnDestroy } from '@angular/core';
import { Subject, Subscription, BehaviorSubject, throwError } from 'rxjs';
import { catchError, retry } from 'rxjs/operators';
import { WebSocketService } from './websocket.service';
import { AudioStreamService } from './audio-stream.service';
export type ConversationState =
| 'idle'
| 'listening'
| 'processing_stt'
| 'processing_llm'
| 'processing_tts'
| 'playing_audio'
| 'error';
export interface ConversationMessage {
role: 'user' | 'assistant' | 'system';
text: string;
timestamp: Date;
audioUrl?: string;
error?: boolean;
}
export interface ConversationConfig {
language?: string;
stt_engine?: string;
tts_engine?: string;
enable_barge_in?: boolean;
max_silence_duration?: number;
}
export interface ConversationError {
type: 'websocket' | 'audio' | 'permission' | 'network' | 'unknown';
message: string;
details?: any;
timestamp: Date;
}
@Injectable({
providedIn: 'root'
})
export class ConversationManagerService implements OnDestroy {
private subscriptions = new Subscription();
private audioQueue: string[] = [];
private isInterrupting = false;
private sessionId: string | null = null;
private conversationConfig: ConversationConfig = {
language: 'tr-TR',
stt_engine: 'google',
enable_barge_in: true
};
// State management
private currentStateSubject = new BehaviorSubject<ConversationState>('idle');
public currentState$ = this.currentStateSubject.asObservable();
// Message history
private messagesSubject = new BehaviorSubject<ConversationMessage[]>([]);
public messages$ = this.messagesSubject.asObservable();
// Current transcription
private transcriptionSubject = new BehaviorSubject<string>('');
public transcription$ = this.transcriptionSubject.asObservable();
// Error handling
private errorSubject = new Subject<ConversationError>();
public error$ = this.errorSubject.asObservable();
// Audio player reference
private audioPlayer: HTMLAudioElement | null = null;
private audioPlayerPromise: Promise<void> | null = null;
constructor(
private wsService: WebSocketService,
private audioService: AudioStreamService
) {}
ngOnDestroy(): void {
this.cleanup();
}
async startConversation(sessionId: string, config?: ConversationConfig): Promise<void> {
try {
if (!sessionId) {
throw new Error('Session ID is required');
}
// Update configuration
if (config) {
this.conversationConfig = { ...this.conversationConfig, ...config };
}
this.sessionId = sessionId;
// Reset state
this.clearMessages();
this.currentStateSubject.next('idle');
// Connect WebSocket first
await this.wsService.connect(sessionId).catch(error => {
throw new Error(`WebSocket connection failed: ${error.message}`);
});
// Start audio recording
await this.audioService.startRecording().catch(error => {
// Disconnect WebSocket if audio fails
this.wsService.disconnect();
throw new Error(`Audio recording failed: ${error.message}`);
});
// Set up subscriptions
this.setupSubscriptions();
// Send start signal with configuration
this.wsService.sendControl('start_session', this.conversationConfig);
console.log('✅ Conversation started with config:', this.conversationConfig);
} catch (error: any) {
console.error('Failed to start conversation:', error);
const conversationError: ConversationError = {
type: this.determineErrorType(error),
message: error.message || 'Failed to start conversation',
details: error,
timestamp: new Date()
};
this.errorSubject.next(conversationError);
this.currentStateSubject.next('error');
this.cleanup();
throw error;
}
}
stopConversation(): void {
try {
// Send stop signal if connected
if (this.wsService.isConnected()) {
this.wsService.sendControl('stop_session');
}
this.cleanup();
this.addSystemMessage('Conversation ended');
} catch (error) {
console.error('Error stopping conversation:', error);
this.cleanup();
}
}
private setupSubscriptions(): void {
// Audio chunks from microphone
this.subscriptions.add(
this.audioService.audioChunk$.subscribe({
next: (chunk) => {
if (!this.isInterrupting && this.wsService.isConnected()) {
try {
this.wsService.sendAudioChunk(chunk.data);
} catch (error) {
console.error('Failed to send audio chunk:', error);
}
}
},
error: (error) => {
console.error('Audio stream error:', error);
this.handleAudioError(error);
}
})
);
// Audio stream errors
this.subscriptions.add(
this.audioService.error$.subscribe(error => {
this.handleAudioError(error);
})
);
// WebSocket messages
this.subscriptions.add(
this.wsService.message$.subscribe({
next: (message) => {
this.handleMessage(message);
},
error: (error) => {
console.error('WebSocket message error:', error);
this.handleWebSocketError(error);
}
})
);
// Transcription updates
this.subscriptions.add(
this.wsService.transcription$.subscribe(result => {
if (!result.is_final) {
this.transcriptionSubject.next(result.text);
}
})
);
// State changes
this.subscriptions.add(
this.wsService.stateChange$.subscribe(change => {
this.currentStateSubject.next(change.to as ConversationState);
this.handleStateChange(change.from, change.to);
})
);
// WebSocket errors
this.subscriptions.add(
this.wsService.error$.subscribe(error => {
console.error('WebSocket error:', error);
this.handleWebSocketError({ message: error });
})
);
// WebSocket connection state
this.subscriptions.add(
this.wsService.connection$.subscribe(connected => {
if (!connected && this.currentStateSubject.value !== 'idle') {
this.addSystemMessage('Connection lost. Attempting to reconnect...');
}
})
);
}
private handleMessage(message: any): void {
try {
switch (message.type) {
case 'transcription':
if (message['is_final']) {
this.addMessage('user', message['text']);
this.transcriptionSubject.next('');
}
break;
case 'assistant_response':
this.addMessage('assistant', message['text']);
break;
case 'tts_audio':
this.handleTTSAudio(message);
break;
case 'control':
if (message['action'] === 'stop_playback') {
this.stopAudioPlayback();
}
break;
case 'error':
this.handleServerError(message);
break;
case 'session_config':
// Update configuration from server
if (message['config']) {
this.conversationConfig = { ...this.conversationConfig, ...message['config'] };
}
break;
}
} catch (error) {
console.error('Error handling message:', error);
this.errorSubject.next({
type: 'unknown',
message: 'Failed to process message',
details: error,
timestamp: new Date()
});
}
}
private handleStateChange(from: string, to: string): void {
console.log(`📊 State: ${from}${to}`);
// Handle state-specific logic
if (to === 'listening') {
// Clear transcription when starting to listen
this.transcriptionSubject.next('');
} else if (to === 'playing_audio') {
// Start playing accumulated audio
this.playQueuedAudio();
} else if (to === 'error') {
// Handle error state
this.addSystemMessage('An error occurred in the conversation flow');
}
}
private handleTTSAudio(message: any): void {
try {
// Validate audio data
if (!message['data']) {
console.warn('TTS audio message missing data');
return;
}
// Accumulate audio chunks
this.audioQueue.push(message['data']);
if (message['is_last']) {
// All chunks received, create audio blob
const audioData = this.audioQueue.join('');
const audioBlob = this.base64ToBlob(audioData, message['mime_type'] || 'audio/mpeg');
const audioUrl = URL.createObjectURL(audioBlob);
// Update last message with audio URL
const messages = this.messagesSubject.value;
if (messages.length > 0 && messages[messages.length - 1].role === 'assistant') {
messages[messages.length - 1].audioUrl = audioUrl;
this.messagesSubject.next([...messages]);
}
// Clear queue
this.audioQueue = [];
}
} catch (error) {
console.error('Error handling TTS audio:', error);
this.audioQueue = []; // Clear queue on error
}
}
private playQueuedAudio(): void {
const messages = this.messagesSubject.value;
const lastMessage = messages[messages.length - 1];
if (lastMessage?.audioUrl && lastMessage.role === 'assistant') {
this.playAudio(lastMessage.audioUrl);
}
}
private async playAudio(audioUrl: string): Promise<void> {
try {
if (!this.audioPlayer) {
this.audioPlayer = new Audio();
this.setupAudioPlayerHandlers();
}
this.audioPlayer.src = audioUrl;
// Store the play promise to handle interruptions properly
this.audioPlayerPromise = this.audioPlayer.play();
await this.audioPlayerPromise;
} catch (error: any) {
// Check if error is due to interruption
if (error.name === 'AbortError') {
console.log('Audio playback interrupted');
} else {
console.error('Audio playback error:', error);
this.errorSubject.next({
type: 'audio',
message: 'Failed to play audio response',
details: error,
timestamp: new Date()
});
}
} finally {
this.audioPlayerPromise = null;
}
}
private setupAudioPlayerHandlers(): void {
if (!this.audioPlayer) return;
this.audioPlayer.onended = () => {
// Notify that audio playback ended
if (this.wsService.isConnected()) {
try {
this.wsService.sendControl('audio_ended');
} catch (error) {
console.error('Failed to send audio_ended signal:', error);
}
}
this.currentStateSubject.next('idle');
};
this.audioPlayer.onerror = (error) => {
console.error('Audio player error:', error);
this.currentStateSubject.next('idle');
};
}
private stopAudioPlayback(): void {
try {
if (this.audioPlayer) {
this.audioPlayer.pause();
this.audioPlayer.currentTime = 0;
// Cancel any pending play promise
if (this.audioPlayerPromise) {
this.audioPlayerPromise.catch(() => {
// Ignore abort errors
});
this.audioPlayerPromise = null;
}
}
} catch (error) {
console.error('Error stopping audio playback:', error);
}
}
// Barge-in handling
performBargeIn(): void {
const currentState = this.currentStateSubject.value;
if (currentState !== 'idle' && currentState !== 'listening' && currentState !== 'error') {
console.log('🛑 Performing barge-in');
this.isInterrupting = true;
// Stop audio playback
this.stopAudioPlayback();
// Clear audio queue
this.audioQueue = [];
// Notify server
if (this.wsService.isConnected()) {
try {
this.wsService.sendControl('interrupt', {
at_state: currentState,
timestamp: Date.now()
});
} catch (error) {
console.error('Failed to send interrupt signal:', error);
}
}
// Reset interruption flag after a delay
setTimeout(() => {
this.isInterrupting = false;
}, 500);
}
}
private addMessage(role: 'user' | 'assistant', text: string, error: boolean = false): void {
if (!text || text.trim().length === 0) {
return;
}
const messages = this.messagesSubject.value;
messages.push({
role,
text,
timestamp: new Date(),
error
});
this.messagesSubject.next([...messages]);
}
private addSystemMessage(text: string): void {
console.log(`📢 System: ${text}`);
const messages = this.messagesSubject.value;
messages.push({
role: 'system',
text,
timestamp: new Date()
});
this.messagesSubject.next([...messages]);
}
private base64ToBlob(base64: string, mimeType: string): Blob {
try {
const byteCharacters = atob(base64);
const byteNumbers = new Array(byteCharacters.length);
for (let i = 0; i < byteCharacters.length; i++) {
byteNumbers[i] = byteCharacters.charCodeAt(i);
}
const byteArray = new Uint8Array(byteNumbers);
return new Blob([byteArray], { type: mimeType });
} catch (error) {
console.error('Error converting base64 to blob:', error);
throw new Error('Failed to convert audio data');
}
}
private handleAudioError(error: any): void {
const conversationError: ConversationError = {
type: error.type || 'audio',
message: error.message || 'Audio error occurred',
details: error,
timestamp: new Date()
};
this.errorSubject.next(conversationError);
// Add user-friendly message
if (error.type === 'permission') {
this.addSystemMessage('Microphone permission denied. Please allow microphone access.');
} else if (error.type === 'device') {
this.addSystemMessage('Microphone not found or not accessible.');
} else {
this.addSystemMessage('Audio error occurred. Please check your microphone.');
}
// Update state
this.currentStateSubject.next('error');
}
private handleWebSocketError(error: any): void {
const conversationError: ConversationError = {
type: 'websocket',
message: error.message || 'WebSocket error occurred',
details: error,
timestamp: new Date()
};
this.errorSubject.next(conversationError);
this.addSystemMessage('Connection error. Please check your internet connection.');
// Don't set error state for temporary connection issues
if (this.wsService.getReconnectionInfo().isReconnecting) {
this.addSystemMessage('Attempting to reconnect...');
} else {
this.currentStateSubject.next('error');
}
}
private handleServerError(message: any): void {
const errorType = message['error_type'] || 'unknown';
const errorMessage = message['message'] || 'Server error occurred';
const conversationError: ConversationError = {
type: errorType === 'race_condition' ? 'network' : 'unknown',
message: errorMessage,
details: message,
timestamp: new Date()
};
this.errorSubject.next(conversationError);
// Add user-friendly message based on error type
if (errorType === 'race_condition') {
this.addSystemMessage('Session conflict detected. Please restart the conversation.');
} else if (errorType === 'stt_error') {
this.addSystemMessage('Speech recognition error. Please try speaking again.');
} else if (errorType === 'tts_error') {
this.addSystemMessage('Text-to-speech error. Response will be shown as text only.');
} else {
this.addSystemMessage(`Error: ${errorMessage}`);
}
}
private determineErrorType(error: any): ConversationError['type'] {
if (error.type) {
return error.type;
}
if (error.message?.includes('WebSocket') || error.message?.includes('connection')) {
return 'websocket';
}
if (error.message?.includes('microphone') || error.message?.includes('audio')) {
return 'audio';
}
if (error.message?.includes('permission')) {
return 'permission';
}
if (error.message?.includes('network') || error.status === 0) {
return 'network';
}
return 'unknown';
}
private cleanup(): void {
try {
this.subscriptions.unsubscribe();
this.subscriptions = new Subscription();
this.audioService.stopRecording();
this.wsService.disconnect();
this.stopAudioPlayback();
if (this.audioPlayer) {
this.audioPlayer = null;
}
this.audioQueue = [];
this.isInterrupting = false;
this.currentStateSubject.next('idle');
console.log('🧹 Conversation cleaned up');
} catch (error) {
console.error('Error during cleanup:', error);
}
}
// Public methods for UI
getCurrentState(): ConversationState {
return this.currentStateSubject.value;
}
getMessages(): ConversationMessage[] {
return this.messagesSubject.value;
}
clearMessages(): void {
this.messagesSubject.next([]);
this.transcriptionSubject.next('');
}
updateConfig(config: Partial<ConversationConfig>): void {
this.conversationConfig = { ...this.conversationConfig, ...config };
// Send config update if connected
if (this.wsService.isConnected()) {
try {
this.wsService.sendControl('update_config', config);
} catch (error) {
console.error('Failed to update config:', error);
}
}
}
getConfig(): ConversationConfig {
return { ...this.conversationConfig };
}
isConnected(): boolean {
return this.wsService.isConnected();
}
// Retry connection
async retryConnection(): Promise<void> {
if (!this.sessionId) {
throw new Error('No session ID available for retry');
}
this.currentStateSubject.next('idle');
await this.startConversation(this.sessionId, this.conversationConfig);
}
}