flare / flare-ui /src /app /services /websocket.service.ts
ciyidogan's picture
Upload 118 files
9f79da5 verified
// websocket.service.ts
// Path: /flare-ui/src/app/services/websocket.service.ts
import { Injectable } from '@angular/core';
import { Subject, Observable, timer, throwError } from 'rxjs';
import { retry, tap, catchError } from 'rxjs/operators';
export interface WebSocketMessage {
type: string;
[key: string]: any;
}
export interface TranscriptionResult {
text: string;
is_final: boolean;
confidence: number;
}
export interface StateChangeMessage {
from: string;
to: string;
}
@Injectable({
providedIn: 'root'
})
export class WebSocketService {
private socket: WebSocket | null = null;
private url: string = '';
private reconnectAttempts = 0;
private maxReconnectAttempts = 5;
private reconnectDelay = 1000;
private keepAliveInterval: any;
private connectionTimeout: any;
private readonly CONNECTION_TIMEOUT = 30000; // 30 seconds
// Subjects for different message types
private messageSubject = new Subject<WebSocketMessage>();
private transcriptionSubject = new Subject<TranscriptionResult>();
private stateChangeSubject = new Subject<StateChangeMessage>();
private errorSubject = new Subject<string>();
private connectionSubject = new Subject<boolean>();
// Public observables
public message$ = this.messageSubject.asObservable();
public transcription$ = this.transcriptionSubject.asObservable();
public stateChange$ = this.stateChangeSubject.asObservable();
public error$ = this.errorSubject.asObservable();
public connection$ = this.connectionSubject.asObservable();
constructor() {}
connect(sessionId: string): Promise<void> {
return new Promise((resolve, reject) => {
try {
if (!sessionId) {
const error = new Error('Session ID is required');
reject(error);
return;
}
// Close existing connection if any
if (this.socket) {
this.disconnect();
}
// Construct WebSocket URL
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const host = window.location.host;
this.url = `${protocol}//${host}/ws/conversation/${sessionId}`;
console.log(`πŸ”Œ Connecting to WebSocket: ${this.url}`);
// Set connection timeout
this.connectionTimeout = setTimeout(() => {
const error = new Error('WebSocket connection timeout');
console.error('❌ WebSocket connection timeout');
this.handleConnectionError(error);
reject(error);
}, this.CONNECTION_TIMEOUT);
this.socket = new WebSocket(this.url);
this.socket.onopen = () => {
clearTimeout(this.connectionTimeout);
console.log('βœ… WebSocket connected');
this.reconnectAttempts = 0;
this.connectionSubject.next(true);
// Start keep-alive ping
this.startKeepAlive();
resolve();
};
this.socket.onmessage = (event) => {
try {
const message: WebSocketMessage = JSON.parse(event.data);
this.handleMessage(message);
} catch (error) {
console.error('Failed to parse WebSocket message:', error);
this.errorSubject.next('Invalid message format received');
}
};
this.socket.onerror = (error) => {
clearTimeout(this.connectionTimeout);
console.error('❌ WebSocket error:', error);
this.handleConnectionError(error);
reject(error);
};
this.socket.onclose = (event) => {
clearTimeout(this.connectionTimeout);
console.log('πŸ”Œ WebSocket disconnected', {
code: event.code,
reason: event.reason,
wasClean: event.wasClean
});
this.connectionSubject.next(false);
this.stopKeepAlive();
// Handle different close codes
if (event.code === 1006) {
// Abnormal closure
this.errorSubject.next('WebSocket connection lost unexpectedly');
} else if (event.code === 1000) {
// Normal closure
console.log('WebSocket closed normally');
} else {
// Other closure codes
this.errorSubject.next(`WebSocket closed: ${event.reason || 'Unknown reason'}`);
}
// Attempt reconnection for non-normal closures
if (event.code !== 1000 && this.reconnectAttempts < this.maxReconnectAttempts) {
this.attemptReconnect(sessionId);
}
};
} catch (error) {
console.error('Failed to create WebSocket:', error);
this.handleConnectionError(error);
reject(error);
}
});
}
disconnect(): void {
try {
this.stopKeepAlive();
clearTimeout(this.connectionTimeout);
if (this.socket) {
// Close with normal closure code
this.socket.close(1000, 'Client disconnect');
this.socket = null;
}
this.connectionSubject.next(false);
this.reconnectAttempts = 0;
} catch (error) {
console.error('Error during disconnect:', error);
}
}
send(message: WebSocketMessage): void {
try {
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
const messageStr = JSON.stringify(message);
this.socket.send(messageStr);
} else {
const errorMsg = 'WebSocket is not connected';
console.warn(errorMsg);
this.errorSubject.next(errorMsg);
// Optionally throw error for caller to handle
throw new Error(errorMsg);
}
} catch (error) {
console.error('Error sending message:', error);
this.errorSubject.next('Failed to send message');
throw error;
}
}
sendAudioChunk(audioData: string): void {
try {
this.send({
type: 'audio_chunk',
data: audioData,
timestamp: Date.now()
});
} catch (error) {
console.error('Failed to send audio chunk:', error);
// Don't re-throw for audio chunks to avoid interrupting stream
}
}
sendControl(action: string, config?: any): void {
try {
this.send({
type: 'control',
action: action,
config: config,
timestamp: Date.now()
});
} catch (error) {
console.error(`Failed to send control action '${action}':`, error);
throw error;
}
}
private handleMessage(message: WebSocketMessage): void {
try {
// Emit to general message stream
this.messageSubject.next(message);
// Handle specific message types
switch (message.type) {
case 'transcription':
this.transcriptionSubject.next({
text: message['text'] || '',
is_final: message['is_final'] || false,
confidence: message['confidence'] || 0
});
break;
case 'state_change':
this.stateChangeSubject.next({
from: message['from'] || 'unknown',
to: message['to'] || 'unknown'
});
break;
case 'error':
const errorMessage = message['message'] || 'Unknown error';
this.errorSubject.next(errorMessage);
// Handle specific error types
if (message['error_type'] === 'race_condition') {
console.warn('Race condition detected in WebSocket message');
}
break;
case 'tts_audio':
// Log TTS audio for debugging
console.log('TTS audio message received', {
has_data: !!message['data'],
is_last: message['is_last'],
chunk_index: message['chunk_index'],
mime_type: message['mime_type']
});
break;
case 'assistant_response':
case 'pong':
case 'session_started':
case 'session_config':
case 'stt_ready':
case 'control':
// These are handled by general message stream
break;
default:
console.log('Unknown message type:', message.type);
}
} catch (error) {
console.error('Error handling message:', error);
this.errorSubject.next('Error processing message');
}
}
private attemptReconnect(sessionId: string): void {
this.reconnectAttempts++;
const delay = Math.min(
this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1),
30000 // Max 30 seconds
);
console.log(`πŸ”„ Attempting reconnection ${this.reconnectAttempts}/${this.maxReconnectAttempts} in ${delay}ms`);
setTimeout(() => {
this.connect(sessionId).catch(error => {
console.error('Reconnection failed:', error);
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
this.errorSubject.next('Maximum reconnection attempts reached');
}
});
}, delay);
}
private handleConnectionError(error: any): void {
const errorMessage = error?.message || 'WebSocket connection error';
this.errorSubject.next(errorMessage);
// Log additional error details
console.error('WebSocket connection error details:', {
message: errorMessage,
type: error?.type,
target: error?.target,
timestamp: new Date().toISOString()
});
}
// Keep-alive mechanism
private startKeepAlive(): void {
this.stopKeepAlive(); // Clear any existing interval
this.keepAliveInterval = setInterval(() => {
try {
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
this.send({ type: 'ping', timestamp: Date.now() });
} else {
console.warn('Keep-alive: WebSocket not open');
this.stopKeepAlive();
}
} catch (error) {
console.error('Keep-alive error:', error);
this.stopKeepAlive();
}
}, 30000); // Ping every 30 seconds
}
private stopKeepAlive(): void {
if (this.keepAliveInterval) {
clearInterval(this.keepAliveInterval);
this.keepAliveInterval = null;
}
}
isConnected(): boolean {
return this.socket !== null && this.socket.readyState === WebSocket.OPEN;
}
getConnectionState(): string {
if (!this.socket) return 'DISCONNECTED';
switch (this.socket.readyState) {
case WebSocket.CONNECTING:
return 'CONNECTING';
case WebSocket.OPEN:
return 'CONNECTED';
case WebSocket.CLOSING:
return 'CLOSING';
case WebSocket.CLOSED:
return 'CLOSED';
default:
return 'UNKNOWN';
}
}
// Get reconnection status
getReconnectionInfo(): { attempts: number, maxAttempts: number, isReconnecting: boolean } {
return {
attempts: this.reconnectAttempts,
maxAttempts: this.maxReconnectAttempts,
isReconnecting: this.reconnectAttempts > 0 && this.reconnectAttempts < this.maxReconnectAttempts
};
}
// Force reconnect
forceReconnect(sessionId: string): Promise<void> {
this.reconnectAttempts = 0;
this.disconnect();
return this.connect(sessionId);
}
}