File size: 3,026 Bytes
aae36e9
 
 
 
0443b19
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# Import spaces before any CUDA/torch imports
import spaces

# Other imports below
import torch
import numpy as np
from PIL import Image
from transformers import pipeline
from diffusers import StableDiffusionControlNetPipeline, ControlNetModel, UniPCMultistepScheduler
from diffusers.utils import load_image
import os
import huggingface_hub
import config

class ControlNetPipeline:
    def __init__(self):
        """Initialize the ControlNet pipeline with lazy loading"""
        self.depth_estimator = None
        self.pipe = None
        self.controlnet = None
        self.is_initialized = False

    @spaces.GPU
    def initialize(self):
        """Initialize the models with GPU acceleration"""
        if self.is_initialized:
            return

        # Load depth estimator
        self.depth_estimator = pipeline('depth-estimation')

        # Load ControlNet model
        self.controlnet = ControlNetModel.from_pretrained(
            config.CONTROLNET_MODEL, 
            torch_dtype=torch.float16
        )

        # Load Stable Diffusion pipeline with ControlNet
        self.pipe = StableDiffusionControlNetPipeline.from_pretrained(
            config.BASE_MODEL, 
            controlnet=self.controlnet, 
            safety_checker=None, 
            torch_dtype=torch.float16
        )

        # Use more efficient scheduler
        self.pipe.scheduler = UniPCMultistepScheduler.from_config(self.pipe.scheduler.config)

        # Enable memory optimizations
        try:
            self.pipe.enable_xformers_memory_efficient_attention()
        except:
            print("xformers not available, using default attention mechanism")

        self.pipe.enable_model_cpu_offload()
        self.is_initialized = True

    @spaces.GPU
    def process_image(self, image):
        """Process the input image to generate depth map"""
        # Ensure model is initialized
        if not self.is_initialized:
            self.initialize()

        # Generate depth map
        depth = self.depth_estimator(image)['depth']
        depth_array = np.array(depth)
        depth_array = depth_array[:, :, None]
        depth_array = np.concatenate([depth_array, depth_array, depth_array], axis=2)
        depth_image = Image.fromarray(depth_array)
        
        return depth_image

    @spaces.GPU
    def generate(self, prompt, image, negative_prompt=None, guidance_scale=7.5, num_inference_steps=20):
        """Generate an image using ControlNet with the provided prompt and input image"""
        # Ensure model is initialized
        if not self.is_initialized:
            self.initialize()

        # Process image to get depth map
        depth_image = self.process_image(image)
        
        # Generate the image
        output = self.pipe(
            prompt=prompt,
            image=depth_image,
            negative_prompt=negative_prompt,
            guidance_scale=float(guidance_scale),
            num_inference_steps=int(num_inference_steps)
        )
        
        return output.images[0]