ciyidogan commited on
Commit
615eca3
Β·
verified Β·
1 Parent(s): ef24465

Update flare-ui/src/app/services/conversation-manager.service.ts

Browse files
flare-ui/src/app/services/conversation-manager.service.ts CHANGED
@@ -1,210 +1,290 @@
1
- import { Injectable } from '@angular/core';
2
- import { Subject, Observable, timer } from 'rxjs';
3
- import { retry, tap } from 'rxjs/operators';
 
4
 
5
- export interface WebSocketMessage {
6
- type: string;
7
- [key: string]: any;
8
- }
 
 
 
9
 
10
- export interface TranscriptionResult {
 
11
  text: string;
12
- is_final: boolean;
13
- confidence: number;
14
- }
15
-
16
- export interface StateChangeMessage {
17
- from: string;
18
- to: string;
19
  }
20
 
21
  @Injectable({
22
  providedIn: 'root'
23
  })
24
- export class WebSocketService {
25
- private socket: WebSocket | null = null;
26
- private url: string = '';
27
- private reconnectAttempts = 0;
28
- private maxReconnectAttempts = 5;
29
- private reconnectDelay = 1000;
30
 
31
- // Subjects for different message types
32
- private messageSubject = new Subject<WebSocketMessage>();
33
- private transcriptionSubject = new Subject<TranscriptionResult>();
34
- private stateChangeSubject = new Subject<StateChangeMessage>();
35
- private errorSubject = new Subject<string>();
36
- private connectionSubject = new Subject<boolean>();
37
 
38
- // Public observables
39
- public message$ = this.messageSubject.asObservable();
 
 
 
 
40
  public transcription$ = this.transcriptionSubject.asObservable();
41
- public stateChange$ = this.stateChangeSubject.asObservable();
42
- public error$ = this.errorSubject.asObservable();
43
- public connection$ = this.connectionSubject.asObservable();
44
-
45
- constructor() {}
46
-
47
- connect(sessionId: string): Promise<void> {
48
- return new Promise((resolve, reject) => {
49
- try {
50
- // Construct WebSocket URL
51
- const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
52
- const host = window.location.host;
53
- this.url = `${protocol}//${host}/ws/conversation/${sessionId}`;
54
-
55
- console.log(`πŸ”Œ Connecting to WebSocket: ${this.url}`);
56
-
57
- this.socket = new WebSocket(this.url);
58
-
59
- this.socket.onopen = () => {
60
- console.log('βœ… WebSocket connected');
61
- this.reconnectAttempts = 0;
62
- this.connectionSubject.next(true);
63
-
64
- // Start keep-alive ping
65
- this.startKeepAlive();
66
-
67
- resolve();
68
- };
69
-
70
- this.socket.onmessage = (event) => {
71
- try {
72
- const message: WebSocketMessage = JSON.parse(event.data);
73
- this.handleMessage(message);
74
- } catch (error) {
75
- console.error('Failed to parse WebSocket message:', error);
76
- }
77
- };
78
-
79
- this.socket.onerror = (error) => {
80
- console.error('❌ WebSocket error:', error);
81
- this.errorSubject.next('WebSocket bağlantı hatası');
82
- reject(error);
83
- };
84
-
85
- this.socket.onclose = () => {
86
- console.log('πŸ”Œ WebSocket disconnected');
87
- this.connectionSubject.next(false);
88
- this.stopKeepAlive();
89
-
90
- // Attempt reconnection
91
- if (this.reconnectAttempts < this.maxReconnectAttempts) {
92
- this.attemptReconnect(sessionId);
93
- }
94
- };
95
-
96
- } catch (error) {
97
- console.error('Failed to create WebSocket:', error);
98
- reject(error);
99
- }
100
- });
101
- }
102
 
103
- disconnect(): void {
104
- this.stopKeepAlive();
105
-
106
- if (this.socket) {
107
- this.socket.close();
108
- this.socket = null;
109
- }
110
-
111
- this.connectionSubject.next(false);
112
  }
113
 
114
- send(message: WebSocketMessage): void {
115
- if (this.socket && this.socket.readyState === WebSocket.OPEN) {
116
- this.socket.send(JSON.stringify(message));
117
- } else {
118
- console.warn('WebSocket is not connected');
119
- this.errorSubject.next('WebSocket bağlantısı yok');
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
120
  }
121
  }
122
 
123
- sendAudioChunk(audioData: string): void {
124
- this.send({
125
- type: 'audio_chunk',
126
- data: audioData,
127
- timestamp: Date.now()
128
- });
129
  }
130
 
131
- sendControl(action: string, config?: any): void {
132
- this.send({
133
- type: 'control',
134
- action: action,
135
- config: config
136
- });
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
137
  }
138
 
139
- private handleMessage(message: WebSocketMessage): void {
140
- // Emit to general message stream
141
- this.messageSubject.next(message);
142
-
143
- // Handle specific message types
144
  switch (message.type) {
145
  case 'transcription':
146
- this.transcriptionSubject.next({
147
- text: message.text,
148
- is_final: message.is_final,
149
- confidence: message.confidence
150
- });
151
- break;
152
-
153
- case 'state_change':
154
- this.stateChangeSubject.next({
155
- from: message['from'],
156
- to: message['to']
157
- });
158
  break;
159
 
160
- case 'error':
161
- this.errorSubject.next(message['message']);
162
  break;
163
 
164
  case 'tts_audio':
165
- // Handle TTS audio chunks
166
- this.messageSubject.next(message);
167
  break;
168
 
169
- case 'assistant_response':
170
- // Handle assistant text response
171
- this.messageSubject.next(message);
 
172
  break;
173
  }
174
  }
175
 
176
- private attemptReconnect(sessionId: string): void {
177
- this.reconnectAttempts++;
178
- const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
 
 
 
 
 
 
 
 
 
 
 
 
 
179
 
180
- console.log(`πŸ”„ Attempting reconnection ${this.reconnectAttempts}/${this.maxReconnectAttempts} in ${delay}ms`);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
181
 
182
- setTimeout(() => {
183
- this.connect(sessionId).catch(error => {
184
- console.error('Reconnection failed:', error);
 
 
 
 
 
 
 
185
  });
186
- }, delay);
 
 
 
 
 
187
  }
188
 
189
- // Keep-alive mechanism
190
- private keepAliveInterval: any;
191
-
192
- private startKeepAlive(): void {
193
- this.keepAliveInterval = setInterval(() => {
194
- if (this.socket && this.socket.readyState === WebSocket.OPEN) {
195
- this.send({ type: 'ping' });
196
- }
197
- }, 30000); // Ping every 30 seconds
198
  }
199
-
200
- private stopKeepAlive(): void {
201
- if (this.keepAliveInterval) {
202
- clearInterval(this.keepAliveInterval);
203
- this.keepAliveInterval = null;
 
 
 
 
 
 
204
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
205
  }
206
 
207
- isConnected(): boolean {
208
- return this.socket !== null && this.socket.readyState === WebSocket.OPEN;
209
  }
210
  }
 
1
+ import { Injectable, OnDestroy } from '@angular/core';
2
+ import { Subject, Subscription, BehaviorSubject } from 'rxjs';
3
+ import { WebSocketService } from './websocket.service';
4
+ import { AudioStreamService } from './audio-stream.service';
5
 
6
+ export type ConversationState =
7
+ | 'idle'
8
+ | 'listening'
9
+ | 'processing_stt'
10
+ | 'processing_llm'
11
+ | 'processing_tts'
12
+ | 'playing_audio';
13
 
14
+ export interface ConversationMessage {
15
+ role: 'user' | 'assistant';
16
  text: string;
17
+ timestamp: Date;
18
+ audioUrl?: string;
 
 
 
 
 
19
  }
20
 
21
  @Injectable({
22
  providedIn: 'root'
23
  })
24
+ export class ConversationManagerService implements OnDestroy {
25
+ private subscriptions = new Subscription();
26
+ private audioQueue: string[] = [];
27
+ private isInterrupting = false;
 
 
28
 
29
+ // State management
30
+ private currentStateSubject = new BehaviorSubject<ConversationState>('idle');
31
+ public currentState$ = this.currentStateSubject.asObservable();
 
 
 
32
 
33
+ // Message history
34
+ private messagesSubject = new BehaviorSubject<ConversationMessage[]>([]);
35
+ public messages$ = this.messagesSubject.asObservable();
36
+
37
+ // Current transcription
38
+ private transcriptionSubject = new BehaviorSubject<string>('');
39
  public transcription$ = this.transcriptionSubject.asObservable();
40
+
41
+ // Audio player reference
42
+ private audioPlayer: HTMLAudioElement | null = null;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
43
 
44
+ constructor(
45
+ private wsService: WebSocketService,
46
+ private audioService: AudioStreamService
47
+ ) {}
48
+
49
+ ngOnDestroy(): void {
50
+ this.cleanup();
 
 
51
  }
52
 
53
+ async startConversation(sessionId: string): Promise<void> {
54
+ try {
55
+ // Connect WebSocket
56
+ await this.wsService.connect(sessionId);
57
+
58
+ // Start audio recording
59
+ await this.audioService.startRecording();
60
+
61
+ // Set up subscriptions
62
+ this.setupSubscriptions();
63
+
64
+ // Send start signal
65
+ this.wsService.sendControl('start_session', {
66
+ language: 'tr-TR',
67
+ stt_engine: 'google',
68
+ enable_barge_in: true
69
+ });
70
+
71
+ console.log('βœ… Conversation started');
72
+
73
+ } catch (error) {
74
+ console.error('Failed to start conversation:', error);
75
+ throw error;
76
  }
77
  }
78
 
79
+ stopConversation(): void {
80
+ this.cleanup();
 
 
 
 
81
  }
82
 
83
+ private setupSubscriptions(): void {
84
+ // Audio chunks from microphone
85
+ this.subscriptions.add(
86
+ this.audioService.audioChunk$.subscribe(chunk => {
87
+ if (!this.isInterrupting) {
88
+ this.wsService.sendAudioChunk(chunk.data);
89
+ }
90
+ })
91
+ );
92
+
93
+ // WebSocket messages
94
+ this.subscriptions.add(
95
+ this.wsService.message$.subscribe(message => {
96
+ this.handleMessage(message);
97
+ })
98
+ );
99
+
100
+ // Transcription updates
101
+ this.subscriptions.add(
102
+ this.wsService.transcription$.subscribe(result => {
103
+ if (!result.is_final) {
104
+ this.transcriptionSubject.next(result.text);
105
+ }
106
+ })
107
+ );
108
+
109
+ // State changes
110
+ this.subscriptions.add(
111
+ this.wsService.stateChange$.subscribe(change => {
112
+ this.currentStateSubject.next(change.to as ConversationState);
113
+ this.handleStateChange(change.from, change.to);
114
+ })
115
+ );
116
+
117
+ // Errors
118
+ this.subscriptions.add(
119
+ this.wsService.error$.subscribe(error => {
120
+ console.error('WebSocket error:', error);
121
+ this.addSystemMessage(`Hata: ${error}`);
122
+ })
123
+ );
124
  }
125
 
126
+ private handleMessage(message: any): void {
 
 
 
 
127
  switch (message.type) {
128
  case 'transcription':
129
+ if (message['is_final']) {
130
+ this.addMessage('user', message['text']);
131
+ this.transcriptionSubject.next('');
132
+ }
 
 
 
 
 
 
 
 
133
  break;
134
 
135
+ case 'assistant_response':
136
+ this.addMessage('assistant', message['text']);
137
  break;
138
 
139
  case 'tts_audio':
140
+ this.handleTTSAudio(message);
 
141
  break;
142
 
143
+ case 'control':
144
+ if (message['action'] === 'stop_playback') {
145
+ this.stopAudioPlayback();
146
+ }
147
  break;
148
  }
149
  }
150
 
151
+ private handleStateChange(from: string, to: string): void {
152
+ console.log(`πŸ“Š State: ${from} β†’ ${to}`);
153
+
154
+ // Handle state-specific logic
155
+ if (to === 'listening') {
156
+ // Clear transcription when starting to listen
157
+ this.transcriptionSubject.next('');
158
+ } else if (to === 'playing_audio') {
159
+ // Start playing accumulated audio
160
+ this.playQueuedAudio();
161
+ }
162
+ }
163
+
164
+ private handleTTSAudio(message: any): void {
165
+ // Accumulate audio chunks
166
+ this.audioQueue.push(message['data']);
167
 
168
+ if (message['is_last']) {
169
+ // All chunks received, create audio blob
170
+ const audioData = this.audioQueue.join('');
171
+ const audioBlob = this.base64ToBlob(audioData, 'audio/mpeg');
172
+ const audioUrl = URL.createObjectURL(audioBlob);
173
+
174
+ // Update last message with audio URL
175
+ const messages = this.messagesSubject.value;
176
+ if (messages.length > 0 && messages[messages.length - 1].role === 'assistant') {
177
+ messages[messages.length - 1].audioUrl = audioUrl;
178
+ this.messagesSubject.next([...messages]);
179
+ }
180
+
181
+ // Clear queue
182
+ this.audioQueue = [];
183
+ }
184
+ }
185
+
186
+ private playQueuedAudio(): void {
187
+ const messages = this.messagesSubject.value;
188
+ const lastMessage = messages[messages.length - 1];
189
+
190
+ if (lastMessage?.audioUrl && lastMessage.role === 'assistant') {
191
+ this.playAudio(lastMessage.audioUrl);
192
+ }
193
+ }
194
+
195
+ private playAudio(audioUrl: string): void {
196
+ if (!this.audioPlayer) {
197
+ this.audioPlayer = new Audio();
198
+ }
199
+
200
+ this.audioPlayer.src = audioUrl;
201
+ this.audioPlayer.play().catch(error => {
202
+ console.error('Audio playback error:', error);
203
+ });
204
+
205
+ this.audioPlayer.onended = () => {
206
+ // Notify that audio playback ended
207
+ this.wsService.sendControl('audio_ended');
208
+ this.currentStateSubject.next('idle');
209
+ };
210
+ }
211
+
212
+ private stopAudioPlayback(): void {
213
+ if (this.audioPlayer) {
214
+ this.audioPlayer.pause();
215
+ this.audioPlayer.currentTime = 0;
216
+ }
217
+ }
218
+
219
+ // Barge-in handling
220
+ performBargeIn(): void {
221
+ const currentState = this.currentStateSubject.value;
222
 
223
+ if (currentState !== 'idle' && currentState !== 'listening') {
224
+ console.log('πŸ›‘ Performing barge-in');
225
+ this.isInterrupting = true;
226
+
227
+ // Stop audio playback
228
+ this.stopAudioPlayback();
229
+
230
+ // Notify server
231
+ this.wsService.sendControl('interrupt', {
232
+ at_state: currentState
233
  });
234
+
235
+ // Reset interruption flag after a delay
236
+ setTimeout(() => {
237
+ this.isInterrupting = false;
238
+ }, 500);
239
+ }
240
  }
241
 
242
+ private addMessage(role: 'user' | 'assistant', text: string): void {
243
+ const messages = this.messagesSubject.value;
244
+ messages.push({
245
+ role,
246
+ text,
247
+ timestamp: new Date()
248
+ });
249
+ this.messagesSubject.next([...messages]);
 
250
  }
251
+
252
+ private addSystemMessage(text: string): void {
253
+ console.log(`πŸ“’ System: ${text}`);
254
+ }
255
+
256
+ private base64ToBlob(base64: string, mimeType: string): Blob {
257
+ const byteCharacters = atob(base64);
258
+ const byteNumbers = new Array(byteCharacters.length);
259
+
260
+ for (let i = 0; i < byteCharacters.length; i++) {
261
+ byteNumbers[i] = byteCharacters.charCodeAt(i);
262
  }
263
+
264
+ const byteArray = new Uint8Array(byteNumbers);
265
+ return new Blob([byteArray], { type: mimeType });
266
+ }
267
+
268
+ private cleanup(): void {
269
+ this.subscriptions.unsubscribe();
270
+ this.audioService.stopRecording();
271
+ this.wsService.disconnect();
272
+ this.stopAudioPlayback();
273
+ this.audioQueue = [];
274
+ this.currentStateSubject.next('idle');
275
+ console.log('🧹 Conversation cleaned up');
276
+ }
277
+
278
+ // Public methods for UI
279
+ getCurrentState(): ConversationState {
280
+ return this.currentStateSubject.value;
281
+ }
282
+
283
+ getMessages(): ConversationMessage[] {
284
+ return this.messagesSubject.value;
285
  }
286
 
287
+ clearMessages(): void {
288
+ this.messagesSubject.next([]);
289
  }
290
  }