import { zValidator } from '@hono/zod-validator'; import { $data_sources, $ingested_items, eq, isNull } from '@meridian/database'; import { Hono } from 'hono'; import { z } from 'zod'; import type { HonoEnv } from '../app'; import { Logger } from '../lib/logger'; import { tryCatchAsync } from '../lib/tryCatchAsync'; import { getDb, hasValidAuthToken } from '../lib/utils'; const logger = new Logger({ router: 'durable-objects' }); const route = new Hono() // handle DO-specific routes .get( '/source/:sourceId/*', zValidator( 'param', z.object({ sourceId: z.string().min(1, 'Source ID is required'), }) ), async c => { const { sourceId } = c.req.valid('param'); const doId = c.env.DATA_SOURCE_INGESTOR.idFromName(decodeURIComponent(sourceId)); const stub = c.env.DATA_SOURCE_INGESTOR.get(doId); // reconstruct path for the DO const url = new URL(c.req.url); const pathParts = url.pathname.split('/'); const doPath = `/${pathParts.slice(4).join('/')}`; const doUrl = new URL(doPath + url.search, 'http://do'); const doRequest = new Request(doUrl.toString(), c.req.raw); return stub.fetch(doRequest); } ) // admin endpoints .post( '/admin/source/:sourceId/init', zValidator( 'param', z.object({ sourceId: z.string().min(1, 'Source ID is required'), }) ), async c => { // auth check if (!hasValidAuthToken(c)) { return c.json({ error: 'Unauthorized' }, 401); } const initLogger = logger.child({ operation: 'init-source' }); const { sourceId } = c.req.valid('param'); const db = getDb(c.env.HYPERDRIVE); // Get the source first const sourceResult = await tryCatchAsync( db.query.$data_sources.findFirst({ where: eq($data_sources.id, Number(sourceId)), }) ); if (sourceResult.isErr()) { const error = sourceResult.error instanceof Error ? sourceResult.error : new Error(String(sourceResult.error)); initLogger.error('Failed to fetch source', { sourceId }, error); return c.json({ error: 'Failed to fetch source' }, 500); } const source = sourceResult.value; if (!source) { return c.json({ error: 'Source not found' }, 404); } // Initialize the DO const doId = c.env.DATA_SOURCE_INGESTOR.idFromName(source.config.config.url); const stub = c.env.DATA_SOURCE_INGESTOR.get(doId); const initResult = await tryCatchAsync( stub.initialize({ id: source.id, source_type: source.source_type, config: source.config, config_version_hash: source.config_version_hash, scrape_frequency_tier: source.scrape_frequency_minutes, }) ); if (initResult.isErr()) { const error = initResult.error instanceof Error ? initResult.error : new Error(String(initResult.error)); initLogger.error('Failed to initialize source DO', { sourceId, url: source.config.config.url }, error); return c.json({ error: 'Failed to initialize source DO' }, 500); } initLogger.info('Successfully initialized source DO', { sourceId, url: source.config.config.url }); return c.json({ success: true }); } ) .post('/admin/initialize-dos', async c => { // auth check if (!hasValidAuthToken(c)) { return c.json({ error: 'Unauthorized' }, 401); } const initLogger = logger.child({ operation: 'initialize-dos' }); initLogger.info('Initializing SourceScraperDOs from database'); const db = getDb(c.env.HYPERDRIVE); // Get batch size from query params, default to 100 const batchSize = Number(c.req.query('batchSize')) || 100; initLogger.info('Using batch size', { batchSize }); const allSourcesResult = await tryCatchAsync( db .select({ id: $data_sources.id, source_type: $data_sources.source_type, config: $data_sources.config, config_version_hash: $data_sources.config_version_hash, scrape_frequency_tier: $data_sources.scrape_frequency_minutes, }) .from($data_sources) .where(isNull($data_sources.do_initialized_at)) ); if (allSourcesResult.isErr()) { const error = allSourcesResult.error instanceof Error ? allSourcesResult.error : new Error(String(allSourcesResult.error)); initLogger.error('Failed to fetch sources from database', undefined, error); return c.json({ error: 'Failed to fetch sources from database' }, 500); } const allSources = allSourcesResult.value; initLogger.info('Sources fetched from database', { source_count: allSources.length }); // Process sources in batches let processedCount = 0; let successCount = 0; // Create batches of sources const batches = []; for (let i = 0; i < allSources.length; i += batchSize) { batches.push(allSources.slice(i, i + batchSize)); } // Process each batch sequentially for (let batchIndex = 0; batchIndex < batches.length; batchIndex++) { const batch = batches[batchIndex]; initLogger.info('Processing batch', { batchIndex: batchIndex + 1, batchSize: batch.length }); const batchResults = await Promise.all( batch.map(async source => { const sourceLogger = initLogger.child({ source_id: source.id, url: source.config.config.url }); const doId = c.env.DATA_SOURCE_INGESTOR.idFromName(source.config.config.url); const stub = c.env.DATA_SOURCE_INGESTOR.get(doId); sourceLogger.debug('Initializing DO'); const result = await tryCatchAsync(stub.initialize(source)); if (result.isErr()) { const error = result.error instanceof Error ? result.error : new Error(String(result.error)); sourceLogger.error('Failed to initialize DO', undefined, error); return false; } sourceLogger.debug('Successfully initialized DO'); return true; }) ); processedCount += batch.length; successCount += batchResults.filter(success => success).length; initLogger.info('Batch completed', { batchIndex: batchIndex + 1, batchSuccessful: batchResults.filter(success => success).length, totalProcessed: processedCount, totalSuccessful: successCount, }); } initLogger.info('Initialization process complete', { total: allSources.length, successful: successCount }); return c.json({ initialized: successCount, total: allSources.length }); }) .delete( '/admin/source/:sourceId', zValidator( 'param', z.object({ sourceId: z.string().min(1, 'Source ID is required'), }) ), async c => { // auth check if (!hasValidAuthToken(c)) { return c.json({ error: 'Unauthorized' }, 401); } const deleteLogger = logger.child({ operation: 'delete-source' }); const { sourceId } = c.req.valid('param'); const db = getDb(c.env.HYPERDRIVE); // Get the source first to get its URL const sourceResult = await tryCatchAsync( db.query.$data_sources.findFirst({ where: eq($data_sources.id, Number(sourceId)), }) ); if (sourceResult.isErr()) { const error = sourceResult.error instanceof Error ? sourceResult.error : new Error(String(sourceResult.error)); deleteLogger.error('Failed to fetch source', { sourceId }, error); return c.json({ error: 'Failed to fetch source' }, 500); } const source = sourceResult.value; if (!source) { return c.json({ error: 'Source not found' }, 404); } // Delete the durable object first const doId = c.env.DATA_SOURCE_INGESTOR.idFromName(source.config.config.url); const stub = c.env.DATA_SOURCE_INGESTOR.get(doId); const deleteResult = await tryCatchAsync( stub.fetch('http://do/delete', { method: 'DELETE', }) ); if (deleteResult.isErr()) { const error = deleteResult.error instanceof Error ? deleteResult.error : new Error(String(deleteResult.error)); deleteLogger.error('Failed to delete source DO', { sourceId, url: source.config.config.url }, error); return c.json({ error: 'Failed to delete source DO' }, 500); } // Then delete from database // delete the articles first const articlesResult = await tryCatchAsync( db.delete($ingested_items).where(eq($ingested_items.data_source_id, Number(sourceId))) ); if (articlesResult.isErr()) { const error = articlesResult.error instanceof Error ? articlesResult.error : new Error(String(articlesResult.error)); deleteLogger.error('Failed to delete articles', { sourceId }, error); return c.json({ error: 'Failed to delete articles' }, 500); } const dbDeleteResult = await tryCatchAsync( db.delete($data_sources).where(eq($data_sources.id, Number(sourceId))) ); if (dbDeleteResult.isErr()) { const error = dbDeleteResult.error instanceof Error ? dbDeleteResult.error : new Error(String(dbDeleteResult.error)); deleteLogger.error('Failed to delete source from database', { sourceId }, error); return c.json({ error: 'Failed to delete source from database' }, 500); } deleteLogger.info('Successfully deleted source', { sourceId, url: source.config.config.url }); return c.json({ success: true }); } ); export default route;