|
import redis.asyncio as redis |
|
import os |
|
from dotenv import load_dotenv |
|
import asyncio |
|
from utils.logger import logger |
|
from typing import List, Any |
|
|
|
|
|
client = None |
|
_initialized = False |
|
_init_lock = asyncio.Lock() |
|
|
|
|
|
REDIS_KEY_TTL = 3600 * 24 |
|
|
|
|
|
def initialize(): |
|
"""Initialize Redis connection using environment variables.""" |
|
global client |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
redis_host = os.getenv('REDIS_HOST', 'redis') |
|
redis_port = int(os.getenv('REDIS_PORT', 6379)) |
|
redis_password = os.getenv('REDIS_PASSWORD', '') |
|
|
|
redis_ssl_str = os.getenv('REDIS_SSL', 'False') |
|
redis_ssl = redis_ssl_str.lower() == 'true' |
|
|
|
logger.info(f"Initializing Redis connection to {redis_host}:{redis_port}") |
|
|
|
|
|
client = redis.Redis( |
|
host=redis_host, |
|
port=redis_port, |
|
password=redis_password, |
|
ssl=redis_ssl, |
|
decode_responses=True, |
|
socket_timeout=5.0, |
|
socket_connect_timeout=5.0, |
|
retry_on_timeout=True, |
|
health_check_interval=30 |
|
) |
|
|
|
return client |
|
|
|
|
|
async def initialize_async(): |
|
"""Initialize Redis connection asynchronously.""" |
|
global client, _initialized |
|
|
|
async with _init_lock: |
|
if not _initialized: |
|
logger.info("Initializing Redis connection") |
|
initialize() |
|
|
|
try: |
|
await client.ping() |
|
logger.info("Successfully connected to Redis") |
|
_initialized = True |
|
except Exception as e: |
|
logger.error(f"Failed to connect to Redis: {e}") |
|
client = None |
|
raise |
|
|
|
return client |
|
|
|
|
|
async def close(): |
|
"""Close Redis connection.""" |
|
global client, _initialized |
|
if client: |
|
logger.info("Closing Redis connection") |
|
await client.aclose() |
|
client = None |
|
_initialized = False |
|
logger.info("Redis connection closed") |
|
|
|
|
|
async def get_client(): |
|
"""Get the Redis client, initializing if necessary.""" |
|
global client, _initialized |
|
if client is None or not _initialized: |
|
await initialize_async() |
|
return client |
|
|
|
|
|
|
|
async def set(key: str, value: str, ex: int = None): |
|
"""Set a Redis key.""" |
|
redis_client = await get_client() |
|
return await redis_client.set(key, value, ex=ex) |
|
|
|
|
|
async def get(key: str, default: str = None): |
|
"""Get a Redis key.""" |
|
redis_client = await get_client() |
|
result = await redis_client.get(key) |
|
return result if result is not None else default |
|
|
|
|
|
async def delete(key: str): |
|
"""Delete a Redis key.""" |
|
redis_client = await get_client() |
|
return await redis_client.delete(key) |
|
|
|
|
|
async def publish(channel: str, message: str): |
|
"""Publish a message to a Redis channel.""" |
|
redis_client = await get_client() |
|
return await redis_client.publish(channel, message) |
|
|
|
|
|
async def create_pubsub(): |
|
"""Create a Redis pubsub object.""" |
|
redis_client = await get_client() |
|
return redis_client.pubsub() |
|
|
|
|
|
|
|
async def rpush(key: str, *values: Any): |
|
"""Append one or more values to a list.""" |
|
redis_client = await get_client() |
|
return await redis_client.rpush(key, *values) |
|
|
|
|
|
async def lrange(key: str, start: int, end: int) -> List[str]: |
|
"""Get a range of elements from a list.""" |
|
redis_client = await get_client() |
|
return await redis_client.lrange(key, start, end) |
|
|
|
|
|
async def llen(key: str) -> int: |
|
"""Get the length of a list.""" |
|
redis_client = await get_client() |
|
return await redis_client.llen(key) |
|
|
|
|
|
|
|
async def expire(key: str, time: int): |
|
"""Set a key's time to live in seconds.""" |
|
redis_client = await get_client() |
|
return await redis_client.expire(key, time) |
|
|
|
|
|
async def keys(pattern: str) -> List[str]: |
|
"""Get keys matching a pattern.""" |
|
redis_client = await get_client() |
|
return await redis_client.keys(pattern) |