File size: 5,863 Bytes
ada2c6f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
#!/usr/bin/env python3
"""
パフォーマンステストスクリプト
動画生成の各ステップの実行時間を計測
"""

import time
import logging
from test_api_client import TalkingHeadAPIClient
import os

# ロギング設定
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

class TimingStats:
    def __init__(self):
        self.stats = {}
        self.start_times = {}
    
    def start(self, name):
        self.start_times[name] = time.time()
    
    def end(self, name):
        if name in self.start_times:
            duration = time.time() - self.start_times[name]
            self.stats[name] = duration
            return duration
        return None
    
    def report(self):
        print("\n=== パフォーマンス計測結果 ===")
        total_time = sum(self.stats.values())
        for name, duration in self.stats.items():
            percentage = (duration / total_time) * 100 if total_time > 0 else 0
            print(f"{name}: {duration:.2f}秒 ({percentage:.1f}%)")
        print(f"\n合計時間: {total_time:.2f}秒")
        
        # 音声ファイルの長さを取得
        try:
            import librosa
            audio_path = "example/audio.wav"
            y, sr = librosa.load(audio_path, sr=None)
            audio_duration = len(y) / sr
            print(f"音声ファイルの長さ: {audio_duration:.2f}秒")
            print(f"処理時間比率: {total_time/audio_duration:.2f}x")
        except Exception as e:
            print(f"音声長さの取得失敗: {e}")

def test_performance():
    """パフォーマンステストを実行"""
    timer = TimingStats()
    
    # 全体の開始時間
    timer.start("全体処理")
    
    # クライアント初期化
    timer.start("API接続")
    try:
        client = TalkingHeadAPIClient()
        timer.end("API接続")
    except Exception as e:
        logging.error(f"クライアント初期化失敗: {e}")
        return
    
    # サンプルファイル
    audio_path = "example/audio.wav"
    image_path = "example/image.png"
    
    # ファイル情報を表示
    audio_size = os.path.getsize(audio_path) / 1024 / 1024  # MB
    image_size = os.path.getsize(image_path) / 1024 / 1024  # MB
    print(f"\n入力ファイル情報:")
    print(f"- 音声: {audio_path} ({audio_size:.2f} MB)")
    print(f"- 画像: {image_path} ({image_size:.2f} MB)")
    
    # 動画生成
    timer.start("動画生成(API呼び出し)")
    try:
        result = client.generate_video(audio_path, image_path)
        video_data, status = result
        timer.end("動画生成(API呼び出し)")
        
        if video_data:
            # 保存処理
            timer.start("動画保存")
            if isinstance(video_data, dict) and 'video' in video_data:
                saved_path = client.save_with_timestamp(video_data['video'])
                timer.end("動画保存")
                
                # 出力ファイル情報
                output_size = os.path.getsize(saved_path) / 1024 / 1024  # MB
                print(f"\n出力ファイル情報:")
                print(f"- 動画: {saved_path} ({output_size:.2f} MB)")
            
            timer.end("全体処理")
            timer.report()
            
            print(f"\n✅ テスト成功!")
            print(f"ステータス: {status}")
        else:
            print(f"\n❌ テスト失敗")
            print(f"ステータス: {status}")
            
    except Exception as e:
        logging.error(f"エラー発生: {e}")
        import traceback
        traceback.print_exc()

def test_multiple_runs(runs=3):
    """複数回実行して平均時間を計測"""
    print(f"\n=== {runs}回連続実行テスト ===")
    
    times = []
    for i in range(runs):
        print(f"\n--- 実行 {i+1}/{runs} ---")
        start = time.time()
        
        try:
            client = TalkingHeadAPIClient()
            result = client.generate_video("example/audio.wav", "example/image.png")
            if result[0]:
                duration = time.time() - start
                times.append(duration)
                print(f"実行時間: {duration:.2f}秒")
        except Exception as e:
            print(f"エラー: {e}")
    
    if times:
        avg_time = sum(times) / len(times)
        min_time = min(times)
        max_time = max(times)
        print(f"\n=== 統計 ===")
        print(f"平均時間: {avg_time:.2f}秒")
        print(f"最小時間: {min_time:.2f}秒")
        print(f"最大時間: {max_time:.2f}秒")

def analyze_bottlenecks():
    """ボトルネック分析のための詳細テスト"""
    print("\n=== ボトルネック分析 ===")
    
    # ローカルファイルの読み込み時間
    start = time.time()
    with open("example/audio.wav", "rb") as f:
        audio_data = f.read()
    with open("example/image.png", "rb") as f:
        image_data = f.read()
    local_read_time = time.time() - start
    print(f"ローカルファイル読み込み: {local_read_time:.3f}秒")
    
    # ネットワーク遅延の推定(Hugging Face Spaceへのping相当)
    import requests
    start = time.time()
    try:
        response = requests.get("https://o-ken5481-talkingavater-bgk.hf.space", timeout=10)
        network_time = time.time() - start
        print(f"ネットワーク遅延(推定): {network_time:.3f}秒")
    except:
        print("ネットワーク遅延の測定失敗")

if __name__ == "__main__":
    print("DittoTalkingHead パフォーマンステスト")
    print("=" * 50)
    
    # 1. 詳細な時間計測
    test_performance()
    
    # 2. 複数回実行テスト
    # test_multiple_runs(3)
    
    # 3. ボトルネック分析
    analyze_bottlenecks()