from huggingface_hub import HfApi, hf_hub_download from typing import Dict, Optional import json import os import logging # Configure logging logger = logging.getLogger(__name__) class SimpleMemoryCalculator: def __init__(self): logger.info("Initializing SimpleMemoryCalculator") try: self.hf_api = HfApi() logger.debug("HuggingFace API initialized") except Exception as e: logger.error(f"Failed to initialize HuggingFace API: {e}") raise self.cache = {} # Known model memory requirements (in GB for FP16) self.known_models = { "black-forest-labs/FLUX.1-schnell": { "params_billions": 12.0, "fp16_gb": 24.0, "inference_fp16_gb": 36.0 }, "black-forest-labs/FLUX.1-dev": { "params_billions": 12.0, "fp16_gb": 24.0, "inference_fp16_gb": 36.0 }, "stabilityai/stable-diffusion-xl-base-1.0": { "params_billions": 3.5, "fp16_gb": 7.0, "inference_fp16_gb": 12.0 }, "runwayml/stable-diffusion-v1-5": { "params_billions": 0.86, "fp16_gb": 1.7, "inference_fp16_gb": 4.0 } } logger.debug(f"Known models in database: {len(self.known_models)}") def get_model_memory_requirements(self, model_id: str) -> Dict: """ Get memory requirements for a model, using known values or estimating from file sizes. """ logger.info(f"Getting memory requirements for model: {model_id}") if model_id in self.cache: logger.debug(f"Using cached memory data for {model_id}") return self.cache[model_id] # Check if we have known values if model_id in self.known_models: logger.info(f"Using known memory data for {model_id}") known = self.known_models[model_id] logger.debug(f"Known data: {known}") result = { 'model_id': model_id, 'total_params': int(known['params_billions'] * 1e9), 'total_params_billions': known['params_billions'], 'memory_fp32_gb': known['fp16_gb'] * 2, 'memory_fp16_gb': known['fp16_gb'], 'memory_bf16_gb': known['fp16_gb'], 'memory_int8_gb': known['fp16_gb'] / 2, 'estimated_inference_memory_fp16_gb': known['inference_fp16_gb'], 'estimated_inference_memory_bf16_gb': known['inference_fp16_gb'], 'source': 'known_values' } self.cache[model_id] = result return result # Try to estimate from HuggingFace API try: return self._estimate_from_api(model_id) except Exception as e: # Fallback to generic estimation return self._generic_estimation(model_id, str(e)) def _estimate_from_api(self, model_id: str) -> Dict: """Estimate memory from HuggingFace model info.""" try: print(f"Fetching model info for: {model_id}") model_info = self.hf_api.model_info(model_id) print(f"Successfully fetched model info for: {model_id}") # Get file sizes from model repo total_size_bytes = 0 safetensor_files = [] files_without_size = 0 for sibling in model_info.siblings: if sibling.rfilename.endswith('.safetensors'): file_size_bytes = sibling.size if file_size_bytes is None or file_size_bytes == 0: files_without_size += 1 print(f"Warning: No size info for {sibling.rfilename}") # Try to estimate based on typical safetensor file sizes if 'unet' in sibling.rfilename.lower(): file_size_bytes = 3_400_000_000 # ~3.4GB typical for UNet elif 'text_encoder' in sibling.rfilename.lower(): file_size_bytes = 500_000_000 # ~500MB typical for text encoder elif 'vae' in sibling.rfilename.lower(): file_size_bytes = 160_000_000 # ~160MB typical for VAE else: file_size_bytes = 500_000_000 # Default fallback print(f" → Using estimated size: {file_size_bytes / (1024**3):.2f} GB") else: print(f"File {sibling.rfilename}: {file_size_bytes / (1024**3):.2f} GB") size_mb = file_size_bytes / (1024 * 1024) safetensor_files.append({ 'filename': sibling.rfilename, 'size_mb': size_mb, 'estimated': file_size_bytes != sibling.size }) total_size_bytes += file_size_bytes print(f"Found {len(safetensor_files)} safetensor files, total size: {total_size_bytes / (1024**3):.2f} GB") if files_without_size > 0: print(f"Warning: {files_without_size} files had no size info, used estimates") # Estimate parameters from file size (assuming FP16) total_size_gb = total_size_bytes / (1024**3) estimated_params = int((total_size_bytes / 2)) # 2 bytes per param for FP16 estimated_params_billions = estimated_params / 1e9 # Estimate inference memory (model + activations) inference_multiplier = 1.5 # Conservative estimate estimated_inference_memory = total_size_gb * inference_multiplier result = { 'model_id': model_id, 'total_params': estimated_params, 'total_params_billions': estimated_params_billions, 'memory_fp32_gb': total_size_gb * 2, 'memory_fp16_gb': total_size_gb, 'memory_bf16_gb': total_size_gb, 'memory_int8_gb': total_size_gb / 2, 'estimated_inference_memory_fp16_gb': estimated_inference_memory, 'estimated_inference_memory_bf16_gb': estimated_inference_memory, 'safetensors_files': safetensor_files, 'files_without_size': files_without_size, 'source': 'api_estimation' } self.cache[model_id] = result logger.info(f"Successfully estimated memory for {model_id} via API") logger.debug(f"API estimation result: {result}") return result except Exception as api_error: logger.error(f"API Error for model {model_id}: {type(api_error).__name__}: {str(api_error)}") # Re-raise with more context raise Exception(f"HuggingFace API Error: {type(api_error).__name__}: {str(api_error)}") def _generic_estimation(self, model_id: str, error_msg: str) -> Dict: """Generic fallback estimation.""" logger.warning(f"Using generic estimation for {model_id} due to: {error_msg}") # Default to medium-sized model estimates default_params_billions = 3.0 default_fp16_gb = 6.0 logger.debug(f"Generic estimation parameters: {default_params_billions}B params, {default_fp16_gb}GB FP16") result = { 'model_id': model_id, 'total_params': int(default_params_billions * 1e9), 'total_params_billions': default_params_billions, 'memory_fp32_gb': default_fp16_gb * 2, 'memory_fp16_gb': default_fp16_gb, 'memory_bf16_gb': default_fp16_gb, 'memory_int8_gb': default_fp16_gb / 2, 'estimated_inference_memory_fp16_gb': default_fp16_gb * 1.5, 'estimated_inference_memory_bf16_gb': default_fp16_gb * 1.5, 'source': 'generic_fallback', 'error': error_msg } logger.info(f"Generic estimation completed for {model_id}") return result def get_memory_recommendation(self, model_id: str, available_vram_gb: float) -> Dict: """Get memory recommendations based on available VRAM.""" logger.info(f"Generating memory recommendations for {model_id} with {available_vram_gb}GB VRAM") memory_info = self.get_model_memory_requirements(model_id) recommendations = { 'model_id': model_id, 'available_vram_gb': available_vram_gb, 'model_memory_fp16_gb': memory_info['memory_fp16_gb'], 'estimated_inference_memory_fp16_gb': memory_info['estimated_inference_memory_fp16_gb'], 'recommendations': [] } inference_memory_fp16 = memory_info['estimated_inference_memory_fp16_gb'] model_memory_fp16 = memory_info['memory_fp16_gb'] logger.debug(f"Model memory: {model_memory_fp16}GB, Inference memory: {inference_memory_fp16}GB") # Determine recommendations if available_vram_gb >= inference_memory_fp16: recommendations['recommendations'].append("āœ… Full model can fit in VRAM") recommendations['recommended_precision'] = 'float16' recommendations['cpu_offload'] = False recommendations['attention_slicing'] = False elif available_vram_gb >= model_memory_fp16: recommendations['recommendations'].append("āš ļø Model weights fit, enable memory optimizations") recommendations['recommended_precision'] = 'float16' recommendations['cpu_offload'] = False recommendations['attention_slicing'] = True recommendations['vae_slicing'] = True elif available_vram_gb >= model_memory_fp16 * 0.7: recommendations['recommendations'].append("šŸ”„ Use CPU offloading for some components") recommendations['recommended_precision'] = 'float16' recommendations['cpu_offload'] = True recommendations['attention_slicing'] = True recommendations['vae_slicing'] = True else: recommendations['recommendations'].append("šŸ”„ Requires sequential CPU offloading") recommendations['recommended_precision'] = 'float16' recommendations['sequential_offload'] = True recommendations['attention_slicing'] = True recommendations['vae_slicing'] = True return recommendations def format_memory_info(self, model_id: str) -> str: """Format memory information for display.""" info = self.get_model_memory_requirements(model_id) source_text = { 'known_values': 'šŸ“Š Known model specifications', 'api_estimation': 'šŸ” Estimated from model files', 'generic_fallback': 'āš ļø Generic estimation (API error)' }.get(info.get('source', 'unknown'), 'ā“ Unknown source') # Add warning if file sizes were estimated if info.get('files_without_size', 0) > 0: source_text += f" (āš ļø {info['files_without_size']} files used size estimates)" output = f""" šŸ¤– **Memory Analysis for {model_id}** {source_text} šŸ”¢ **Parameters**: {info['total_params_billions']:.1f}B parameters šŸ’¾ **Model Memory Requirements**: • FP32: {info['memory_fp32_gb']:.1f} GB • FP16/BF16: {info['memory_fp16_gb']:.1f} GB • INT8: {info['memory_int8_gb']:.1f} GB šŸš€ **Estimated Inference Memory**: • FP16: {info['estimated_inference_memory_fp16_gb']:.1f} GB • BF16: {info['estimated_inference_memory_bf16_gb']:.1f} GB """ if 'error' in info: output += f"\nāš ļø **Note**: {info['error']}" return output.strip() # Quick test if __name__ == "__main__": calc = SimpleMemoryCalculator() models = [ "black-forest-labs/FLUX.1-schnell", "stabilityai/stable-diffusion-xl-base-1.0", "runwayml/stable-diffusion-v1-5" ] for model in models: print(f"\n{'='*60}") print(calc.format_memory_info(model)) # Test recommendations for vram in [8, 16, 24]: rec = calc.get_memory_recommendation(model, vram) print(f"\nšŸ’” {vram}GB VRAM: {rec['recommendations'][0]}")