File size: 12,799 Bytes
80a1334
 
 
 
aae35f1
 
 
 
80a1334
 
 
 
aae35f1
 
 
 
 
 
 
 
80a1334
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
aae35f1
 
80a1334
 
 
 
 
aae35f1
 
80a1334
aae35f1
80a1334
 
 
 
aae35f1
80a1334
aae35f1
 
80a1334
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
aae35f1
 
80a1334
 
 
aae35f1
80a1334
 
 
 
 
aae35f1
 
80a1334
 
 
 
aae35f1
 
 
80a1334
 
 
 
 
 
 
 
 
 
 
 
aae35f1
 
 
80a1334
 
 
aae35f1
 
80a1334
 
 
 
 
 
 
 
 
 
 
 
 
aae35f1
 
80a1334
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
from huggingface_hub import HfApi, hf_hub_download
from typing import Dict, Optional
import json
import os
import logging

# Configure logging
logger = logging.getLogger(__name__)


class SimpleMemoryCalculator:
    def __init__(self):
        logger.info("Initializing SimpleMemoryCalculator")
        try:
            self.hf_api = HfApi()
            logger.debug("HuggingFace API initialized")
        except Exception as e:
            logger.error(f"Failed to initialize HuggingFace API: {e}")
            raise
            
        self.cache = {}
        
        # Known model memory requirements (in GB for FP16)
        self.known_models = {
            "black-forest-labs/FLUX.1-schnell": {
                "params_billions": 12.0,
                "fp16_gb": 24.0,
                "inference_fp16_gb": 36.0
            },
            "black-forest-labs/FLUX.1-dev": {
                "params_billions": 12.0,
                "fp16_gb": 24.0,
                "inference_fp16_gb": 36.0
            },
            "stabilityai/stable-diffusion-xl-base-1.0": {
                "params_billions": 3.5,
                "fp16_gb": 7.0,
                "inference_fp16_gb": 12.0
            },
            "runwayml/stable-diffusion-v1-5": {
                "params_billions": 0.86,
                "fp16_gb": 1.7,
                "inference_fp16_gb": 4.0
            }
        }
        
        logger.debug(f"Known models in database: {len(self.known_models)}")
    
    def get_model_memory_requirements(self, model_id: str) -> Dict:
        """
        Get memory requirements for a model, using known values or estimating from file sizes.
        """
        logger.info(f"Getting memory requirements for model: {model_id}")
        
        if model_id in self.cache:
            logger.debug(f"Using cached memory data for {model_id}")
            return self.cache[model_id]
        
        # Check if we have known values
        if model_id in self.known_models:
            logger.info(f"Using known memory data for {model_id}")
            known = self.known_models[model_id]
            logger.debug(f"Known data: {known}")
            
            result = {
                'model_id': model_id,
                'total_params': int(known['params_billions'] * 1e9),
                'total_params_billions': known['params_billions'],
                'memory_fp32_gb': known['fp16_gb'] * 2,
                'memory_fp16_gb': known['fp16_gb'],
                'memory_bf16_gb': known['fp16_gb'],
                'memory_int8_gb': known['fp16_gb'] / 2,
                'estimated_inference_memory_fp16_gb': known['inference_fp16_gb'],
                'estimated_inference_memory_bf16_gb': known['inference_fp16_gb'],
                'source': 'known_values'
            }
            self.cache[model_id] = result
            return result
        
        # Try to estimate from HuggingFace API
        try:
            return self._estimate_from_api(model_id)
        except Exception as e:
            # Fallback to generic estimation
            return self._generic_estimation(model_id, str(e))
    
    def _estimate_from_api(self, model_id: str) -> Dict:
        """Estimate memory from HuggingFace model info."""
        try:
            print(f"Fetching model info for: {model_id}")
            model_info = self.hf_api.model_info(model_id)
            print(f"Successfully fetched model info for: {model_id}")
            
            # Get file sizes from model repo
            total_size_bytes = 0
            safetensor_files = []
            files_without_size = 0
            
            for sibling in model_info.siblings:
                if sibling.rfilename.endswith('.safetensors'):
                    file_size_bytes = sibling.size
                    if file_size_bytes is None or file_size_bytes == 0:
                        files_without_size += 1
                        print(f"Warning: No size info for {sibling.rfilename}")
                        # Try to estimate based on typical safetensor file sizes
                        if 'unet' in sibling.rfilename.lower():
                            file_size_bytes = 3_400_000_000  # ~3.4GB typical for UNet
                        elif 'text_encoder' in sibling.rfilename.lower():
                            file_size_bytes = 500_000_000   # ~500MB typical for text encoder
                        elif 'vae' in sibling.rfilename.lower():
                            file_size_bytes = 160_000_000   # ~160MB typical for VAE
                        else:
                            file_size_bytes = 500_000_000   # Default fallback
                        print(f"  β†’ Using estimated size: {file_size_bytes / (1024**3):.2f} GB")
                    else:
                        print(f"File {sibling.rfilename}: {file_size_bytes / (1024**3):.2f} GB")
                    
                    size_mb = file_size_bytes / (1024 * 1024)
                    safetensor_files.append({
                        'filename': sibling.rfilename,
                        'size_mb': size_mb,
                        'estimated': file_size_bytes != sibling.size
                    })
                    total_size_bytes += file_size_bytes
            
            print(f"Found {len(safetensor_files)} safetensor files, total size: {total_size_bytes / (1024**3):.2f} GB")
            if files_without_size > 0:
                print(f"Warning: {files_without_size} files had no size info, used estimates")
            
            # Estimate parameters from file size (assuming FP16)
            total_size_gb = total_size_bytes / (1024**3)
            estimated_params = int((total_size_bytes / 2))  # 2 bytes per param for FP16
            estimated_params_billions = estimated_params / 1e9
            
            # Estimate inference memory (model + activations)
            inference_multiplier = 1.5  # Conservative estimate
            estimated_inference_memory = total_size_gb * inference_multiplier
            
            result = {
                'model_id': model_id,
                'total_params': estimated_params,
                'total_params_billions': estimated_params_billions,
                'memory_fp32_gb': total_size_gb * 2,
                'memory_fp16_gb': total_size_gb,
                'memory_bf16_gb': total_size_gb,
                'memory_int8_gb': total_size_gb / 2,
                'estimated_inference_memory_fp16_gb': estimated_inference_memory,
                'estimated_inference_memory_bf16_gb': estimated_inference_memory,
                'safetensors_files': safetensor_files,
                'files_without_size': files_without_size,
                'source': 'api_estimation'
            }
            
            self.cache[model_id] = result
            logger.info(f"Successfully estimated memory for {model_id} via API")
            logger.debug(f"API estimation result: {result}")
            return result
            
        except Exception as api_error:
            logger.error(f"API Error for model {model_id}: {type(api_error).__name__}: {str(api_error)}")
            # Re-raise with more context
            raise Exception(f"HuggingFace API Error: {type(api_error).__name__}: {str(api_error)}")                
    
    def _generic_estimation(self, model_id: str, error_msg: str) -> Dict:
        """Generic fallback estimation."""
        logger.warning(f"Using generic estimation for {model_id} due to: {error_msg}")
        
        # Default to medium-sized model estimates
        default_params_billions = 3.0
        default_fp16_gb = 6.0
        
        logger.debug(f"Generic estimation parameters: {default_params_billions}B params, {default_fp16_gb}GB FP16")
        
        result = {
            'model_id': model_id,
            'total_params': int(default_params_billions * 1e9),
            'total_params_billions': default_params_billions,
            'memory_fp32_gb': default_fp16_gb * 2,
            'memory_fp16_gb': default_fp16_gb,
            'memory_bf16_gb': default_fp16_gb,
            'memory_int8_gb': default_fp16_gb / 2,
            'estimated_inference_memory_fp16_gb': default_fp16_gb * 1.5,
            'estimated_inference_memory_bf16_gb': default_fp16_gb * 1.5,
            'source': 'generic_fallback',
            'error': error_msg
        }
        
        logger.info(f"Generic estimation completed for {model_id}")
        return result
    
    def get_memory_recommendation(self, model_id: str, available_vram_gb: float) -> Dict:
        """Get memory recommendations based on available VRAM."""
        logger.info(f"Generating memory recommendations for {model_id} with {available_vram_gb}GB VRAM")
        
        memory_info = self.get_model_memory_requirements(model_id)
        
        recommendations = {
            'model_id': model_id,
            'available_vram_gb': available_vram_gb,
            'model_memory_fp16_gb': memory_info['memory_fp16_gb'],
            'estimated_inference_memory_fp16_gb': memory_info['estimated_inference_memory_fp16_gb'],
            'recommendations': []
        }
        
        inference_memory_fp16 = memory_info['estimated_inference_memory_fp16_gb']
        model_memory_fp16 = memory_info['memory_fp16_gb']
        
        logger.debug(f"Model memory: {model_memory_fp16}GB, Inference memory: {inference_memory_fp16}GB")
        
        # Determine recommendations
        if available_vram_gb >= inference_memory_fp16:
            recommendations['recommendations'].append("βœ… Full model can fit in VRAM")
            recommendations['recommended_precision'] = 'float16'
            recommendations['cpu_offload'] = False
            recommendations['attention_slicing'] = False
            
        elif available_vram_gb >= model_memory_fp16:
            recommendations['recommendations'].append("⚠️ Model weights fit, enable memory optimizations")
            recommendations['recommended_precision'] = 'float16'
            recommendations['cpu_offload'] = False
            recommendations['attention_slicing'] = True
            recommendations['vae_slicing'] = True
            
        elif available_vram_gb >= model_memory_fp16 * 0.7:
            recommendations['recommendations'].append("πŸ”„ Use CPU offloading for some components")
            recommendations['recommended_precision'] = 'float16'
            recommendations['cpu_offload'] = True
            recommendations['attention_slicing'] = True
            recommendations['vae_slicing'] = True
            
        else:
            recommendations['recommendations'].append("πŸ”„ Requires sequential CPU offloading")
            recommendations['recommended_precision'] = 'float16'
            recommendations['sequential_offload'] = True
            recommendations['attention_slicing'] = True
            recommendations['vae_slicing'] = True
        
        return recommendations
    
    def format_memory_info(self, model_id: str) -> str:
        """Format memory information for display."""
        info = self.get_model_memory_requirements(model_id)
        
        source_text = {
            'known_values': 'πŸ“Š Known model specifications',
            'api_estimation': 'πŸ” Estimated from model files',
            'generic_fallback': '⚠️ Generic estimation (API error)'
        }.get(info.get('source', 'unknown'), '❓ Unknown source')
        
        # Add warning if file sizes were estimated
        if info.get('files_without_size', 0) > 0:
            source_text += f" (⚠️ {info['files_without_size']} files used size estimates)"
        
        output = f"""
πŸ€– **Memory Analysis for {model_id}**

{source_text}

πŸ”’ **Parameters**: {info['total_params_billions']:.1f}B parameters

πŸ’Ύ **Model Memory Requirements**:
   β€’ FP32: {info['memory_fp32_gb']:.1f} GB
   β€’ FP16/BF16: {info['memory_fp16_gb']:.1f} GB
   β€’ INT8: {info['memory_int8_gb']:.1f} GB

πŸš€ **Estimated Inference Memory**:
   β€’ FP16: {info['estimated_inference_memory_fp16_gb']:.1f} GB
   β€’ BF16: {info['estimated_inference_memory_bf16_gb']:.1f} GB
"""
        
        if 'error' in info:
            output += f"\n⚠️ **Note**: {info['error']}"
        
        return output.strip()


# Quick test
if __name__ == "__main__":
    calc = SimpleMemoryCalculator()
    
    models = [
        "black-forest-labs/FLUX.1-schnell",
        "stabilityai/stable-diffusion-xl-base-1.0",
        "runwayml/stable-diffusion-v1-5"
    ]
    
    for model in models:
        print(f"\n{'='*60}")
        print(calc.format_memory_info(model))
        
        # Test recommendations
        for vram in [8, 16, 24]:
            rec = calc.get_memory_recommendation(model, vram)
            print(f"\nπŸ’‘ {vram}GB VRAM: {rec['recommendations'][0]}")