|
from __future__ import annotations |
|
from functools import wraps |
|
from typing import TYPE_CHECKING, Any, TypeVar |
|
|
|
from bot.cache.serialization import AbstractSerializer, PickleSerializer |
|
from bot.core.loader import redis_client |
|
|
|
if TYPE_CHECKING: |
|
from collections.abc import Awaitable |
|
from datetime import timedelta |
|
from typing import Callable |
|
|
|
from redis.asyncio import Redis |
|
|
|
|
|
DEFAULT_TTL = 10 |
|
|
|
_Func = TypeVar("_Func") |
|
Args = str | int |
|
Kwargs = Any |
|
|
|
|
|
def build_key(*args: Args, **kwargs: Kwargs) -> str: |
|
"""Build a string key based on provided arguments and keyword arguments.""" |
|
args_str = ":".join(map(str, args)) |
|
kwargs_str = ":".join(f"{key}={value}" for key, value in sorted(kwargs.items())) |
|
return f"{args_str}:{kwargs_str}" |
|
|
|
|
|
async def set_redis_value( |
|
key: bytes | str, |
|
value: bytes | str, |
|
ttl: int | timedelta | None = DEFAULT_TTL, |
|
is_transaction: bool = False, |
|
) -> None: |
|
"""Set a value in Redis with an optional time-to-live (TTL).""" |
|
async with redis_client.pipeline(transaction=is_transaction) as pipeline: |
|
await pipeline.set(key, value) |
|
if ttl: |
|
await pipeline.expire(key, ttl) |
|
|
|
await pipeline.execute() |
|
|
|
|
|
def cached( |
|
ttl: int | timedelta = DEFAULT_TTL, |
|
namespace: str = "main", |
|
cache: Redis = redis_client, |
|
key_builder: Callable[..., str] = build_key, |
|
serializer: AbstractSerializer | None = None, |
|
) -> Callable[[Callable[..., Awaitable[_Func]]], Callable[..., Awaitable[_Func]]]: |
|
"""Caches the function's return value into a key generated with module_name, function_name, and args. |
|
|
|
Args: |
|
ttl (int | timedelta): Time-to-live for the cached value. |
|
namespace (str): Namespace for cache keys. |
|
cache (Redis): Redis instance for storing cached data. |
|
key_builder (Callable[..., str]): Function to build cache keys. |
|
serializer (AbstractSerializer | None): Serializer for cache data. |
|
|
|
Returns: |
|
Callable: A decorator that wraps the original function with caching logic. |
|
|
|
""" |
|
if serializer is None: |
|
serializer = PickleSerializer() |
|
|
|
def decorator(func: Callable[..., Awaitable[_Func]]) -> Callable[..., Awaitable[_Func]]: |
|
@wraps(func) |
|
async def wrapper(*args: Args, **kwargs: Kwargs) -> Any: |
|
key = key_builder(*args, **kwargs) |
|
key = f"{namespace}:{func.__module__}:{func.__name__}:{key}" |
|
|
|
|
|
cached_value = await cache.get(key) |
|
if cached_value is not None: |
|
return serializer.deserialize(cached_value) |
|
|
|
|
|
result = await func(*args, **kwargs) |
|
|
|
|
|
await set_redis_value( |
|
key=key, |
|
value=serializer.serialize(result), |
|
ttl=ttl, |
|
) |
|
|
|
return result |
|
|
|
return wrapper |
|
|
|
return decorator |
|
|
|
|
|
async def clear_cache( |
|
func: Callable[..., Awaitable[Any]], |
|
*args: Args, |
|
**kwargs: Kwargs, |
|
) -> None: |
|
"""Clear the cache for a specific function and arguments. |
|
|
|
Parameters |
|
---------- |
|
- func (Callable): The target function for which the cache needs to be cleared. |
|
- args (Args): Positional arguments passed to the function. |
|
- kwargs (Kwargs): Keyword arguments passed to the function. |
|
|
|
Keyword Arguments: |
|
- namespace (str, optional): A string indicating the namespace for the cache. Defaults to "main". |
|
|
|
""" |
|
namespace = kwargs.get("namespace", "main") |
|
|
|
key = build_key(*args, **kwargs) |
|
key = f"{namespace}:{func.__module__}:{func.__name__}:{key}" |
|
|
|
await redis_client.delete(key) |
|
|