Spaces:
Runtime error
Runtime error
| from typing import Dict | |
| from uuid import UUID | |
| import asyncio | |
| from fastapi import WebSocket | |
| from types import SimpleNamespace | |
| from typing import Dict | |
| from typing import Union | |
| UserDataContent = Dict[UUID, Dict[str, Union[WebSocket, asyncio.Queue]]] | |
| class UserData: | |
| def __init__(self): | |
| self.data_content: Dict[UUID, UserDataContent] = {} | |
| async def create_user(self, user_id: UUID, websocket: WebSocket): | |
| self.data_content[user_id] = { | |
| "websocket": websocket, | |
| "queue": asyncio.Queue(), | |
| } | |
| await asyncio.sleep(1) | |
| def check_user(self, user_id: UUID) -> bool: | |
| return user_id in self.data_content | |
| async def update_data(self, user_id: UUID, new_data: SimpleNamespace): | |
| user_session = self.data_content[user_id] | |
| queue = user_session["queue"] | |
| while not queue.empty(): | |
| try: | |
| queue.get_nowait() | |
| except asyncio.QueueEmpty: | |
| continue | |
| await queue.put(new_data) | |
| async def get_latest_data(self, user_id: UUID) -> SimpleNamespace: | |
| user_session = self.data_content[user_id] | |
| queue = user_session["queue"] | |
| try: | |
| return await queue.get() | |
| except asyncio.QueueEmpty: | |
| return None | |
| def delete_user(self, user_id: UUID): | |
| user_session = self.data_content[user_id] | |
| queue = user_session["queue"] | |
| while not queue.empty(): | |
| try: | |
| queue.get_nowait() | |
| except asyncio.QueueEmpty: | |
| continue | |
| if user_id in self.data_content: | |
| del self.data_content[user_id] | |
| def get_user_count(self) -> int: | |
| return len(self.data_content) | |
| def get_websocket(self, user_id: UUID) -> WebSocket: | |
| return self.data_content[user_id]["websocket"] | |
| user_data = UserData() | |