AshDavid12 commited on
Commit
372483f
·
1 Parent(s): 4c42c49

added send ping in server

Browse files
Files changed (2) hide show
  1. client.py +13 -21
  2. infer.py +14 -1
client.py CHANGED
@@ -2,21 +2,19 @@ import asyncio
2
  import websockets
3
  import requests
4
  import ssl
5
- import logging
6
 
7
  # Parameters for reading and sending the audio
8
- AUDIO_FILE_URL = "https://raw.githubusercontent.com/AshDavid12/runpod-serverless-forked/main/test_hebrew.wav" # Use WAV file
9
-
10
-
11
  async def send_audio(websocket):
12
- buffer_size = 1024 # Buffer audio chunks up to 512KB before sending
13
  audio_buffer = bytearray()
14
 
15
  with requests.get(AUDIO_FILE_URL, stream=True, allow_redirects=False) as response:
16
  if response.status_code == 200:
17
  print("Starting to stream audio file...")
18
 
19
- for chunk in response.iter_content(chunk_size=1024): # Stream in chunks
20
  if chunk:
21
  audio_buffer.extend(chunk)
22
  #print(f"Received audio chunk of size {len(chunk)} bytes.")
@@ -24,30 +22,26 @@ async def send_audio(websocket):
24
  # Send buffered audio data once it's large enough
25
  if len(audio_buffer) >= buffer_size:
26
  await websocket.send(audio_buffer)
27
- #print(f"Sent {len(audio_buffer)} bytes of audio data.")
28
  audio_buffer.clear()
29
- await asyncio.sleep(0.01)
30
 
31
  print("Finished sending audio.")
32
  else:
33
  print(f"Failed to download audio file. Status code: {response.status_code}")
34
 
35
-
36
  async def receive_transcription(websocket):
37
  while True:
38
  try:
39
- transcription = await websocket.recv() # Receive transcription from the server
40
- # new_segments = process_transcription_results(transcription)
41
- # # Now handle only new segments
42
- # if new_segments:
43
- # for segment in new_segments:
44
- # print(f"New Segment: {segment['text']}")
45
  print(f"Transcription: {transcription}")
46
  except Exception as e:
47
  print(f"Error receiving transcription: {e}")
 
48
  break
49
 
50
-
51
  async def send_heartbeat(websocket):
52
  while True:
53
  try:
@@ -56,7 +50,7 @@ async def send_heartbeat(websocket):
56
  except websockets.ConnectionClosed:
57
  print("Connection closed, stopping heartbeat")
58
  break
59
- await asyncio.sleep(30) # Send ping every 30 seconds (adjust as needed)
60
 
61
 
62
  async def run_client():
@@ -65,13 +59,11 @@ async def run_client():
65
  ssl_context.check_hostname = False
66
  ssl_context.verify_mode = ssl.CERT_NONE
67
 
68
- async with websockets.connect(uri, ssl=ssl_context, timeout=120) as websocket:
69
- print(f"here")
70
  await asyncio.gather(
71
  send_audio(websocket),
72
  receive_transcription(websocket),
73
  send_heartbeat(websocket)
74
  )
75
 
76
-
77
- asyncio.run(run_client())
 
2
  import websockets
3
  import requests
4
  import ssl
 
5
 
6
  # Parameters for reading and sending the audio
7
+ #AUDIO_FILE_URL = "https://raw.githubusercontent.com/AshDavid12/runpod-serverless-forked/main/test_hebrew.wav" # Use WAV file
8
+ AUDIO_FILE_URL = "https://raw.githubusercontent.com/AshDavid12/hugging_face_ivrit_streaming/main/long_hebrew.wav"
 
9
  async def send_audio(websocket):
10
+ buffer_size = 512*1024 #HAVE TO HAVE 512!!
11
  audio_buffer = bytearray()
12
 
13
  with requests.get(AUDIO_FILE_URL, stream=True, allow_redirects=False) as response:
14
  if response.status_code == 200:
15
  print("Starting to stream audio file...")
16
 
