Spaces:
Runtime error
Runtime error
""" | |
Parallel Processing Module for DittoTalkingHead | |
Implements concurrent audio and image preprocessing | |
""" | |
import asyncio | |
import concurrent.futures | |
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor | |
import time | |
from typing import Tuple, Dict, Any, Optional, Callable | |
import numpy as np | |
from pathlib import Path | |
import threading | |
import queue | |
import torch | |
from functools import partial | |
class ParallelProcessor: | |
""" | |
Parallel processing for audio and image preprocessing | |
""" | |
def __init__( | |
self, | |
num_threads: int = 4, | |
num_processes: int = 2, | |
use_cuda_streams: bool = True | |
): | |
""" | |
Initialize parallel processor | |
Args: | |
num_threads: Number of threads for I/O operations | |
num_processes: Number of processes for CPU-intensive tasks | |
use_cuda_streams: Use CUDA streams for GPU operations | |
""" | |
self.num_threads = num_threads | |
self.num_processes = num_processes | |
self.use_cuda_streams = use_cuda_streams and torch.cuda.is_available() | |
# Thread pool for I/O operations | |
self.thread_executor = ThreadPoolExecutor(max_workers=num_threads) | |
# Process pool for CPU-intensive operations | |
self.process_executor = ProcessPoolExecutor(max_workers=num_processes) | |
# CUDA streams for GPU operations | |
if self.use_cuda_streams: | |
self.cuda_streams = [torch.cuda.Stream() for _ in range(2)] | |
else: | |
self.cuda_streams = None | |
print(f"✅ ParallelProcessor initialized: {num_threads} threads, {num_processes} processes") | |
if self.use_cuda_streams: | |
print("✅ CUDA streams enabled for GPU parallelism") | |
def preprocess_audio_parallel(self, audio_path: str) -> Dict[str, Any]: | |
""" | |
Preprocess audio file in parallel | |
Args: | |
audio_path: Path to audio file | |
Returns: | |
Preprocessed audio data | |
""" | |
import librosa | |
# Define subtasks | |
def load_audio(): | |
return librosa.load(audio_path, sr=16000) | |
def extract_features(audio, sr): | |
# Extract various audio features in parallel | |
features = {} | |
# MFCC features | |
features['mfcc'] = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=13) | |
# Spectral features | |
features['spectral_centroid'] = librosa.feature.spectral_centroid(y=audio, sr=sr) | |
features['spectral_rolloff'] = librosa.feature.spectral_rolloff(y=audio, sr=sr) | |
return features | |
# Load audio | |
audio, sr = load_audio() | |
# Extract features in parallel (if needed) | |
features = extract_features(audio, sr) | |
return { | |
'audio': audio, | |
'sample_rate': sr, | |
'features': features, | |
'duration': len(audio) / sr | |
} | |
def preprocess_image_parallel(self, image_path: str, target_size: int = 320) -> Dict[str, Any]: | |
""" | |
Preprocess image file in parallel | |
Args: | |
image_path: Path to image file | |
target_size: Target resolution | |
Returns: | |
Preprocessed image data | |
""" | |
from PIL import Image | |
import cv2 | |
# Define subtasks | |
def load_and_resize(): | |
# Load image | |
img = Image.open(image_path).convert('RGB') | |
# Resize | |
img = img.resize((target_size, target_size), Image.Resampling.LANCZOS) | |
return np.array(img) | |
def extract_face_landmarks(img_array): | |
# Face detection and landmark extraction | |
# Simplified version - in production, use MediaPipe or similar | |
return { | |
'has_face': True, | |
'landmarks': None # Placeholder | |
} | |
# Execute in parallel | |
future_img = self.thread_executor.submit(load_and_resize) | |
# Get results | |
img_array = future_img.result() | |
# Extract landmarks | |
landmarks = extract_face_landmarks(img_array) | |
return { | |
'image': img_array, | |
'shape': img_array.shape, | |
'landmarks': landmarks | |
} | |
async def preprocess_parallel_async( | |
self, | |
audio_path: str, | |
image_path: str, | |
target_size: int = 320 | |
) -> Tuple[Dict[str, Any], Dict[str, Any]]: | |
""" | |
Asynchronously preprocess audio and image in parallel | |
Args: | |
audio_path: Path to audio file | |
image_path: Path to image file | |
target_size: Target image resolution | |
Returns: | |
Tuple of (audio_data, image_data) | |
""" | |
loop = asyncio.get_event_loop() | |
# Create tasks for parallel execution | |
audio_task = loop.run_in_executor( | |
self.thread_executor, | |
self.preprocess_audio_parallel, | |
audio_path | |
) | |
image_task = loop.run_in_executor( | |
self.thread_executor, | |
partial(self.preprocess_image_parallel, target_size=target_size), | |
image_path | |
) | |
# Wait for both tasks to complete | |
audio_data, image_data = await asyncio.gather(audio_task, image_task) | |
return audio_data, image_data | |
def preprocess_parallel_sync( | |
self, | |
audio_path: str, | |
image_path: str, | |
target_size: int = 320 | |
) -> Tuple[Dict[str, Any], Dict[str, Any]]: | |
""" | |
Synchronously preprocess audio and image in parallel | |
Args: | |
audio_path: Path to audio file | |
image_path: Path to image file | |
target_size: Target image resolution | |
Returns: | |
Tuple of (audio_data, image_data) | |
""" | |
# Submit tasks to thread pool | |
audio_future = self.thread_executor.submit( | |
self.preprocess_audio_parallel, | |
audio_path | |
) | |
image_future = self.thread_executor.submit( | |
self.preprocess_image_parallel, | |
image_path, | |
target_size | |
) | |
# Wait for results | |
audio_data = audio_future.result() | |
image_data = image_future.result() | |
return audio_data, image_data | |
def process_gpu_parallel( | |
self, | |
audio_tensor: torch.Tensor, | |
image_tensor: torch.Tensor, | |
model_audio: torch.nn.Module, | |
model_image: torch.nn.Module | |
) -> Tuple[torch.Tensor, torch.Tensor]: | |
""" | |
Process audio and image through models using CUDA streams | |
Args: | |
audio_tensor: Audio tensor | |
image_tensor: Image tensor | |
model_audio: Audio processing model | |
model_image: Image processing model | |
Returns: | |
Tuple of processed tensors | |
""" | |
if not self.use_cuda_streams: | |
# Fallback to sequential processing | |
audio_out = model_audio(audio_tensor) | |
image_out = model_image(image_tensor) | |
return audio_out, image_out | |
# Use CUDA streams for parallel GPU processing | |
with torch.cuda.stream(self.cuda_streams[0]): | |
audio_out = model_audio(audio_tensor) | |
with torch.cuda.stream(self.cuda_streams[1]): | |
image_out = model_image(image_tensor) | |
# Synchronize streams | |
torch.cuda.synchronize() | |
return audio_out, image_out | |
def shutdown(self): | |
"""Shutdown executors""" | |
self.thread_executor.shutdown(wait=True) | |
self.process_executor.shutdown(wait=True) | |
print("✅ ParallelProcessor shutdown complete") | |
class PipelineProcessor: | |
""" | |
Pipeline-based processing for continuous operations | |
""" | |
def __init__(self, stages: Dict[str, Callable], buffer_size: int = 10): | |
""" | |
Initialize pipeline processor | |
Args: | |
stages: Dictionary of stage_name -> processing_function | |
buffer_size: Size of queues between stages | |
""" | |
self.stages = stages | |
self.buffer_size = buffer_size | |
# Create queues between stages | |
self.queues = {} | |
stage_names = list(stages.keys()) | |
for i in range(len(stage_names) - 1): | |
queue_name = f"{stage_names[i]}_to_{stage_names[i+1]}" | |
self.queues[queue_name] = queue.Queue(maxsize=buffer_size) | |
# Input and output queues | |
self.input_queue = queue.Queue(maxsize=buffer_size) | |
self.output_queue = queue.Queue(maxsize=buffer_size) | |
# Worker threads | |
self.workers = [] | |
self.stop_event = threading.Event() | |
def _worker(self, stage_name: str, process_func: Callable, input_q: queue.Queue, output_q: queue.Queue): | |
"""Worker thread for a pipeline stage""" | |
while not self.stop_event.is_set(): | |
try: | |
# Get input with timeout | |
item = input_q.get(timeout=0.1) | |
if item is None: # Poison pill | |
output_q.put(None) | |
break | |
# Process item | |
result = process_func(item) | |
# Put result | |
output_q.put(result) | |
except queue.Empty: | |
continue | |
except Exception as e: | |
print(f"Error in stage {stage_name}: {e}") | |
output_q.put(None) | |
def start(self): | |
"""Start pipeline processing""" | |
stage_names = list(self.stages.keys()) | |
# Create worker threads | |
for i, (stage_name, process_func) in enumerate(self.stages.items()): | |
# Determine input and output queues | |
if i == 0: | |
input_q = self.input_queue | |
else: | |
queue_name = f"{stage_names[i-1]}_to_{stage_names[i]}" | |
input_q = self.queues[queue_name] | |
if i == len(stage_names) - 1: | |
output_q = self.output_queue | |
else: | |
queue_name = f"{stage_names[i]}_to_{stage_names[i+1]}" | |
output_q = self.queues[queue_name] | |
# Create and start worker | |
worker = threading.Thread( | |
target=self._worker, | |
args=(stage_name, process_func, input_q, output_q) | |
) | |
worker.start() | |
self.workers.append(worker) | |
print(f"✅ Pipeline started with {len(self.workers)} stages") | |
def process(self, item: Any) -> Any: | |
"""Process an item through the pipeline""" | |
self.input_queue.put(item) | |
return self.output_queue.get() | |
def stop(self): | |
"""Stop pipeline processing""" | |
self.stop_event.set() | |
# Send poison pills | |
self.input_queue.put(None) | |
# Wait for workers | |
for worker in self.workers: | |
worker.join() | |
print("✅ Pipeline stopped") | |
def benchmark_parallel_processing(): | |
"""Benchmark parallel vs sequential processing""" | |
import time | |
print("\n=== Parallel Processing Benchmark ===") | |
# Create processor | |
processor = ParallelProcessor(num_threads=4) | |
# Test files (using example files) | |
audio_path = "example/audio.wav" | |
image_path = "example/image.png" | |
# Sequential processing | |
start_seq = time.time() | |
audio_data_seq = processor.preprocess_audio_parallel(audio_path) | |
image_data_seq = processor.preprocess_image_parallel(image_path) | |
time_seq = time.time() - start_seq | |
# Parallel processing | |
start_par = time.time() | |
audio_data_par, image_data_par = processor.preprocess_parallel_sync(audio_path, image_path) | |
time_par = time.time() - start_par | |
# Results | |
print(f"Sequential processing: {time_seq:.3f}s") | |
print(f"Parallel processing: {time_par:.3f}s") | |
print(f"Speedup: {time_seq/time_par:.2f}x") | |
processor.shutdown() | |
return { | |
'sequential_time': time_seq, | |
'parallel_time': time_par, | |
'speedup': time_seq / time_par | |
} |