#!/usr/bin/env python3 """ パフォーマンステストスクリプト 動画生成の各ステップの実行時間を計測 """ import time import logging from test_api_client import TalkingHeadAPIClient import os # ロギング設定 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) class TimingStats: def __init__(self): self.stats = {} self.start_times = {} def start(self, name): self.start_times[name] = time.time() def end(self, name): if name in self.start_times: duration = time.time() - self.start_times[name] self.stats[name] = duration return duration return None def report(self): print("\n=== パフォーマンス計測結果 ===") total_time = sum(self.stats.values()) for name, duration in self.stats.items(): percentage = (duration / total_time) * 100 if total_time > 0 else 0 print(f"{name}: {duration:.2f}秒 ({percentage:.1f}%)") print(f"\n合計時間: {total_time:.2f}秒") # 音声ファイルの長さを取得 try: import librosa audio_path = "example/audio.wav" y, sr = librosa.load(audio_path, sr=None) audio_duration = len(y) / sr print(f"音声ファイルの長さ: {audio_duration:.2f}秒") print(f"処理時間比率: {total_time/audio_duration:.2f}x") except Exception as e: print(f"音声長さの取得失敗: {e}") def test_performance(): """パフォーマンステストを実行""" timer = TimingStats() # 全体の開始時間 timer.start("全体処理") # クライアント初期化 timer.start("API接続") try: client = TalkingHeadAPIClient() timer.end("API接続") except Exception as e: logging.error(f"クライアント初期化失敗: {e}") return # サンプルファイル audio_path = "example/audio.wav" image_path = "example/image.png" # ファイル情報を表示 audio_size = os.path.getsize(audio_path) / 1024 / 1024 # MB image_size = os.path.getsize(image_path) / 1024 / 1024 # MB print(f"\n入力ファイル情報:") print(f"- 音声: {audio_path} ({audio_size:.2f} MB)") print(f"- 画像: {image_path} ({image_size:.2f} MB)") # 動画生成 timer.start("動画生成(API呼び出し)") try: result = client.generate_video(audio_path, image_path) video_data, status = result timer.end("動画生成(API呼び出し)") if video_data: # 保存処理 timer.start("動画保存") if isinstance(video_data, dict) and 'video' in video_data: saved_path = client.save_with_timestamp(video_data['video']) timer.end("動画保存") # 出力ファイル情報 output_size = os.path.getsize(saved_path) / 1024 / 1024 # MB print(f"\n出力ファイル情報:") print(f"- 動画: {saved_path} ({output_size:.2f} MB)") timer.end("全体処理") timer.report() print(f"\n✅ テスト成功!") print(f"ステータス: {status}") else: print(f"\n❌ テスト失敗") print(f"ステータス: {status}") except Exception as e: logging.error(f"エラー発生: {e}") import traceback traceback.print_exc() def test_multiple_runs(runs=3): """複数回実行して平均時間を計測""" print(f"\n=== {runs}回連続実行テスト ===") times = [] for i in range(runs): print(f"\n--- 実行 {i+1}/{runs} ---") start = time.time() try: client = TalkingHeadAPIClient() result = client.generate_video("example/audio.wav", "example/image.png") if result[0]: duration = time.time() - start times.append(duration) print(f"実行時間: {duration:.2f}秒") except Exception as e: print(f"エラー: {e}") if times: avg_time = sum(times) / len(times) min_time = min(times) max_time = max(times) print(f"\n=== 統計 ===") print(f"平均時間: {avg_time:.2f}秒") print(f"最小時間: {min_time:.2f}秒") print(f"最大時間: {max_time:.2f}秒") def analyze_bottlenecks(): """ボトルネック分析のための詳細テスト""" print("\n=== ボトルネック分析 ===") # ローカルファイルの読み込み時間 start = time.time() with open("example/audio.wav", "rb") as f: audio_data = f.read() with open("example/image.png", "rb") as f: image_data = f.read() local_read_time = time.time() - start print(f"ローカルファイル読み込み: {local_read_time:.3f}秒") # ネットワーク遅延の推定(Hugging Face Spaceへのping相当) import requests start = time.time() try: response = requests.get("https://o-ken5481-talkingavater-bgk.hf.space", timeout=10) network_time = time.time() - start print(f"ネットワーク遅延(推定): {network_time:.3f}秒") except: print("ネットワーク遅延の測定失敗") if __name__ == "__main__": print("DittoTalkingHead パフォーマンステスト") print("=" * 50) # 1. 詳細な時間計測 test_performance() # 2. 複数回実行テスト # test_multiple_runs(3) # 3. ボトルネック分析 analyze_bottlenecks()