17
+ for chunk in response.iter_content(chunk_size=512): # Stream in chunks
18
  if chunk:
19
  audio_buffer.extend(chunk)
20
  #print(f"Received audio chunk of size {len(chunk)} bytes.")
 
22
  # Send buffered audio data once it's large enough
23
  if len(audio_buffer) >= buffer_size:
24
  await websocket.send(audio_buffer)
25
+ print(f"Sent {len(audio_buffer)} bytes of audio data.")
26
  audio_buffer.clear()
27
+ await asyncio.sleep(0.001)
28
 
29
  print("Finished sending audio.")
30
  else:
31
  print(f"Failed to download audio file. Status code: {response.status_code}")
32
 
 
33
  async def receive_transcription(websocket):
34
  while True:
35
  try:
36
+
37
+ transcription = await asyncio.wait_for(websocket.recv(),timeout=300)
38
+ # Receive transcription from the server
 
 
 
39
  print(f"Transcription: {transcription}")
40
  except Exception as e:
41
  print(f"Error receiving transcription: {e}")
42
+ await asyncio.sleep(30)
43
  break
44
 
 
45
  async def send_heartbeat(websocket):
46
  while True:
47
  try:
 
50
  except websockets.ConnectionClosed:
51
  print("Connection closed, stopping heartbeat")
52
  break
53
+ await asyncio.sleep(600) # Send ping every 30 seconds (adjust as needed)
54
 
55
 
56
  async def run_client():
 
59
  ssl_context.check_hostname = False
60
  ssl_context.verify_mode = ssl.CERT_NONE
61
 
62
+ async with websockets.connect(uri, ssl=ssl_context, ping_timeout=120,ping_interval=10) as websocket:
 
63
  await asyncio.gather(
64
  send_audio(websocket),
65
  receive_transcription(websocket),
66
  send_heartbeat(websocket)
67
  )
68
 
69
+ asyncio.run(run_client())
 
infer.py CHANGED
@@ -176,7 +176,7 @@ def transcribe_core_ws(audio_file, last_transcribed_time):
176
  ret['new_segments'].append(seg)
177
 
178
  # Update the last transcribed time to the end of the current segment
179
- new_last_transcribed_time = max(new_last_transcribed_time, s.end)
180
  logging.debug(f"Updated last transcribed time to: {new_last_transcribed_time} seconds")
181
 
182
  #logging.info(f"Returning {len(ret['new_segments'])} new segments and updated last transcribed time.")
@@ -192,6 +192,19 @@ async def websocket_transcribe(websocket: WebSocket):
192
  await websocket.accept()
193
  logging.info("WebSocket connection established successfully.")
194
 
 
 
 
 
 
 
 
 
 
 
 
 
 
195
  try:
196
  processed_segments = [] # Keeps track of the segments already transcribed
197
  accumulated_audio_size = 0 # Track how much audio data has been buffered
 
176
  ret['new_segments'].append(seg)
177
 
178
  # Update the last transcribed time to the end of the current segment
179
+ new_last_transcribed_time = s.end
180
  logging.debug(f"Updated last transcribed time to: {new_last_transcribed_time} seconds")
181
 
182
  #logging.info(f"Returning {len(ret['new_segments'])} new segments and updated last transcribed time.")
 
192
  await websocket.accept()
193
  logging.info("WebSocket connection established successfully.")
194
 
195
+ async def send_ping():
196
+ """Function to send periodic ping to keep the connection alive."""
197
+ while True:
198
+ try:
199
+ await websocket.ping()
200
+ logging.info("Sent keepalive ping to client.")
201
+ await asyncio.sleep(10) # Ping every 10 seconds (adjust the interval as needed)
202
+ except Exception as e:
203
+ logging.error(f"Error sending ping: {e}")
204
+ break
205
+
206
+ ping_task = asyncio.create_task(send_ping())
207
+
208
  try:
209
  processed_segments = [] # Keeps track of the segments already transcribed
210
  accumulated_audio_size = 0 # Track how much audio data has been buffered