Spaces:
Running
Running
from datetime import datetime, timedelta, timezone | |
from typing import Any, Dict, List, Optional | |
from sqlalchemy import delete, func, select | |
from app.config.config import settings | |
from app.database import services as db_services | |
from app.database.connection import database | |
from app.database.models import ErrorLog | |
from app.log.logger import get_error_log_logger | |
logger = get_error_log_logger() | |
async def delete_old_error_logs(): | |
""" | |
Deletes error logs older than a specified number of days, | |
based on the AUTO_DELETE_ERROR_LOGS_ENABLED and AUTO_DELETE_ERROR_LOGS_DAYS settings. | |
""" | |
if not settings.AUTO_DELETE_ERROR_LOGS_ENABLED: | |
logger.info("Auto-deletion of error logs is disabled. Skipping.") | |
return | |
days_to_keep = settings.AUTO_DELETE_ERROR_LOGS_DAYS | |
if not isinstance(days_to_keep, int) or days_to_keep <= 0: | |
logger.error( | |
f"Invalid AUTO_DELETE_ERROR_LOGS_DAYS value: {days_to_keep}. Must be a positive integer. Skipping deletion." | |
) | |
return | |
cutoff_date = datetime.now(timezone.utc) - timedelta(days=days_to_keep) | |
logger.info( | |
f"Attempting to delete error logs older than {days_to_keep} days (before {cutoff_date.strftime('%Y-%m-%d %H:%M:%S %Z')})." | |
) | |
try: | |
if not database.is_connected: | |
await database.connect() | |
logger.info("Database connection established for deleting error logs.") | |
# First, count how many logs will be deleted (optional, for logging) | |
count_query = select(func.count(ErrorLog.id)).where( | |
ErrorLog.request_time < cutoff_date | |
) | |
num_logs_to_delete = await database.fetch_val(count_query) | |
if num_logs_to_delete == 0: | |
logger.info( | |
"No error logs found older than the specified period. No deletion needed." | |
) | |
return | |
logger.info(f"Found {num_logs_to_delete} error logs to delete.") | |
# Perform the deletion | |
query = delete(ErrorLog).where(ErrorLog.request_time < cutoff_date) | |
await database.execute(query) | |
logger.info( | |
f"Successfully deleted {num_logs_to_delete} error logs older than {days_to_keep} days." | |
) | |
except Exception as e: | |
logger.error( | |
f"Error during automatic deletion of error logs: {e}", exc_info=True | |
) | |
async def process_get_error_logs( | |
limit: int, | |
offset: int, | |
key_search: Optional[str], | |
error_search: Optional[str], | |
error_code_search: Optional[str], | |
start_date: Optional[datetime], | |
end_date: Optional[datetime], | |
sort_by: str, | |
sort_order: str, | |
) -> Dict[str, Any]: | |
""" | |
处理错误日志的检索,支持分页和过滤。 | |
""" | |
try: | |
logs_data = await db_services.get_error_logs( | |
limit=limit, | |
offset=offset, | |
key_search=key_search, | |
error_search=error_search, | |
error_code_search=error_code_search, | |
start_date=start_date, | |
end_date=end_date, | |
sort_by=sort_by, | |
sort_order=sort_order, | |
) | |
total_count = await db_services.get_error_logs_count( | |
key_search=key_search, | |
error_search=error_search, | |
error_code_search=error_code_search, | |
start_date=start_date, | |
end_date=end_date, | |
) | |
return {"logs": logs_data, "total": total_count} | |
except Exception as e: | |
logger.error(f"Service error in process_get_error_logs: {e}", exc_info=True) | |
raise | |
async def process_get_error_log_details(log_id: int) -> Optional[Dict[str, Any]]: | |
""" | |
处理特定错误日志详细信息的检索。 | |
如果未找到,则返回 None。 | |
""" | |
try: | |
log_details = await db_services.get_error_log_details(log_id=log_id) | |
return log_details | |
except Exception as e: | |
logger.error( | |
f"Service error in process_get_error_log_details for ID {log_id}: {e}", | |
exc_info=True, | |
) | |
raise | |
async def process_delete_error_logs_by_ids(log_ids: List[int]) -> int: | |
""" | |
按 ID 批量删除错误日志。 | |
返回尝试删除的日志数量。 | |
""" | |
if not log_ids: | |
return 0 | |
try: | |
deleted_count = await db_services.delete_error_logs_by_ids(log_ids) | |
return deleted_count | |
except Exception as e: | |
logger.error( | |
f"Service error in process_delete_error_logs_by_ids for IDs {log_ids}: {e}", | |
exc_info=True, | |
) | |
raise | |
async def process_delete_error_log_by_id(log_id: int) -> bool: | |
""" | |
按 ID 删除单个错误日志。 | |
如果删除成功(或找到日志并尝试删除),则返回 True,否则返回 False。 | |
""" | |
try: | |
success = await db_services.delete_error_log_by_id(log_id) | |
return success | |
except Exception as e: | |
logger.error( | |
f"Service error in process_delete_error_log_by_id for ID {log_id}: {e}", | |
exc_info=True, | |
) | |
raise | |
async def process_delete_all_error_logs() -> int: | |
""" | |
处理删除所有错误日志的请求。 | |
返回删除的日志数量。 | |
""" | |
try: | |
if not database.is_connected: | |
await database.connect() | |
logger.info("Database connection established for deleting all error logs.") | |
deleted_count = await db_services.delete_all_error_logs() | |
logger.info( | |
f"Successfully processed request to delete all error logs. Count: {deleted_count}" | |
) | |
return deleted_count | |
except Exception as e: | |
logger.error( | |
f"Service error in process_delete_all_error_logs: {e}", | |
exc_info=True, | |
) | |
raise | |