""" KnowledgeBridge Modal App Provides distributed computing capabilities for document processing and vector search """ import modal import json import numpy as np from typing import List, Dict, Any, Optional import os import requests from io import BytesIO import PyPDF2 import pytesseract from PIL import Image import faiss import pickle import hashlib # Create Modal app app = modal.App("knowledgebridge-main") # Define the image with required dependencies image = ( modal.Image.debian_slim(python_version="3.11") .pip_install([ "numpy", "faiss-cpu", "PyPDF2", "pillow", "pytesseract", "requests", "scikit-learn", "sentence-transformers", "openai", "tiktoken" ]) .apt_install(["tesseract-ocr", "tesseract-ocr-eng", "poppler-utils"]) ) # Shared volume for storing vector indices volume = modal.Volume.from_name("knowledgebridge-storage", create_if_missing=True) @app.function( image=image, volumes={"/storage": volume}, timeout=300, memory=2048 ) def extract_text_from_documents(documents: List[Dict[str, Any]]) -> Dict[str, Any]: """ Extract text from documents using OCR and PDF parsing """ results = [] for doc in documents: try: doc_id = doc.get('id', f"doc_{len(results)}") content_type = doc.get('contentType', 'text/plain') content = doc.get('content', '') extracted_text = "" if content_type == 'application/pdf': # Handle PDF content try: # Assume content is base64 encoded PDF import base64 pdf_data = base64.b64decode(content) pdf_reader = PyPDF2.PdfReader(BytesIO(pdf_data)) for page_num, page in enumerate(pdf_reader.pages): page_text = page.extract_text() extracted_text += f"Page {page_num + 1}:\n{page_text}\n\n" except Exception as pdf_error: extracted_text = f"PDF extraction failed: {str(pdf_error)}" elif content_type.startswith('image/'): # Handle image content with OCR try: import base64 image_data = base64.b64decode(content) image = Image.open(BytesIO(image_data)) extracted_text = pytesseract.image_to_string(image) except Exception as ocr_error: extracted_text = f"OCR extraction failed: {str(ocr_error)}" else: # Plain text or other formats extracted_text = content results.append({ 'id': doc_id, 'extracted_text': extracted_text, 'original_type': content_type, 'status': 'completed' }) except Exception as e: results.append({ 'id': doc.get('id', f"doc_{len(results)}"), 'extracted_text': "", 'original_type': doc.get('contentType', 'unknown'), 'status': 'failed', 'error': str(e) }) return { 'task_id': f"extract_{hash(str(documents))[:8]}", 'status': 'completed', 'results': results, 'processed_count': len(results) } @app.function( image=image, volumes={"/storage": volume}, timeout=600, memory=4096, cpu=2 ) def build_vector_index(documents: List[Dict[str, Any]], index_name: str = "main_index") -> Dict[str, Any]: """ Build FAISS vector index from documents """ try: from sentence_transformers import SentenceTransformer # Load embedding model model = SentenceTransformer('all-MiniLM-L6-v2') # Extract texts and create embeddings texts = [] doc_metadata = [] for doc in documents: text = doc.get('content', doc.get('extracted_text', '')) if text and len(text.strip()) > 10: # Only process non-empty texts texts.append(text[:8000]) # Limit text length doc_metadata.append({ 'id': doc.get('id'), 'title': doc.get('title', 'Untitled'), 'source': doc.get('source', 'Unknown'), 'content': text }) if not texts: return { 'task_id': f"index_{index_name}_{hash(str(documents))[:8]}", 'status': 'failed', 'error': 'No valid texts to index' } # Generate embeddings embeddings = model.encode(texts, show_progress_bar=False) embeddings = np.array(embeddings).astype('float32') # Create FAISS index dimension = embeddings.shape[1] index = faiss.IndexFlatIP(dimension) # Inner product for cosine similarity # Normalize embeddings for cosine similarity faiss.normalize_L2(embeddings) index.add(embeddings) # Save index and metadata index_path = f"/storage/{index_name}.index" metadata_path = f"/storage/{index_name}_metadata.pkl" faiss.write_index(index, index_path) with open(metadata_path, 'wb') as f: pickle.dump(doc_metadata, f) volume.commit() return { 'task_id': f"index_{index_name}_{hash(str(documents))[:8]}", 'status': 'completed', 'index_name': index_name, 'document_count': len(doc_metadata), 'dimension': dimension, 'index_path': index_path } except Exception as e: return { 'task_id': f"index_{index_name}_{hash(str(documents))[:8]}", 'status': 'failed', 'error': str(e) } @app.function( image=image, volumes={"/storage": volume}, timeout=60, memory=2048 ) def vector_search(query: str, index_name: str = "main_index", max_results: int = 10) -> Dict[str, Any]: """ Perform vector search using FAISS index """ try: from sentence_transformers import SentenceTransformer # Load embedding model model = SentenceTransformer('all-MiniLM-L6-v2') # Load index and metadata index_path = f"/storage/{index_name}.index" metadata_path = f"/storage/{index_name}_metadata.pkl" if not os.path.exists(index_path) or not os.path.exists(metadata_path): return { 'status': 'failed', 'error': f'Index {index_name} not found. Please build index first.', 'results': [] } # Load FAISS index index = faiss.read_index(index_path) # Load metadata with open(metadata_path, 'rb') as f: doc_metadata = pickle.load(f) # Generate query embedding query_embedding = model.encode([query]) query_embedding = np.array(query_embedding).astype('float32') faiss.normalize_L2(query_embedding) # Search scores, indices = index.search(query_embedding, min(max_results, len(doc_metadata))) # Format results results = [] for i, (score, idx) in enumerate(zip(scores[0], indices[0])): if idx >= 0 and idx < len(doc_metadata): # Valid index doc = doc_metadata[idx] results.append({ 'id': doc['id'], 'title': doc['title'], 'content': doc['content'], 'source': doc['source'], 'relevanceScore': float(score), 'rank': i + 1, 'snippet': doc['content'][:200] + '...' if len(doc['content']) > 200 else doc['content'] }) return { 'status': 'completed', 'results': results, 'query': query, 'total_found': len(results) } except Exception as e: return { 'status': 'failed', 'error': str(e), 'results': [] } @app.function( image=image, timeout=300, memory=2048 ) def batch_process_documents(request: Dict[str, Any]) -> Dict[str, Any]: """ Process multiple documents in batch """ try: documents = request.get('documents', []) operations = request.get('operations', ['extract_text']) results = { 'task_id': f"batch_{hash(str(request))[:8]}", 'status': 'completed', 'operations_completed': [], 'document_count': len(documents) } # Extract text if requested if 'extract_text' in operations: extraction_result = extract_text_from_documents(documents) results['operations_completed'].append('extract_text') results['extraction_results'] = extraction_result.get('results', []) # Build index if requested if 'build_index' in operations: index_name = request.get('index_name', 'batch_index') index_result = build_vector_index(documents, index_name) results['operations_completed'].append('build_index') results['index_results'] = index_result return results except Exception as e: return { 'task_id': f"batch_{hash(str(request))[:8]}", 'status': 'failed', 'error': str(e) } # Simple task status tracking (in-memory for demo) task_statuses = {} @app.function(timeout=30) def get_task_status(task_id: str) -> Dict[str, Any]: """ Get status of a processing task """ # In a real implementation, this would check a database # For now, return a simple status return { 'task_id': task_id, 'status': 'completed', # Simplified for demo 'progress': 100, 'message': 'Task completed successfully' } # Web endpoints @app.function() @modal.web_endpoint(method="POST", label="vector-search") def web_vector_search(request_data: Dict[str, Any]) -> Dict[str, Any]: """HTTP endpoint for vector search""" query = request_data.get('query', '') index_name = request_data.get('index_name', 'main_index') max_results = request_data.get('max_results', 10) return vector_search.remote(query, index_name, max_results) @app.function() @modal.web_endpoint(method="POST", label="extract-text") def web_extract_text(request_data: Dict[str, Any]) -> Dict[str, Any]: """HTTP endpoint for text extraction""" documents = request_data.get('documents', []) return extract_text_from_documents.remote(documents) @app.function() @modal.web_endpoint(method="POST", label="build-index") def web_build_index(request_data: Dict[str, Any]) -> Dict[str, Any]: """HTTP endpoint for building vector index""" documents = request_data.get('documents', []) index_name = request_data.get('index_name', 'main_index') return build_vector_index.remote(documents, index_name) @app.function() @modal.web_endpoint(method="POST", label="batch-process") def web_batch_process(request_data: Dict[str, Any]) -> Dict[str, Any]: """HTTP endpoint for batch processing""" return batch_process_documents.remote(request_data) @app.function() @modal.web_endpoint(method="GET", label="task-status") def web_task_status(task_id: str) -> Dict[str, Any]: """HTTP endpoint for task status""" return get_task_status.remote(task_id) @app.function() @modal.web_endpoint(method="GET", label="health") def health_check() -> Dict[str, Any]: """Health check endpoint""" return { 'status': 'healthy', 'service': 'KnowledgeBridge Modal App', 'version': '1.0.0', 'timestamp': str(modal.functions.current_timestamp()) } if __name__ == "__main__": print("KnowledgeBridge Modal App") print("Available functions:") print("- extract_text_from_documents") print("- build_vector_index") print("- vector_search") print("- batch_process_documents") print("- get_task_status")