| """ | |
| import logging | |
| from datetime import datetime, timedelta, timezone | |
| from typing import Tuple, Dict, List | |
| logger = logging.getLogger(__name__) | |
| class RateLimiter: | |
| def __init__(self, period_days: int = 7, quota: int = 5): | |
| self.period_days = period_days | |
| self.quota = quota | |
| self.submission_history: Dict[str, List[datetime]] = {} | |
| self.higher_quota_users = set() # Users with higher quotas | |
| self.unlimited_users = set() # Users with no quota limits | |
| def add_unlimited_user(self, user_id: str): | |
| """Add a user to the unlimited users list""" | |
| self.unlimited_users.add(user_id) | |
| def add_higher_quota_user(self, user_id: str): | |
| """Add a user to the higher quota users list""" | |
| self.higher_quota_users.add(user_id) | |
| def record_submission(self, user_id: str): | |
| """Record a new submission for a user""" | |
| current_time = datetime.now(timezone.utc) | |
| if user_id not in self.submission_history: | |
| self.submission_history[user_id] = [] | |
| self.submission_history[user_id].append(current_time) | |
| def clean_old_submissions(self, user_id: str): | |
| """Remove submissions older than the period""" | |
| if user_id not in self.submission_history: | |
| return | |
| current_time = datetime.now(timezone.utc) | |
| cutoff_time = current_time - timedelta(days=self.period_days) | |
| self.submission_history[user_id] = [ | |
| time for time in self.submission_history[user_id] | |
| if time > cutoff_time | |
| ] | |
| async def check_rate_limit(self, user_id: str) -> Tuple[bool, str]: | |
| """Check if a user has exceeded their rate limit | |
| Returns: | |
| Tuple[bool, str]: (is_allowed, error_message) | |
| """ | |
| # Unlimited users bypass all checks | |
| if user_id in self.unlimited_users: | |
| return True, "" | |
| # Clean old submissions | |
| self.clean_old_submissions(user_id) | |
| # Get current submission count | |
| submission_count = len(self.submission_history.get(user_id, [])) | |
| # Calculate user's quota | |
| user_quota = self.quota * 2 if user_id in self.higher_quota_users else self.quota | |
| # Check if user has exceeded their quota | |
| if submission_count >= user_quota: | |
| error_msg = ( | |
| f"User '{user_id}' has reached the limit of {user_quota} submissions " | |
| f"in the last {self.period_days} days. Please wait before submitting again." | |
| ) | |
| return False, error_msg | |
| return True, "" | |
| """ |