import app from './app'; import { DataSourceIngestorDO } from './durable_objects/dataSourceIngestorDO'; import { Logger } from './lib/logger'; import { type ProcessArticlesParams, startProcessArticleWorkflow } from './workflows/processIngestedItem.workflow'; export type Env = { // Bindings ARTICLES_BUCKET: R2Bucket; ARTICLE_PROCESSING_QUEUE: Queue; DATA_SOURCE_INGESTOR: DurableObjectNamespace; PROCESS_INGESTED_ITEM: Workflow; HYPERDRIVE: Hyperdrive; // Secrets API_TOKEN: string; AXIOM_DATASET: string | undefined; // optional, use if you want to send logs to axiom AXIOM_TOKEN: string | undefined; // optional, use if you want to send logs to axiom CLOUDFLARE_API_TOKEN: string; CLOUDFLARE_ACCOUNT_ID: string; DATABASE_URL: string; GEMINI_API_KEY: string; GEMINI_BASE_URL: string; MERIDIAN_ML_SERVICE_URL: string; MERIDIAN_ML_SERVICE_API_KEY: string; }; // Create a base logger for the queue handler const queueLogger = new Logger({ service: 'article-queue-handler' }); export default { fetch: app.fetch, async queue(batch: MessageBatch, env: Env): Promise { const batchLogger = queueLogger.child({ batch_size: batch.messages.length }); batchLogger.info('Received batch of articles to process'); const articlesToProcess: number[] = []; for (const message of batch.messages) { const { ingested_item_ids } = message.body as ProcessArticlesParams; batchLogger.debug('Processing message', { message_id: message.id, article_count: ingested_item_ids.length }); for (const id of ingested_item_ids) { articlesToProcess.push(id); } } batchLogger.info('Articles extracted from batch', { total_articles: articlesToProcess.length }); if (articlesToProcess.length === 0) { batchLogger.info('Queue batch was empty, nothing to process'); batch.ackAll(); // Acknowledge the empty batch return; } // Process articles in chunks of 96 const CHUNK_SIZE = 96; const articleChunks = []; for (let i = 0; i < articlesToProcess.length; i += CHUNK_SIZE) { articleChunks.push(articlesToProcess.slice(i, i + CHUNK_SIZE)); } batchLogger.info('Split articles into chunks', { chunk_count: articleChunks.length }); // Process each chunk sequentially for (const chunk of articleChunks) { const workflowResult = await startProcessArticleWorkflow(env, { ingested_item_ids: chunk }); if (workflowResult.isErr()) { batchLogger.error( 'Failed to trigger ProcessArticles Workflow', { error_message: workflowResult.error.message, chunk_size: chunk.length }, workflowResult.error ); // Retry the entire batch if Workflow creation failed batch.retryAll({ delaySeconds: 30 }); // Retry after 30 seconds return; } batchLogger.info('Successfully triggered ProcessArticles Workflow for chunk', { workflow_id: workflowResult.value.id, chunk_size: chunk.length, }); } batch.ackAll(); // Acknowledge the entire batch after all chunks are processed }, } satisfies ExportedHandler; export { DataSourceIngestorDO }; export { ProcessIngestedItemWorkflow } from './workflows/processIngestedItem.workflow';