Spaces:
Running
Running
import os | |
from dotenv import load_dotenv | |
import google.generativeai as genai | |
from hardware_detector import HardwareDetector | |
from typing import Dict, List | |
load_dotenv() | |
class AutoDiffusersGenerator: | |
def __init__(self, api_key: str): | |
genai.configure(api_key=api_key) | |
self.model = genai.GenerativeModel('gemini-2.5-flash-preview-05-20') | |
self.hardware_detector = HardwareDetector() | |
def generate_optimized_code(self, | |
model_name: str, | |
prompt_text: str, | |
image_size: tuple = (768, 1360), | |
num_inference_steps: int = 4, | |
use_manual_specs: bool = False, | |
manual_specs: Dict = None, | |
memory_analysis: Dict = None) -> str: | |
"""Generate optimized diffusers code based on hardware specs and memory analysis.""" | |
# Get hardware specifications | |
if use_manual_specs and manual_specs: | |
hardware_specs = manual_specs | |
# Determine optimization profile based on manual specs | |
if hardware_specs.get('gpu_info') and hardware_specs['gpu_info']: | |
vram_gb = hardware_specs['gpu_info'][0]['memory_mb'] / 1024 | |
if vram_gb >= 16: | |
optimization_profile = 'performance' | |
elif vram_gb >= 8: | |
optimization_profile = 'balanced' | |
else: | |
optimization_profile = 'memory_efficient' | |
else: | |
optimization_profile = 'cpu_only' | |
else: | |
hardware_specs = self.hardware_detector.specs | |
optimization_profile = self.hardware_detector.get_optimization_profile() | |
# Create the prompt for Gemini API | |
system_prompt = self._create_generation_prompt( | |
model_name, prompt_text, image_size, num_inference_steps, | |
hardware_specs, optimization_profile, memory_analysis | |
) | |
try: | |
response = self.model.generate_content(system_prompt) | |
return response.text | |
except Exception as e: | |
return f"Error generating code: {str(e)}" | |
def _create_generation_prompt(self, | |
model_name: str, | |
prompt_text: str, | |
image_size: tuple, | |
num_inference_steps: int, | |
hardware_specs: Dict, | |
optimization_profile: str, | |
memory_analysis: Dict = None) -> str: | |
"""Create the prompt for Gemini API to generate optimized code.""" | |
base_prompt = f""" | |
You are an expert in optimizing diffusers library code for different hardware configurations. | |
TASK: Generate optimized Python code for running a diffusion model with the following specifications: | |
- Model: {model_name} | |
- Prompt: "{prompt_text}" | |
- Image size: {image_size[0]}x{image_size[1]} | |
- Inference steps: {num_inference_steps} | |
HARDWARE SPECIFICATIONS: | |
- Platform: {hardware_specs['platform']} ({hardware_specs['architecture']}) | |
- CPU Cores: {hardware_specs['cpu_count']} | |
- CUDA Available: {hardware_specs['cuda_available']} | |
- MPS Available: {hardware_specs['mps_available']} | |
- Optimization Profile: {optimization_profile} | |
""" | |
if hardware_specs.get('gpu_info'): | |
base_prompt += f"- GPU: {hardware_specs['gpu_info'][0]['name']} ({hardware_specs['gpu_info'][0]['memory_mb']/1024:.1f} GB VRAM)\n" | |
# Add user dtype preference if specified | |
if hardware_specs.get('user_dtype'): | |
base_prompt += f"- User specified dtype: {hardware_specs['user_dtype']}\n" | |
# Add memory analysis information | |
if memory_analysis: | |
memory_info = memory_analysis.get('memory_info', {}) | |
recommendations = memory_analysis.get('recommendations', {}) | |
base_prompt += f"\nMEMORY ANALYSIS:\n" | |
if memory_info.get('estimated_inference_memory_fp16_gb'): | |
base_prompt += f"- Model Memory Requirements: {memory_info['estimated_inference_memory_fp16_gb']} GB (FP16 inference)\n" | |
if memory_info.get('memory_fp16_gb'): | |
base_prompt += f"- Model Weights Size: {memory_info['memory_fp16_gb']} GB (FP16)\n" | |
if recommendations.get('recommendations'): | |
base_prompt += f"- Memory Recommendation: {', '.join(recommendations['recommendations'])}\n" | |
if recommendations.get('recommended_precision'): | |
base_prompt += f"- Recommended Precision: {recommendations['recommended_precision']}\n" | |
if recommendations.get('cpu_offload'): | |
base_prompt += f"- CPU Offloading Required: {recommendations['cpu_offload']}\n" | |
if recommendations.get('attention_slicing'): | |
base_prompt += f"- Attention Slicing Recommended: {recommendations['attention_slicing']}\n" | |
if recommendations.get('vae_slicing'): | |
base_prompt += f"- VAE Slicing Recommended: {recommendations['vae_slicing']}\n" | |
base_prompt += f""" | |
OPTIMIZATION REQUIREMENTS: | |
Please scrape and analyze the latest optimization techniques from this URL: https://huggingface.co/docs/diffusers/main/en/optimization | |
IMPORTANT: For FLUX.1-schnell models, do NOT include guidance_scale parameter as it's not needed. | |
Based on the hardware specs and optimization profile, generate Python code that includes: | |
1. **Memory Optimizations** (if low VRAM): | |
- Model offloading (enable_model_cpu_offload, enable_sequential_cpu_offload) | |
- Attention slicing (enable_attention_slicing) | |
- VAE slicing (enable_vae_slicing) | |
- Memory efficient attention | |
2. **Speed Optimizations**: | |
- Appropriate torch.compile() usage | |
- Optimal dtype selection (torch.float16, torch.bfloat16) | |
- Device placement optimization | |
3. **Hardware-Specific Optimizations**: | |
- CUDA optimizations for NVIDIA GPUs | |
- MPS optimizations for Apple Silicon | |
- CPU fallbacks when needed | |
4. **Model-Specific Optimizations**: | |
- Appropriate scheduler selection | |
- Optimal inference parameters | |
- Pipeline configuration | |
5. **Data Type (dtype) Selection**: | |
- If user specified a dtype, use that exact dtype in the code | |
- If no dtype specified, automatically select the optimal dtype based on hardware: | |
* Apple Silicon (MPS): prefer torch.bfloat16 | |
* NVIDIA GPUs: prefer torch.float16 or torch.bfloat16 based on capability | |
* CPU only: use torch.float32 | |
- Add a comment explaining why that dtype was chosen | |
IMPORTANT GUIDELINES: | |
- Include all necessary imports | |
- Add brief comments explaining optimization choices | |
- Use the most current and effective optimization techniques | |
- Ensure code is production-ready | |
CODE STYLE REQUIREMENTS - GENERATE COMPACT CODE: | |
- Assign static values directly to function arguments instead of using variables when possible | |
- Minimize variable declarations - inline values where it improves readability | |
- Reduce exception handling to essential cases only - assume normal operation | |
- Use concise, direct code patterns | |
- Combine operations where logical and readable | |
- Avoid unnecessary intermediate variables | |
- Keep code clean and minimal while maintaining functionality | |
Examples of preferred compact style: | |
- pipe = Pipeline.from_pretrained("model", torch_dtype=torch.float16) instead of storing dtype in variable | |
- image = pipe("prompt", num_inference_steps=4, height=768, width=1360) instead of separate variables | |
- Direct assignment: device = "cuda" if torch.cuda.is_available() else "cpu" | |
Generate ONLY the Python code, no explanations before or after the code block. | |
""" | |
return base_prompt | |
def run_interactive_mode(self): | |
"""Run the generator in interactive mode.""" | |
print("=== Auto-Diffusers Code Generator ===") | |
print("This tool generates optimized diffusers code based on your hardware.\n") | |
# Check hardware | |
print("=== Hardware Detection ===") | |
self.hardware_detector.print_specs() | |
use_manual = input("\nUse manual hardware input? (y/n): ").lower() == 'y' | |
# Get user inputs | |
print("\n=== Model Configuration ===") | |
model_name = input("Model name (default: black-forest-labs/FLUX.1-schnell): ").strip() | |
if not model_name: | |
model_name = "black-forest-labs/FLUX.1-schnell" | |
prompt_text = input("Prompt text (default: A cat holding a sign that says hello world): ").strip() | |
if not prompt_text: | |
prompt_text = "A cat holding a sign that says hello world" | |
try: | |
width = int(input("Image width (default: 1360): ") or "1360") | |
height = int(input("Image height (default: 768): ") or "768") | |
steps = int(input("Inference steps (default: 4): ") or "4") | |
except ValueError: | |
width, height, steps = 1360, 768, 4 | |
print("\n=== Generating Optimized Code ===") | |
# Generate code | |
optimized_code = self.generate_optimized_code( | |
model_name=model_name, | |
prompt_text=prompt_text, | |
image_size=(height, width), | |
num_inference_steps=steps, | |
use_manual_specs=use_manual | |
) | |
print("\n" + "="*60) | |
print("OPTIMIZED DIFFUSERS CODE:") | |
print("="*60) | |
print(optimized_code) | |
print("="*60) | |
def main(): | |
# Get API key from .env file | |
api_key = os.getenv('GOOGLE_API_KEY') | |
if not api_key: | |
api_key = os.getenv('GEMINI_API_KEY') # fallback | |
if not api_key: | |
api_key = input("Enter your Gemini API key: ").strip() | |
if not api_key: | |
print("API key is required!") | |
return | |
generator = AutoDiffusersGenerator(api_key) | |
generator.run_interactive_mode() | |
if __name__ == "__main__": | |
main() |