/** * Smart Document Ingestion Service * Combines Modal's serverless compute with Nebius AI analysis */ import { modalClient, type DocumentProcessingTask } from './modal-client'; import { nebiusClient } from './nebius-client'; import { storage } from './storage'; import type { InsertDocument } from '@shared/schema'; interface DocumentUpload { file: Buffer | string; filename: string; contentType: string; metadata?: Record; } interface IngestionResult { documentId: number; processingTaskId: string; analysis: { category: string; summary: string; keyPoints: string[]; qualityScore: number; }; embeddings: number[]; status: 'processing' | 'completed' | 'failed'; } class SmartIngestionService { /** * Process uploaded document with full AI pipeline */ async ingestDocument(upload: DocumentUpload): Promise { try { // Step 1: Extract text content using Modal OCR if needed let textContent: string; if (upload.contentType.includes('pdf') || upload.contentType.includes('image')) { const ocrTask = await modalClient.extractTextFromDocuments([ this.uploadToTempStorage(upload) ]); // Wait for OCR completion (simplified for demo) textContent = await this.waitForTaskCompletion(ocrTask.taskId); } else { textContent = upload.file.toString(); } // Step 2: Analyze document using Nebius AI const [ categoryAnalysis, summaryAnalysis, keyPointsAnalysis, qualityAnalysis ] = await Promise.all([ nebiusClient.analyzeDocument({ content: textContent, analysisType: 'classification' }), nebiusClient.analyzeDocument({ content: textContent, analysisType: 'summary' }), nebiusClient.analyzeDocument({ content: textContent, analysisType: 'key_points' }), nebiusClient.analyzeDocument({ content: textContent, analysisType: 'quality_score' }) ]); // Step 3: Generate embeddings using Nebius const embeddingResponse = await nebiusClient.createEmbeddings({ input: textContent.substring(0, 8000), // Limit for token constraints model: 'text-embedding-3-large' }); // Step 4: Create document in storage const documentData: InsertDocument = { title: upload.filename, content: textContent, sourceType: this.extractSourceType(categoryAnalysis.analysis), source: upload.filename, url: upload.metadata?.url, metadata: { ...upload.metadata, analysis: { category: categoryAnalysis.analysis, summary: summaryAnalysis.analysis, keyPoints: keyPointsAnalysis.analysis, qualityScore: this.extractQualityScore(qualityAnalysis.analysis) }, contentType: upload.contentType, fileSize: Buffer.isBuffer(upload.file) ? upload.file.length : upload.file.length, processingTimestamp: new Date().toISOString() } }; const document = await storage.createDocument(documentData); // Step 5: Queue vector indexing with Modal const indexTask = await modalClient.buildVectorIndex([{ id: document.id.toString(), content: textContent, embeddings: embeddingResponse.data[0].embedding, metadata: document.metadata }]); return { documentId: document.id, processingTaskId: indexTask.taskId, analysis: { category: this.extractCategory(categoryAnalysis.analysis), summary: summaryAnalysis.analysis, keyPoints: this.parseKeyPoints(keyPointsAnalysis.analysis), qualityScore: this.extractQualityScore(qualityAnalysis.analysis) }, embeddings: embeddingResponse.data[0].embedding, status: 'completed' }; } catch (error) { console.error('Document ingestion failed:', error); throw new Error(`Ingestion failed: ${error instanceof Error ? error.message : 'Unknown error'}`); } } /** * Batch process multiple documents using Modal's distributed compute */ async batchIngestDocuments(uploads: DocumentUpload[]): Promise<{ taskId: string; estimatedCompletion: Date; documentsQueued: number; }> { // Prepare documents for batch processing const documents = uploads.map((upload, index) => ({ id: `batch_${Date.now()}_${index}`, content: upload.file.toString(), metadata: { filename: upload.filename, contentType: upload.contentType, ...upload.metadata } })); // Submit to Modal for distributed processing const task = await modalClient.batchProcessDocuments({ documents, modelName: 'text-embedding-3-large', batchSize: 20 }); return { taskId: task.taskId, estimatedCompletion: new Date(Date.now() + uploads.length * 2000), // 2s per doc estimate documentsQueued: uploads.length }; } /** * Enhanced search using both Modal vector search and Nebius query understanding */ async enhancedSearch(query: string, options: { maxResults?: number; searchType?: 'semantic' | 'hybrid'; useQueryEnhancement?: boolean; } = {}): Promise<{ results: any[]; enhancedQuery?: any; searchInsights?: any; }> { const { maxResults = 10, searchType = 'semantic', useQueryEnhancement = true } = options; // Step 1: Enhance query using Nebius AI let enhancedQueryData; let searchQuery = query; if (useQueryEnhancement) { enhancedQueryData = await nebiusClient.enhanceQuery(query); searchQuery = enhancedQueryData.enhancedQuery; } // Step 2: Perform vector search using Modal's high-performance endpoint let modalResults = []; // Skip Modal if not configured properly if (process.env.MODAL_TOKEN_ID && process.env.MODAL_TOKEN_SECRET) { try { console.log('🔄 Attempting Modal vector search...'); const modalResponse = await modalClient.vectorSearch( searchQuery, 'main_index', // Assuming we have a main index maxResults ); modalResults = modalResponse.results || []; console.log(`✅ Modal search returned ${modalResults.length} results`); } catch (error) { console.log('❌ Modal search failed:', error instanceof Error ? error.message : String(error)); console.log('🔄 Falling back to local search'); } } else { console.log('⚠️ Modal not configured, using local search only'); } // Step 3: Get local results as backup/supplement const localResults = await storage.searchDocuments({ query: searchQuery, searchType: searchType as "semantic" | "keyword" | "hybrid", limit: maxResults, offset: 0 }); // Step 4: Combine and rank results using Nebius AI const combinedResults = [...modalResults, ...localResults.results] .slice(0, maxResults * 2); // Get more for re-ranking // Step 5: Score relevance using Nebius AI const scoredResults = await Promise.all( combinedResults.map(async (result) => { try { const relevanceData = await nebiusClient.scoreCitationRelevance(query, { title: result.title, content: result.content, snippet: result.snippet || result.content.substring(0, 200) }); return { ...result, relevanceScore: relevanceData.relevanceScore, aiExplanation: relevanceData.explanation, keyReasons: relevanceData.keyReasons }; } catch (error) { return { ...result, relevanceScore: result.relevanceScore || 0.5 }; } }) ); // Step 6: Sort by AI-enhanced relevance scores const finalResults = scoredResults .sort((a, b) => (b.relevanceScore || 0) - (a.relevanceScore || 0)) .slice(0, maxResults); return { results: finalResults, enhancedQuery: enhancedQueryData, searchInsights: { totalResults: finalResults.length, avgRelevanceScore: finalResults.reduce((acc, r) => acc + (r.relevanceScore || 0), 0) / finalResults.length, modalResultsCount: modalResults.length, localResultsCount: localResults.results.length } }; } /** * Generate research synthesis using Nebius AI */ async generateResearchSynthesis(query: string, documents: any[]): Promise { if (documents.length === 0) { return { synthesis: 'No documents available for synthesis', keyFindings: [], gaps: ['Insufficient source material'], recommendations: ['Search for more relevant documents'] }; } return nebiusClient.generateResearchInsights( documents.map(doc => ({ title: doc.title, content: doc.content, metadata: doc.metadata })), query ); } // Helper methods private uploadToTempStorage(upload: DocumentUpload): string { // In production, upload to cloud storage and return URL return `temp://documents/${upload.filename}`; } private async waitForTaskCompletion(taskId: string): Promise { // Simplified polling for demo - in production use webhooks const maxAttempts = 30; let attempts = 0; while (attempts < maxAttempts) { const status = await modalClient.getTaskStatus(taskId); if (status.status === 'completed') { return status.result?.extractedText || 'Text extraction completed'; } else if (status.status === 'failed') { throw new Error(`Task failed: ${status.error}`); } await new Promise(resolve => setTimeout(resolve, 2000)); attempts++; } throw new Error('Task timed out'); } private extractSourceType(analysis: string): string { const types: Record = { 'academic_paper': 'academic', 'technical_documentation': 'technical', 'research_report': 'research', 'code_repository': 'code', 'blog_post': 'web', 'news_article': 'news' }; for (const [key, value] of Object.entries(types)) { if (analysis.toLowerCase().includes(key)) { return value; } } return 'general'; } private extractCategory(analysis: string): string { return analysis.split('\n')[0] || 'Unknown'; } private parseKeyPoints(analysis: string): string[] { return analysis.split('\n') .filter(line => line.trim().startsWith('-') || line.trim().startsWith('•') || line.match(/^\d+\./)) .map(line => line.replace(/^[-•\d.]\s*/, '').trim()) .slice(0, 5); } private extractQualityScore(analysis: string): number { const scoreMatch = analysis.match(/(\d+(?:\.\d+)?)\s*\/?\s*10/); if (scoreMatch) { return parseFloat(scoreMatch[1]); } return 7.0; // Default score } } export const smartIngestionService = new SmartIngestionService(); export type { DocumentUpload, IngestionResult };