Spaces:
Running
Running
File size: 5,787 Bytes
76b9762 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional
from sqlalchemy import delete, func, select
from app.config.config import settings
from app.database import services as db_services
from app.database.connection import database
from app.database.models import ErrorLog
from app.log.logger import get_error_log_logger
logger = get_error_log_logger()
async def delete_old_error_logs():
"""
Deletes error logs older than a specified number of days,
based on the AUTO_DELETE_ERROR_LOGS_ENABLED and AUTO_DELETE_ERROR_LOGS_DAYS settings.
"""
if not settings.AUTO_DELETE_ERROR_LOGS_ENABLED:
logger.info("Auto-deletion of error logs is disabled. Skipping.")
return
days_to_keep = settings.AUTO_DELETE_ERROR_LOGS_DAYS
if not isinstance(days_to_keep, int) or days_to_keep <= 0:
logger.error(
f"Invalid AUTO_DELETE_ERROR_LOGS_DAYS value: {days_to_keep}. Must be a positive integer. Skipping deletion."
)
return
cutoff_date = datetime.now(timezone.utc) - timedelta(days=days_to_keep)
logger.info(
f"Attempting to delete error logs older than {days_to_keep} days (before {cutoff_date.strftime('%Y-%m-%d %H:%M:%S %Z')})."
)
try:
if not database.is_connected:
await database.connect()
logger.info("Database connection established for deleting error logs.")
# First, count how many logs will be deleted (optional, for logging)
count_query = select(func.count(ErrorLog.id)).where(
ErrorLog.request_time < cutoff_date
)
num_logs_to_delete = await database.fetch_val(count_query)
if num_logs_to_delete == 0:
logger.info(
"No error logs found older than the specified period. No deletion needed."
)
return
logger.info(f"Found {num_logs_to_delete} error logs to delete.")
# Perform the deletion
query = delete(ErrorLog).where(ErrorLog.request_time < cutoff_date)
await database.execute(query)
logger.info(
f"Successfully deleted {num_logs_to_delete} error logs older than {days_to_keep} days."
)
except Exception as e:
logger.error(
f"Error during automatic deletion of error logs: {e}", exc_info=True
)
async def process_get_error_logs(
limit: int,
offset: int,
key_search: Optional[str],
error_search: Optional[str],
error_code_search: Optional[str],
start_date: Optional[datetime],
end_date: Optional[datetime],
sort_by: str,
sort_order: str,
) -> Dict[str, Any]:
"""
处理错误日志的检索,支持分页和过滤。
"""
try:
logs_data = await db_services.get_error_logs(
limit=limit,
offset=offset,
key_search=key_search,
error_search=error_search,
error_code_search=error_code_search,
start_date=start_date,
end_date=end_date,
sort_by=sort_by,
sort_order=sort_order,
)
total_count = await db_services.get_error_logs_count(
key_search=key_search,
error_search=error_search,
error_code_search=error_code_search,
start_date=start_date,
end_date=end_date,
)
return {"logs": logs_data, "total": total_count}
except Exception as e:
logger.error(f"Service error in process_get_error_logs: {e}", exc_info=True)
raise
async def process_get_error_log_details(log_id: int) -> Optional[Dict[str, Any]]:
"""
处理特定错误日志详细信息的检索。
如果未找到,则返回 None。
"""
try:
log_details = await db_services.get_error_log_details(log_id=log_id)
return log_details
except Exception as e:
logger.error(
f"Service error in process_get_error_log_details for ID {log_id}: {e}",
exc_info=True,
)
raise
async def process_delete_error_logs_by_ids(log_ids: List[int]) -> int:
"""
按 ID 批量删除错误日志。
返回尝试删除的日志数量。
"""
if not log_ids:
return 0
try:
deleted_count = await db_services.delete_error_logs_by_ids(log_ids)
return deleted_count
except Exception as e:
logger.error(
f"Service error in process_delete_error_logs_by_ids for IDs {log_ids}: {e}",
exc_info=True,
)
raise
async def process_delete_error_log_by_id(log_id: int) -> bool:
"""
按 ID 删除单个错误日志。
如果删除成功(或找到日志并尝试删除),则返回 True,否则返回 False。
"""
try:
success = await db_services.delete_error_log_by_id(log_id)
return success
except Exception as e:
logger.error(
f"Service error in process_delete_error_log_by_id for ID {log_id}: {e}",
exc_info=True,
)
raise
async def process_delete_all_error_logs() -> int:
"""
处理删除所有错误日志的请求。
返回删除的日志数量。
"""
try:
if not database.is_connected:
await database.connect()
logger.info("Database connection established for deleting all error logs.")
deleted_count = await db_services.delete_all_error_logs()
logger.info(
f"Successfully processed request to delete all error logs. Count: {deleted_count}"
)
return deleted_count
except Exception as e:
logger.error(
f"Service error in process_delete_all_error_logs: {e}",
exc_info=True,
)
raise
|