payman / src /ingestion /pipeline.py
satyamdev404's picture
Upload 31 files
e0aa230 verified
"""
Ingestion Pipeline Module
This module orchestrates the complete document ingestion process,
integrating all components for a seamless workflow.
Components: DocumentProcessor, URLProcessor, TextExtractor, EmbeddingGenerator, VectorDB
"""
import logging
from typing import Dict, List, Any, Optional, Union
from pathlib import Path
import asyncio
from datetime import datetime
from .document_processor import DocumentProcessor
from .url_processor import URLProcessor
from ingestion.text_extractor import TextExtractor
from embedding.embedding_generator import EmbeddingGenerator
from storage.vector_db import VectorDB
from utils.config_manager import ConfigManager
from utils.error_handler import error_handler, ErrorType, RAGError
class IngestionPipeline:
"""
Complete ingestion pipeline that orchestrates document processing, text extraction,
embedding generation, and vector storage.
Features:
- End-to-end document ingestion
- URL content processing
- Batch processing capabilities
- Progress tracking and statistics
- Error handling and recovery
"""
def __init__(self, config_path: Optional[str] = None):
"""
Initialize the ingestion pipeline.
Args:
config_path: Path to configuration file
"""
self.logger = logging.getLogger(__name__)
# Load configuration
self.config_manager = ConfigManager(config_path)
self.config = self.config_manager.config
# Initialize statistics
self.stats = {
"documents_processed": 0,
"urls_processed": 0,
"chunks_created": 0,
"embeddings_generated": 0,
"vectors_stored": 0,
"errors_encountered": 0,
"start_time": None,
"end_time": None,
}
# Initialize components
self._initialize_components()
def _initialize_components(self):
"""Initialize all pipeline components."""
try:
# ๐Ÿ“„ Document processor
doc_config = self.config.get("document_processing", {})
self.document_processor = DocumentProcessor(doc_config)
# URL processor
url_config = self.config.get("url_processing", {})
self.url_processor = URLProcessor(url_config)
# Text extractor
text_config = self.config.get("document_processing", {})
self.text_extractor = TextExtractor(text_config)
# ๐Ÿ”ฎ Embedding generator
embedding_config = self.config.get("embedding", {})
embedding_config["api_key"] = self.config.get("api_keys", {}).get(
"gemini_api_key"
)
self.embedding_generator = EmbeddingGenerator(embedding_config)
# Vector database
vector_config = self.config.get("vector_db", {})
vector_config["api_key"] = self.config.get("api_keys", {}).get(
"pinecone_api_key"
)
self.vector_db = VectorDB(vector_config)
self.logger.info("All pipeline components initialized successfully")
except Exception as e:
self.logger.error(f"โŒ Failed to initialize pipeline components: {str(e)}")
raise RAGError(f"Pipeline initialization failed: {str(e)}")
@error_handler(ErrorType.DOCUMENT_PROCESSING)
def process_documents(self, file_paths: List[str]) -> Dict[str, Any]:
"""
Process multiple documents through the complete pipeline.
Args:
file_paths: List of document file paths
Returns:
Processing results and statistics
"""
self.logger.info(
f"Starting document processing pipeline for {len(file_paths)} files"
)
self.stats["start_time"] = datetime.now()
all_results = []
for i, file_path in enumerate(file_paths):
try:
self.logger.info(
f"๐Ÿ“„ Processing document {i+1}/{len(file_paths)}: {file_path}"
)
# ๐Ÿ“„ Step 1: Process document
doc_result = self.document_processor.process_document(file_path)
self.stats["documents_processed"] += 1
# Step 2: Extract and chunk text
text_chunks = self.text_extractor.process_text(
doc_result["content"], doc_result["metadata"]
)
self.stats["chunks_created"] += len(text_chunks)
# ๐Ÿ”ฎ Step 3: Generate embeddings
embedded_chunks = self.embedding_generator.generate_embeddings(
text_chunks
)
valid_embeddings = [
chunk for chunk in embedded_chunks if chunk.get("embedding")
]
self.stats["embeddings_generated"] += len(valid_embeddings)
# Step 4: Store in vector database
if valid_embeddings:
storage_success = self.vector_db.store_embeddings(valid_embeddings)
if storage_success:
self.stats["vectors_stored"] += len(valid_embeddings)
# Compile results
result = {
"file_path": file_path,
"document_type": doc_result.get("document_type"),
"chunks_created": len(text_chunks),
"embeddings_generated": len(valid_embeddings),
"storage_success": storage_success if valid_embeddings else False,
"metadata": doc_result["metadata"],
}
all_results.append(result)
self.logger.info(
f"Document processed: {len(text_chunks)} chunks, {len(valid_embeddings)} embeddings"
)
except Exception as e:
self.stats["errors_encountered"] += 1
self.logger.error(f"โŒ Error processing {file_path}: {str(e)}")
all_results.append(
{
"file_path": file_path,
"error": str(e),
"chunks_created": 0,
"embeddings_generated": 0,
"storage_success": False,
}
)
self.stats["end_time"] = datetime.now()
return {
"results": all_results,
"statistics": self.get_statistics(),
"success_rate": self._calculate_success_rate(all_results),
}
@error_handler(ErrorType.URL_PROCESSING)
def process_urls(self, urls: List[str]) -> Dict[str, Any]:
"""
Process multiple URLs through the complete pipeline.
Args:
urls: List of URLs to process
Returns:
Processing results and statistics
"""
self.logger.info(f"Starting URL processing pipeline for {len(urls)} URLs")
self.stats["start_time"] = datetime.now()
all_results = []
for i, url in enumerate(urls):
try:
self.logger.info(f"Processing URL {i+1}/{len(urls)}: {url}")
# Step 1: Process URL
url_result = self.url_processor.process_url(url)
if not url_result:
self.logger.warning(f"No content extracted from URL: {url}")
continue
self.stats["urls_processed"] += 1
# Step 2: Extract and chunk text
text_chunks = self.text_extractor.process_text(
url_result["content"], url_result["metadata"]
)
self.stats["chunks_created"] += len(text_chunks)
# ๐Ÿ”ฎ Step 3: Generate embeddings
embedded_chunks = self.embedding_generator.generate_embeddings(
text_chunks
)
valid_embeddings = [
chunk for chunk in embedded_chunks if chunk.get("embedding")
]
self.stats["embeddings_generated"] += len(valid_embeddings)
# Step 4: Store in vector database
storage_success = False
if valid_embeddings:
storage_success = self.vector_db.store_embeddings(valid_embeddings)
if storage_success:
self.stats["vectors_stored"] += len(valid_embeddings)
# Process linked documents if any
linked_results = []
for linked_doc in url_result.get("linked_documents", []):
if linked_doc.get("content"):
linked_chunks = self.text_extractor.process_text(
linked_doc["content"], linked_doc["metadata"]
)
linked_embedded = self.embedding_generator.generate_embeddings(
linked_chunks
)
linked_valid = [
chunk for chunk in linked_embedded if chunk.get("embedding")
]
if linked_valid:
self.vector_db.store_embeddings(linked_valid)
linked_results.append(
{
"url": linked_doc["source"],
"chunks": len(linked_chunks),
"embeddings": len(linked_valid),
}
)
# Compile results
result = {
"url": url,
"chunks_created": len(text_chunks),
"embeddings_generated": len(valid_embeddings),
"storage_success": storage_success,
"linked_documents": linked_results,
"metadata": url_result["metadata"],
}
all_results.append(result)
self.logger.info(
f"URL processed: {len(text_chunks)} chunks, {len(valid_embeddings)} embeddings"
)
except Exception as e:
self.stats["errors_encountered"] += 1
self.logger.error(f"โŒ Error processing {url}: {str(e)}")
all_results.append(
{
"url": url,
"error": str(e),
"chunks_created": 0,
"embeddings_generated": 0,
"storage_success": False,
}
)
self.stats["end_time"] = datetime.now()
return {
"results": all_results,
"statistics": self.get_statistics(),
"success_rate": self._calculate_success_rate(all_results),
}
def process_mixed_sources(
self, file_paths: Optional[List[str]] = None, urls: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
Process both documents and URLs in a single pipeline run.
Args:
file_paths: Optional list of document file paths
urls: Optional list of URLs
Returns:
Combined processing results
"""
self.logger.info("Starting mixed source processing pipeline")
results = {
"document_results": [],
"url_results": [],
"combined_statistics": {},
"overall_success_rate": 0.0,
}
# ๐Ÿ“„ Process documents
if file_paths:
doc_results = self.process_documents(file_paths)
results["document_results"] = doc_results["results"]
# Process URLs
if urls:
url_results = self.process_urls(urls)
results["url_results"] = url_results["results"]
# Combine statistics
results["combined_statistics"] = self.get_statistics()
# ๐ŸŽฏ Calculate overall success rate
all_items = results["document_results"] + results["url_results"]
results["overall_success_rate"] = self._calculate_success_rate(all_items)
return results
def _calculate_success_rate(self, results: List[Dict[str, Any]]) -> float:
"""
Calculate success rate from results.
Args:
results: List of processing results
Returns:
Success rate as percentage
"""
if not results:
return 0.0
successful = sum(
1 for result in results if result.get("storage_success", False)
)
return (successful / len(results)) * 100
def get_statistics(self) -> Dict[str, Any]:
"""
Get comprehensive pipeline statistics.
Returns:
Statistics dictionary
"""
stats = self.stats.copy()
if stats["start_time"] and stats["end_time"]:
runtime = stats["end_time"] - stats["start_time"]
stats["runtime_seconds"] = runtime.total_seconds()
stats["processing_rate"] = (
stats["documents_processed"] + stats["urls_processed"]
) / max(1, runtime.total_seconds())
# ๐Ÿ”ฎ Add component statistics
stats["embedding_stats"] = self.embedding_generator.get_statistics()
stats["vector_db_stats"] = self.vector_db.get_index_stats()
stats["url_processor_stats"] = self.url_processor.get_statistics()
return stats
def health_check(self) -> Dict[str, Any]:
"""
Perform comprehensive health check on all components.
Returns:
Health check results
"""
health = {
"overall_status": "healthy",
"timestamp": datetime.now().isoformat(),
"components": {},
}
try:
# ๐Ÿ”ฎ Check embedding generator
if self.embedding_generator.client:
health["components"]["embedding_generator"] = "Ready"
else:
health["components"]["embedding_generator"] = "โŒ Not configured"
health["overall_status"] = "degraded"
# Check vector database
vector_health = self.vector_db.health_check()
health["components"]["vector_database"] = vector_health["status"]
if vector_health["status"] != "healthy":
health["overall_status"] = "degraded"
# Add component details
health["details"] = {
"vector_db_health": vector_health,
"embedding_stats": self.embedding_generator.get_statistics(),
"pipeline_stats": self.get_statistics(),
}
except Exception as e:
health["overall_status"] = "unhealthy"
health["error"] = str(e)
return health
def reset_statistics(self):
"""Reset pipeline statistics."""
self.stats = {
"documents_processed": 0,
"urls_processed": 0,
"chunks_created": 0,
"embeddings_generated": 0,
"vectors_stored": 0,
"errors_encountered": 0,
"start_time": None,
"end_time": None,
}
# Reset component statistics
self.embedding_generator.stats = {
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"cache_hits": 0,
"total_tokens_processed": 0,
"start_time": datetime.now(),
}
self.vector_db.reset_stats()
self.url_processor.reset()
self.logger.info("All pipeline statistics reset")
# Convenience function for quick pipeline usage
def create_pipeline(config_path: Optional[str] = None) -> IngestionPipeline:
"""
Create and return a configured ingestion pipeline.
Args:
config_path: Optional path to configuration file
Returns:
Configured IngestionPipeline instance
"""
return IngestionPipeline(config_path)
# ๐Ÿ“„ Example usage functions
def process_documents_simple(
file_paths: List[str], config_path: Optional[str] = None
) -> Dict[str, Any]:
"""
๐Ÿ“„ Simple function to process documents with default configuration.
Args:
file_paths: List of document file paths
config_path: Optional configuration file path
Returns:
Processing results
"""
pipeline = create_pipeline(config_path)
return pipeline.process_documents(file_paths)
def process_urls_simple(
urls: List[str], config_path: Optional[str] = None
) -> Dict[str, Any]:
"""
Simple function to process URLs with default configuration.
Args:
urls: List of URLs to process
config_path: Optional configuration file path
Returns:
Processing results
"""
pipeline = create_pipeline(config_path)
return pipeline.process_urls(urls)