# app/service/stats_service.py import datetime from typing import Union from sqlalchemy import and_, case, func, or_, select from app.database.connection import database from app.database.models import RequestLog from app.log.logger import get_stats_logger logger = get_stats_logger() class StatsService: """Service class for handling statistics related operations.""" async def get_calls_in_last_seconds(self, seconds: int) -> dict[str, int]: """获取过去 N 秒内的调用次数 (总数、成功、失败)""" try: cutoff_time = datetime.datetime.now() - datetime.timedelta(seconds=seconds) query = select( func.count(RequestLog.id).label("total"), func.sum( case( ( and_( RequestLog.status_code >= 200, RequestLog.status_code < 300, ), 1, ), else_=0, ) ).label("success"), func.sum( case( ( or_( RequestLog.status_code < 200, RequestLog.status_code >= 300, ), 1, ), (RequestLog.status_code is None, 1), else_=0, ) ).label("failure"), ).where(RequestLog.request_time >= cutoff_time) result = await database.fetch_one(query) if result: return { "total": result["total"] or 0, "success": result["success"] or 0, "failure": result["failure"] or 0, } return {"total": 0, "success": 0, "failure": 0} except Exception as e: logger.error(f"Failed to get calls in last {seconds} seconds: {e}") return {"total": 0, "success": 0, "failure": 0} async def get_calls_in_last_minutes(self, minutes: int) -> dict[str, int]: """获取过去 N 分钟内的调用次数 (总数、成功、失败)""" return await self.get_calls_in_last_seconds(minutes * 60) async def get_calls_in_last_hours(self, hours: int) -> dict[str, int]: """获取过去 N 小时内的调用次数 (总数、成功、失败)""" return await self.get_calls_in_last_seconds(hours * 3600) async def get_calls_in_current_month(self) -> dict[str, int]: """获取当前自然月内的调用次数 (总数、成功、失败)""" try: now = datetime.datetime.now() start_of_month = now.replace( day=1, hour=0, minute=0, second=0, microsecond=0 ) query = select( func.count(RequestLog.id).label("total"), func.sum( case( ( and_( RequestLog.status_code >= 200, RequestLog.status_code < 300, ), 1, ), else_=0, ) ).label("success"), func.sum( case( ( or_( RequestLog.status_code < 200, RequestLog.status_code >= 300, ), 1, ), (RequestLog.status_code is None, 1), else_=0, ) ).label("failure"), ).where(RequestLog.request_time >= start_of_month) result = await database.fetch_one(query) if result: return { "total": result["total"] or 0, "success": result["success"] or 0, "failure": result["failure"] or 0, } return {"total": 0, "success": 0, "failure": 0} except Exception as e: logger.error(f"Failed to get calls in current month: {e}") return {"total": 0, "success": 0, "failure": 0} async def get_api_usage_stats(self) -> dict: """获取所有需要的 API 使用统计数据 (总数、成功、失败)""" try: stats_1m = await self.get_calls_in_last_minutes(1) stats_1h = await self.get_calls_in_last_hours(1) stats_24h = await self.get_calls_in_last_hours(24) stats_month = await self.get_calls_in_current_month() return { "calls_1m": stats_1m, "calls_1h": stats_1h, "calls_24h": stats_24h, "calls_month": stats_month, } except Exception as e: logger.error(f"Failed to get API usage stats: {e}") default_stat = {"total": 0, "success": 0, "failure": 0} return { "calls_1m": default_stat.copy(), "calls_1h": default_stat.copy(), "calls_24h": default_stat.copy(), "calls_month": default_stat.copy(), } async def get_api_call_details(self, period: str) -> list[dict]: """ 获取指定时间段内的 API 调用详情 Args: period: 时间段标识 ('1m', '1h', '24h') Returns: 包含调用详情的字典列表,每个字典包含 timestamp, key, model, status Raises: ValueError: 如果 period 无效 """ now = datetime.datetime.now() if period == "1m": start_time = now - datetime.timedelta(minutes=1) elif period == "1h": start_time = now - datetime.timedelta(hours=1) elif period == "24h": start_time = now - datetime.timedelta(hours=24) else: raise ValueError(f"无效的时间段标识: {period}") try: query = ( select( RequestLog.request_time.label("timestamp"), RequestLog.api_key.label("key"), RequestLog.model_name.label("model"), RequestLog.status_code, ) .where(RequestLog.request_time >= start_time) .order_by(RequestLog.request_time.desc()) ) results = await database.fetch_all(query) details = [] for row in results: status = "failure" if row["status_code"] is not None: status = "success" if 200 <= row["status_code"] < 300 else "failure" details.append( { "timestamp": row[ "timestamp" ].isoformat(), "key": row["key"], "model": row["model"], "status": status, } ) logger.info( f"Retrieved {len(details)} API call details for period '{period}'" ) return details except Exception as e: logger.error( f"Failed to get API call details for period '{period}': {e}") raise async def get_key_usage_details_last_24h(self, key: str) -> Union[dict, None]: """ 获取指定 API 密钥在过去 24 小时内按模型统计的调用次数。 Args: key: 要查询的 API 密钥。 Returns: 一个字典,其中键是模型名称,值是调用次数。 如果查询出错或没有找到记录,可能返回 None 或空字典。 Example: {"gemini-pro": 10, "gemini-1.5-pro-latest": 5} """ logger.info( f"Fetching usage details for key ending in ...{key[-4:]} for the last 24h." ) cutoff_time = datetime.datetime.now() - datetime.timedelta(hours=24) try: query = ( select( RequestLog.model_name, func.count( RequestLog.id).label("call_count") ) .where( RequestLog.api_key == key, RequestLog.request_time >= cutoff_time, RequestLog.model_name.isnot(None), ) .group_by(RequestLog.model_name) .order_by(func.count(RequestLog.id).desc()) ) results = await database.fetch_all(query) if not results: logger.info( f"No usage details found for key ending in ...{key[-4:]} in the last 24h." ) return {} usage_details = {row["model_name"]: row["call_count"] for row in results} logger.info( f"Successfully fetched usage details for key ending in ...{key[-4:]}: {usage_details}" ) return usage_details except Exception as e: logger.error( f"Failed to get key usage details for key ending in ...{key[-4:]}: {e}", exc_info=True, ) raise