Spaces:
Running
Running
import { zValidator } from '@hono/zod-validator'; | |
import { $data_sources, $ingested_items, eq, isNull } from '@meridian/database'; | |
import { Hono } from 'hono'; | |
import { z } from 'zod'; | |
import type { HonoEnv } from '../app'; | |
import { Logger } from '../lib/logger'; | |
import { tryCatchAsync } from '../lib/tryCatchAsync'; | |
import { getDb, hasValidAuthToken } from '../lib/utils'; | |
const logger = new Logger({ router: 'durable-objects' }); | |
const route = new Hono<HonoEnv>() | |
// handle DO-specific routes | |
.get( | |
'/source/:sourceId/*', | |
zValidator( | |
'param', | |
z.object({ | |
sourceId: z.string().min(1, 'Source ID is required'), | |
}) | |
), | |
async c => { | |
const { sourceId } = c.req.valid('param'); | |
const doId = c.env.DATA_SOURCE_INGESTOR.idFromName(decodeURIComponent(sourceId)); | |
const stub = c.env.DATA_SOURCE_INGESTOR.get(doId); | |
// reconstruct path for the DO | |
const url = new URL(c.req.url); | |
const pathParts = url.pathname.split('/'); | |
const doPath = `/${pathParts.slice(4).join('/')}`; | |
const doUrl = new URL(doPath + url.search, 'http://do'); | |
const doRequest = new Request(doUrl.toString(), c.req.raw); | |
return stub.fetch(doRequest); | |
} | |
) | |
// admin endpoints | |
.post( | |
'/admin/source/:sourceId/init', | |
zValidator( | |
'param', | |
z.object({ | |
sourceId: z.string().min(1, 'Source ID is required'), | |
}) | |
), | |
async c => { | |
// auth check | |
if (!hasValidAuthToken(c)) { | |
return c.json({ error: 'Unauthorized' }, 401); | |
} | |
const initLogger = logger.child({ operation: 'init-source' }); | |
const { sourceId } = c.req.valid('param'); | |
const db = getDb(c.env.HYPERDRIVE); | |
// Get the source first | |
const sourceResult = await tryCatchAsync( | |
db.query.$data_sources.findFirst({ | |
where: eq($data_sources.id, Number(sourceId)), | |
}) | |
); | |
if (sourceResult.isErr()) { | |
const error = sourceResult.error instanceof Error ? sourceResult.error : new Error(String(sourceResult.error)); | |
initLogger.error('Failed to fetch source', { sourceId }, error); | |
return c.json({ error: 'Failed to fetch source' }, 500); | |
} | |
const source = sourceResult.value; | |
if (!source) { | |
return c.json({ error: 'Source not found' }, 404); | |
} | |
// Initialize the DO | |
const doId = c.env.DATA_SOURCE_INGESTOR.idFromName(source.config.config.url); | |
const stub = c.env.DATA_SOURCE_INGESTOR.get(doId); | |
const initResult = await tryCatchAsync( | |
stub.initialize({ | |
id: source.id, | |
source_type: source.source_type, | |
config: source.config, | |
config_version_hash: source.config_version_hash, | |
scrape_frequency_tier: source.scrape_frequency_minutes, | |
}) | |
); | |
if (initResult.isErr()) { | |
const error = initResult.error instanceof Error ? initResult.error : new Error(String(initResult.error)); | |
initLogger.error('Failed to initialize source DO', { sourceId, url: source.config.config.url }, error); | |
return c.json({ error: 'Failed to initialize source DO' }, 500); | |
} | |
initLogger.info('Successfully initialized source DO', { sourceId, url: source.config.config.url }); | |
return c.json({ success: true }); | |
} | |
) | |
.post('/admin/initialize-dos', async c => { | |
// auth check | |
if (!hasValidAuthToken(c)) { | |
return c.json({ error: 'Unauthorized' }, 401); | |
} | |
const initLogger = logger.child({ operation: 'initialize-dos' }); | |
initLogger.info('Initializing SourceScraperDOs from database'); | |
const db = getDb(c.env.HYPERDRIVE); | |
// Get batch size from query params, default to 100 | |
const batchSize = Number(c.req.query('batchSize')) || 100; | |
initLogger.info('Using batch size', { batchSize }); | |
const allSourcesResult = await tryCatchAsync( | |
db | |
.select({ | |
id: $data_sources.id, | |
source_type: $data_sources.source_type, | |
config: $data_sources.config, | |
config_version_hash: $data_sources.config_version_hash, | |
scrape_frequency_tier: $data_sources.scrape_frequency_minutes, | |
}) | |
.from($data_sources) | |
.where(isNull($data_sources.do_initialized_at)) | |
); | |
if (allSourcesResult.isErr()) { | |
const error = | |
allSourcesResult.error instanceof Error ? allSourcesResult.error : new Error(String(allSourcesResult.error)); | |
initLogger.error('Failed to fetch sources from database', undefined, error); | |
return c.json({ error: 'Failed to fetch sources from database' }, 500); | |
} | |
const allSources = allSourcesResult.value; | |
initLogger.info('Sources fetched from database', { source_count: allSources.length }); | |
// Process sources in batches | |
let processedCount = 0; | |
let successCount = 0; | |
// Create batches of sources | |
const batches = []; | |
for (let i = 0; i < allSources.length; i += batchSize) { | |
batches.push(allSources.slice(i, i + batchSize)); | |
} | |
// Process each batch sequentially | |
for (let batchIndex = 0; batchIndex < batches.length; batchIndex++) { | |
const batch = batches[batchIndex]; | |
initLogger.info('Processing batch', { batchIndex: batchIndex + 1, batchSize: batch.length }); | |
const batchResults = await Promise.all( | |
batch.map(async source => { | |
const sourceLogger = initLogger.child({ source_id: source.id, url: source.config.config.url }); | |
const doId = c.env.DATA_SOURCE_INGESTOR.idFromName(source.config.config.url); | |
const stub = c.env.DATA_SOURCE_INGESTOR.get(doId); | |
sourceLogger.debug('Initializing DO'); | |
const result = await tryCatchAsync(stub.initialize(source)); | |
if (result.isErr()) { | |
const error = result.error instanceof Error ? result.error : new Error(String(result.error)); | |
sourceLogger.error('Failed to initialize DO', undefined, error); | |
return false; | |
} | |
sourceLogger.debug('Successfully initialized DO'); | |
return true; | |
}) | |
); | |
processedCount += batch.length; | |
successCount += batchResults.filter(success => success).length; | |
initLogger.info('Batch completed', { | |
batchIndex: batchIndex + 1, | |
batchSuccessful: batchResults.filter(success => success).length, | |
totalProcessed: processedCount, | |
totalSuccessful: successCount, | |
}); | |
} | |
initLogger.info('Initialization process complete', { total: allSources.length, successful: successCount }); | |
return c.json({ initialized: successCount, total: allSources.length }); | |
}) | |
.delete( | |
'/admin/source/:sourceId', | |
zValidator( | |
'param', | |
z.object({ | |
sourceId: z.string().min(1, 'Source ID is required'), | |
}) | |
), | |
async c => { | |
// auth check | |
if (!hasValidAuthToken(c)) { | |
return c.json({ error: 'Unauthorized' }, 401); | |
} | |
const deleteLogger = logger.child({ operation: 'delete-source' }); | |
const { sourceId } = c.req.valid('param'); | |
const db = getDb(c.env.HYPERDRIVE); | |
// Get the source first to get its URL | |
const sourceResult = await tryCatchAsync( | |
db.query.$data_sources.findFirst({ | |
where: eq($data_sources.id, Number(sourceId)), | |
}) | |
); | |
if (sourceResult.isErr()) { | |
const error = sourceResult.error instanceof Error ? sourceResult.error : new Error(String(sourceResult.error)); | |
deleteLogger.error('Failed to fetch source', { sourceId }, error); | |
return c.json({ error: 'Failed to fetch source' }, 500); | |
} | |
const source = sourceResult.value; | |
if (!source) { | |
return c.json({ error: 'Source not found' }, 404); | |
} | |
// Delete the durable object first | |
const doId = c.env.DATA_SOURCE_INGESTOR.idFromName(source.config.config.url); | |
const stub = c.env.DATA_SOURCE_INGESTOR.get(doId); | |
const deleteResult = await tryCatchAsync( | |
stub.fetch('http://do/delete', { | |
method: 'DELETE', | |
}) | |
); | |
if (deleteResult.isErr()) { | |
const error = deleteResult.error instanceof Error ? deleteResult.error : new Error(String(deleteResult.error)); | |
deleteLogger.error('Failed to delete source DO', { sourceId, url: source.config.config.url }, error); | |
return c.json({ error: 'Failed to delete source DO' }, 500); | |
} | |
// Then delete from database | |
// delete the articles first | |
const articlesResult = await tryCatchAsync( | |
db.delete($ingested_items).where(eq($ingested_items.data_source_id, Number(sourceId))) | |
); | |
if (articlesResult.isErr()) { | |
const error = | |
articlesResult.error instanceof Error ? articlesResult.error : new Error(String(articlesResult.error)); | |
deleteLogger.error('Failed to delete articles', { sourceId }, error); | |
return c.json({ error: 'Failed to delete articles' }, 500); | |
} | |
const dbDeleteResult = await tryCatchAsync( | |
db.delete($data_sources).where(eq($data_sources.id, Number(sourceId))) | |
); | |
if (dbDeleteResult.isErr()) { | |
const error = | |
dbDeleteResult.error instanceof Error ? dbDeleteResult.error : new Error(String(dbDeleteResult.error)); | |
deleteLogger.error('Failed to delete source from database', { sourceId }, error); | |
return c.json({ error: 'Failed to delete source from database' }, 500); | |
} | |
deleteLogger.info('Successfully deleted source', { sourceId, url: source.config.config.url }); | |
return c.json({ success: true }); | |
} | |
); | |
export default route; | |