Daniel Marques commited on
Commit
3532e4c
·
1 Parent(s): ef11c5a

fix: add websocket in handlerToken

Browse files
Files changed (1) hide show
  1. main.py +11 -23
main.py CHANGED
@@ -54,8 +54,6 @@ class MyCustomSyncHandler(BaseCallbackHandler):
54
  self.message += token
55
  redisClient.publish(f'{kwargs["tags"][0]}', self.message)
56
 
57
- print(self.message)
58
-
59
  LLM = load_model(device_type=DEVICE_TYPE, model_id=MODEL_ID, model_basename=MODEL_BASENAME, stream=True)
60
 
61
  prompt, memory = get_prompt_template(promptTemplate_type="llama", history=True)
@@ -71,6 +69,14 @@ QA = RetrievalQA.from_chain_type(
71
  },
72
  )
73
 
 
 
 
 
 
 
 
 
74
  app = FastAPI(title="homepage-app")
75
  api_app = FastAPI(title="api app")
76
 
@@ -123,7 +129,6 @@ def run_ingest_route():
123
  },
124
  )
125
 
126
-
127
  return {"response": "The training was successfully completed"}
128
  except Exception as e:
129
  raise HTTPException(status_code=500, detail=f"Error occurred: {str(e)}")
@@ -226,27 +231,10 @@ async def websocket_endpoint(websocket: WebSocket, client_id: int):
226
  while True:
227
  prompt = await websocket.receive_text()
228
 
229
- pubsub = redisClient.pubsub()
230
- pubsub.subscribe(f'{client_id}')
231
-
232
- for item in pubsub.listen():
233
- if item['type'] == 'message':
234
- await websocket.send_text(f'{item["data"]}')
235
- if item['type'] == 'subscribe':
236
- QA(
237
- inputs=prompt,
238
- return_only_outputs=True,
239
- callbacks=[MyCustomSyncHandler()],
240
- tags=f'{client_id}',
241
- include_run_info=True
242
- )
243
-
244
-
245
-
246
-
247
-
248
-
249
 
 
250
 
251
  except WebSocketDisconnect:
252
  print('disconnect')
 
54
  self.message += token
55
  redisClient.publish(f'{kwargs["tags"][0]}', self.message)
56
 
 
 
57
  LLM = load_model(device_type=DEVICE_TYPE, model_id=MODEL_ID, model_basename=MODEL_BASENAME, stream=True)
58
 
59
  prompt, memory = get_prompt_template(promptTemplate_type="llama", history=True)
 
69
  },
70
  )
71
 
72
+ async def pubsub_listener(websocket, client_id):
73
+ pubsub = redisClient.pubsub()
74
+ pubsub.subscribe(f'{client_id}')
75
+
76
+ while True:
77
+ message = pubsub.listen()
78
+ await websocket.send_text(f'{message}')
79
+
80
  app = FastAPI(title="homepage-app")
81
  api_app = FastAPI(title="api app")
82
 
 
129
  },
130
  )
131
 
 
132
  return {"response": "The training was successfully completed"}
133
  except Exception as e:
134
  raise HTTPException(status_code=500, detail=f"Error occurred: {str(e)}")
 
231
  while True:
232
  prompt = await websocket.receive_text()
233
 
234
+ pubsub_thread = threading.Thread(target=pubsub_listener(websocket, client_id))
235
+ pubsub_thread.start()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
236
 
237
+ QA(inputs=prompt, return_only_outputs=True, callbacks=[MyCustomSyncHandler()], tags=f'{client_id}', include_run_info=True)
238
 
239
  except WebSocketDisconnect:
240
  print('disconnect')