meridian-ml-service / apps /backend /src /workflows /processIngestedItem.workflow.ts
yunlonggong's picture
Initial project upload
1b44660
import { WorkflowEntrypoint, type WorkflowEvent, type WorkflowStep, type WorkflowStepConfig } from 'cloudflare:workers';
import {
$data_sources,
$ingested_items,
and,
eq,
gte,
inArray,
isNull,
type DataSourceConfigWrapper,
} from '@meridian/database';
import { err, ok } from 'neverthrow';
import { ResultAsync } from 'neverthrow';
import type { Env } from '../index';
import { getArticleWithBrowser, getArticleWithFetch } from '../lib/articleFetchers';
import { createEmbeddings } from '../lib/embeddings';
import { Logger } from '../lib/logger';
import { DomainRateLimiter } from '../lib/rateLimiter';
import { getDb } from '../lib/utils';
import type { z } from 'zod';
import { getArticleRepresentationPrompt } from '../prompts/articleRepresentation.prompt';
import { createGoogleGenerativeAI, google } from '@ai-sdk/google';
import { generateText } from 'ai';
const dbStepConfig: WorkflowStepConfig = {
retries: { limit: 3, delay: '1 second', backoff: 'linear' },
timeout: '5 seconds',
};
/**
* Parameters for the ProcessArticles workflow
*/
export type ProcessArticlesParams = { ingested_item_ids: number[] };
const workflowLogger = new Logger({ workflow: 'ProcessArticles' });
/**
* Workflow that processes articles by fetching their content, extracting text with Readability,
* generating embeddings, and storing the results.
*
* This workflow handles:
* - Source type dispatching (RSS, etc.)
* - Fetching article content with appropriate rate limiting
* - Domain-specific fetching strategies (browser vs. simple fetch)
* - Mozilla Readability-based content extraction
* - 10KB threshold for content storage (DB vs R2)
* - Embedding generation for search
* - Persistent storage in database and object storage
* - Error handling and status tracking
*/
export class ProcessIngestedItemWorkflow extends WorkflowEntrypoint<Env, ProcessArticlesParams> {
/**
* Main workflow execution method that processes a batch of articles
*
* @param _event Workflow event containing article IDs to process
* @param step Workflow step context for creating durable operations
*/
async run(_event: WorkflowEvent<ProcessArticlesParams>, step: WorkflowStep) {
const env = this.env;
const db = getDb(env.HYPERDRIVE);
const google = createGoogleGenerativeAI({
apiKey: env.GEMINI_API_KEY,
baseURL: env.GEMINI_BASE_URL || 'https://generativelanguage.googleapis.com/v1beta',
});
const logger = workflowLogger.child({
workflow_id: _event.instanceId,
initial_article_count: _event.payload.ingested_item_ids.length,
});
logger.info('Starting workflow run');
const articles = await step.do('get articles', dbStepConfig, async () =>
db
.select({
id: $ingested_items.id,
url: $ingested_items.url_to_original,
title: $ingested_items.display_title,
publishedAt: $ingested_items.published_at,
sourceType: $data_sources.source_type,
config: $data_sources.config,
})
.from($ingested_items)
.innerJoin($data_sources, eq($ingested_items.data_source_id, $data_sources.id))
.where(
and(
// only process articles that haven't been processed yet
isNull($ingested_items.processed_at),
// only process articles that have a publish date in the last 48 hours
gte($ingested_items.published_at, new Date(new Date().getTime() - 48 * 60 * 60 * 1000)),
// only articles that have not failed
isNull($ingested_items.fail_reason),
// MAIN FILTER: only articles that need to be processed
inArray($ingested_items.id, _event.payload.ingested_item_ids)
)
)
);
const fetchLogger = logger.child({ articles_count: articles.length });
fetchLogger.info('Fetching article contents');
// Create rate limiter with article processing specific settings
const rateLimiter = new DomainRateLimiter<{
id: number;
url: string;
title: string | null;
publishedAt: Date | null;
sourceType: 'RSS';
config: z.infer<typeof DataSourceConfigWrapper>;
}>({ maxConcurrent: 8, globalCooldownMs: 1_000, domainCooldownMs: 5_000 });
// Process articles with rate limiting and source type dispatcher
const articlesToProcess: Array<{
id: number;
title: string;
url: string;
contentBodyText: string;
contentBodyR2Key: string | null;
wordCount: number;
publishedTime?: string;
}> = [];
const articleResults = await rateLimiter.processBatch(articles, step, async article => {
const scrapeLogger = fetchLogger.child({ article_id: article.id, source_type: article.sourceType });
// Skip PDFs immediately
if (article.url.toLowerCase().endsWith('.pdf')) {
scrapeLogger.info('Skipping PDF article');
// Update the article status to mark it as skipped PDF
await step.do(`mark PDF article ${article.id} as skipped`, dbStepConfig, async () => {
return db
.update($ingested_items)
.set({
status: 'SKIPPED_PDF',
processed_at: new Date(),
fail_reason: 'PDF article - cannot process',
})
.where(eq($ingested_items.id, article.id));
});
return { id: article.id, success: false, error: 'pdf' };
}
// Dispatcher based on source type
if (article.sourceType === 'RSS') {
return await this._processRSSArticle(article, scrapeLogger, step, env);
}
scrapeLogger.error('Unsupported source type', { source_type: article.sourceType });
return { id: article.id, success: false, error: `Unsupported source type: ${article.sourceType}` };
});
// Handle results
let successCount = 0;
let failCount = 0;
const dbUpdateLogger = fetchLogger.child({ results_count: articleResults.length });
for (const result of articleResults) {
const articleLogger = dbUpdateLogger.child({ article_id: result.id });
if (result.success && 'processedContent' in result) {
successCount++;
articlesToProcess.push({
id: result.id,
title: result.processedContent.title,
url: result.processedContent.url,
contentBodyText: result.processedContent.contentBodyText,
contentBodyR2Key: result.processedContent.contentBodyR2Key,
wordCount: result.processedContent.wordCount,
publishedTime: result.processedContent.publishedTime,
});
await step.do(`update db for successful article ${result.id}`, dbStepConfig, async () => {
articleLogger.debug('Updating article status to CONTENT_FETCHED');
return db
.update($ingested_items)
.set({
status: 'PROCESSED',
usedBrowser: result.used_browser,
})
.where(eq($ingested_items.id, result.id));
});
} else {
failCount++;
// update failed articles in DB with the fail reason
await step.do(`update db for failed article ${result.id}`, dbStepConfig, async () => {
const failReason = result.error ? String(result.error) : 'Unknown error';
const status = result.error?.includes('render') ? 'FAILED_RENDER' : 'FAILED_FETCH';
articleLogger.warn('Marking article as failed during content fetch', {
fail_reason: failReason,
status,
});
return db
.update($ingested_items)
.set({
processed_at: new Date(),
fail_reason: failReason,
status: status,
})
.where(eq($ingested_items.id, result.id));
});
}
}
const processingLogger = logger.child({
processing_batch_size: articlesToProcess.length,
fetch_success_count: successCount,
fetch_fail_count: failCount,
});
processingLogger.info('Processing articles with content extraction and embeddings');
// process articles for embeddings
const analysisResults = await Promise.allSettled(
articlesToProcess.map(async article => {
const articleLogger = processingLogger.child({ article_id: article.id });
articleLogger.info('Generating article representation');
// Analyze article
const articleRepresentation = await step.do(
`analyze article ${article.id}`,
{ retries: { limit: 3, delay: '2 seconds', backoff: 'exponential' }, timeout: '1 minute' },
async () => {
const response = await generateText({
model: google('gemini-2.0-flash-001'),
temperature: 0,
prompt: getArticleRepresentationPrompt(article.title, article.url, article.contentBodyText),
});
return response.text;
}
);
articleLogger.info('Embedding article representation');
// Generate embeddings (no need to upload to R2 as it's already handled in processing)
const embeddingResult = await step.do(`generate embeddings for article ${article.id}`, async () => {
articleLogger.info('Generating embeddings');
const embeddings = await createEmbeddings(env, [articleRepresentation]);
if (embeddings.isErr()) throw embeddings.error;
return embeddings.value[0];
});
// handle results in a separate step
await step.do(`update article ${article.id} status`, async () =>
db
.update($ingested_items)
.set({
processed_at: new Date(),
display_title: article.title,
content_body_text: article.contentBodyText,
content_body_r2_key: article.contentBodyR2Key,
embedding: embeddingResult,
embedding_text: articleRepresentation,
status: 'PROCESSED',
word_count: article.wordCount,
})
.where(eq($ingested_items.id, article.id))
);
articleLogger.info('Article processed successfully');
return { id: article.id, success: true };
})
);
const successfulAnalyses = analysisResults.filter(
(result): result is PromiseFulfilledResult<{ id: number; success: true }> =>
result.status === 'fulfilled' && result.value.success
).length;
const failedAnalyses = analysisResults.filter(
result => result.status === 'rejected' || (result.status === 'fulfilled' && !result.value.success)
).length;
logger.info('Workflow completed', {
total_articles: articlesToProcess.length,
successful_analyses: successfulAnalyses,
failed_analyses: failedAnalyses,
});
}
/**
* Processes RSS articles by fetching HTML content and using Readability for extraction
*/
private async _processRSSArticle(
article: {
id: number;
url: string;
title: string | null;
publishedAt: Date | null;
sourceType: 'RSS';
config: z.infer<typeof DataSourceConfigWrapper>;
},
scrapeLogger: Logger,
step: WorkflowStep,
env: Env
) {
scrapeLogger.info('Processing RSS article');
// This will contain either a successful result or a controlled error
// biome-ignore lint/suspicious/noImplicitAnyLet: <explanation>
let result;
try {
result = await step.do(
`scrape RSS article ${article.id}`,
{ retries: { limit: 3, delay: '2 second', backoff: 'exponential' }, timeout: '2 minutes' },
async () => {
// During retries, let errors bubble up naturally
if (article.config.config.rss_paywall === true) {
scrapeLogger.info('Using browser to fetch article (tricky domain)');
const browserResult = await getArticleWithBrowser(env, article.url);
if (browserResult.isErr()) throw browserResult.error.error;
return {
id: article.id,
success: true,
parsedContent: browserResult.value,
used_browser: true,
};
}
scrapeLogger.info('Attempting fetch-first approach');
const fetchResult = await getArticleWithFetch(article.url);
if (!fetchResult.isErr()) {
return {
id: article.id,
success: true,
parsedContent: fetchResult.value,
used_browser: false,
};
}
// Fetch failed, try browser with jitter
scrapeLogger.info('Fetch failed, falling back to browser');
const jitterTime = Math.random() * 2500 + 500;
await step.sleep('jitter', jitterTime);
const browserResult = await getArticleWithBrowser(env, article.url);
if (browserResult.isErr()) throw browserResult.error.error;
return {
id: article.id,
success: true,
parsedContent: browserResult.value,
used_browser: true,
};
}
);
} catch (error) {
scrapeLogger.error(
'Failed to scrape RSS article',
{ error: error instanceof Error ? error.message : String(error) },
error instanceof Error ? error : new Error(String(error))
);
// After all retries failed, return a structured error
return {
id: article.id,
success: false,
error: error instanceof Error ? error.message : String(error) || 'exhausted all retries',
};
}
// Apply 10KB threshold logic
const CONTENT_SIZE_THRESHOLD = 10240; // 10KB
const fullText = result.parsedContent.text;
const wordCount = fullText.split(' ').length;
let contentBodyText: string;
let contentBodyR2Key: string | null = null;
if (fullText.length <= CONTENT_SIZE_THRESHOLD) {
// Store full text in DB
contentBodyText = fullText;
} else {
// Store first 10KB in DB with truncation indicator, full text in R2
contentBodyText = `${fullText.substring(0, CONTENT_SIZE_THRESHOLD)}...`;
// Store full content in R2
const date = result.parsedContent.publishedTime ? new Date(result.parsedContent.publishedTime) : new Date();
const r2Key = `processed_content/${date.getUTCFullYear()}/${date.getUTCMonth() + 1}/${date.getUTCDate()}/${article.id}.txt`;
try {
await env.ARTICLES_BUCKET.put(r2Key, fullText);
contentBodyR2Key = r2Key;
scrapeLogger.info('Stored full content in R2', { r2_key: r2Key, content_length: fullText.length });
} catch (r2Error) {
scrapeLogger.error('Failed to store content in R2', { r2_key: r2Key }, r2Error as Error);
// Continue with truncated content in DB only
}
}
return {
id: article.id,
success: true,
processedContent: {
title: result.parsedContent.title,
contentBodyText,
contentBodyR2Key,
url: article.url,
wordCount,
publishedTime: result.parsedContent.publishedTime,
},
used_browser: result.used_browser,
};
}
}
/**
* Starts a new ProcessArticles workflow instance with the provided article IDs
*
* @param env Application environment
* @param params Parameters containing the list of article IDs to process
* @returns Result containing either the created workflow or an error
*/
export async function startProcessArticleWorkflow(env: Env, params: ProcessArticlesParams) {
const workflow = await ResultAsync.fromPromise(
env.PROCESS_INGESTED_ITEM.create({ id: crypto.randomUUID(), params }),
e => (e instanceof Error ? e : new Error(String(e)))
);
if (workflow.isErr()) return err(workflow.error);
return ok(workflow.value);
}