Spaces:
Running
Running
import json | |
from typing import Optional | |
from redis import StrictRedis | |
from .config import get_settings | |
from .models import DeployStatus | |
_settings = get_settings() | |
redis_client = StrictRedis.from_url( | |
_settings.redis_url, | |
decode_responses=True, | |
# 连接池配置 | |
max_connections=50, # 最大连接数 | |
retry_on_timeout=True, # 超时重试 | |
socket_timeout=15, # socket 超时 | |
socket_connect_timeout=15, # 连接超时 | |
health_check_interval=300, # 健康检查间隔 | |
) | |
class TaskStore: | |
prefix = "task:" | |
def _key(cls, task_id: str) -> str: | |
return f"{cls.prefix}{task_id}" | |
def save(cls, status: DeployStatus) -> None: | |
redis_client.hset( | |
cls._key(status.task_id), | |
mapping={ | |
"status": status.status, | |
"detail": json.dumps(status.detail) if status.detail else "", | |
}, | |
) | |
redis_client.expire(cls._key(status.task_id), 24 * 3600) | |
def load(cls, task_id: str) -> Optional[DeployStatus]: | |
data = redis_client.hgetall(cls._key(task_id)) | |
if not data: | |
return None | |
detail = json.loads(data.get("detail", "null")) if data.get("detail") else None | |
return DeployStatus(task_id=task_id, status=data.get("status", "UNKNOWN"), detail=detail) | |