Spaces:
Sleeping
Sleeping
""" | |
Ingestion Pipeline Module | |
This module orchestrates the complete document ingestion process, | |
integrating all components for a seamless workflow. | |
Components: DocumentProcessor, URLProcessor, TextExtractor, EmbeddingGenerator, VectorDB | |
""" | |
import logging | |
from typing import Dict, List, Any, Optional, Union | |
from pathlib import Path | |
import asyncio | |
from datetime import datetime | |
from .document_processor import DocumentProcessor | |
from .url_processor import URLProcessor | |
from ingestion.text_extractor import TextExtractor | |
from embedding.embedding_generator import EmbeddingGenerator | |
from storage.vector_db import VectorDB | |
from utils.config_manager import ConfigManager | |
from utils.error_handler import error_handler, ErrorType, RAGError | |
class IngestionPipeline: | |
""" | |
Complete ingestion pipeline that orchestrates document processing, text extraction, | |
embedding generation, and vector storage. | |
Features: | |
- End-to-end document ingestion | |
- URL content processing | |
- Batch processing capabilities | |
- Progress tracking and statistics | |
- Error handling and recovery | |
""" | |
def __init__(self, config_path: Optional[str] = None): | |
""" | |
Initialize the ingestion pipeline. | |
Args: | |
config_path: Path to configuration file | |
""" | |
self.logger = logging.getLogger(__name__) | |
# Load configuration | |
self.config_manager = ConfigManager(config_path) | |
self.config = self.config_manager.config | |
# Initialize statistics | |
self.stats = { | |
"documents_processed": 0, | |
"urls_processed": 0, | |
"chunks_created": 0, | |
"embeddings_generated": 0, | |
"vectors_stored": 0, | |
"errors_encountered": 0, | |
"start_time": None, | |
"end_time": None, | |
} | |
# Initialize components | |
self._initialize_components() | |
def _initialize_components(self): | |
"""Initialize all pipeline components.""" | |
try: | |
# ๐ Document processor | |
doc_config = self.config.get("document_processing", {}) | |
self.document_processor = DocumentProcessor(doc_config) | |
# URL processor | |
url_config = self.config.get("url_processing", {}) | |
self.url_processor = URLProcessor(url_config) | |
# Text extractor | |
text_config = self.config.get("document_processing", {}) | |
self.text_extractor = TextExtractor(text_config) | |
# ๐ฎ Embedding generator | |
embedding_config = self.config.get("embedding", {}) | |
embedding_config["api_key"] = self.config.get("api_keys", {}).get( | |
"gemini_api_key" | |
) | |
self.embedding_generator = EmbeddingGenerator(embedding_config) | |
# Vector database | |
vector_config = self.config.get("vector_db", {}) | |
vector_config["api_key"] = self.config.get("api_keys", {}).get( | |
"pinecone_api_key" | |
) | |
self.vector_db = VectorDB(vector_config) | |
self.logger.info("All pipeline components initialized successfully") | |
except Exception as e: | |
self.logger.error(f"โ Failed to initialize pipeline components: {str(e)}") | |
raise RAGError(f"Pipeline initialization failed: {str(e)}") | |
def process_documents(self, file_paths: List[str]) -> Dict[str, Any]: | |
""" | |
Process multiple documents through the complete pipeline. | |
Args: | |
file_paths: List of document file paths | |
Returns: | |
Processing results and statistics | |
""" | |
self.logger.info( | |
f"Starting document processing pipeline for {len(file_paths)} files" | |
) | |
self.stats["start_time"] = datetime.now() | |
all_results = [] | |
for i, file_path in enumerate(file_paths): | |
try: | |
self.logger.info( | |
f"๐ Processing document {i+1}/{len(file_paths)}: {file_path}" | |
) | |
# ๐ Step 1: Process document | |
doc_result = self.document_processor.process_document(file_path) | |
self.stats["documents_processed"] += 1 | |
# Step 2: Extract and chunk text | |
text_chunks = self.text_extractor.process_text( | |
doc_result["content"], doc_result["metadata"] | |
) | |
self.stats["chunks_created"] += len(text_chunks) | |
# ๐ฎ Step 3: Generate embeddings | |
embedded_chunks = self.embedding_generator.generate_embeddings( | |
text_chunks | |
) | |
valid_embeddings = [ | |
chunk for chunk in embedded_chunks if chunk.get("embedding") | |
] | |
self.stats["embeddings_generated"] += len(valid_embeddings) | |
# Step 4: Store in vector database | |
if valid_embeddings: | |
storage_success = self.vector_db.store_embeddings(valid_embeddings) | |
if storage_success: | |
self.stats["vectors_stored"] += len(valid_embeddings) | |
# Compile results | |
result = { | |
"file_path": file_path, | |
"document_type": doc_result.get("document_type"), | |
"chunks_created": len(text_chunks), | |
"embeddings_generated": len(valid_embeddings), | |
"storage_success": storage_success if valid_embeddings else False, | |
"metadata": doc_result["metadata"], | |
} | |
all_results.append(result) | |
self.logger.info( | |
f"Document processed: {len(text_chunks)} chunks, {len(valid_embeddings)} embeddings" | |
) | |
except Exception as e: | |
self.stats["errors_encountered"] += 1 | |
self.logger.error(f"โ Error processing {file_path}: {str(e)}") | |
all_results.append( | |
{ | |
"file_path": file_path, | |
"error": str(e), | |
"chunks_created": 0, | |
"embeddings_generated": 0, | |
"storage_success": False, | |
} | |
) | |
self.stats["end_time"] = datetime.now() | |
return { | |
"results": all_results, | |
"statistics": self.get_statistics(), | |
"success_rate": self._calculate_success_rate(all_results), | |
} | |
def process_urls(self, urls: List[str]) -> Dict[str, Any]: | |
""" | |
Process multiple URLs through the complete pipeline. | |
Args: | |
urls: List of URLs to process | |
Returns: | |
Processing results and statistics | |
""" | |
self.logger.info(f"Starting URL processing pipeline for {len(urls)} URLs") | |
self.stats["start_time"] = datetime.now() | |
all_results = [] | |
for i, url in enumerate(urls): | |
try: | |
self.logger.info(f"Processing URL {i+1}/{len(urls)}: {url}") | |
# Step 1: Process URL | |
url_result = self.url_processor.process_url(url) | |
if not url_result: | |
self.logger.warning(f"No content extracted from URL: {url}") | |
continue | |
self.stats["urls_processed"] += 1 | |
# Step 2: Extract and chunk text | |
text_chunks = self.text_extractor.process_text( | |
url_result["content"], url_result["metadata"] | |
) | |
self.stats["chunks_created"] += len(text_chunks) | |
# ๐ฎ Step 3: Generate embeddings | |
embedded_chunks = self.embedding_generator.generate_embeddings( | |
text_chunks | |
) | |
valid_embeddings = [ | |
chunk for chunk in embedded_chunks if chunk.get("embedding") | |
] | |
self.stats["embeddings_generated"] += len(valid_embeddings) | |
# Step 4: Store in vector database | |
storage_success = False | |
if valid_embeddings: | |
storage_success = self.vector_db.store_embeddings(valid_embeddings) | |
if storage_success: | |
self.stats["vectors_stored"] += len(valid_embeddings) | |
# Process linked documents if any | |
linked_results = [] | |
for linked_doc in url_result.get("linked_documents", []): | |
if linked_doc.get("content"): | |
linked_chunks = self.text_extractor.process_text( | |
linked_doc["content"], linked_doc["metadata"] | |
) | |
linked_embedded = self.embedding_generator.generate_embeddings( | |
linked_chunks | |
) | |
linked_valid = [ | |
chunk for chunk in linked_embedded if chunk.get("embedding") | |
] | |
if linked_valid: | |
self.vector_db.store_embeddings(linked_valid) | |
linked_results.append( | |
{ | |
"url": linked_doc["source"], | |
"chunks": len(linked_chunks), | |
"embeddings": len(linked_valid), | |
} | |
) | |
# Compile results | |
result = { | |
"url": url, | |
"chunks_created": len(text_chunks), | |
"embeddings_generated": len(valid_embeddings), | |
"storage_success": storage_success, | |
"linked_documents": linked_results, | |
"metadata": url_result["metadata"], | |
} | |
all_results.append(result) | |
self.logger.info( | |
f"URL processed: {len(text_chunks)} chunks, {len(valid_embeddings)} embeddings" | |
) | |
except Exception as e: | |
self.stats["errors_encountered"] += 1 | |
self.logger.error(f"โ Error processing {url}: {str(e)}") | |
all_results.append( | |
{ | |
"url": url, | |
"error": str(e), | |
"chunks_created": 0, | |
"embeddings_generated": 0, | |
"storage_success": False, | |
} | |
) | |
self.stats["end_time"] = datetime.now() | |
return { | |
"results": all_results, | |
"statistics": self.get_statistics(), | |
"success_rate": self._calculate_success_rate(all_results), | |
} | |
def process_mixed_sources( | |
self, file_paths: Optional[List[str]] = None, urls: Optional[List[str]] = None | |
) -> Dict[str, Any]: | |
""" | |
Process both documents and URLs in a single pipeline run. | |
Args: | |
file_paths: Optional list of document file paths | |
urls: Optional list of URLs | |
Returns: | |
Combined processing results | |
""" | |
self.logger.info("Starting mixed source processing pipeline") | |
results = { | |
"document_results": [], | |
"url_results": [], | |
"combined_statistics": {}, | |
"overall_success_rate": 0.0, | |
} | |
# ๐ Process documents | |
if file_paths: | |
doc_results = self.process_documents(file_paths) | |
results["document_results"] = doc_results["results"] | |
# Process URLs | |
if urls: | |
url_results = self.process_urls(urls) | |
results["url_results"] = url_results["results"] | |
# Combine statistics | |
results["combined_statistics"] = self.get_statistics() | |
# ๐ฏ Calculate overall success rate | |
all_items = results["document_results"] + results["url_results"] | |
results["overall_success_rate"] = self._calculate_success_rate(all_items) | |
return results | |
def _calculate_success_rate(self, results: List[Dict[str, Any]]) -> float: | |
""" | |
Calculate success rate from results. | |
Args: | |
results: List of processing results | |
Returns: | |
Success rate as percentage | |
""" | |
if not results: | |
return 0.0 | |
successful = sum( | |
1 for result in results if result.get("storage_success", False) | |
) | |
return (successful / len(results)) * 100 | |
def get_statistics(self) -> Dict[str, Any]: | |
""" | |
Get comprehensive pipeline statistics. | |
Returns: | |
Statistics dictionary | |
""" | |
stats = self.stats.copy() | |
if stats["start_time"] and stats["end_time"]: | |
runtime = stats["end_time"] - stats["start_time"] | |
stats["runtime_seconds"] = runtime.total_seconds() | |
stats["processing_rate"] = ( | |
stats["documents_processed"] + stats["urls_processed"] | |
) / max(1, runtime.total_seconds()) | |
# ๐ฎ Add component statistics | |
stats["embedding_stats"] = self.embedding_generator.get_statistics() | |
stats["vector_db_stats"] = self.vector_db.get_index_stats() | |
stats["url_processor_stats"] = self.url_processor.get_statistics() | |
return stats | |
def health_check(self) -> Dict[str, Any]: | |
""" | |
Perform comprehensive health check on all components. | |
Returns: | |
Health check results | |
""" | |
health = { | |
"overall_status": "healthy", | |
"timestamp": datetime.now().isoformat(), | |
"components": {}, | |
} | |
try: | |
# ๐ฎ Check embedding generator | |
if self.embedding_generator.client: | |
health["components"]["embedding_generator"] = "Ready" | |
else: | |
health["components"]["embedding_generator"] = "โ Not configured" | |
health["overall_status"] = "degraded" | |
# Check vector database | |
vector_health = self.vector_db.health_check() | |
health["components"]["vector_database"] = vector_health["status"] | |
if vector_health["status"] != "healthy": | |
health["overall_status"] = "degraded" | |
# Add component details | |
health["details"] = { | |
"vector_db_health": vector_health, | |
"embedding_stats": self.embedding_generator.get_statistics(), | |
"pipeline_stats": self.get_statistics(), | |
} | |
except Exception as e: | |
health["overall_status"] = "unhealthy" | |
health["error"] = str(e) | |
return health | |
def reset_statistics(self): | |
"""Reset pipeline statistics.""" | |
self.stats = { | |
"documents_processed": 0, | |
"urls_processed": 0, | |
"chunks_created": 0, | |
"embeddings_generated": 0, | |
"vectors_stored": 0, | |
"errors_encountered": 0, | |
"start_time": None, | |
"end_time": None, | |
} | |
# Reset component statistics | |
self.embedding_generator.stats = { | |
"total_requests": 0, | |
"successful_requests": 0, | |
"failed_requests": 0, | |
"cache_hits": 0, | |
"total_tokens_processed": 0, | |
"start_time": datetime.now(), | |
} | |
self.vector_db.reset_stats() | |
self.url_processor.reset() | |
self.logger.info("All pipeline statistics reset") | |
# Convenience function for quick pipeline usage | |
def create_pipeline(config_path: Optional[str] = None) -> IngestionPipeline: | |
""" | |
Create and return a configured ingestion pipeline. | |
Args: | |
config_path: Optional path to configuration file | |
Returns: | |
Configured IngestionPipeline instance | |
""" | |
return IngestionPipeline(config_path) | |
# ๐ Example usage functions | |
def process_documents_simple( | |
file_paths: List[str], config_path: Optional[str] = None | |
) -> Dict[str, Any]: | |
""" | |
๐ Simple function to process documents with default configuration. | |
Args: | |
file_paths: List of document file paths | |
config_path: Optional configuration file path | |
Returns: | |
Processing results | |
""" | |
pipeline = create_pipeline(config_path) | |
return pipeline.process_documents(file_paths) | |
def process_urls_simple( | |
urls: List[str], config_path: Optional[str] = None | |
) -> Dict[str, Any]: | |
""" | |
Simple function to process URLs with default configuration. | |
Args: | |
urls: List of URLs to process | |
config_path: Optional configuration file path | |
Returns: | |
Processing results | |
""" | |
pipeline = create_pipeline(config_path) | |
return pipeline.process_urls(urls) | |