yunlonggong's picture
Initial project upload
1b44660
import type { WorkflowStep } from 'cloudflare:workers';
import { Logger } from './logger';
/**
* Configuration options for the rate limiter
*/
type RateLimiterOptions = {
maxConcurrent: number;
globalCooldownMs: number;
domainCooldownMs: number;
};
/**
* Represents a batch item with an ID and URL
*/
type BatchItem<IdType = number | string> = {
id: IdType;
url: string;
};
/**
* Rate limiter that respects per-domain cooldowns to prevent overloading specific domains
* when making HTTP requests. Handles batching and throttling of requests.
*
* @template T Type of the batch items, must extend BatchItem
* @template I Type of the ID field, defaults to number | string
*/
export class DomainRateLimiter<T extends BatchItem<I>, I = number | string> {
private lastDomainAccess = new Map<string, number>();
private options: RateLimiterOptions;
private logger: Logger;
/**
* Creates a new DomainRateLimiter instance
*
* @param options Configuration options for throttling
*/
constructor(options: RateLimiterOptions) {
this.options = options;
this.logger = new Logger({ service: 'DomainRateLimiter' });
}
/**
* Processes a batch of items with domain-aware rate limiting
*
* @param items Array of items to process
* @param step Workflow step instance for handling sleeps/delays
* @param processItem Function that processes a single item and returns a result
* @returns Promise resolving to an array of results in the same order as input items
*
* @template R The return type of the processItem function
*/
async processBatch<R>(
items: T[],
step: WorkflowStep,
processItem: (item: T, domain: string) => Promise<R>
): Promise<R[]> {
const batchLogger = this.logger.child({ batch_size: items.length });
batchLogger.info('Starting batch processing');
const results: R[] = [];
const remainingItems = [...items];
while (remainingItems.length > 0) {
const currentBatch: T[] = [];
const currentTime = Date.now();
// Select items for current batch based on domain cooldown
for (const item of [...remainingItems]) {
if (currentBatch.length >= this.options.maxConcurrent) break;
try {
const domain = new URL(item.url).hostname;
const lastAccess = this.lastDomainAccess.get(domain) || 0;
if (currentTime - lastAccess >= this.options.domainCooldownMs) {
currentBatch.push(item);
// Remove from remaining items
const idx = remainingItems.findIndex(i => i.id === item.id);
if (idx >= 0) remainingItems.splice(idx, 1);
}
} catch (e) {
// Skip invalid URLs
const idx = remainingItems.findIndex(i => i.id === item.id);
if (idx >= 0) remainingItems.splice(idx, 1);
}
}
if (currentBatch.length === 0) {
// Nothing ready yet, wait for next domain to be ready
const nextReady = Math.min(
...remainingItems
.map(item => {
try {
const domain = new URL(item.url).hostname;
const lastAccess = this.lastDomainAccess.get(domain) || 0;
return this.options.domainCooldownMs - (currentTime - lastAccess);
} catch {
return Number.POSITIVE_INFINITY; // Skip invalid URLs
}
})
.filter(time => time > 0) // Only consider positive wait times
);
batchLogger.debug('Waiting for domain cooldown', { wait_time_ms: Math.max(500, nextReady) });
await step.sleep(`waiting for domain cooldown (${Math.round(nextReady / 1000)}s)`, Math.max(500, nextReady));
continue;
}
batchLogger.debug('Processing batch', { batch_size: currentBatch.length, remaining: remainingItems.length });
// Process current batch in parallel
const batchResults = await Promise.allSettled(
currentBatch.map(async item => {
try {
const domain = new URL(item.url).hostname;
this.lastDomainAccess.set(domain, Date.now());
return await processItem(item, domain);
} catch (error) {
const itemLogger = batchLogger.child({ item_id: item.id });
itemLogger.error(
'Error processing item',
undefined,
error instanceof Error ? error : new Error(String(error))
);
throw error;
}
})
);
// Add results
for (const result of batchResults) {
if (result.status === 'fulfilled') {
results.push(result.value);
}
}
// Apply global cooldown between batches if we have more items to process
if (remainingItems.length > 0) {
batchLogger.debug('Applying global rate limit', { cooldown_ms: this.options.globalCooldownMs });
await step.sleep(
`global rate limit (${Math.round(this.options.globalCooldownMs / 1000)}s)`,
this.options.globalCooldownMs
);
}
}
batchLogger.info('Batch processing complete', { processed_count: results.length });
return results;
}
}