Spaces:
Running
Running
from huggingface_hub import HfApi, hf_hub_download | |
from typing import Dict, Optional | |
import json | |
import os | |
import logging | |
# Configure logging | |
logger = logging.getLogger(__name__) | |
class SimpleMemoryCalculator: | |
def __init__(self): | |
logger.info("Initializing SimpleMemoryCalculator") | |
try: | |
self.hf_api = HfApi() | |
logger.debug("HuggingFace API initialized") | |
except Exception as e: | |
logger.error(f"Failed to initialize HuggingFace API: {e}") | |
raise | |
self.cache = {} | |
# Known model memory requirements (in GB for FP16) | |
self.known_models = { | |
"black-forest-labs/FLUX.1-schnell": { | |
"params_billions": 12.0, | |
"fp16_gb": 24.0, | |
"inference_fp16_gb": 36.0 | |
}, | |
"black-forest-labs/FLUX.1-dev": { | |
"params_billions": 12.0, | |
"fp16_gb": 24.0, | |
"inference_fp16_gb": 36.0 | |
}, | |
"stabilityai/stable-diffusion-xl-base-1.0": { | |
"params_billions": 3.5, | |
"fp16_gb": 7.0, | |
"inference_fp16_gb": 12.0 | |
}, | |
"runwayml/stable-diffusion-v1-5": { | |
"params_billions": 0.86, | |
"fp16_gb": 1.7, | |
"inference_fp16_gb": 4.0 | |
} | |
} | |
logger.debug(f"Known models in database: {len(self.known_models)}") | |
def get_model_memory_requirements(self, model_id: str) -> Dict: | |
""" | |
Get memory requirements for a model, using known values or estimating from file sizes. | |
""" | |
logger.info(f"Getting memory requirements for model: {model_id}") | |
if model_id in self.cache: | |
logger.debug(f"Using cached memory data for {model_id}") | |
return self.cache[model_id] | |
# Check if we have known values | |
if model_id in self.known_models: | |
logger.info(f"Using known memory data for {model_id}") | |
known = self.known_models[model_id] | |
logger.debug(f"Known data: {known}") | |
result = { | |
'model_id': model_id, | |
'total_params': int(known['params_billions'] * 1e9), | |
'total_params_billions': known['params_billions'], | |
'memory_fp32_gb': known['fp16_gb'] * 2, | |
'memory_fp16_gb': known['fp16_gb'], | |
'memory_bf16_gb': known['fp16_gb'], | |
'memory_int8_gb': known['fp16_gb'] / 2, | |
'estimated_inference_memory_fp16_gb': known['inference_fp16_gb'], | |
'estimated_inference_memory_bf16_gb': known['inference_fp16_gb'], | |
'source': 'known_values' | |
} | |
self.cache[model_id] = result | |
return result | |
# Try to estimate from HuggingFace API | |
try: | |
return self._estimate_from_api(model_id) | |
except Exception as e: | |
# Fallback to generic estimation | |
return self._generic_estimation(model_id, str(e)) | |
def _estimate_from_api(self, model_id: str) -> Dict: | |
"""Estimate memory from HuggingFace model info.""" | |
try: | |
print(f"Fetching model info for: {model_id}") | |
model_info = self.hf_api.model_info(model_id) | |
print(f"Successfully fetched model info for: {model_id}") | |
# Get file sizes from model repo | |
total_size_bytes = 0 | |
safetensor_files = [] | |
files_without_size = 0 | |
for sibling in model_info.siblings: | |
if sibling.rfilename.endswith('.safetensors'): | |
file_size_bytes = sibling.size | |
if file_size_bytes is None or file_size_bytes == 0: | |
files_without_size += 1 | |
print(f"Warning: No size info for {sibling.rfilename}") | |
# Try to estimate based on typical safetensor file sizes | |
if 'unet' in sibling.rfilename.lower(): | |
file_size_bytes = 3_400_000_000 # ~3.4GB typical for UNet | |
elif 'text_encoder' in sibling.rfilename.lower(): | |
file_size_bytes = 500_000_000 # ~500MB typical for text encoder | |
elif 'vae' in sibling.rfilename.lower(): | |
file_size_bytes = 160_000_000 # ~160MB typical for VAE | |
else: | |
file_size_bytes = 500_000_000 # Default fallback | |
print(f" β Using estimated size: {file_size_bytes / (1024**3):.2f} GB") | |
else: | |
print(f"File {sibling.rfilename}: {file_size_bytes / (1024**3):.2f} GB") | |
size_mb = file_size_bytes / (1024 * 1024) | |
safetensor_files.append({ | |
'filename': sibling.rfilename, | |
'size_mb': size_mb, | |
'estimated': file_size_bytes != sibling.size | |
}) | |
total_size_bytes += file_size_bytes | |
print(f"Found {len(safetensor_files)} safetensor files, total size: {total_size_bytes / (1024**3):.2f} GB") | |
if files_without_size > 0: | |
print(f"Warning: {files_without_size} files had no size info, used estimates") | |
# Estimate parameters from file size (assuming FP16) | |
total_size_gb = total_size_bytes / (1024**3) | |
estimated_params = int((total_size_bytes / 2)) # 2 bytes per param for FP16 | |
estimated_params_billions = estimated_params / 1e9 | |
# Estimate inference memory (model + activations) | |
inference_multiplier = 1.5 # Conservative estimate | |
estimated_inference_memory = total_size_gb * inference_multiplier | |
result = { | |
'model_id': model_id, | |
'total_params': estimated_params, | |
'total_params_billions': estimated_params_billions, | |
'memory_fp32_gb': total_size_gb * 2, | |
'memory_fp16_gb': total_size_gb, | |
'memory_bf16_gb': total_size_gb, | |
'memory_int8_gb': total_size_gb / 2, | |
'estimated_inference_memory_fp16_gb': estimated_inference_memory, | |
'estimated_inference_memory_bf16_gb': estimated_inference_memory, | |
'safetensors_files': safetensor_files, | |
'files_without_size': files_without_size, | |
'source': 'api_estimation' | |
} | |
self.cache[model_id] = result | |
logger.info(f"Successfully estimated memory for {model_id} via API") | |
logger.debug(f"API estimation result: {result}") | |
return result | |
except Exception as api_error: | |
logger.error(f"API Error for model {model_id}: {type(api_error).__name__}: {str(api_error)}") | |
# Re-raise with more context | |
raise Exception(f"HuggingFace API Error: {type(api_error).__name__}: {str(api_error)}") | |
def _generic_estimation(self, model_id: str, error_msg: str) -> Dict: | |
"""Generic fallback estimation.""" | |
logger.warning(f"Using generic estimation for {model_id} due to: {error_msg}") | |
# Default to medium-sized model estimates | |
default_params_billions = 3.0 | |
default_fp16_gb = 6.0 | |
logger.debug(f"Generic estimation parameters: {default_params_billions}B params, {default_fp16_gb}GB FP16") | |
result = { | |
'model_id': model_id, | |
'total_params': int(default_params_billions * 1e9), | |
'total_params_billions': default_params_billions, | |
'memory_fp32_gb': default_fp16_gb * 2, | |
'memory_fp16_gb': default_fp16_gb, | |
'memory_bf16_gb': default_fp16_gb, | |
'memory_int8_gb': default_fp16_gb / 2, | |
'estimated_inference_memory_fp16_gb': default_fp16_gb * 1.5, | |
'estimated_inference_memory_bf16_gb': default_fp16_gb * 1.5, | |
'source': 'generic_fallback', | |
'error': error_msg | |
} | |
logger.info(f"Generic estimation completed for {model_id}") | |
return result | |
def get_memory_recommendation(self, model_id: str, available_vram_gb: float) -> Dict: | |
"""Get memory recommendations based on available VRAM.""" | |
logger.info(f"Generating memory recommendations for {model_id} with {available_vram_gb}GB VRAM") | |
memory_info = self.get_model_memory_requirements(model_id) | |
recommendations = { | |
'model_id': model_id, | |
'available_vram_gb': available_vram_gb, | |
'model_memory_fp16_gb': memory_info['memory_fp16_gb'], | |
'estimated_inference_memory_fp16_gb': memory_info['estimated_inference_memory_fp16_gb'], | |
'recommendations': [] | |
} | |
inference_memory_fp16 = memory_info['estimated_inference_memory_fp16_gb'] | |
model_memory_fp16 = memory_info['memory_fp16_gb'] | |
logger.debug(f"Model memory: {model_memory_fp16}GB, Inference memory: {inference_memory_fp16}GB") | |
# Determine recommendations | |
if available_vram_gb >= inference_memory_fp16: | |
recommendations['recommendations'].append("β Full model can fit in VRAM") | |
recommendations['recommended_precision'] = 'float16' | |
recommendations['cpu_offload'] = False | |
recommendations['attention_slicing'] = False | |
elif available_vram_gb >= model_memory_fp16: | |
recommendations['recommendations'].append("β οΈ Model weights fit, enable memory optimizations") | |
recommendations['recommended_precision'] = 'float16' | |
recommendations['cpu_offload'] = False | |
recommendations['attention_slicing'] = True | |
recommendations['vae_slicing'] = True | |
elif available_vram_gb >= model_memory_fp16 * 0.7: | |
recommendations['recommendations'].append("π Use CPU offloading for some components") | |
recommendations['recommended_precision'] = 'float16' | |
recommendations['cpu_offload'] = True | |
recommendations['attention_slicing'] = True | |
recommendations['vae_slicing'] = True | |
else: | |
recommendations['recommendations'].append("π Requires sequential CPU offloading") | |
recommendations['recommended_precision'] = 'float16' | |
recommendations['sequential_offload'] = True | |
recommendations['attention_slicing'] = True | |
recommendations['vae_slicing'] = True | |
return recommendations | |
def format_memory_info(self, model_id: str) -> str: | |
"""Format memory information for display.""" | |
info = self.get_model_memory_requirements(model_id) | |
source_text = { | |
'known_values': 'π Known model specifications', | |
'api_estimation': 'π Estimated from model files', | |
'generic_fallback': 'β οΈ Generic estimation (API error)' | |
}.get(info.get('source', 'unknown'), 'β Unknown source') | |
# Add warning if file sizes were estimated | |
if info.get('files_without_size', 0) > 0: | |
source_text += f" (β οΈ {info['files_without_size']} files used size estimates)" | |
output = f""" | |
π€ **Memory Analysis for {model_id}** | |
{source_text} | |
π’ **Parameters**: {info['total_params_billions']:.1f}B parameters | |
πΎ **Model Memory Requirements**: | |
β’ FP32: {info['memory_fp32_gb']:.1f} GB | |
β’ FP16/BF16: {info['memory_fp16_gb']:.1f} GB | |
β’ INT8: {info['memory_int8_gb']:.1f} GB | |
π **Estimated Inference Memory**: | |
β’ FP16: {info['estimated_inference_memory_fp16_gb']:.1f} GB | |
β’ BF16: {info['estimated_inference_memory_bf16_gb']:.1f} GB | |
""" | |
if 'error' in info: | |
output += f"\nβ οΈ **Note**: {info['error']}" | |
return output.strip() | |
# Quick test | |
if __name__ == "__main__": | |
calc = SimpleMemoryCalculator() | |
models = [ | |
"black-forest-labs/FLUX.1-schnell", | |
"stabilityai/stable-diffusion-xl-base-1.0", | |
"runwayml/stable-diffusion-v1-5" | |
] | |
for model in models: | |
print(f"\n{'='*60}") | |
print(calc.format_memory_info(model)) | |
# Test recommendations | |
for vram in [8, 16, 24]: | |
rec = calc.get_memory_recommendation(model, vram) | |
print(f"\nπ‘ {vram}GB VRAM: {rec['recommendations'][0]}") |