Spaces:
Running
Running
import asyncio | |
from typing import List, Tuple, Any, Set | |
class AQueue: | |
def __init__(self, maxsize=None): | |
self._queue = [] # type: List[Tuple[Any, int]] | |
self._mask = set() # type: Set[int] | |
self._put_lock = asyncio.Lock() | |
self._get_lock = asyncio.Lock() | |
self._not_empty = asyncio.Event() | |
async def put(self, item: Any, lock: int): | |
async with self._put_lock: | |
self._queue.append((item, lock)) | |
if lock not in self._mask: | |
self._not_empty.set() | |
async def get(self, worker_id): | |
async with self._get_lock: | |
await self._not_empty.wait() | |
available = [i for i, (_, i_lock) in enumerate(self._queue) if i_lock not in self._mask] | |
item, lock = self._queue.pop(available[0]) | |
self.acquire(lock) | |
available = [i for i, (_, i_lock) in enumerate(self._queue) if i_lock not in self._mask] | |
if not available: | |
self._not_empty.clear() | |
return item, lock | |
def acquire(self, lock: int): | |
self._mask.add(lock) | |
def release(self, lock: int): | |
self._mask.remove(lock) | |
for _, i_lock in self._queue: | |
if i_lock == lock: | |
self._not_empty.set() | |
break | |
def qsize(self): | |
return len(self._queue) | |
def empty(self): | |
return not self._queue | |