Spaces:
Running
Running
| from dataclasses import dataclass | |
| import asyncio | |
| import numpy as np | |
| class UnlimitedSemaphore: | |
| """A context manager that allows unlimited access.""" | |
| async def __aenter__(self): | |
| pass | |
| async def __aexit__(self, exc_type, exc, tb): | |
| pass | |
| class EmbeddingFunc: | |
| embedding_dim: int | |
| max_token_size: int | |
| func: callable | |
| concurrent_limit: int = 16 | |
| def __post_init__(self): | |
| if self.concurrent_limit != 0: | |
| self._semaphore = asyncio.Semaphore(self.concurrent_limit) | |
| else: | |
| self._semaphore = UnlimitedSemaphore() | |
| async def __call__(self, *args, **kwargs) -> np.ndarray: | |
| async with self._semaphore: | |
| return await self.func(*args, **kwargs) | |