import { DurableObject } from 'cloudflare:workers'; import { $data_sources, $ingested_items, DataSourceConfigWrapper, eq } from '@meridian/database'; import { type Result, ResultAsync, err, ok } from 'neverthrow'; import { z } from 'zod'; import type { Env } from '../index'; import { Logger } from '../lib/logger'; import { parseRSSFeed } from '../lib/parsers'; import { tryCatchAsync } from '../lib/tryCatchAsync'; import { getDb } from '../lib/utils'; import { userAgents } from '../lib/utils'; // Import RSS config schema const RssSourceConfigV1 = z.object({ url: z.string().url(), rss_paywall: z.boolean().optional().default(false), config_schema_version: z.literal('1.0'), }); /** * Schema for validating DataSourceState * Used to ensure state hasn't been corrupted before operating on it */ const DataSourceStateSchema = z.object({ dataSourceId: z.number().int().positive(), sourceType: z.string(), config: DataSourceConfigWrapper, // Assuming RSS for now configVersionHash: z.string().nullable(), scrapeFrequencyTier: z.number().int().positive(), lastChecked: z.number().nullable(), }); /** * State interface for managing data source scraping configuration and status */ type DataSourceState = z.infer; const tierIntervals = { 1: 60 * 60 * 1000, // Tier 1: Check every hour 2: 4 * 60 * 60 * 1000, // Tier 2: Check every 4 hours 3: 6 * 60 * 60 * 1000, // Tier 3: Check every 6 hours 4: 24 * 60 * 60 * 1000, // Tier 4: Check every 24 hours }; const DEFAULT_INTERVAL = tierIntervals[2]; // Default to 4 hours if tier is invalid // --- Retry Configuration --- const MAX_STEP_RETRIES = 3; // Max retries for *each* step (fetch, parse, insert) const INITIAL_RETRY_DELAY_MS = 500; // Start delay, doubles each time /** * Executes an operation with exponential backoff retries * * @param operation Function that returns a Promise to execute with retries * @param maxRetries Maximum number of retry attempts * @param initialDelayMs Initial delay between retries in milliseconds (doubles each retry) * @param logger Logger instance to record retry attempts and failures * @returns Result object from either a successful operation or the last failed attempt * * @template T Success value type * @template E Error type, must extend Error */ async function attemptWithRetries( operation: () => Promise>, maxRetries: number, initialDelayMs: number, logger: Logger ): Promise> { let lastError: E | undefined; for (let attempt = 1; attempt <= maxRetries; attempt++) { logger.debug(`Attempt ${attempt}/${maxRetries}...`); const result = await operation(); if (result.isOk()) { logger.debug(`Attempt ${attempt} successful.`); return ok(result.value); // Return successful result immediately } lastError = result.error; // Store the error logger.warn( `Attempt ${attempt} failed.`, { error_name: lastError.name, error_message: lastError.message }, lastError ); // If not the last attempt, wait before retrying if (attempt < maxRetries) { const delay = initialDelayMs * 2 ** (attempt - 1); logger.debug('Waiting before next attempt.', { delay_ms: delay }); await new Promise(resolve => setTimeout(resolve, delay)); } } // If loop finishes, all retries failed logger.error('Failed after max attempts.', { max_retries: maxRetries }, lastError); // biome-ignore lint/style/noNonNullAssertion: return err(lastError!); } /** * Durable Object for periodically scraping RSS feeds from various sources * * This DO handles: * - Scheduled scraping of RSS sources based on frequency tiers * - Fetching and parsing RSS content * - Extracting and storing new articles * - Sending new articles to a processing queue * - Managing state across executions * - Handling failures with retries */ export class DataSourceIngestorDO extends DurableObject { private logger: Logger; /** * Initializes the DO with logging * * @param ctx Durable Object state context * @param env Application environment */ constructor(ctx: DurableObjectState, env: Env) { super(ctx, env); this.logger = new Logger({ durable_object: 'SourceScraperDO', do_id: this.ctx.id.toString() }); this.logger.info('DO initialized'); } /** * Initializes the scraper with data source data and sets up the initial alarm * * @param dataSourceData Data source configuration including ID, source type, config, config version hash, and scrape frequency tier * @throws Error if initialization fails */ async initialize(dataSourceData: { id: number; source_type: string; config: unknown; config_version_hash: string | null; scrape_frequency_tier: number; }): Promise { const logger = this.logger.child({ operation: 'initialize', dataSourceId: dataSourceData.id, source_type: dataSourceData.source_type, }); logger.info('Initializing with data', { data_source_data: dataSourceData }); const sourceExistsResult = await ResultAsync.fromPromise( getDb(this.env.HYPERDRIVE).query.$data_sources.findFirst({ where: (s, { eq }) => eq(s.id, dataSourceData.id) }), e => new Error(`Database query failed: ${e}`) ); if (sourceExistsResult.isErr()) { logger.error('Failed to query DB for data source', undefined, sourceExistsResult.error); throw sourceExistsResult.error; // Rethrow DB error } if (!sourceExistsResult.value) { logger.warn( "Data source doesn't exist in DB. This is likely due to a race condition where the source was deleted after being queued for initialization." ); // Instead of throwing, we'll just return without setting up the DO return; } // Parse the config JSONB using the appropriate Zod schema (assuming RSS for now) const parsedConfig = DataSourceConfigWrapper.safeParse(dataSourceData.config); if (!parsedConfig.success) { logger.error('Failed to parse RSS config', { config: dataSourceData.config, error: parsedConfig.error }); throw new Error(`Invalid RSS config: ${parsedConfig.error.message}`); } const state: DataSourceState = { dataSourceId: dataSourceData.id, sourceType: dataSourceData.source_type, config: parsedConfig.data, configVersionHash: dataSourceData.config_version_hash, scrapeFrequencyTier: dataSourceData.scrape_frequency_tier, lastChecked: null, }; // Add retry logic for storage operations let putSuccess = false; for (let i = 0; i < 3 && !putSuccess; i++) { try { await this.ctx.storage.put('state', state); putSuccess = true; logger.info('Initialized state successfully.'); } catch (storageError) { logger.warn(`Attempt ${i + 1} to put state failed`, undefined, storageError as Error); if (i < 2) await new Promise(res => setTimeout(res, 200 * (i + 1))); // Exponential backoff } } if (!putSuccess) { logger.error('Failed to put initial state after retries. DO may be unstable.'); throw new Error('Failed to persist initial DO state.'); } try { // Update the data source's do_initialized_at field await getDb(this.env.HYPERDRIVE) .update($data_sources) .set({ do_initialized_at: new Date() }) .where(eq($data_sources.id, dataSourceData.id)); } catch (dbError) { logger.error('Failed to update data source do_initialized_at', undefined, dbError as Error); throw new Error( `Failed to update data source initialization status: ${dbError instanceof Error ? dbError.message : String(dbError)}` ); } try { // Only set alarm if state was successfully stored await this.ctx.storage.setAlarm(Date.now() + 5000); logger.info('Initial alarm set.'); } catch (alarmError) { logger.error('Failed to set initial alarm', undefined, alarmError as Error); throw new Error( `Failed to set initial alarm: ${alarmError instanceof Error ? alarmError.message : String(alarmError)}` ); } } /** * Alarm handler that performs the scheduled data source scraping * * This method is triggered by the DO alarm and: * 1. Retrieves and validates the DataSourceState from storage * 2. Checks for config changes by comparing config version hashes * 3. Dispatches to the appropriate processing method based on source type * 4. Schedules the next alarm */ async alarm(): Promise { // Keep logger instance outside try block if possible, // but create child logger inside if needed after state is fetched. const alarmLogger = this.logger.child({ operation: 'alarm' }); // Initial logger try { // 1. Retrieve the DataSourceState from storage const state = await this.ctx.storage.get('state'); if (state === undefined) { this.logger.error('State not found in alarm. Cannot proceed.'); // Maybe schedule alarm far in the future or log an error to an external system // We cannot proceed without state. return; } // Validate state to protect against corruption const validatedState = DataSourceStateSchema.safeParse(state); if (validatedState.success === false) { const logger = this.logger.child({ operation: 'alarm', validation_error: validatedState.error.format() }); logger.error('State validation failed. Cannot proceed with corrupted state.'); // Schedule a far-future alarm to prevent continuous failed attempts await this.ctx.storage.setAlarm(Date.now() + 24 * 60 * 60 * 1000); // 24 hours return; } let currentState = validatedState.data; const { dataSourceId, scrapeFrequencyTier } = currentState; const alarmLogger = this.logger.child({ operation: 'alarm', dataSourceId }); alarmLogger.info('Alarm triggered'); // 2. Check config hash/timestamp logic const configCheckLogger = alarmLogger.child({ step: 'Config Check' }); const dbConfigResult = await ResultAsync.fromPromise( getDb(this.env.HYPERDRIVE).query.$data_sources.findFirst({ where: (ds, { eq }) => eq(ds.id, dataSourceId), columns: { config: true, config_version_hash: true }, }), e => new Error(`Database query failed: ${e}`) ); if (dbConfigResult.isErr()) { configCheckLogger.error('Failed to query DB for config check', undefined, dbConfigResult.error); return; } if (!dbConfigResult.value) { configCheckLogger.warn('Data source no longer exists in DB. Stopping processing.'); return; } const dbConfig = dbConfigResult.value; // Check if config has changed if (dbConfig.config_version_hash !== currentState.configVersionHash) { configCheckLogger.info('Config version hash changed, re-initializing', { old_hash: currentState.configVersionHash, new_hash: dbConfig.config_version_hash, }); // Parse the new config using the appropriate Zod schema const newParsedConfig = DataSourceConfigWrapper.safeParse(dbConfig.config); if (!newParsedConfig.success) { configCheckLogger.error('Failed to parse new RSS config', { config: dbConfig.config, error: newParsedConfig.error, }); return; } // Update internal state with new config currentState = { ...currentState, config: newParsedConfig.data, configVersionHash: dbConfig.config_version_hash, }; // Persist the updated state await this.ctx.storage.put('state', currentState); configCheckLogger.info('State updated with new config'); } else { configCheckLogger.debug('Config version hash unchanged, proceeding with current config'); } const interval = tierIntervals[scrapeFrequencyTier as keyof typeof tierIntervals] || DEFAULT_INTERVAL; // --- Schedule the *next* regular alarm run immediately --- // This ensures that even if this current run fails completely after all retries, // the process will attempt again later according to its schedule. const nextScheduledAlarmTime = Date.now() + interval; await this.ctx.storage.setAlarm(nextScheduledAlarmTime); alarmLogger.info('Next regular alarm scheduled', { next_alarm: new Date(nextScheduledAlarmTime).toISOString() }); // 3. Dispatcher logic based on source type if (currentState.sourceType === 'RSS') { await this._fetchAndProcessRss(currentState, alarmLogger); } else { alarmLogger.error('Unsupported source type', { sourceType: currentState.sourceType }); return; } } catch (error) { // Use the latest available logger instance (might be base or detailed) const errorLogger = alarmLogger || this.logger; errorLogger.error( 'Unhandled exception occurred within alarm handler', { error_name: error instanceof Error ? error.name : 'UnknownError' }, error instanceof Error ? error : new Error(String(error)) // Log the error object/stack ); } } /** * Private method to fetch and process RSS feeds * * @param state Current data source state containing RSS config * @param logger Logger instance for this operation */ private async _fetchAndProcessRss(state: DataSourceState, logger: Logger): Promise { const { dataSourceId, config } = state; // --- Workflow Step 1: Fetch Feed with Retries --- const fetchLogger = logger.child({ step: 'Fetch' }); const fetchResult = await attemptWithRetries( async () => { const respResult = await tryCatchAsync( fetch(config.config.url, { method: 'GET', headers: { 'User-Agent': userAgents[Math.floor(Math.random() * userAgents.length)], Referer: 'https://www.google.com/', }, }) ); if (respResult.isErr()) return err(respResult.error as Error); // Ensure response is OK before trying to read body if (respResult.value.ok === false) { return err(new Error(`Fetch failed with status: ${respResult.value.status} ${respResult.value.statusText}`)); } // Read body - this can also fail const textResult = await tryCatchAsync(respResult.value.text()); if (textResult.isErr()) return err(textResult.error as Error); return ok(textResult.value); }, MAX_STEP_RETRIES, INITIAL_RETRY_DELAY_MS, fetchLogger ); if (fetchResult.isErr()) { // Error already logged by attemptWithRetries return; } const feedText = fetchResult.value; // --- Workflow Step 2: Parse Feed with Retries --- const parseLogger = logger.child({ step: 'Parse' }); const parseResult = await attemptWithRetries( async () => parseRSSFeed(feedText), MAX_STEP_RETRIES, INITIAL_RETRY_DELAY_MS, parseLogger ); if (parseResult.isErr()) { // Error already logged by attemptWithRetries return; } const articles = parseResult.value; // Type: ParsedArticle[] // --- Process Articles and Store Raw Data in R2 --- const now = Date.now(); const ageThreshold = now - 48 * 60 * 60 * 1000; // 48 hours ago const articlesToInsert: Omit[] = []; const r2Logger = logger.child({ step: 'R2 Storage' }); for (const article of articles) { // 1a. Store raw RSS item object in R2 // Note: parseRSSFeed returns items with id, title, link, pubDate // The parseRSSFeed function actually returns items with an 'id' field, but the type doesn't reflect this const itemIdFromSource = (article as { id?: string; title: string; link: string; pubDate: Date | null }).id || article.link; const r2Key = `raw_items/${dataSourceId}/${itemIdFromSource}_${now}.json`; try { await this.env.ARTICLES_BUCKET.put(r2Key, JSON.stringify(article), { httpMetadata: { contentType: 'application/json', }, }); r2Logger.debug('Stored raw RSS item in R2', { r2_key: r2Key, item_id: itemIdFromSource }); } catch (r2Error) { r2Logger.error( 'Failed to store raw RSS item in R2', { r2_key: r2Key, item_id: itemIdFromSource }, r2Error as Error ); // Continue processing even if R2 storage fails } // 2. Prepare data for insertion into $ingested_items const publishTimestamp = article.pubDate ? article.pubDate.getTime() : 0; articlesToInsert.push({ data_source_id: dataSourceId, item_id_from_source: itemIdFromSource, raw_data_r2_key: r2Key, url_to_original: article.link, published_at: article.pubDate, display_title: article.title, status: 'NEW', ingested_at: new Date(now), // Other fields like content_body_text, content_body_r2_key, embedding, analysis_payload are NULL/undefined }); } if (articlesToInsert.length === 0) { logger.info('No articles found in feed'); // Successfully processed, update lastChecked const updatedState = { ...state, lastChecked: now }; await this.ctx.storage.put('state', updatedState); logger.info('Updated lastChecked', { timestamp: new Date(now).toISOString() }); // Update data source lastChecked in database with retries const sourceUpdateLogger = logger.child({ step: 'Source Update' }); const sourceUpdateResult = await attemptWithRetries( async () => ResultAsync.fromPromise( getDb(this.env.HYPERDRIVE) .update($data_sources) .set({ lastChecked: new Date(now) }) .where(eq($data_sources.id, dataSourceId)), e => (e instanceof Error ? e : new Error(`Source update failed: ${String(e)}`)) ), MAX_STEP_RETRIES, INITIAL_RETRY_DELAY_MS, sourceUpdateLogger ); if (sourceUpdateResult.isErr()) { sourceUpdateLogger.error('Failed to update data source lastChecked after all retries'); return; } sourceUpdateLogger.info('Updated data source lastChecked in database'); return; } logger.info('Processed articles from feed', { total_articles: articlesToInsert.length, }); // --- 3. Batch insert into $ingested_items with conflict handling --- const dbLogger = logger.child({ step: 'DB Insert' }); const insertResult = await attemptWithRetries( async () => ResultAsync.fromPromise( getDb(this.env.HYPERDRIVE) .insert($ingested_items) .values(articlesToInsert) .onConflictDoNothing({ target: [$ingested_items.data_source_id, $ingested_items.item_id_from_source] }) .returning({ insertedId: $ingested_items.id, itemId: $ingested_items.item_id_from_source }), e => (e instanceof Error ? e : new Error(`DB Insert failed: ${String(e)}`)) // Error mapper ), MAX_STEP_RETRIES, INITIAL_RETRY_DELAY_MS, dbLogger ); if (insertResult.isErr()) { // Error already logged by attemptWithRetries return; } const insertedRows = insertResult.value; // Type: { insertedId: number, itemId: string }[] dbLogger.info('DB Insert completed', { affected_rows: insertedRows.length }); // 4. Retrieve IDs of newly inserted items (onConflictDoNothing means only new items are returned) const newlyInsertedIds = insertedRows.map(row => row.insertedId); // --- 5. Send only newly inserted item IDs to queue --- if (newlyInsertedIds.length > 0 && this.env.ARTICLE_PROCESSING_QUEUE) { const BATCH_SIZE_LIMIT = 100; // Adjust as needed const queueLogger = logger.child({ step: 'Queue', total_ids_to_queue: newlyInsertedIds.length }); queueLogger.info('Sending newly inserted item IDs to queue'); for (let i = 0; i < newlyInsertedIds.length; i += BATCH_SIZE_LIMIT) { const batch = newlyInsertedIds.slice(i, i + BATCH_SIZE_LIMIT); queueLogger.debug('Sending batch to queue', { batch_size: batch.length, batch_index: i / BATCH_SIZE_LIMIT }); this.ctx.waitUntil( this.env.ARTICLE_PROCESSING_QUEUE.send({ ingested_item_ids: batch }).catch(queueError => { queueLogger.error( 'Failed to send batch to queue', { batch_index: i / BATCH_SIZE_LIMIT, batch_size: batch.length }, queueError instanceof Error ? queueError : new Error(String(queueError)) ); }) ); } } // --- Final Step: Update lastChecked only on full success --- logger.info('All steps successful. Updating lastChecked'); const updatedState = { ...state, lastChecked: now }; await this.ctx.storage.put('state', updatedState); logger.info('Updated lastChecked', { timestamp: new Date(now).toISOString() }); // Update data source lastChecked in database with retries const sourceUpdateLogger = logger.child({ step: 'Source Update' }); const sourceUpdateResult = await attemptWithRetries( async () => ResultAsync.fromPromise( getDb(this.env.HYPERDRIVE) .update($data_sources) .set({ lastChecked: new Date(now) }) .where(eq($data_sources.id, dataSourceId)), e => (e instanceof Error ? e : new Error(`Source update failed: ${String(e)}`)) ), MAX_STEP_RETRIES, INITIAL_RETRY_DELAY_MS, sourceUpdateLogger ); if (sourceUpdateResult.isErr()) { sourceUpdateLogger.error('Failed to update data source lastChecked after all retries'); return; } sourceUpdateLogger.info('Updated data source lastChecked in database'); } /** * Handles HTTP requests to manage the scraper * * Supports endpoints: * - /trigger: Manually triggers an immediate scrape * - /status: Returns the current state and next alarm time * - /delete: Deletes the DO * - /initialize: Sets up the scraper with a new source configuration * * @param request The incoming HTTP request * @returns HTTP response with appropriate status and data */ async fetch(request: Request): Promise { const url = new URL(request.url); const fetchLogger = this.logger.child({ operation: 'fetch', method: request.method, path: url.pathname }); fetchLogger.info('Received fetch request'); if (url.pathname === '/trigger') { fetchLogger.info('Manual trigger received'); await this.ctx.storage.setAlarm(Date.now()); // Trigger alarm soon return new Response('Alarm set'); } if (url.pathname === '/status') { fetchLogger.info('Status request received'); const state = await this.ctx.storage.get('state'); const alarm = await this.ctx.storage.getAlarm(); return Response.json({ state: state || { error: 'State not initialized' }, nextAlarmTimestamp: alarm, }); } if (url.pathname === '/delete' && request.method === 'DELETE') { fetchLogger.info('Delete request received'); try { await this.destroy(); fetchLogger.info('DO successfully destroyed'); return new Response('Deleted', { status: 200 }); } catch (error) { fetchLogger.error('Failed to destroy DO', undefined, error instanceof Error ? error : new Error(String(error))); return new Response(`Failed to delete: ${error instanceof Error ? error.message : String(error)}`, { status: 500, }); } } else if (url.pathname === '/initialize' && request.method === 'POST') { fetchLogger.info('Initialize request received'); const dataSourceDataResult = await tryCatchAsync( request.json<{ id: number; source_type: string; config: unknown; config_version_hash: string | null; scrape_frequency_tier: number; }>() ); if (dataSourceDataResult.isErr()) { const error = dataSourceDataResult.error instanceof Error ? dataSourceDataResult.error : new Error(String(dataSourceDataResult.error)); fetchLogger.error('Initialization failed via fetch', undefined, error); return new Response(`Initialization failed: ${error.message}`, { status: 500 }); } const dataSourceData = dataSourceDataResult.value; if ( !dataSourceData || typeof dataSourceData.id !== 'number' || typeof dataSourceData.source_type !== 'string' || typeof dataSourceData.scrape_frequency_tier !== 'number' ) { fetchLogger.warn('Invalid data source data format received', { received_data: dataSourceData }); return new Response('Invalid data source data format', { status: 400 }); } try { await this.initialize(dataSourceData); fetchLogger.info('Initialization successful via API'); return new Response('Initialized'); } catch (error) { fetchLogger.error( 'Initialization failed', undefined, error instanceof Error ? error : new Error(String(error)) ); return new Response(`Initialization failed: ${error instanceof Error ? error.message : String(error)}`, { status: 500, }); } } fetchLogger.warn('Path not found'); return new Response('Not found', { status: 404 }); } /** * Cleanup method called when the DO is about to be destroyed * Removes all stored state */ async destroy() { this.logger.info('Destroy called, deleting storage'); const state = await this.ctx.storage.get('state'); if (state?.dataSourceId) { // Clear the do_initialized_at field when DO is destroyed await getDb(this.env.HYPERDRIVE) .update($data_sources) .set({ do_initialized_at: null }) .where(eq($data_sources.id, state.dataSourceId)); } await this.ctx.storage.deleteAll(); this.logger.info('Storage deleted'); } }