File size: 3,348 Bytes
1b44660
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import app from './app';
import { DataSourceIngestorDO } from './durable_objects/dataSourceIngestorDO';
import { Logger } from './lib/logger';
import { type ProcessArticlesParams, startProcessArticleWorkflow } from './workflows/processIngestedItem.workflow';

export type Env = {
  // Bindings
  ARTICLES_BUCKET: R2Bucket;
  ARTICLE_PROCESSING_QUEUE: Queue<ProcessArticlesParams>;
  DATA_SOURCE_INGESTOR: DurableObjectNamespace<DataSourceIngestorDO>;
  PROCESS_INGESTED_ITEM: Workflow<ProcessArticlesParams>;
  HYPERDRIVE: Hyperdrive;

  // Secrets
  API_TOKEN: string;

  AXIOM_DATASET: string | undefined; // optional, use if you want to send logs to axiom
  AXIOM_TOKEN: string | undefined; // optional, use if you want to send logs to axiom

  CLOUDFLARE_API_TOKEN: string;
  CLOUDFLARE_ACCOUNT_ID: string;

  DATABASE_URL: string;

  GEMINI_API_KEY: string;
  GEMINI_BASE_URL: string;

  MERIDIAN_ML_SERVICE_URL: string;
  MERIDIAN_ML_SERVICE_API_KEY: string;
};

// Create a base logger for the queue handler
const queueLogger = new Logger({ service: 'article-queue-handler' });

export default {
  fetch: app.fetch,
  async queue(batch: MessageBatch<unknown>, env: Env): Promise<void> {
    const batchLogger = queueLogger.child({ batch_size: batch.messages.length });
    batchLogger.info('Received batch of articles to process');

    const articlesToProcess: number[] = [];
    for (const message of batch.messages) {
      const { ingested_item_ids } = message.body as ProcessArticlesParams;
      batchLogger.debug('Processing message', { message_id: message.id, article_count: ingested_item_ids.length });

      for (const id of ingested_item_ids) {
        articlesToProcess.push(id);
      }
    }

    batchLogger.info('Articles extracted from batch', { total_articles: articlesToProcess.length });

    if (articlesToProcess.length === 0) {
      batchLogger.info('Queue batch was empty, nothing to process');
      batch.ackAll(); // Acknowledge the empty batch
      return;
    }

    // Process articles in chunks of 96
    const CHUNK_SIZE = 96;
    const articleChunks = [];
    for (let i = 0; i < articlesToProcess.length; i += CHUNK_SIZE) {
      articleChunks.push(articlesToProcess.slice(i, i + CHUNK_SIZE));
    }

    batchLogger.info('Split articles into chunks', { chunk_count: articleChunks.length });

    // Process each chunk sequentially
    for (const chunk of articleChunks) {
      const workflowResult = await startProcessArticleWorkflow(env, { ingested_item_ids: chunk });
      if (workflowResult.isErr()) {
        batchLogger.error(
          'Failed to trigger ProcessArticles Workflow',
          { error_message: workflowResult.error.message, chunk_size: chunk.length },
          workflowResult.error
        );
        // Retry the entire batch if Workflow creation failed
        batch.retryAll({ delaySeconds: 30 }); // Retry after 30 seconds
        return;
      }

      batchLogger.info('Successfully triggered ProcessArticles Workflow for chunk', {
        workflow_id: workflowResult.value.id,
        chunk_size: chunk.length,
      });
    }

    batch.ackAll(); // Acknowledge the entire batch after all chunks are processed
  },
} satisfies ExportedHandler<Env>;

export { DataSourceIngestorDO };
export { ProcessIngestedItemWorkflow } from './workflows/processIngestedItem.workflow';