Spaces:
Runtime error
Runtime error
""" | |
Performance test script for Phase 3 optimizations | |
Tests various optimization strategies and measures performance improvements | |
""" | |
import time | |
import os | |
import sys | |
import numpy as np | |
from pathlib import Path | |
import torch | |
from typing import Dict, List, Tuple | |
import json | |
from datetime import datetime | |
# Add project root to path | |
sys.path.append(str(Path(__file__).parent)) | |
from model_manager import ModelManager | |
from core.optimization import ( | |
FixedResolutionProcessor, | |
GPUOptimizer, | |
AvatarCache, | |
AvatarTokenManager, | |
ColdStartOptimizer | |
) | |
class PerformanceTester: | |
"""Performance testing framework for DittoTalkingHead optimizations""" | |
def __init__(self): | |
self.results = [] | |
self.resolution_optimizer = FixedResolutionProcessor() | |
self.gpu_optimizer = GPUOptimizer() | |
self.cold_start_optimizer = ColdStartOptimizer() | |
self.avatar_cache = AvatarCache() | |
# Test configurations | |
self.test_configs = { | |
"audio_durations": [4, 8, 16, 32], # seconds | |
"resolutions": [256, 320, 512], # will test 320 fixed vs others | |
"optimization_levels": ["none", "gpu_only", "resolution_only", "full"] | |
} | |
def setup_test_environment(self): | |
"""Set up test environment""" | |
print("=== Setting up test environment ===") | |
# Initialize models | |
USE_PYTORCH = True | |
model_manager = ModelManager(cache_dir="/tmp/ditto_models", use_pytorch=USE_PYTORCH) | |
if not model_manager.setup_models(): | |
raise RuntimeError("Failed to setup models") | |
# Initialize SDK | |
if USE_PYTORCH: | |
data_root = "./checkpoints/ditto_pytorch" | |
cfg_pkl = "./checkpoints/ditto_cfg/v0.4_hubert_cfg_pytorch.pkl" | |
else: | |
data_root = "./checkpoints/ditto_trt_Ampere_Plus" | |
cfg_pkl = "./checkpoints/ditto_cfg/v0.4_hubert_cfg_trt.pkl" | |
from stream_pipeline_offline import StreamSDK | |
self.sdk = StreamSDK(cfg_pkl, data_root) | |
print("✅ Test environment ready") | |
def generate_test_data(self, duration: int) -> Tuple[str, str]: | |
""" | |
Generate test audio and image files | |
Args: | |
duration: Audio duration in seconds | |
Returns: | |
Tuple of (audio_path, image_path) | |
""" | |
import tempfile | |
from scipy.io import wavfile | |
from PIL import Image | |
# Generate test audio (sine wave) | |
sample_rate = 16000 | |
t = np.linspace(0, duration, duration * sample_rate) | |
audio_data = np.sin(2 * np.pi * 440 * t).astype(np.float32) * 0.5 | |
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp: | |
wavfile.write(tmp.name, sample_rate, audio_data) | |
audio_path = tmp.name | |
# Generate test image | |
img = Image.new('RGB', (512, 512), color='white') | |
# Add some features | |
from PIL import ImageDraw | |
draw = ImageDraw.Draw(img) | |
draw.ellipse([156, 156, 356, 356], fill='lightblue') # Face | |
draw.ellipse([200, 200, 220, 220], fill='black') # Left eye | |
draw.ellipse([292, 200, 312, 220], fill='black') # Right eye | |
draw.arc([220, 250, 292, 300], 0, 180, fill='red', width=3) # Mouth | |
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as tmp: | |
img.save(tmp.name) | |
image_path = tmp.name | |
return audio_path, image_path | |
def test_baseline(self, audio_duration: int) -> Dict[str, float]: | |
""" | |
Test baseline performance without optimizations | |
Args: | |
audio_duration: Test audio duration in seconds | |
Returns: | |
Performance metrics | |
""" | |
print(f"\n--- Testing baseline (no optimizations, {audio_duration}s audio) ---") | |
audio_path, image_path = self.generate_test_data(audio_duration) | |
try: | |
# Disable optimizations | |
torch.backends.cudnn.benchmark = False | |
with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as tmp: | |
output_path = tmp.name | |
# Run without optimizations | |
from inference import run, seed_everything | |
seed_everything(1024) | |
start_time = time.time() | |
run(self.sdk, audio_path, image_path, output_path) | |
process_time = time.time() - start_time | |
# Clean up | |
for path in [audio_path, image_path, output_path]: | |
if os.path.exists(path): | |
os.unlink(path) | |
return { | |
"audio_duration": audio_duration, | |
"process_time": process_time, | |
"realtime_factor": process_time / audio_duration, | |
"optimization": "none" | |
} | |
except Exception as e: | |
print(f"Error in baseline test: {e}") | |
return None | |
def test_gpu_optimization(self, audio_duration: int) -> Dict[str, float]: | |
"""Test with GPU optimizations only""" | |
print(f"\n--- Testing GPU optimization ({audio_duration}s audio) ---") | |
audio_path, image_path = self.generate_test_data(audio_duration) | |
try: | |
# Apply GPU optimizations | |
self.gpu_optimizer._setup_cuda_optimizations() | |
with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as tmp: | |
output_path = tmp.name | |
from inference import run, seed_everything | |
seed_everything(1024) | |
start_time = time.time() | |
run(self.sdk, audio_path, image_path, output_path) | |
process_time = time.time() - start_time | |
# Clean up | |
for path in [audio_path, image_path, output_path]: | |
if os.path.exists(path): | |
os.unlink(path) | |
return { | |
"audio_duration": audio_duration, | |
"process_time": process_time, | |
"realtime_factor": process_time / audio_duration, | |
"optimization": "gpu_only" | |
} | |
except Exception as e: | |
print(f"Error in GPU optimization test: {e}") | |
return None | |
def test_resolution_optimization(self, audio_duration: int) -> Dict[str, float]: | |
"""Test with resolution optimization (320x320)""" | |
print(f"\n--- Testing resolution optimization ({audio_duration}s audio) ---") | |
audio_path, image_path = self.generate_test_data(audio_duration) | |
try: | |
with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as tmp: | |
output_path = tmp.name | |
# Apply resolution optimization | |
setup_kwargs = { | |
"max_size": self.resolution_optimizer.get_max_dim(), # 320 | |
"sampling_timesteps": self.resolution_optimizer.get_diffusion_steps() # 25 | |
} | |
from inference import run, seed_everything | |
seed_everything(1024) | |
start_time = time.time() | |
run(self.sdk, audio_path, image_path, output_path, | |
more_kwargs={"setup_kwargs": setup_kwargs}) | |
process_time = time.time() - start_time | |
# Clean up | |
for path in [audio_path, image_path, output_path]: | |
if os.path.exists(path): | |
os.unlink(path) | |
return { | |
"audio_duration": audio_duration, | |
"process_time": process_time, | |
"realtime_factor": process_time / audio_duration, | |
"optimization": "resolution_only", | |
"resolution": f"{self.resolution_optimizer.get_max_dim()}x{self.resolution_optimizer.get_max_dim()}" | |
} | |
except Exception as e: | |
print(f"Error in resolution optimization test: {e}") | |
return None | |
def test_full_optimization(self, audio_duration: int) -> Dict[str, float]: | |
"""Test with all optimizations enabled""" | |
print(f"\n--- Testing full optimization ({audio_duration}s audio) ---") | |
audio_path, image_path = self.generate_test_data(audio_duration) | |
try: | |
# Apply all optimizations | |
self.gpu_optimizer._setup_cuda_optimizations() | |
with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as tmp: | |
output_path = tmp.name | |
setup_kwargs = { | |
"max_size": self.resolution_optimizer.get_max_dim(), | |
"sampling_timesteps": self.resolution_optimizer.get_diffusion_steps() | |
} | |
from inference import run, seed_everything | |
seed_everything(1024) | |
start_time = time.time() | |
run(self.sdk, audio_path, image_path, output_path, | |
more_kwargs={"setup_kwargs": setup_kwargs}) | |
process_time = time.time() - start_time | |
# Clean up | |
for path in [audio_path, image_path, output_path]: | |
if os.path.exists(path): | |
os.unlink(path) | |
return { | |
"audio_duration": audio_duration, | |
"process_time": process_time, | |
"realtime_factor": process_time / audio_duration, | |
"optimization": "full", | |
"resolution": f"{self.resolution_optimizer.get_max_dim()}x{self.resolution_optimizer.get_max_dim()}", | |
"gpu_optimized": True | |
} | |
except Exception as e: | |
print(f"Error in full optimization test: {e}") | |
return None | |
def run_comprehensive_test(self): | |
"""Run comprehensive performance tests""" | |
print("\n" + "="*60) | |
print("Starting comprehensive performance test") | |
print("="*60) | |
self.setup_test_environment() | |
# Test different audio durations and optimization levels | |
for duration in self.test_configs["audio_durations"]: | |
print(f"\n{'='*60}") | |
print(f"Testing with {duration}s audio") | |
print(f"{'='*60}") | |
# Run tests with different optimization levels | |
tests = [ | |
("Baseline", self.test_baseline), | |
("GPU Only", self.test_gpu_optimization), | |
("Resolution Only", self.test_resolution_optimization), | |
("Full Optimization", self.test_full_optimization) | |
] | |
duration_results = [] | |
for test_name, test_func in tests: | |
result = test_func(duration) | |
if result: | |
duration_results.append(result) | |
print(f"{test_name}: {result['process_time']:.2f}s (RT factor: {result['realtime_factor']:.2f}x)") | |
# Clear GPU cache between tests | |
self.gpu_optimizer.clear_cache() | |
time.sleep(1) # Brief pause | |
self.results.extend(duration_results) | |
# Generate report | |
self.generate_report() | |
def generate_report(self): | |
"""Generate performance test report""" | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
report_file = f"performance_report_{timestamp}.json" | |
# Calculate improvements | |
summary = { | |
"test_date": timestamp, | |
"gpu_info": self.gpu_optimizer.get_memory_stats(), | |
"optimization_config": self.resolution_optimizer.get_performance_config(), | |
"results": self.results | |
} | |
# Calculate average improvements by optimization type | |
avg_improvements = {} | |
for opt_type in ["gpu_only", "resolution_only", "full"]: | |
opt_results = [r for r in self.results if r.get("optimization") == opt_type] | |
baseline_results = [r for r in self.results if r.get("optimization") == "none" | |
and r["audio_duration"] == opt_results[0]["audio_duration"]] | |
if opt_results and baseline_results: | |
avg_improvement = 0 | |
for opt_r in opt_results: | |
baseline_r = next((b for b in baseline_results | |
if b["audio_duration"] == opt_r["audio_duration"]), None) | |
if baseline_r: | |
improvement = (baseline_r["process_time"] - opt_r["process_time"]) / baseline_r["process_time"] * 100 | |
avg_improvement += improvement | |
avg_improvements[opt_type] = avg_improvement / len(opt_results) | |
summary["average_improvements"] = avg_improvements | |
# Save report | |
with open(report_file, 'w') as f: | |
json.dump(summary, f, indent=2) | |
# Print summary | |
print("\n" + "="*60) | |
print("PERFORMANCE TEST SUMMARY") | |
print("="*60) | |
print("\nAverage Performance Improvements:") | |
for opt_type, improvement in avg_improvements.items(): | |
print(f"- {opt_type}: {improvement:.1f}% faster") | |
print(f"\nDetailed results saved to: {report_file}") | |
# Check if we meet the target (16s audio in <10s) | |
target_results = [r for r in self.results | |
if r.get("optimization") == "full" and r["audio_duration"] == 16] | |
if target_results: | |
meets_target = target_results[0]["process_time"] <= 10.0 | |
print(f"\n✅ Target Achievement (16s audio < 10s): {'YES' if meets_target else 'NO'}") | |
print(f" Actual time: {target_results[0]['process_time']:.2f}s") | |
if __name__ == "__main__": | |
import tempfile | |
tester = PerformanceTester() | |
tester.run_comprehensive_test() |