ciyidogan commited on
Commit
c44ad84
Β·
verified Β·
1 Parent(s): c95cb32

Update websocket_handler.py

Browse files
Files changed (1) hide show
  1. websocket_handler.py +86 -48
websocket_handler.py CHANGED
@@ -620,6 +620,10 @@ async def handle_control_message(websocket: WebSocket, session: RealtimeSession,
620
  async def process_user_input(websocket: WebSocket, session: RealtimeSession):
621
  """Process complete user input"""
622
  try:
 
 
 
 
623
  user_text = session.current_transcription
624
  if not user_text:
625
  log_warning(f"⚠️ Empty transcription, resetting", session_id=session.session.session_id)
@@ -632,20 +636,22 @@ async def process_user_input(websocket: WebSocket, session: RealtimeSession):
632
  # State zaten PROCESSING_STT olarak set edildi, direkt devam et
633
 
634
  # Send final transcription
635
- await websocket.send_json({
636
- "type": "transcription",
637
- "text": user_text,
638
- "is_final": True,
639
- "confidence": 0.95
640
- })
 
641
 
642
  # State: LLM Processing
643
  await session.change_state(ConversationState.PROCESSING_LLM)
644
- await websocket.send_json({
645
- "type": "state_change",
646
- "from": "processing_stt",
647
- "to": "processing_llm"
648
- })
 
649
 
650
  # Add to chat history
651
  session.session.add_message("user", user_text)
@@ -664,20 +670,26 @@ async def process_user_input(websocket: WebSocket, session: RealtimeSession):
664
  session.session.add_message("assistant", response_text)
665
 
666
  # Send text response
667
- await websocket.send_json({
668
- "type": "assistant_response",
669
- "text": response_text
670
- })
 
671
 
672
  # Generate TTS if enabled
673
  tts_provider = TTSFactory.create_provider()
674
- if tts_provider:
 
 
675
  await session.change_state(ConversationState.PROCESSING_TTS)
676
- await websocket.send_json({
677
- "type": "state_change",
678
- "from": "processing_llm",
679
- "to": "processing_tts"
680
- })
 
 
 
681
 
682
  # Generate TTS with barge-in support
683
  tts_task = await session.barge_in_handler.start_tts_task(
@@ -689,13 +701,15 @@ async def process_user_input(websocket: WebSocket, session: RealtimeSession):
689
  except asyncio.CancelledError:
690
  log_info("⚑ TTS cancelled due to barge-in", session_id=session.session.session_id)
691
  else:
 
692
  # No TTS, go back to idle
693
  await session.change_state(ConversationState.IDLE)
694
- await websocket.send_json({
695
- "type": "state_change",
696
- "from": "processing_llm",
697
- "to": "idle"
698
- })
 
699
 
700
  except Exception as e:
701
  log_error(
@@ -704,10 +718,11 @@ async def process_user_input(websocket: WebSocket, session: RealtimeSession):
704
  traceback=traceback.format_exc(),
705
  session_id=session.session.session_id
706
  )
707
- await websocket.send_json({
708
- "type": "error",
709
- "message": f"Processing error: {str(e)}"
710
- })
 
711
  await session.reset_for_new_utterance()
712
  await session.change_state(ConversationState.IDLE)
713
 
@@ -721,17 +736,33 @@ async def generate_and_stream_tts(
721
  try:
722
  log_info(f"🎀 Starting TTS generation for text: '{text[:50]}...'", session_id=session.session.session_id)
723
 
 
 
 
 
 
 
 
 
 
 
724
  # Generate audio
725
- audio_data = await tts_provider.synthesize(text)
726
  log_info(f"βœ… TTS generated: {len(audio_data)} bytes, type: {type(audio_data)}", session_id=session.session.session_id)
727
 
 
 
 
 
 
728
  # Change state to playing
729
  await session.change_state(ConversationState.PLAYING_AUDIO)
730
- await websocket.send_json({
731
- "type": "state_change",
732
- "from": "processing_tts",
733
- "to": "playing_audio"
734
- })
 
735
 
736
  # Convert entire audio to base64 for transmission
737
  import base64
@@ -754,6 +785,11 @@ async def generate_and_stream_tts(
754
  if asyncio.current_task().cancelled():
755
  log_info(f"⚑ Streaming cancelled at chunk {i//chunk_size}", session_id=session.session.session_id)
756
  break
 
 
 
 
 
757
 
758
  chunk = audio_base64[i:i + chunk_size]
759
  chunk_index = i // chunk_size
@@ -761,14 +797,15 @@ async def generate_and_stream_tts(
761
 
762
  log_debug(f"πŸ“¨ Sending chunk {chunk_index}/{total_chunks}, size: {len(chunk)}, is_last: {is_last}")
763
 
764
- await websocket.send_json({
765
- "type": "tts_audio",
766
- "data": chunk,
767
- "chunk_index": chunk_index,
768
- "total_chunks": total_chunks,
769
- "is_last": is_last,
770
- "mime_type": "audio/mpeg"
771
- })
 
772
 
773
  # Small delay to prevent overwhelming the client
774
  await asyncio.sleep(0.01)
@@ -791,7 +828,8 @@ async def generate_and_stream_tts(
791
  traceback=traceback.format_exc(),
792
  session_id=session.session.session_id
793
  )
794
- await websocket.send_json({
795
- "type": "error",
796
- "message": f"TTS error: {str(e)}"
797
- })
 
 
620
  async def process_user_input(websocket: WebSocket, session: RealtimeSession):
621
  """Process complete user input"""
622
  try:
623
+ # WebSocket aktif mi kontrol et
624
+ if not session.is_websocket_active:
625
+ return
626
+
627
  user_text = session.current_transcription
628
  if not user_text:
629
  log_warning(f"⚠️ Empty transcription, resetting", session_id=session.session.session_id)
 
636
  # State zaten PROCESSING_STT olarak set edildi, direkt devam et
637
 
638
  # Send final transcription
639
+ if session.is_websocket_active:
640
+ await websocket.send_json({
641
+ "type": "transcription",
642
+ "text": user_text,
643
+ "is_final": True,
644
+ "confidence": 0.95
645
+ })
646
 
647
  # State: LLM Processing
648
  await session.change_state(ConversationState.PROCESSING_LLM)
649
+ if session.is_websocket_active:
650
+ await websocket.send_json({
651
+ "type": "state_change",
652
+ "from": "processing_stt",
653
+ "to": "processing_llm"
654
+ })
655
 
656
  # Add to chat history
657
  session.session.add_message("user", user_text)
 
670
  session.session.add_message("assistant", response_text)
671
 
672
  # Send text response
673
+ if session.is_websocket_active:
674
+ await websocket.send_json({
675
+ "type": "assistant_response",
676
+ "text": response_text
677
+ })
678
 
679
  # Generate TTS if enabled
680
  tts_provider = TTSFactory.create_provider()
681
+ log_info(f"πŸ” TTS provider check: {tts_provider is not None}", session_id=session.session.session_id)
682
+
683
+ if tts_provider and session.is_websocket_active:
684
  await session.change_state(ConversationState.PROCESSING_TTS)
685
+ if session.is_websocket_active:
686
+ await websocket.send_json({
687
+ "type": "state_change",
688
+ "from": "processing_llm",
689
+ "to": "processing_tts"
690
+ })
691
+
692
+ log_info(f"🎡 Starting TTS generation for response", session_id=session.session.session_id)
693
 
694
  # Generate TTS with barge-in support
695
  tts_task = await session.barge_in_handler.start_tts_task(
 
701
  except asyncio.CancelledError:
702
  log_info("⚑ TTS cancelled due to barge-in", session_id=session.session.session_id)
703
  else:
704
+ log_info(f"⚠️ No TTS provider or WebSocket inactive, skipping TTS", session_id=session.session.session_id)
705
  # No TTS, go back to idle
706
  await session.change_state(ConversationState.IDLE)
707
+ if session.is_websocket_active:
708
+ await websocket.send_json({
709
+ "type": "state_change",
710
+ "from": "processing_llm",
711
+ "to": "idle"
712
+ })
713
 
714
  except Exception as e:
715
  log_error(
 
718
  traceback=traceback.format_exc(),
719
  session_id=session.session.session_id
720
  )
721
+ if session.is_websocket_active:
722
+ await websocket.send_json({
723
+ "type": "error",
724
+ "message": f"Processing error: {str(e)}"
725
+ })
726
  await session.reset_for_new_utterance()
727
  await session.change_state(ConversationState.IDLE)
728
 
 
736
  try:
737
  log_info(f"🎀 Starting TTS generation for text: '{text[:50]}...'", session_id=session.session.session_id)
738
 
739
+ # TTS preprocessor kullan
740
+ from tts_preprocessor import TTSPreprocessor
741
+ preprocessor = TTSPreprocessor(language=session.session.locale)
742
+ processed_text = preprocessor.preprocess(
743
+ text,
744
+ tts_provider.get_preprocessing_flags()
745
+ )
746
+
747
+ log_debug(f"πŸ“ Preprocessed text: '{processed_text[:50]}...'", session_id=session.session.session_id)
748
+
749
  # Generate audio
750
+ audio_data = await tts_provider.synthesize(processed_text)
751
  log_info(f"βœ… TTS generated: {len(audio_data)} bytes, type: {type(audio_data)}", session_id=session.session.session_id)
752
 
753
+ # WebSocket aktif mi kontrol et
754
+ if not session.is_websocket_active:
755
+ log_warning(f"⚠️ WebSocket inactive, skipping TTS streaming", session_id=session.session.session_id)
756
+ return
757
+
758
  # Change state to playing
759
  await session.change_state(ConversationState.PLAYING_AUDIO)
760
+ if session.is_websocket_active:
761
+ await websocket.send_json({
762
+ "type": "state_change",
763
+ "from": "processing_tts",
764
+ "to": "playing_audio"
765
+ })
766
 
767
  # Convert entire audio to base64 for transmission
768
  import base64
 
785
  if asyncio.current_task().cancelled():
786
  log_info(f"⚑ Streaming cancelled at chunk {i//chunk_size}", session_id=session.session.session_id)
787
  break
788
+
789
+ # WebSocket aktif mi kontrol et
790
+ if not session.is_websocket_active:
791
+ log_warning(f"⚠️ WebSocket inactive during streaming, stopping", session_id=session.session.session_id)
792
+ break
793
 
794
  chunk = audio_base64[i:i + chunk_size]
795
  chunk_index = i // chunk_size
 
797
 
798
  log_debug(f"πŸ“¨ Sending chunk {chunk_index}/{total_chunks}, size: {len(chunk)}, is_last: {is_last}")
799
 
800
+ if session.is_websocket_active:
801
+ await websocket.send_json({
802
+ "type": "tts_audio",
803
+ "data": chunk,
804
+ "chunk_index": chunk_index,
805
+ "total_chunks": total_chunks,
806
+ "is_last": is_last,
807
+ "mime_type": "audio/mpeg"
808
+ })
809
 
810
  # Small delay to prevent overwhelming the client
811
  await asyncio.sleep(0.01)
 
828
  traceback=traceback.format_exc(),
829
  session_id=session.session.session_id
830
  )
831
+ if session.is_websocket_active:
832
+ await websocket.send_json({
833
+ "type": "error",
834
+ "message": f"TTS error: {str(e)}"
835
+ })