talkingAvater_bgk / test_performance_optimized.py
oKen38461's picture
README_jp.mdにPhase 3のパフォーマンス最適化の実装状況を更新し、API経由の使用例を追加しました。また、requirements.txtにPhase 3の依存関係を追加しました。
b27232b
raw
history blame
14.3 kB
"""
Performance test script for Phase 3 optimizations
Tests various optimization strategies and measures performance improvements
"""
import time
import os
import sys
import numpy as np
from pathlib import Path
import torch
from typing import Dict, List, Tuple
import json
from datetime import datetime
# Add project root to path
sys.path.append(str(Path(__file__).parent))
from model_manager import ModelManager
from core.optimization import (
FixedResolutionProcessor,
GPUOptimizer,
AvatarCache,
AvatarTokenManager,
ColdStartOptimizer
)
class PerformanceTester:
"""Performance testing framework for DittoTalkingHead optimizations"""
def __init__(self):
self.results = []
self.resolution_optimizer = FixedResolutionProcessor()
self.gpu_optimizer = GPUOptimizer()
self.cold_start_optimizer = ColdStartOptimizer()
self.avatar_cache = AvatarCache()
# Test configurations
self.test_configs = {
"audio_durations": [4, 8, 16, 32], # seconds
"resolutions": [256, 320, 512], # will test 320 fixed vs others
"optimization_levels": ["none", "gpu_only", "resolution_only", "full"]
}
def setup_test_environment(self):
"""Set up test environment"""
print("=== Setting up test environment ===")
# Initialize models
USE_PYTORCH = True
model_manager = ModelManager(cache_dir="/tmp/ditto_models", use_pytorch=USE_PYTORCH)
if not model_manager.setup_models():
raise RuntimeError("Failed to setup models")
# Initialize SDK
if USE_PYTORCH:
data_root = "./checkpoints/ditto_pytorch"
cfg_pkl = "./checkpoints/ditto_cfg/v0.4_hubert_cfg_pytorch.pkl"
else:
data_root = "./checkpoints/ditto_trt_Ampere_Plus"
cfg_pkl = "./checkpoints/ditto_cfg/v0.4_hubert_cfg_trt.pkl"
from stream_pipeline_offline import StreamSDK
self.sdk = StreamSDK(cfg_pkl, data_root)
print("✅ Test environment ready")
def generate_test_data(self, duration: int) -> Tuple[str, str]:
"""
Generate test audio and image files
Args:
duration: Audio duration in seconds
Returns:
Tuple of (audio_path, image_path)
"""
import tempfile
from scipy.io import wavfile
from PIL import Image
# Generate test audio (sine wave)
sample_rate = 16000
t = np.linspace(0, duration, duration * sample_rate)
audio_data = np.sin(2 * np.pi * 440 * t).astype(np.float32) * 0.5
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp:
wavfile.write(tmp.name, sample_rate, audio_data)
audio_path = tmp.name
# Generate test image
img = Image.new('RGB', (512, 512), color='white')
# Add some features
from PIL import ImageDraw
draw = ImageDraw.Draw(img)
draw.ellipse([156, 156, 356, 356], fill='lightblue') # Face
draw.ellipse([200, 200, 220, 220], fill='black') # Left eye
draw.ellipse([292, 200, 312, 220], fill='black') # Right eye
draw.arc([220, 250, 292, 300], 0, 180, fill='red', width=3) # Mouth
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as tmp:
img.save(tmp.name)
image_path = tmp.name
return audio_path, image_path
def test_baseline(self, audio_duration: int) -> Dict[str, float]:
"""
Test baseline performance without optimizations
Args:
audio_duration: Test audio duration in seconds
Returns:
Performance metrics
"""
print(f"\n--- Testing baseline (no optimizations, {audio_duration}s audio) ---")
audio_path, image_path = self.generate_test_data(audio_duration)
try:
# Disable optimizations
torch.backends.cudnn.benchmark = False
with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as tmp:
output_path = tmp.name
# Run without optimizations
from inference import run, seed_everything
seed_everything(1024)
start_time = time.time()
run(self.sdk, audio_path, image_path, output_path)
process_time = time.time() - start_time
# Clean up
for path in [audio_path, image_path, output_path]:
if os.path.exists(path):
os.unlink(path)
return {
"audio_duration": audio_duration,
"process_time": process_time,
"realtime_factor": process_time / audio_duration,
"optimization": "none"
}
except Exception as e:
print(f"Error in baseline test: {e}")
return None
def test_gpu_optimization(self, audio_duration: int) -> Dict[str, float]:
"""Test with GPU optimizations only"""
print(f"\n--- Testing GPU optimization ({audio_duration}s audio) ---")
audio_path, image_path = self.generate_test_data(audio_duration)
try:
# Apply GPU optimizations
self.gpu_optimizer._setup_cuda_optimizations()
with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as tmp:
output_path = tmp.name
from inference import run, seed_everything
seed_everything(1024)
start_time = time.time()
run(self.sdk, audio_path, image_path, output_path)
process_time = time.time() - start_time
# Clean up
for path in [audio_path, image_path, output_path]:
if os.path.exists(path):
os.unlink(path)
return {
"audio_duration": audio_duration,
"process_time": process_time,
"realtime_factor": process_time / audio_duration,
"optimization": "gpu_only"
}
except Exception as e:
print(f"Error in GPU optimization test: {e}")
return None
def test_resolution_optimization(self, audio_duration: int) -> Dict[str, float]:
"""Test with resolution optimization (320x320)"""
print(f"\n--- Testing resolution optimization ({audio_duration}s audio) ---")
audio_path, image_path = self.generate_test_data(audio_duration)
try:
with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as tmp:
output_path = tmp.name
# Apply resolution optimization
setup_kwargs = {
"max_size": self.resolution_optimizer.get_max_dim(), # 320
"sampling_timesteps": self.resolution_optimizer.get_diffusion_steps() # 25
}
from inference import run, seed_everything
seed_everything(1024)
start_time = time.time()
run(self.sdk, audio_path, image_path, output_path,
more_kwargs={"setup_kwargs": setup_kwargs})
process_time = time.time() - start_time
# Clean up
for path in [audio_path, image_path, output_path]:
if os.path.exists(path):
os.unlink(path)
return {
"audio_duration": audio_duration,
"process_time": process_time,
"realtime_factor": process_time / audio_duration,
"optimization": "resolution_only",
"resolution": f"{self.resolution_optimizer.get_max_dim()}x{self.resolution_optimizer.get_max_dim()}"
}
except Exception as e:
print(f"Error in resolution optimization test: {e}")
return None
def test_full_optimization(self, audio_duration: int) -> Dict[str, float]:
"""Test with all optimizations enabled"""
print(f"\n--- Testing full optimization ({audio_duration}s audio) ---")
audio_path, image_path = self.generate_test_data(audio_duration)
try:
# Apply all optimizations
self.gpu_optimizer._setup_cuda_optimizations()
with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as tmp:
output_path = tmp.name
setup_kwargs = {
"max_size": self.resolution_optimizer.get_max_dim(),
"sampling_timesteps": self.resolution_optimizer.get_diffusion_steps()
}
from inference import run, seed_everything
seed_everything(1024)
start_time = time.time()
run(self.sdk, audio_path, image_path, output_path,
more_kwargs={"setup_kwargs": setup_kwargs})
process_time = time.time() - start_time
# Clean up
for path in [audio_path, image_path, output_path]:
if os.path.exists(path):
os.unlink(path)
return {
"audio_duration": audio_duration,
"process_time": process_time,
"realtime_factor": process_time / audio_duration,
"optimization": "full",
"resolution": f"{self.resolution_optimizer.get_max_dim()}x{self.resolution_optimizer.get_max_dim()}",
"gpu_optimized": True
}
except Exception as e:
print(f"Error in full optimization test: {e}")
return None
def run_comprehensive_test(self):
"""Run comprehensive performance tests"""
print("\n" + "="*60)
print("Starting comprehensive performance test")
print("="*60)
self.setup_test_environment()
# Test different audio durations and optimization levels
for duration in self.test_configs["audio_durations"]:
print(f"\n{'='*60}")
print(f"Testing with {duration}s audio")
print(f"{'='*60}")
# Run tests with different optimization levels
tests = [
("Baseline", self.test_baseline),
("GPU Only", self.test_gpu_optimization),
("Resolution Only", self.test_resolution_optimization),
("Full Optimization", self.test_full_optimization)
]
duration_results = []
for test_name, test_func in tests:
result = test_func(duration)
if result:
duration_results.append(result)
print(f"{test_name}: {result['process_time']:.2f}s (RT factor: {result['realtime_factor']:.2f}x)")
# Clear GPU cache between tests
self.gpu_optimizer.clear_cache()
time.sleep(1) # Brief pause
self.results.extend(duration_results)
# Generate report
self.generate_report()
def generate_report(self):
"""Generate performance test report"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
report_file = f"performance_report_{timestamp}.json"
# Calculate improvements
summary = {
"test_date": timestamp,
"gpu_info": self.gpu_optimizer.get_memory_stats(),
"optimization_config": self.resolution_optimizer.get_performance_config(),
"results": self.results
}
# Calculate average improvements by optimization type
avg_improvements = {}
for opt_type in ["gpu_only", "resolution_only", "full"]:
opt_results = [r for r in self.results if r.get("optimization") == opt_type]
baseline_results = [r for r in self.results if r.get("optimization") == "none"
and r["audio_duration"] == opt_results[0]["audio_duration"]]
if opt_results and baseline_results:
avg_improvement = 0
for opt_r in opt_results:
baseline_r = next((b for b in baseline_results
if b["audio_duration"] == opt_r["audio_duration"]), None)
if baseline_r:
improvement = (baseline_r["process_time"] - opt_r["process_time"]) / baseline_r["process_time"] * 100
avg_improvement += improvement
avg_improvements[opt_type] = avg_improvement / len(opt_results)
summary["average_improvements"] = avg_improvements
# Save report
with open(report_file, 'w') as f:
json.dump(summary, f, indent=2)
# Print summary
print("\n" + "="*60)
print("PERFORMANCE TEST SUMMARY")
print("="*60)
print("\nAverage Performance Improvements:")
for opt_type, improvement in avg_improvements.items():
print(f"- {opt_type}: {improvement:.1f}% faster")
print(f"\nDetailed results saved to: {report_file}")
# Check if we meet the target (16s audio in <10s)
target_results = [r for r in self.results
if r.get("optimization") == "full" and r["audio_duration"] == 16]
if target_results:
meets_target = target_results[0]["process_time"] <= 10.0
print(f"\n✅ Target Achievement (16s audio < 10s): {'YES' if meets_target else 'NO'}")
print(f" Actual time: {target_results[0]['process_time']:.2f}s")
if __name__ == "__main__":
import tempfile
tester = PerformanceTester()
tester.run_comprehensive_test()