File size: 9,654 Bytes
1b44660
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
import { zValidator } from '@hono/zod-validator';
import { $data_sources, $ingested_items, eq, isNull } from '@meridian/database';
import { Hono } from 'hono';
import { z } from 'zod';
import type { HonoEnv } from '../app';
import { Logger } from '../lib/logger';
import { tryCatchAsync } from '../lib/tryCatchAsync';
import { getDb, hasValidAuthToken } from '../lib/utils';

const logger = new Logger({ router: 'durable-objects' });

const route = new Hono<HonoEnv>()
  // handle DO-specific routes
  .get(
    '/source/:sourceId/*',
    zValidator(
      'param',
      z.object({
        sourceId: z.string().min(1, 'Source ID is required'),
      })
    ),
    async c => {
      const { sourceId } = c.req.valid('param');
      const doId = c.env.DATA_SOURCE_INGESTOR.idFromName(decodeURIComponent(sourceId));
      const stub = c.env.DATA_SOURCE_INGESTOR.get(doId);

      // reconstruct path for the DO
      const url = new URL(c.req.url);
      const pathParts = url.pathname.split('/');
      const doPath = `/${pathParts.slice(4).join('/')}`;
      const doUrl = new URL(doPath + url.search, 'http://do');

      const doRequest = new Request(doUrl.toString(), c.req.raw);
      return stub.fetch(doRequest);
    }
  )
  // admin endpoints
  .post(
    '/admin/source/:sourceId/init',
    zValidator(
      'param',
      z.object({
        sourceId: z.string().min(1, 'Source ID is required'),
      })
    ),
    async c => {
      // auth check
      if (!hasValidAuthToken(c)) {
        return c.json({ error: 'Unauthorized' }, 401);
      }

      const initLogger = logger.child({ operation: 'init-source' });
      const { sourceId } = c.req.valid('param');

      const db = getDb(c.env.HYPERDRIVE);

      // Get the source first
      const sourceResult = await tryCatchAsync(
        db.query.$data_sources.findFirst({
          where: eq($data_sources.id, Number(sourceId)),
        })
      );

      if (sourceResult.isErr()) {
        const error = sourceResult.error instanceof Error ? sourceResult.error : new Error(String(sourceResult.error));
        initLogger.error('Failed to fetch source', { sourceId }, error);
        return c.json({ error: 'Failed to fetch source' }, 500);
      }

      const source = sourceResult.value;
      if (!source) {
        return c.json({ error: 'Source not found' }, 404);
      }

      // Initialize the DO
      const doId = c.env.DATA_SOURCE_INGESTOR.idFromName(source.config.config.url);
      const stub = c.env.DATA_SOURCE_INGESTOR.get(doId);

      const initResult = await tryCatchAsync(
        stub.initialize({
          id: source.id,
          source_type: source.source_type,
          config: source.config,
          config_version_hash: source.config_version_hash,
          scrape_frequency_tier: source.scrape_frequency_minutes,
        })
      );
      if (initResult.isErr()) {
        const error = initResult.error instanceof Error ? initResult.error : new Error(String(initResult.error));
        initLogger.error('Failed to initialize source DO', { sourceId, url: source.config.config.url }, error);
        return c.json({ error: 'Failed to initialize source DO' }, 500);
      }

      initLogger.info('Successfully initialized source DO', { sourceId, url: source.config.config.url });
      return c.json({ success: true });
    }
  )
  .post('/admin/initialize-dos', async c => {
    // auth check
    if (!hasValidAuthToken(c)) {
      return c.json({ error: 'Unauthorized' }, 401);
    }

    const initLogger = logger.child({ operation: 'initialize-dos' });
    initLogger.info('Initializing SourceScraperDOs from database');

    const db = getDb(c.env.HYPERDRIVE);

    // Get batch size from query params, default to 100
    const batchSize = Number(c.req.query('batchSize')) || 100;
    initLogger.info('Using batch size', { batchSize });

    const allSourcesResult = await tryCatchAsync(
      db
        .select({
          id: $data_sources.id,
          source_type: $data_sources.source_type,
          config: $data_sources.config,
          config_version_hash: $data_sources.config_version_hash,
          scrape_frequency_tier: $data_sources.scrape_frequency_minutes,
        })
        .from($data_sources)
        .where(isNull($data_sources.do_initialized_at))
    );
    if (allSourcesResult.isErr()) {
      const error =
        allSourcesResult.error instanceof Error ? allSourcesResult.error : new Error(String(allSourcesResult.error));
      initLogger.error('Failed to fetch sources from database', undefined, error);
      return c.json({ error: 'Failed to fetch sources from database' }, 500);
    }

    const allSources = allSourcesResult.value;
    initLogger.info('Sources fetched from database', { source_count: allSources.length });

    // Process sources in batches
    let processedCount = 0;
    let successCount = 0;

    // Create batches of sources
    const batches = [];
    for (let i = 0; i < allSources.length; i += batchSize) {
      batches.push(allSources.slice(i, i + batchSize));
    }

    // Process each batch sequentially
    for (let batchIndex = 0; batchIndex < batches.length; batchIndex++) {
      const batch = batches[batchIndex];
      initLogger.info('Processing batch', { batchIndex: batchIndex + 1, batchSize: batch.length });

      const batchResults = await Promise.all(
        batch.map(async source => {
          const sourceLogger = initLogger.child({ source_id: source.id, url: source.config.config.url });
          const doId = c.env.DATA_SOURCE_INGESTOR.idFromName(source.config.config.url);
          const stub = c.env.DATA_SOURCE_INGESTOR.get(doId);

          sourceLogger.debug('Initializing DO');
          const result = await tryCatchAsync(stub.initialize(source));
          if (result.isErr()) {
            const error = result.error instanceof Error ? result.error : new Error(String(result.error));
            sourceLogger.error('Failed to initialize DO', undefined, error);
            return false;
          }

          sourceLogger.debug('Successfully initialized DO');
          return true;
        })
      );

      processedCount += batch.length;
      successCount += batchResults.filter(success => success).length;

      initLogger.info('Batch completed', {
        batchIndex: batchIndex + 1,
        batchSuccessful: batchResults.filter(success => success).length,
        totalProcessed: processedCount,
        totalSuccessful: successCount,
      });
    }

    initLogger.info('Initialization process complete', { total: allSources.length, successful: successCount });
    return c.json({ initialized: successCount, total: allSources.length });
  })
  .delete(
    '/admin/source/:sourceId',
    zValidator(
      'param',
      z.object({
        sourceId: z.string().min(1, 'Source ID is required'),
      })
    ),
    async c => {
      // auth check
      if (!hasValidAuthToken(c)) {
        return c.json({ error: 'Unauthorized' }, 401);
      }

      const deleteLogger = logger.child({ operation: 'delete-source' });
      const { sourceId } = c.req.valid('param');

      const db = getDb(c.env.HYPERDRIVE);

      // Get the source first to get its URL
      const sourceResult = await tryCatchAsync(
        db.query.$data_sources.findFirst({
          where: eq($data_sources.id, Number(sourceId)),
        })
      );

      if (sourceResult.isErr()) {
        const error = sourceResult.error instanceof Error ? sourceResult.error : new Error(String(sourceResult.error));
        deleteLogger.error('Failed to fetch source', { sourceId }, error);
        return c.json({ error: 'Failed to fetch source' }, 500);
      }

      const source = sourceResult.value;
      if (!source) {
        return c.json({ error: 'Source not found' }, 404);
      }

      // Delete the durable object first
      const doId = c.env.DATA_SOURCE_INGESTOR.idFromName(source.config.config.url);
      const stub = c.env.DATA_SOURCE_INGESTOR.get(doId);

      const deleteResult = await tryCatchAsync(
        stub.fetch('http://do/delete', {
          method: 'DELETE',
        })
      );
      if (deleteResult.isErr()) {
        const error = deleteResult.error instanceof Error ? deleteResult.error : new Error(String(deleteResult.error));
        deleteLogger.error('Failed to delete source DO', { sourceId, url: source.config.config.url }, error);
        return c.json({ error: 'Failed to delete source DO' }, 500);
      }

      // Then delete from database
      // delete the articles first
      const articlesResult = await tryCatchAsync(
        db.delete($ingested_items).where(eq($ingested_items.data_source_id, Number(sourceId)))
      );
      if (articlesResult.isErr()) {
        const error =
          articlesResult.error instanceof Error ? articlesResult.error : new Error(String(articlesResult.error));
        deleteLogger.error('Failed to delete articles', { sourceId }, error);
        return c.json({ error: 'Failed to delete articles' }, 500);
      }

      const dbDeleteResult = await tryCatchAsync(
        db.delete($data_sources).where(eq($data_sources.id, Number(sourceId)))
      );
      if (dbDeleteResult.isErr()) {
        const error =
          dbDeleteResult.error instanceof Error ? dbDeleteResult.error : new Error(String(dbDeleteResult.error));
        deleteLogger.error('Failed to delete source from database', { sourceId }, error);
        return c.json({ error: 'Failed to delete source from database' }, 500);
      }

      deleteLogger.info('Successfully deleted source', { sourceId, url: source.config.config.url });
      return c.json({ success: true });
    }
  );

export default route;