Spaces:
Running
Running
import asyncio | |
from datetime import datetime, timedelta | |
from app.utils.logging import log | |
import app.config.settings as settings | |
from collections import defaultdict, Counter | |
import time | |
import threading | |
import queue | |
import functools | |
class ApiStatsManager: | |
"""API调用统计管理器,优化性能的新实现""" | |
def __init__(self, enable_background=True, batch_interval=1.0): | |
# 使用Counter记录API密钥和模型的调用次数 | |
self.api_key_counts = Counter() # 记录每个API密钥的调用次数 | |
self.model_counts = Counter() # 记录每个模型的调用次数 | |
self.api_model_counts = defaultdict(Counter) # 记录每个API密钥对每个模型的调用次数 | |
# 记录token使用量 | |
self.api_key_tokens = Counter() # 记录每个API密钥的token使用量 | |
self.model_tokens = Counter() # 记录每个模型的token使用量 | |
self.api_model_tokens = defaultdict(Counter) # 记录每个API密钥对每个模型的token使用量 | |
# 用于时间序列分析的数据结构(最近24小时,按分钟分组) | |
self.time_buckets = {} # 格式: {timestamp_minute: {"calls": count, "tokens": count}} | |
# 保存与兼容格式相关的调用日志(最小化存储) | |
self.recent_calls = [] # 仅保存最近的少量调用,用于前端展示 | |
self.max_recent_calls = 100 # 最大保存的最近调用记录数 | |
# 当前时间分钟桶的时间戳(分钟级别) | |
self.current_minute = self._get_minute_timestamp(datetime.now()) | |
# 清理间隔(小时) | |
self.cleanup_interval = 1 | |
self.last_cleanup = time.time() | |
# 使用线程锁而不是asyncio锁 | |
self._counters_lock = threading.Lock() | |
self._time_series_lock = threading.Lock() | |
self._recent_calls_lock = threading.Lock() | |
# 后台处理相关 | |
self.enable_background = enable_background | |
self.batch_interval = batch_interval | |
self._update_queue = queue.Queue() | |
self._worker_thread = None | |
self._stop_event = threading.Event() | |
if enable_background: | |
self._start_worker() | |
def _start_worker(self): | |
"""启动后台工作线程""" | |
if self._worker_thread is None or not self._worker_thread.is_alive(): | |
self._stop_event.clear() | |
self._worker_thread = threading.Thread( | |
target=self._worker_loop, | |
daemon=True | |
) | |
self._worker_thread.start() | |
def _worker_loop(self): | |
"""后台工作线程的主循环""" | |
batch = [] | |
last_process = time.time() | |
while not self._stop_event.is_set(): | |
try: | |
# 非阻塞获取更新 | |
try: | |
update = self._update_queue.get_nowait() | |
batch.append(update) | |
except queue.Empty: | |
pass | |
# 处理批次或超时 | |
current_time = time.time() | |
if batch and (current_time - last_process >= self.batch_interval): | |
self._process_batch(batch) | |
batch = [] | |
last_process = current_time | |
# 短暂休眠以避免CPU占用过高 | |
time.sleep(0.01) | |
except Exception as e: | |
log('error', f"后台处理线程错误: {str(e)}") | |
time.sleep(1) # 发生错误时短暂休眠 | |
def _process_batch(self, batch): | |
"""处理一批更新""" | |
with self._counters_lock: | |
for api_key, model, tokens in batch: | |
self.api_key_counts[api_key] += 1 | |
self.model_counts[model] += 1 | |
self.api_model_counts[api_key][model] += 1 | |
self.api_key_tokens[api_key] += tokens | |
self.model_tokens[model] += tokens | |
self.api_model_tokens[api_key][model] += tokens | |
async def update_stats(self, api_key, model, tokens=0): | |
"""更新API调用统计""" | |
if self.enable_background: | |
# 将更新放入队列 | |
self._update_queue.put((api_key, model, tokens)) | |
else: | |
# 同步更新 | |
with self._counters_lock: | |
self.api_key_counts[api_key] += 1 | |
self.model_counts[model] += 1 | |
self.api_model_counts[api_key][model] += 1 | |
self.api_key_tokens[api_key] += tokens | |
self.model_tokens[model] += tokens | |
self.api_model_tokens[api_key][model] += tokens | |
# 更新时间序列数据 | |
now = datetime.now() | |
minute_ts = self._get_minute_timestamp(now) | |
with self._time_series_lock: | |
if minute_ts not in self.time_buckets: | |
self.time_buckets[minute_ts] = {"calls": 0, "tokens": 0} | |
self.time_buckets[minute_ts]["calls"] += 1 | |
self.time_buckets[minute_ts]["tokens"] += tokens | |
self.current_minute = minute_ts | |
# 更新最近调用记录 | |
with self._recent_calls_lock: | |
compact_call = { | |
'api_key': api_key, | |
'model': model, | |
'timestamp': now, | |
'tokens': tokens | |
} | |
self.recent_calls.append(compact_call) | |
if len(self.recent_calls) > self.max_recent_calls: | |
self.recent_calls.pop(0) | |
# 记录日志 | |
log_message = f"API调用已记录: 秘钥 '{api_key[:8]}', 模型 '{model}', 令牌: {tokens if tokens is not None else 0}" | |
log('info', log_message) | |
async def cleanup(self): | |
"""清理超过24小时的时间桶数据""" | |
now = datetime.now() | |
day_ago_ts = self._get_minute_timestamp(now - timedelta(days=1)) | |
with self._time_series_lock: | |
# 直接删除旧的时间桶 | |
for ts in list(self.time_buckets.keys()): | |
if ts < day_ago_ts: | |
del self.time_buckets[ts] | |
self.last_cleanup = time.time() | |
async def maybe_cleanup(self, force=False): | |
"""根据需要清理旧数据""" | |
now = time.time() | |
if force or (now - self.last_cleanup > self.cleanup_interval * 3600): | |
await self.cleanup() | |
self.last_cleanup = now | |
async def get_api_key_usage(self, api_key, model=None): | |
"""获取API密钥的使用统计""" | |
with self._counters_lock: | |
if model: | |
return self.api_model_counts[api_key][model] | |
else: | |
return self.api_key_counts[api_key] | |
def get_calls_last_24h(self): | |
"""获取过去24小时的总调用次数""" | |
with self._counters_lock: | |
return sum(self.api_key_counts.values()) | |
def get_calls_last_hour(self, now=None): | |
"""获取过去一小时的总调用次数""" | |
if now is None: | |
now = datetime.now() | |
hour_ago_ts = self._get_minute_timestamp(now - timedelta(hours=1)) | |
with self._time_series_lock: | |
return sum(data["calls"] for ts, data in self.time_buckets.items() | |
if ts >= hour_ago_ts) | |
def get_calls_last_minute(self, now=None): | |
"""获取过去一分钟的总调用次数""" | |
if now is None: | |
now = datetime.now() | |
minute_ago_ts = self._get_minute_timestamp(now - timedelta(minutes=1)) | |
with self._time_series_lock: | |
return sum(data["calls"] for ts, data in self.time_buckets.items() | |
if ts >= minute_ago_ts) | |
def get_time_series_data(self, minutes=30, now=None): | |
"""获取过去N分钟的时间序列数据""" | |
if now is None: | |
now = datetime.now() | |
calls_series = [] | |
tokens_series = [] | |
with self._time_series_lock: | |
for i in range(minutes, -1, -1): | |
minute_dt = now - timedelta(minutes=i) | |
minute_ts = self._get_minute_timestamp(minute_dt) | |
bucket = self.time_buckets.get(minute_ts, {"calls": 0, "tokens": 0}) | |
calls_series.append({ | |
'time': minute_dt.strftime('%H:%M'), | |
'value': bucket["calls"] | |
}) | |
tokens_series.append({ | |
'time': minute_dt.strftime('%H:%M'), | |
'value': bucket["tokens"] | |
}) | |
return calls_series, tokens_series | |
def get_api_key_stats(self, api_keys): | |
"""获取API密钥的详细统计信息""" | |
stats = [] | |
with self._counters_lock: | |
for api_key in api_keys: | |
api_key_id = api_key[:8] | |
calls_24h = self.api_key_counts[api_key] | |
total_tokens = self.api_key_tokens[api_key] | |
model_stats = {} | |
for model, count in self.api_model_counts[api_key].items(): | |
tokens = self.api_model_tokens[api_key][model] | |
model_stats[model] = { | |
'calls': count, | |
'tokens': tokens | |
} | |
usage_percent = (calls_24h / settings.API_KEY_DAILY_LIMIT) * 100 if settings.API_KEY_DAILY_LIMIT > 0 else 0 | |
stats.append({ | |
'api_key': api_key_id, | |
'calls_24h': calls_24h, | |
'total_tokens': total_tokens, | |
'limit': settings.API_KEY_DAILY_LIMIT, | |
'usage_percent': round(usage_percent, 2), | |
'model_stats': model_stats | |
}) | |
stats.sort(key=lambda x: x['usage_percent'], reverse=True) | |
return stats | |
async def reset(self): | |
"""重置所有统计数据""" | |
with self._counters_lock: | |
self.api_key_counts.clear() | |
self.model_counts.clear() | |
self.api_model_counts.clear() | |
self.api_key_tokens.clear() | |
self.model_tokens.clear() | |
self.api_model_tokens.clear() | |
with self._time_series_lock: | |
self.time_buckets.clear() | |
with self._recent_calls_lock: | |
self.recent_calls.clear() | |
self.current_minute = self._get_minute_timestamp(datetime.now()) | |
self.last_cleanup = time.time() | |
def _get_minute_timestamp(self, dt): | |
"""将时间戳转换为分钟级别的时间戳(按分钟取整)""" | |
return int(dt.timestamp() // 60 * 60) | |
# 创建全局单例实例 | |
api_stats_manager = ApiStatsManager() | |
# 兼容现有代码的函数 | |
def clean_expired_stats(api_call_stats): | |
"""清理过期统计数据的函数 (兼容旧接口)""" | |
asyncio.create_task(api_stats_manager.cleanup()) | |
async def update_api_call_stats(api_call_stats, endpoint=None, model=None, token=None): | |
"""更新API调用统计的函数 (兼容旧接口)""" | |
if endpoint and model: | |
await api_stats_manager.update_stats(endpoint, model, token if token is not None else 0) | |
async def get_api_key_usage(api_call_stats, api_key, model=None): | |
"""获取API密钥的调用次数 (兼容旧接口)""" | |
return await api_stats_manager.get_api_key_usage(api_key, model) |