""" Parallel Processing Module for DittoTalkingHead Implements concurrent audio and image preprocessing """ import asyncio import concurrent.futures from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor import time from typing import Tuple, Dict, Any, Optional, Callable import numpy as np from pathlib import Path import threading import queue import torch from functools import partial class ParallelProcessor: """ Parallel processing for audio and image preprocessing """ def __init__( self, num_threads: int = 4, num_processes: int = 2, use_cuda_streams: bool = True ): """ Initialize parallel processor Args: num_threads: Number of threads for I/O operations num_processes: Number of processes for CPU-intensive tasks use_cuda_streams: Use CUDA streams for GPU operations """ self.num_threads = num_threads self.num_processes = num_processes self.use_cuda_streams = use_cuda_streams and torch.cuda.is_available() # Thread pool for I/O operations self.thread_executor = ThreadPoolExecutor(max_workers=num_threads) # Process pool for CPU-intensive operations self.process_executor = ProcessPoolExecutor(max_workers=num_processes) # CUDA streams for GPU operations if self.use_cuda_streams: self.cuda_streams = [torch.cuda.Stream() for _ in range(2)] else: self.cuda_streams = None print(f"✅ ParallelProcessor initialized: {num_threads} threads, {num_processes} processes") if self.use_cuda_streams: print("✅ CUDA streams enabled for GPU parallelism") def preprocess_audio_parallel(self, audio_path: str) -> Dict[str, Any]: """ Preprocess audio file in parallel Args: audio_path: Path to audio file Returns: Preprocessed audio data """ import librosa # Define subtasks def load_audio(): return librosa.load(audio_path, sr=16000) def extract_features(audio, sr): # Extract various audio features in parallel features = {} # MFCC features features['mfcc'] = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=13) # Spectral features features['spectral_centroid'] = librosa.feature.spectral_centroid(y=audio, sr=sr) features['spectral_rolloff'] = librosa.feature.spectral_rolloff(y=audio, sr=sr) return features # Load audio audio, sr = load_audio() # Extract features in parallel (if needed) features = extract_features(audio, sr) return { 'audio': audio, 'sample_rate': sr, 'features': features, 'duration': len(audio) / sr } def preprocess_image_parallel(self, image_path: str, target_size: int = 320) -> Dict[str, Any]: """ Preprocess image file in parallel Args: image_path: Path to image file target_size: Target resolution Returns: Preprocessed image data """ from PIL import Image import cv2 # Define subtasks def load_and_resize(): # Load image img = Image.open(image_path).convert('RGB') # Resize img = img.resize((target_size, target_size), Image.Resampling.LANCZOS) return np.array(img) def extract_face_landmarks(img_array): # Face detection and landmark extraction # Simplified version - in production, use MediaPipe or similar return { 'has_face': True, 'landmarks': None # Placeholder } # Execute in parallel future_img = self.thread_executor.submit(load_and_resize) # Get results img_array = future_img.result() # Extract landmarks landmarks = extract_face_landmarks(img_array) return { 'image': img_array, 'shape': img_array.shape, 'landmarks': landmarks } async def preprocess_parallel_async( self, audio_path: str, image_path: str, target_size: int = 320 ) -> Tuple[Dict[str, Any], Dict[str, Any]]: """ Asynchronously preprocess audio and image in parallel Args: audio_path: Path to audio file image_path: Path to image file target_size: Target image resolution Returns: Tuple of (audio_data, image_data) """ loop = asyncio.get_event_loop() # Create tasks for parallel execution audio_task = loop.run_in_executor( self.thread_executor, self.preprocess_audio_parallel, audio_path ) image_task = loop.run_in_executor( self.thread_executor, partial(self.preprocess_image_parallel, target_size=target_size), image_path ) # Wait for both tasks to complete audio_data, image_data = await asyncio.gather(audio_task, image_task) return audio_data, image_data def preprocess_parallel_sync( self, audio_path: str, image_path: str, target_size: int = 320 ) -> Tuple[Dict[str, Any], Dict[str, Any]]: """ Synchronously preprocess audio and image in parallel Args: audio_path: Path to audio file image_path: Path to image file target_size: Target image resolution Returns: Tuple of (audio_data, image_data) """ # Submit tasks to thread pool audio_future = self.thread_executor.submit( self.preprocess_audio_parallel, audio_path ) image_future = self.thread_executor.submit( self.preprocess_image_parallel, image_path, target_size ) # Wait for results audio_data = audio_future.result() image_data = image_future.result() return audio_data, image_data def process_gpu_parallel( self, audio_tensor: torch.Tensor, image_tensor: torch.Tensor, model_audio: torch.nn.Module, model_image: torch.nn.Module ) -> Tuple[torch.Tensor, torch.Tensor]: """ Process audio and image through models using CUDA streams Args: audio_tensor: Audio tensor image_tensor: Image tensor model_audio: Audio processing model model_image: Image processing model Returns: Tuple of processed tensors """ if not self.use_cuda_streams: # Fallback to sequential processing audio_out = model_audio(audio_tensor) image_out = model_image(image_tensor) return audio_out, image_out # Use CUDA streams for parallel GPU processing with torch.cuda.stream(self.cuda_streams[0]): audio_out = model_audio(audio_tensor) with torch.cuda.stream(self.cuda_streams[1]): image_out = model_image(image_tensor) # Synchronize streams torch.cuda.synchronize() return audio_out, image_out def shutdown(self): """Shutdown executors""" self.thread_executor.shutdown(wait=True) self.process_executor.shutdown(wait=True) print("✅ ParallelProcessor shutdown complete") class PipelineProcessor: """ Pipeline-based processing for continuous operations """ def __init__(self, stages: Dict[str, Callable], buffer_size: int = 10): """ Initialize pipeline processor Args: stages: Dictionary of stage_name -> processing_function buffer_size: Size of queues between stages """ self.stages = stages self.buffer_size = buffer_size # Create queues between stages self.queues = {} stage_names = list(stages.keys()) for i in range(len(stage_names) - 1): queue_name = f"{stage_names[i]}_to_{stage_names[i+1]}" self.queues[queue_name] = queue.Queue(maxsize=buffer_size) # Input and output queues self.input_queue = queue.Queue(maxsize=buffer_size) self.output_queue = queue.Queue(maxsize=buffer_size) # Worker threads self.workers = [] self.stop_event = threading.Event() def _worker(self, stage_name: str, process_func: Callable, input_q: queue.Queue, output_q: queue.Queue): """Worker thread for a pipeline stage""" while not self.stop_event.is_set(): try: # Get input with timeout item = input_q.get(timeout=0.1) if item is None: # Poison pill output_q.put(None) break # Process item result = process_func(item) # Put result output_q.put(result) except queue.Empty: continue except Exception as e: print(f"Error in stage {stage_name}: {e}") output_q.put(None) def start(self): """Start pipeline processing""" stage_names = list(self.stages.keys()) # Create worker threads for i, (stage_name, process_func) in enumerate(self.stages.items()): # Determine input and output queues if i == 0: input_q = self.input_queue else: queue_name = f"{stage_names[i-1]}_to_{stage_names[i]}" input_q = self.queues[queue_name] if i == len(stage_names) - 1: output_q = self.output_queue else: queue_name = f"{stage_names[i]}_to_{stage_names[i+1]}" output_q = self.queues[queue_name] # Create and start worker worker = threading.Thread( target=self._worker, args=(stage_name, process_func, input_q, output_q) ) worker.start() self.workers.append(worker) print(f"✅ Pipeline started with {len(self.workers)} stages") def process(self, item: Any) -> Any: """Process an item through the pipeline""" self.input_queue.put(item) return self.output_queue.get() def stop(self): """Stop pipeline processing""" self.stop_event.set() # Send poison pills self.input_queue.put(None) # Wait for workers for worker in self.workers: worker.join() print("✅ Pipeline stopped") def benchmark_parallel_processing(): """Benchmark parallel vs sequential processing""" import time print("\n=== Parallel Processing Benchmark ===") # Create processor processor = ParallelProcessor(num_threads=4) # Test files (using example files) audio_path = "example/audio.wav" image_path = "example/image.png" # Sequential processing start_seq = time.time() audio_data_seq = processor.preprocess_audio_parallel(audio_path) image_data_seq = processor.preprocess_image_parallel(image_path) time_seq = time.time() - start_seq # Parallel processing start_par = time.time() audio_data_par, image_data_par = processor.preprocess_parallel_sync(audio_path, image_path) time_par = time.time() - start_par # Results print(f"Sequential processing: {time_seq:.3f}s") print(f"Parallel processing: {time_par:.3f}s") print(f"Speedup: {time_seq/time_par:.2f}x") processor.shutdown() return { 'sequential_time': time_seq, 'parallel_time': time_par, 'speedup': time_seq / time_par }