manga-reader / tools /aqueue.py
taslim19
drag
4051191
import asyncio
from typing import List, Tuple, Any, Set
class AQueue:
def __init__(self, maxsize=None):
self._queue = [] # type: List[Tuple[Any, int]]
self._mask = set() # type: Set[int]
self._put_lock = asyncio.Lock()
self._get_lock = asyncio.Lock()
self._not_empty = asyncio.Event()
async def put(self, item: Any, lock: int):
async with self._put_lock:
self._queue.append((item, lock))
if lock not in self._mask:
self._not_empty.set()
async def get(self, worker_id):
async with self._get_lock:
await self._not_empty.wait()
available = [i for i, (_, i_lock) in enumerate(self._queue) if i_lock not in self._mask]
item, lock = self._queue.pop(available[0])
self.acquire(lock)
available = [i for i, (_, i_lock) in enumerate(self._queue) if i_lock not in self._mask]
if not available:
self._not_empty.clear()
return item, lock
def acquire(self, lock: int):
self._mask.add(lock)
def release(self, lock: int):
self._mask.remove(lock)
for _, i_lock in self._queue:
if i_lock == lock:
self._not_empty.set()
break
def qsize(self):
return len(self._queue)
def empty(self):
return not self._queue