auto-diffuser-config / simple_memory_calculator.py
chansung's picture
Upload folder using huggingface_hub
aae35f1 verified
from huggingface_hub import HfApi, hf_hub_download
from typing import Dict, Optional
import json
import os
import logging
# Configure logging
logger = logging.getLogger(__name__)
class SimpleMemoryCalculator:
def __init__(self):
logger.info("Initializing SimpleMemoryCalculator")
try:
self.hf_api = HfApi()
logger.debug("HuggingFace API initialized")
except Exception as e:
logger.error(f"Failed to initialize HuggingFace API: {e}")
raise
self.cache = {}
# Known model memory requirements (in GB for FP16)
self.known_models = {
"black-forest-labs/FLUX.1-schnell": {
"params_billions": 12.0,
"fp16_gb": 24.0,
"inference_fp16_gb": 36.0
},
"black-forest-labs/FLUX.1-dev": {
"params_billions": 12.0,
"fp16_gb": 24.0,
"inference_fp16_gb": 36.0
},
"stabilityai/stable-diffusion-xl-base-1.0": {
"params_billions": 3.5,
"fp16_gb": 7.0,
"inference_fp16_gb": 12.0
},
"runwayml/stable-diffusion-v1-5": {
"params_billions": 0.86,
"fp16_gb": 1.7,
"inference_fp16_gb": 4.0
}
}
logger.debug(f"Known models in database: {len(self.known_models)}")
def get_model_memory_requirements(self, model_id: str) -> Dict:
"""
Get memory requirements for a model, using known values or estimating from file sizes.
"""
logger.info(f"Getting memory requirements for model: {model_id}")
if model_id in self.cache:
logger.debug(f"Using cached memory data for {model_id}")
return self.cache[model_id]
# Check if we have known values
if model_id in self.known_models:
logger.info(f"Using known memory data for {model_id}")
known = self.known_models[model_id]
logger.debug(f"Known data: {known}")
result = {
'model_id': model_id,
'total_params': int(known['params_billions'] * 1e9),
'total_params_billions': known['params_billions'],
'memory_fp32_gb': known['fp16_gb'] * 2,
'memory_fp16_gb': known['fp16_gb'],
'memory_bf16_gb': known['fp16_gb'],
'memory_int8_gb': known['fp16_gb'] / 2,
'estimated_inference_memory_fp16_gb': known['inference_fp16_gb'],
'estimated_inference_memory_bf16_gb': known['inference_fp16_gb'],
'source': 'known_values'
}
self.cache[model_id] = result
return result
# Try to estimate from HuggingFace API
try:
return self._estimate_from_api(model_id)
except Exception as e:
# Fallback to generic estimation
return self._generic_estimation(model_id, str(e))
def _estimate_from_api(self, model_id: str) -> Dict:
"""Estimate memory from HuggingFace model info."""
try:
print(f"Fetching model info for: {model_id}")
model_info = self.hf_api.model_info(model_id)
print(f"Successfully fetched model info for: {model_id}")
# Get file sizes from model repo
total_size_bytes = 0
safetensor_files = []
files_without_size = 0
for sibling in model_info.siblings:
if sibling.rfilename.endswith('.safetensors'):
file_size_bytes = sibling.size
if file_size_bytes is None or file_size_bytes == 0:
files_without_size += 1
print(f"Warning: No size info for {sibling.rfilename}")
# Try to estimate based on typical safetensor file sizes
if 'unet' in sibling.rfilename.lower():
file_size_bytes = 3_400_000_000 # ~3.4GB typical for UNet
elif 'text_encoder' in sibling.rfilename.lower():
file_size_bytes = 500_000_000 # ~500MB typical for text encoder
elif 'vae' in sibling.rfilename.lower():
file_size_bytes = 160_000_000 # ~160MB typical for VAE
else:
file_size_bytes = 500_000_000 # Default fallback
print(f" β†’ Using estimated size: {file_size_bytes / (1024**3):.2f} GB")
else:
print(f"File {sibling.rfilename}: {file_size_bytes / (1024**3):.2f} GB")
size_mb = file_size_bytes / (1024 * 1024)
safetensor_files.append({
'filename': sibling.rfilename,
'size_mb': size_mb,
'estimated': file_size_bytes != sibling.size
})
total_size_bytes += file_size_bytes
print(f"Found {len(safetensor_files)} safetensor files, total size: {total_size_bytes / (1024**3):.2f} GB")
if files_without_size > 0:
print(f"Warning: {files_without_size} files had no size info, used estimates")
# Estimate parameters from file size (assuming FP16)
total_size_gb = total_size_bytes / (1024**3)
estimated_params = int((total_size_bytes / 2)) # 2 bytes per param for FP16
estimated_params_billions = estimated_params / 1e9
# Estimate inference memory (model + activations)
inference_multiplier = 1.5 # Conservative estimate
estimated_inference_memory = total_size_gb * inference_multiplier
result = {
'model_id': model_id,
'total_params': estimated_params,
'total_params_billions': estimated_params_billions,
'memory_fp32_gb': total_size_gb * 2,
'memory_fp16_gb': total_size_gb,
'memory_bf16_gb': total_size_gb,
'memory_int8_gb': total_size_gb / 2,
'estimated_inference_memory_fp16_gb': estimated_inference_memory,
'estimated_inference_memory_bf16_gb': estimated_inference_memory,
'safetensors_files': safetensor_files,
'files_without_size': files_without_size,
'source': 'api_estimation'
}
self.cache[model_id] = result
logger.info(f"Successfully estimated memory for {model_id} via API")
logger.debug(f"API estimation result: {result}")
return result
except Exception as api_error:
logger.error(f"API Error for model {model_id}: {type(api_error).__name__}: {str(api_error)}")
# Re-raise with more context
raise Exception(f"HuggingFace API Error: {type(api_error).__name__}: {str(api_error)}")
def _generic_estimation(self, model_id: str, error_msg: str) -> Dict:
"""Generic fallback estimation."""
logger.warning(f"Using generic estimation for {model_id} due to: {error_msg}")
# Default to medium-sized model estimates
default_params_billions = 3.0
default_fp16_gb = 6.0
logger.debug(f"Generic estimation parameters: {default_params_billions}B params, {default_fp16_gb}GB FP16")
result = {
'model_id': model_id,
'total_params': int(default_params_billions * 1e9),
'total_params_billions': default_params_billions,
'memory_fp32_gb': default_fp16_gb * 2,
'memory_fp16_gb': default_fp16_gb,
'memory_bf16_gb': default_fp16_gb,
'memory_int8_gb': default_fp16_gb / 2,
'estimated_inference_memory_fp16_gb': default_fp16_gb * 1.5,
'estimated_inference_memory_bf16_gb': default_fp16_gb * 1.5,
'source': 'generic_fallback',
'error': error_msg
}
logger.info(f"Generic estimation completed for {model_id}")
return result
def get_memory_recommendation(self, model_id: str, available_vram_gb: float) -> Dict:
"""Get memory recommendations based on available VRAM."""
logger.info(f"Generating memory recommendations for {model_id} with {available_vram_gb}GB VRAM")
memory_info = self.get_model_memory_requirements(model_id)
recommendations = {
'model_id': model_id,
'available_vram_gb': available_vram_gb,
'model_memory_fp16_gb': memory_info['memory_fp16_gb'],
'estimated_inference_memory_fp16_gb': memory_info['estimated_inference_memory_fp16_gb'],
'recommendations': []
}
inference_memory_fp16 = memory_info['estimated_inference_memory_fp16_gb']
model_memory_fp16 = memory_info['memory_fp16_gb']
logger.debug(f"Model memory: {model_memory_fp16}GB, Inference memory: {inference_memory_fp16}GB")
# Determine recommendations
if available_vram_gb >= inference_memory_fp16:
recommendations['recommendations'].append("βœ… Full model can fit in VRAM")
recommendations['recommended_precision'] = 'float16'
recommendations['cpu_offload'] = False
recommendations['attention_slicing'] = False
elif available_vram_gb >= model_memory_fp16:
recommendations['recommendations'].append("⚠️ Model weights fit, enable memory optimizations")
recommendations['recommended_precision'] = 'float16'
recommendations['cpu_offload'] = False
recommendations['attention_slicing'] = True
recommendations['vae_slicing'] = True
elif available_vram_gb >= model_memory_fp16 * 0.7:
recommendations['recommendations'].append("πŸ”„ Use CPU offloading for some components")
recommendations['recommended_precision'] = 'float16'
recommendations['cpu_offload'] = True
recommendations['attention_slicing'] = True
recommendations['vae_slicing'] = True
else:
recommendations['recommendations'].append("πŸ”„ Requires sequential CPU offloading")
recommendations['recommended_precision'] = 'float16'
recommendations['sequential_offload'] = True
recommendations['attention_slicing'] = True
recommendations['vae_slicing'] = True
return recommendations
def format_memory_info(self, model_id: str) -> str:
"""Format memory information for display."""
info = self.get_model_memory_requirements(model_id)
source_text = {
'known_values': 'πŸ“Š Known model specifications',
'api_estimation': 'πŸ” Estimated from model files',
'generic_fallback': '⚠️ Generic estimation (API error)'
}.get(info.get('source', 'unknown'), '❓ Unknown source')
# Add warning if file sizes were estimated
if info.get('files_without_size', 0) > 0:
source_text += f" (⚠️ {info['files_without_size']} files used size estimates)"
output = f"""
πŸ€– **Memory Analysis for {model_id}**
{source_text}
πŸ”’ **Parameters**: {info['total_params_billions']:.1f}B parameters
πŸ’Ύ **Model Memory Requirements**:
β€’ FP32: {info['memory_fp32_gb']:.1f} GB
β€’ FP16/BF16: {info['memory_fp16_gb']:.1f} GB
β€’ INT8: {info['memory_int8_gb']:.1f} GB
πŸš€ **Estimated Inference Memory**:
β€’ FP16: {info['estimated_inference_memory_fp16_gb']:.1f} GB
β€’ BF16: {info['estimated_inference_memory_bf16_gb']:.1f} GB
"""
if 'error' in info:
output += f"\n⚠️ **Note**: {info['error']}"
return output.strip()
# Quick test
if __name__ == "__main__":
calc = SimpleMemoryCalculator()
models = [
"black-forest-labs/FLUX.1-schnell",
"stabilityai/stable-diffusion-xl-base-1.0",
"runwayml/stable-diffusion-v1-5"
]
for model in models:
print(f"\n{'='*60}")
print(calc.format_memory_info(model))
# Test recommendations
for vram in [8, 16, 24]:
rec = calc.get_memory_recommendation(model, vram)
print(f"\nπŸ’‘ {vram}GB VRAM: {rec['recommendations'][0]}")