File size: 4,111 Bytes
ae5dc27 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
import redis.asyncio as redis
import os
from dotenv import load_dotenv
import asyncio
from utils.logger import logger
from typing import List, Any
# Redis client
client = None
_initialized = False
_init_lock = asyncio.Lock()
# Constants
REDIS_KEY_TTL = 3600 * 24 # 24 hour TTL as safety mechanism
def initialize():
"""Initialize Redis connection using environment variables."""
global client
# Load environment variables if not already loaded
load_dotenv()
# Get Redis configuration
redis_host = os.getenv('REDIS_HOST', 'redis')
redis_port = int(os.getenv('REDIS_PORT', 6379))
redis_password = os.getenv('REDIS_PASSWORD', '')
# Convert string 'True'/'False' to boolean
redis_ssl_str = os.getenv('REDIS_SSL', 'False')
redis_ssl = redis_ssl_str.lower() == 'true'
logger.info(f"Initializing Redis connection to {redis_host}:{redis_port}")
# Create Redis client with basic configuration
client = redis.Redis(
host=redis_host,
port=redis_port,
password=redis_password,
ssl=redis_ssl,
decode_responses=True,
socket_timeout=5.0,
socket_connect_timeout=5.0,
retry_on_timeout=True,
health_check_interval=30
)
return client
async def initialize_async():
"""Initialize Redis connection asynchronously."""
global client, _initialized
async with _init_lock:
if not _initialized:
logger.info("Initializing Redis connection")
initialize()
try:
await client.ping()
logger.info("Successfully connected to Redis")
_initialized = True
except Exception as e:
logger.error(f"Failed to connect to Redis: {e}")
client = None
raise
return client
async def close():
"""Close Redis connection."""
global client, _initialized
if client:
logger.info("Closing Redis connection")
await client.aclose()
client = None
_initialized = False
logger.info("Redis connection closed")
async def get_client():
"""Get the Redis client, initializing if necessary."""
global client, _initialized
if client is None or not _initialized:
await initialize_async()
return client
# Basic Redis operations
async def set(key: str, value: str, ex: int = None):
"""Set a Redis key."""
redis_client = await get_client()
return await redis_client.set(key, value, ex=ex)
async def get(key: str, default: str = None):
"""Get a Redis key."""
redis_client = await get_client()
result = await redis_client.get(key)
return result if result is not None else default
async def delete(key: str):
"""Delete a Redis key."""
redis_client = await get_client()
return await redis_client.delete(key)
async def publish(channel: str, message: str):
"""Publish a message to a Redis channel."""
redis_client = await get_client()
return await redis_client.publish(channel, message)
async def create_pubsub():
"""Create a Redis pubsub object."""
redis_client = await get_client()
return redis_client.pubsub()
# List operations
async def rpush(key: str, *values: Any):
"""Append one or more values to a list."""
redis_client = await get_client()
return await redis_client.rpush(key, *values)
async def lrange(key: str, start: int, end: int) -> List[str]:
"""Get a range of elements from a list."""
redis_client = await get_client()
return await redis_client.lrange(key, start, end)
async def llen(key: str) -> int:
"""Get the length of a list."""
redis_client = await get_client()
return await redis_client.llen(key)
# Key management
async def expire(key: str, time: int):
"""Set a key's time to live in seconds."""
redis_client = await get_client()
return await redis_client.expire(key, time)
async def keys(pattern: str) -> List[str]:
"""Get keys matching a pattern."""
redis_client = await get_client()
return await redis_client.keys(pattern) |