File size: 9,099 Bytes
1b44660
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
import type { WorkflowStep } from 'cloudflare:workers';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { DomainRateLimiter } from '../src/lib/rateLimiter';

type BatchItem = {
  id: number | string;
  url: string;
};

describe('DomainRateLimiter', () => {
  // Mock 'step.sleep' to track calls and resolve immediately or after checking delays
  let mockSleep: ReturnType<typeof vi.fn>;
  let step: WorkflowStep; // Mocked step object
  let rateLimiter: DomainRateLimiter<BatchItem>;
  let processItem: ReturnType<typeof vi.fn>;

  beforeEach(() => {
    mockSleep = vi.fn().mockImplementation((reason, ms) => {
      // Advance time when sleep is called
      vi.advanceTimersByTime(ms);
      return Promise.resolve();
    });
    step = { sleep: mockSleep } as unknown as WorkflowStep;
    // Setup rateLimiter with specific options for testing
    rateLimiter = new DomainRateLimiter({ maxConcurrent: 2, globalCooldownMs: 100, domainCooldownMs: 200 });
    processItem = vi.fn().mockImplementation(async (item: BatchItem) => `processed-${item.id}`);

    // Mock Date.now() to control time
    vi.useFakeTimers();
    vi.setSystemTime(new Date(2023, 1, 1, 0, 0, 0));
  });

  afterEach(() => {
    vi.useRealTimers();
  });

  it('should process all items concurrently if limits are not hit', async () => {
    const items = [
      { id: 1, url: 'https://domain1.com/page1' },
      { id: 2, url: 'https://domain2.com/page1' },
    ];

    const results = await rateLimiter.processBatch(items, step, processItem);

    expect(results).toHaveLength(2);
    expect(results).toEqual(['processed-1', 'processed-2']);
    expect(processItem).toHaveBeenCalledTimes(2);
    expect(mockSleep).not.toHaveBeenCalled();
  });

  it('should not exceed maxConcurrent processing simultaneously', async () => {
    const items = [
      { id: 1, url: 'https://domain1.com/page1' },
      { id: 2, url: 'https://domain2.com/page1' },
      { id: 3, url: 'https://domain3.com/page1' },
    ];

    await rateLimiter.processBatch(items, step, processItem);

    // Should process first two items concurrently (maxConcurrent: 2), then apply global cooldown
    expect(mockSleep).toHaveBeenCalledWith(expect.any(String), 100);
    expect(processItem).toHaveBeenCalledTimes(3);
  });

  it('should call step.sleep for globalCooldownMs between batches if needed', async () => {
    const items = [
      { id: 1, url: 'https://domain1.com/page1' },
      { id: 2, url: 'https://domain2.com/page1' },
      { id: 3, url: 'https://domain3.com/page1' },
      { id: 4, url: 'https://domain4.com/page1' },
      { id: 5, url: 'https://domain5.com/page1' },
    ];

    await rateLimiter.processBatch(items, step, processItem);

    // Should have 3 batches: 2 items, 2 items, 1 item
    // Sleep should be called twice for global cooldown between batches
    expect(mockSleep).toHaveBeenCalledTimes(2);
    expect(mockSleep).toHaveBeenCalledWith(expect.stringContaining('global rate limit'), 100);
  });

  it('should call step.sleep for domainCooldownMs if processing the same domain twice quickly', async () => {
    const items = [
      { id: 1, url: 'https://domain1.com/page1' },
      { id: 2, url: 'https://domain1.com/page2' }, // Same domain
    ];

    // Process first item
    await rateLimiter.processBatch([items[0]], step, processItem);

    // Reset mock to track calls separately
    mockSleep.mockClear();
    processItem.mockClear();

    // Advance time but not enough to clear domain cooldown
    vi.advanceTimersByTime(100);

    // Process second item
    await rateLimiter.processBatch([items[1]], step, processItem);

    // Should wait for domain cooldown
    expect(mockSleep).toHaveBeenCalledWith(expect.stringContaining('waiting for domain cooldown'), expect.any(Number));
  });

  it('should allow different domains to be processed concurrently without domain cooldown', async () => {
    const items = [
      { id: 1, url: 'https://domain1.com/page1' },
      { id: 2, url: 'https://domain2.com/page1' },
    ];

    await rateLimiter.processBatch(items, step, processItem);

    // Should process both concurrently without domain cooldown
    expect(processItem).toHaveBeenCalledTimes(2);
    expect(mockSleep).not.toHaveBeenCalled();
  });

  it('should skip items with invalid URLs without throwing an error', async () => {
    const items = [
      { id: 1, url: 'https://domain1.com/page1' },
      { id: 2, url: 'invalid-url' }, // Invalid URL
    ];

    const results = await rateLimiter.processBatch(items, step, processItem);

    // Should only process valid URLs
    expect(results).toHaveLength(1);
    expect(results).toEqual(['processed-1']);
    expect(processItem).toHaveBeenCalledTimes(1);
  });

  it('should call step.sleep with calculated wait time if all available items are domain-limited', async () => {
    // Process first item
    await rateLimiter.processBatch([{ id: 1, url: 'https://domain1.com/page1' }], step, processItem);

    // Reset mocks
    mockSleep.mockClear();
    processItem.mockClear();

    // Advance time to 100ms
    vi.advanceTimersByTime(100);

    // Try to process the same domain again (should need to wait 100ms more)
    await rateLimiter.processBatch([{ id: 2, url: 'https://domain1.com/page2' }], step, processItem);

    // Should wait for remaining time on domain cooldown (200ms - 100ms = 100ms)
    expect(mockSleep).toHaveBeenCalledWith(expect.stringContaining('waiting for domain cooldown'), expect.any(Number));

    // Should eventually process the item
    expect(processItem).toHaveBeenCalledTimes(1);
  });

  it('should call the processItem function with the correct item and extracted domain', async () => {
    const item = { id: 1, url: 'https://example.com/page1' };

    await rateLimiter.processBatch([item], step, processItem);

    expect(processItem).toHaveBeenCalledWith(item, 'example.com');
  });

  it('should return results for all successfully processed items', async () => {
    const items = [
      { id: 1, url: 'https://domain1.com/page1' },
      { id: 2, url: 'https://domain2.com/page1' },
    ];

    const results = await rateLimiter.processBatch(items, step, processItem);

    expect(results).toEqual(['processed-1', 'processed-2']);
  });

  it('should handle errors during processItem gracefully and continue processing others', async () => {
    const items = [
      { id: 1, url: 'https://domain1.com/page1' },
      { id: 2, url: 'https://domain2.com/page1' },
    ];

    // Make the first item fail
    processItem.mockImplementation(async (item: BatchItem) => {
      if (item.id === 1) throw new Error('Processing failed');
      return `processed-${item.id}`;
    });

    const results = await rateLimiter.processBatch(items, step, processItem);

    // Should have only the successful result
    expect(results).toEqual(['processed-2']);
    expect(processItem).toHaveBeenCalledTimes(2);
  });

  it('should update internal lastDomainAccess times correctly', async () => {
    const items = [
      { id: 1, url: 'https://domain1.com/page1' },
      { id: 2, url: 'https://domain1.com/page2' }, // Same domain
    ];

    // Process first item
    await rateLimiter.processBatch([items[0]], step, processItem);

    // Advance time past domain cooldown
    vi.advanceTimersByTime(250);

    // Reset mock to track calls separately
    mockSleep.mockClear();
    processItem.mockClear();

    // Process second item
    await rateLimiter.processBatch([items[1]], step, processItem);

    // Should not wait for domain cooldown since time has advanced past cooldown period
    expect(mockSleep).not.toHaveBeenCalled();
    expect(processItem).toHaveBeenCalledTimes(1);
  });

  it('should only wait for cooldowns of domains with pending items', async () => {
    // First, process items from two different domains
    await rateLimiter.processBatch(
      [
        { id: 1, url: 'https://domain1.com/page1' },
        { id: 2, url: 'https://domain2.com/page1' },
      ],
      step,
      processItem
    );

    // Reset mocks
    mockSleep.mockClear();
    processItem.mockClear();

    // Advance time partially through cooldown period
    vi.advanceTimersByTime(50);

    // Set up domain1 with a much longer remaining cooldown (by manipulating lastDomainAccess)
    // @ts-expect-error accessing private property for testing
    rateLimiter.lastDomainAccess.set('domain1.com', Date.now());

    // Now process only domain2 item
    await rateLimiter.processBatch([{ id: 3, url: 'https://domain2.com/page2' }], step, processItem);

    // Should wait for domain2's cooldown (150ms) not domain1's longer cooldown (200ms)
    expect(mockSleep).toHaveBeenCalledWith(expect.stringContaining('waiting for domain cooldown'), expect.any(Number));

    // Verify the wait time is for domain2 not domain1
    const sleepTime = mockSleep.mock.calls[0][1];
    expect(sleepTime).toBe(500); // There's a minimum 500ms wait time enforced in the code

    // Should eventually process the item
    expect(processItem).toHaveBeenCalledTimes(1);
  });
});