Spaces:
Running
Running
""" | |
日志路由模块 | |
""" | |
from datetime import datetime | |
from typing import Dict, List, Optional | |
from fastapi import ( | |
APIRouter, | |
Body, | |
HTTPException, | |
Path, | |
Query, | |
Request, | |
Response, | |
status, | |
) | |
from pydantic import BaseModel | |
from app.core.security import verify_auth_token | |
from app.log.logger import get_log_routes_logger | |
from app.service.error_log import error_log_service | |
router = APIRouter(prefix="/api/logs", tags=["logs"]) | |
logger = get_log_routes_logger() | |
class ErrorLogListItem(BaseModel): | |
id: int | |
gemini_key: Optional[str] = None | |
error_type: Optional[str] = None | |
error_code: Optional[int] = None | |
model_name: Optional[str] = None | |
request_time: Optional[datetime] = None | |
class ErrorLogListResponse(BaseModel): | |
logs: List[ErrorLogListItem] | |
total: int | |
async def get_error_logs_api( | |
request: Request, | |
limit: int = Query(10, ge=1, le=1000), | |
offset: int = Query(0, ge=0), | |
key_search: Optional[str] = Query( | |
None, description="Search term for Gemini key (partial match)" | |
), | |
error_search: Optional[str] = Query( | |
None, description="Search term for error type or log message" | |
), | |
error_code_search: Optional[str] = Query( | |
None, description="Search term for error code" | |
), | |
start_date: Optional[datetime] = Query( | |
None, description="Start datetime for filtering" | |
), | |
end_date: Optional[datetime] = Query( | |
None, description="End datetime for filtering" | |
), | |
sort_by: str = Query( | |
"id", description="Field to sort by (e.g., 'id', 'request_time')" | |
), | |
sort_order: str = Query("desc", description="Sort order ('asc' or 'desc')"), | |
): | |
""" | |
获取错误日志列表 (返回错误码),支持过滤和排序 | |
Args: | |
request: 请求对象 | |
limit: 限制数量 | |
offset: 偏移量 | |
key_search: 密钥搜索 | |
error_search: 错误搜索 (可能搜索类型或日志内容,由DB层决定) | |
error_code_search: 错误码搜索 | |
start_date: 开始日期 | |
end_date: 结束日期 | |
sort_by: 排序字段 | |
sort_order: 排序顺序 | |
Returns: | |
ErrorLogListResponse: An object containing the list of logs (with error_code) and the total count. | |
""" | |
auth_token = request.cookies.get("auth_token") | |
if not auth_token or not verify_auth_token(auth_token): | |
logger.warning("Unauthorized access attempt to error logs list") | |
raise HTTPException(status_code=401, detail="Not authenticated") | |
try: | |
result = await error_log_service.process_get_error_logs( | |
limit=limit, | |
offset=offset, | |
key_search=key_search, | |
error_search=error_search, | |
error_code_search=error_code_search, | |
start_date=start_date, | |
end_date=end_date, | |
sort_by=sort_by, | |
sort_order=sort_order, | |
) | |
logs_data = result["logs"] | |
total_count = result["total"] | |
validated_logs = [ErrorLogListItem(**log) for log in logs_data] | |
return ErrorLogListResponse(logs=validated_logs, total=total_count) | |
except Exception as e: | |
logger.exception(f"Failed to get error logs list: {str(e)}") | |
raise HTTPException( | |
status_code=500, detail=f"Failed to get error logs list: {str(e)}" | |
) | |
class ErrorLogDetailResponse(BaseModel): | |
id: int | |
gemini_key: Optional[str] = None | |
error_type: Optional[str] = None | |
error_log: Optional[str] = None | |
request_msg: Optional[str] = None | |
model_name: Optional[str] = None | |
request_time: Optional[datetime] = None | |
async def get_error_log_detail_api(request: Request, log_id: int = Path(..., ge=1)): | |
""" | |
根据日志 ID 获取错误日志的详细信息 (包括 error_log 和 request_msg) | |
""" | |
auth_token = request.cookies.get("auth_token") | |
if not auth_token or not verify_auth_token(auth_token): | |
logger.warning( | |
f"Unauthorized access attempt to error log details for ID: {log_id}" | |
) | |
raise HTTPException(status_code=401, detail="Not authenticated") | |
try: | |
log_details = await error_log_service.process_get_error_log_details( | |
log_id=log_id | |
) | |
if not log_details: | |
raise HTTPException(status_code=404, detail="Error log not found") | |
return ErrorLogDetailResponse(**log_details) | |
except HTTPException as http_exc: | |
raise http_exc | |
except Exception as e: | |
logger.exception(f"Failed to get error log details for ID {log_id}: {str(e)}") | |
raise HTTPException( | |
status_code=500, detail=f"Failed to get error log details: {str(e)}" | |
) | |
async def delete_error_logs_bulk_api( | |
request: Request, payload: Dict[str, List[int]] = Body(...) | |
): | |
""" | |
批量删除错误日志 (异步) | |
""" | |
auth_token = request.cookies.get("auth_token") | |
if not auth_token or not verify_auth_token(auth_token): | |
logger.warning("Unauthorized access attempt to bulk delete error logs") | |
raise HTTPException(status_code=401, detail="Not authenticated") | |
log_ids = payload.get("ids") | |
if not log_ids: | |
raise HTTPException(status_code=400, detail="No log IDs provided for deletion.") | |
try: | |
deleted_count = await error_log_service.process_delete_error_logs_by_ids( | |
log_ids | |
) | |
# 注意:异步函数返回的是尝试删除的数量,可能不是精确值 | |
logger.info( | |
f"Attempted bulk deletion for {deleted_count} error logs with IDs: {log_ids}" | |
) | |
return Response(status_code=status.HTTP_204_NO_CONTENT) | |
except Exception as e: | |
logger.exception(f"Error bulk deleting error logs with IDs {log_ids}: {str(e)}") | |
raise HTTPException( | |
status_code=500, detail="Internal server error during bulk deletion" | |
) | |
async def delete_all_error_logs_api(request: Request): | |
""" | |
删除所有错误日志 (异步) | |
""" | |
auth_token = request.cookies.get("auth_token") | |
if not auth_token or not verify_auth_token(auth_token): | |
logger.warning("Unauthorized access attempt to delete all error logs") | |
raise HTTPException(status_code=401, detail="Not authenticated") | |
try: | |
deleted_count = await error_log_service.process_delete_all_error_logs() | |
logger.info(f"Successfully deleted all {deleted_count} error logs.") | |
# No body needed for 204 response | |
return Response(status_code=status.HTTP_204_NO_CONTENT) | |
except Exception as e: | |
logger.exception(f"Error deleting all error logs: {str(e)}") | |
raise HTTPException( | |
status_code=500, detail="Internal server error during deletion of all logs" | |
) | |
async def delete_error_log_api(request: Request, log_id: int = Path(..., ge=1)): | |
""" | |
删除单个错误日志 (异步) | |
""" | |
auth_token = request.cookies.get("auth_token") | |
if not auth_token or not verify_auth_token(auth_token): | |
logger.warning(f"Unauthorized access attempt to delete error log ID: {log_id}") | |
raise HTTPException(status_code=401, detail="Not authenticated") | |
try: | |
success = await error_log_service.process_delete_error_log_by_id(log_id) | |
if not success: | |
# 服务层现在在未找到时返回 False,我们在这里转换为 404 | |
raise HTTPException( | |
status_code=404, detail=f"Error log with ID {log_id} not found" | |
) | |
logger.info(f"Successfully deleted error log with ID: {log_id}") | |
return Response(status_code=status.HTTP_204_NO_CONTENT) | |
except HTTPException as http_exc: | |
raise http_exc | |
except Exception as e: | |
logger.exception(f"Error deleting error log with ID {log_id}: {str(e)}") | |
raise HTTPException( | |
status_code=500, detail="Internal server error during deletion" | |
) | |