Spaces:
Running
on
Zero
Running
on
Zero
# Copyright 2024 The HuggingFace Team. All rights reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
from typing import Callable, List, Optional, Tuple, Union | |
import os | |
import torch | |
import torch.nn.functional as F | |
from diffusers.models.attention_processor import Attention | |
from diffusers.utils import logging | |
from diffusers.utils.import_utils import is_torch_npu_available, is_xformers_available | |
from diffusers.utils.torch_utils import is_torch_version, maybe_allow_in_graph | |
from einops import rearrange | |
from torch import nn | |
# add sageattention support | |
scaled_dot_product_attention = F.scaled_dot_product_attention | |
if os.environ.get("USE_SAGEATTN", "0") == "1": | |
try: | |
from sageattention import sageattn | |
except ImportError: | |
raise ImportError( | |
'Please install the package "sageattention" to use this USE_SAGEATTN.' | |
) | |
scaled_dot_product_attention = sageattn | |
logger = logging.get_logger(__name__) # pylint: disable=invalid-name | |
class AttnProcessor2_0: | |
r""" | |
Processor for implementing scaled dot-product attention (enabled by default if you're using PyTorch 2.0). | |
""" | |
def __init__(self): | |
if not hasattr(F, "scaled_dot_product_attention"): | |
raise ImportError( | |
"AttnProcessor2_0 requires PyTorch 2.0, to use it, please upgrade PyTorch to 2.0." | |
) | |
def __call__( | |
self, | |
attn: Attention, | |
hidden_states: torch.Tensor, | |
encoder_hidden_states: Optional[torch.Tensor] = None, | |
attention_mask: Optional[torch.Tensor] = None, | |
temb: Optional[torch.Tensor] = None, | |
*args, | |
**kwargs, | |
) -> torch.Tensor: | |
if len(args) > 0 or kwargs.get("scale", None) is not None: | |
deprecation_message = "The `scale` argument is deprecated and will be ignored. Please remove it, as passing it will raise an error in the future. `scale` should directly be passed while calling the underlying pipeline component i.e., via `cross_attention_kwargs`." | |
deprecate("scale", "1.0.0", deprecation_message) | |
residual = hidden_states | |
if attn.spatial_norm is not None: | |
hidden_states = attn.spatial_norm(hidden_states, temb) | |
input_ndim = hidden_states.ndim | |
if input_ndim == 4: | |
batch_size, channel, height, width = hidden_states.shape | |
hidden_states = hidden_states.view( | |
batch_size, channel, height * width | |
).transpose(1, 2) | |
batch_size, sequence_length, _ = ( | |
hidden_states.shape | |
if encoder_hidden_states is None | |
else encoder_hidden_states.shape | |
) | |
if attention_mask is not None: | |
attention_mask = attn.prepare_attention_mask( | |
attention_mask, sequence_length, batch_size | |
) | |
# scaled_dot_product_attention expects attention_mask shape to be | |
# (batch, heads, source_length, target_length) | |
attention_mask = attention_mask.view( | |
batch_size, attn.heads, -1, attention_mask.shape[-1] | |
) | |
if attn.group_norm is not None: | |
hidden_states = attn.group_norm(hidden_states.transpose(1, 2)).transpose( | |
1, 2 | |
) | |
query = attn.to_q(hidden_states) | |
if encoder_hidden_states is None: | |
encoder_hidden_states = hidden_states | |
elif attn.norm_cross: | |
encoder_hidden_states = attn.norm_encoder_hidden_states( | |
encoder_hidden_states | |
) | |
key = attn.to_k(encoder_hidden_states) | |
value = attn.to_v(encoder_hidden_states) | |
inner_dim = key.shape[-1] | |
head_dim = inner_dim // attn.heads | |
query = query.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2) | |
key = key.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2) | |
value = value.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2) | |
if attn.norm_q is not None: | |
query = attn.norm_q(query) | |
if attn.norm_k is not None: | |
key = attn.norm_k(key) | |
# the output of sdp = (batch, num_heads, seq_len, head_dim) | |
# TODO: add support for attn.scale when we move to Torch 2.1 | |
hidden_states = scaled_dot_product_attention( | |
query, key, value, attn_mask=attention_mask, dropout_p=0.0, is_causal=False | |
) | |
hidden_states = hidden_states.transpose(1, 2).reshape( | |
batch_size, -1, attn.heads * head_dim | |
) | |
hidden_states = hidden_states.to(query.dtype) | |
# linear proj | |
hidden_states = attn.to_out[0](hidden_states) | |
# dropout | |
hidden_states = attn.to_out[1](hidden_states) | |
if input_ndim == 4: | |
hidden_states = hidden_states.transpose(-1, -2).reshape( | |
batch_size, channel, height, width | |
) | |
if attn.residual_connection: | |
hidden_states = hidden_states + residual | |
hidden_states = hidden_states / attn.rescale_output_factor | |
return hidden_states | |
class FusedAttnProcessor2_0: | |
r""" | |
Processor for implementing scaled dot-product attention (enabled by default if you're using PyTorch 2.0). It uses | |
fused projection layers. For self-attention modules, all projection matrices (i.e., query, key, value) are fused. | |
For cross-attention modules, key and value projection matrices are fused. | |
<Tip warning={true}> | |
This API is currently 🧪 experimental in nature and can change in future. | |
</Tip> | |
""" | |
def __init__(self): | |
if not hasattr(F, "scaled_dot_product_attention"): | |
raise ImportError( | |
"FusedAttnProcessor2_0 requires at least PyTorch 2.0, to use it. Please upgrade PyTorch to > 2.0." | |
) | |
def __call__( | |
self, | |
attn: Attention, | |
hidden_states: torch.Tensor, | |
encoder_hidden_states: Optional[torch.Tensor] = None, | |
attention_mask: Optional[torch.Tensor] = None, | |
temb: Optional[torch.Tensor] = None, | |
*args, | |
**kwargs, | |
) -> torch.Tensor: | |
if len(args) > 0 or kwargs.get("scale", None) is not None: | |
deprecation_message = "The `scale` argument is deprecated and will be ignored. Please remove it, as passing it will raise an error in the future. `scale` should directly be passed while calling the underlying pipeline component i.e., via `cross_attention_kwargs`." | |
deprecate("scale", "1.0.0", deprecation_message) | |
residual = hidden_states | |
if attn.spatial_norm is not None: | |
hidden_states = attn.spatial_norm(hidden_states, temb) | |
input_ndim = hidden_states.ndim | |
if input_ndim == 4: | |
batch_size, channel, height, width = hidden_states.shape | |
hidden_states = hidden_states.view( | |
batch_size, channel, height * width | |
).transpose(1, 2) | |
batch_size, sequence_length, _ = ( | |
hidden_states.shape | |
if encoder_hidden_states is None | |
else encoder_hidden_states.shape | |
) | |
if attention_mask is not None: | |
attention_mask = attn.prepare_attention_mask( | |
attention_mask, sequence_length, batch_size | |
) | |
# scaled_dot_product_attention expects attention_mask shape to be | |
# (batch, heads, source_length, target_length) | |
attention_mask = attention_mask.view( | |
batch_size, attn.heads, -1, attention_mask.shape[-1] | |
) | |
if attn.group_norm is not None: | |
hidden_states = attn.group_norm(hidden_states.transpose(1, 2)).transpose( | |
1, 2 | |
) | |
if encoder_hidden_states is None: | |
qkv = attn.to_qkv(hidden_states) | |
split_size = qkv.shape[-1] // 3 | |
query, key, value = torch.split(qkv, split_size, dim=-1) | |
else: | |
if attn.norm_cross: | |
encoder_hidden_states = attn.norm_encoder_hidden_states( | |
encoder_hidden_states | |
) | |
query = attn.to_q(hidden_states) | |
kv = attn.to_kv(encoder_hidden_states) | |
split_size = kv.shape[-1] // 2 | |
key, value = torch.split(kv, split_size, dim=-1) | |
inner_dim = key.shape[-1] | |
head_dim = inner_dim // attn.heads | |
query = query.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2) | |
key = key.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2) | |
value = value.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2) | |
if attn.norm_q is not None: | |
query = attn.norm_q(query) | |
if attn.norm_k is not None: | |
key = attn.norm_k(key) | |
# the output of sdp = (batch, num_heads, seq_len, head_dim) | |
# TODO: add support for attn.scale when we move to Torch 2.1 | |
hidden_states = F.scaled_dot_product_attention( | |
query, key, value, attn_mask=attention_mask, dropout_p=0.0, is_causal=False | |
) | |
hidden_states = hidden_states.transpose(1, 2).reshape( | |
batch_size, -1, attn.heads * head_dim | |
) | |
hidden_states = hidden_states.to(query.dtype) | |
# linear proj | |
hidden_states = attn.to_out[0](hidden_states) | |
# dropout | |
hidden_states = attn.to_out[1](hidden_states) | |
if input_ndim == 4: | |
hidden_states = hidden_states.transpose(-1, -2).reshape( | |
batch_size, channel, height, width | |
) | |
if attn.residual_connection: | |
hidden_states = hidden_states + residual | |
hidden_states = hidden_states / attn.rescale_output_factor | |
return hidden_states | |
class FluxAttnProcessor2_0: | |
"""Attention processor used typically in processing the SD3-like self-attention projections.""" | |
def __init__(self): | |
if not hasattr(F, "scaled_dot_product_attention"): | |
raise ImportError( | |
"FluxAttnProcessor2_0 requires PyTorch 2.0, to use it, please upgrade PyTorch to 2.0." | |
) | |
def __call__( | |
self, | |
attn: Attention, | |
hidden_states: torch.FloatTensor, | |
encoder_hidden_states: torch.FloatTensor = None, | |
attention_mask: Optional[torch.FloatTensor] = None, | |
image_rotary_emb: Optional[torch.Tensor] = None, | |
) -> torch.FloatTensor: | |
batch_size, _, _ = ( | |
hidden_states.shape | |
if encoder_hidden_states is None | |
else encoder_hidden_states.shape | |
) | |
# `sample` projections. | |
query = attn.to_q(hidden_states) | |
key = attn.to_k(hidden_states) | |
value = attn.to_v(hidden_states) | |
inner_dim = key.shape[-1] | |
head_dim = inner_dim // attn.heads | |
query = query.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2) | |
key = key.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2) | |
value = value.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2) | |
if attn.norm_q is not None: | |
query = attn.norm_q(query) | |
if attn.norm_k is not None: | |
key = attn.norm_k(key) | |
# the attention in FluxSingleTransformerBlock does not use `encoder_hidden_states` | |
if encoder_hidden_states is not None: | |
# `context` projections. | |
encoder_hidden_states_query_proj = attn.add_q_proj(encoder_hidden_states) | |
encoder_hidden_states_key_proj = attn.add_k_proj(encoder_hidden_states) | |
encoder_hidden_states_value_proj = attn.add_v_proj(encoder_hidden_states) | |
encoder_hidden_states_query_proj = encoder_hidden_states_query_proj.view( | |
batch_size, -1, attn.heads, head_dim | |
).transpose(1, 2) | |
encoder_hidden_states_key_proj = encoder_hidden_states_key_proj.view( | |
batch_size, -1, attn.heads, head_dim | |
).transpose(1, 2) | |
encoder_hidden_states_value_proj = encoder_hidden_states_value_proj.view( | |
batch_size, -1, attn.heads, head_dim | |
).transpose(1, 2) | |
if attn.norm_added_q is not None: | |
encoder_hidden_states_query_proj = attn.norm_added_q( | |
encoder_hidden_states_query_proj | |
) | |
if attn.norm_added_k is not None: | |
encoder_hidden_states_key_proj = attn.norm_added_k( | |
encoder_hidden_states_key_proj | |
) | |
# attention | |
query = torch.cat([encoder_hidden_states_query_proj, query], dim=2) | |
key = torch.cat([encoder_hidden_states_key_proj, key], dim=2) | |
value = torch.cat([encoder_hidden_states_value_proj, value], dim=2) | |
if image_rotary_emb is not None: | |
from .embeddings import apply_rotary_emb | |
query = apply_rotary_emb(query, image_rotary_emb) | |
key = apply_rotary_emb(key, image_rotary_emb) | |
hidden_states = scaled_dot_product_attention( | |
query, key, value, attn_mask=attention_mask, dropout_p=0.0, is_causal=False | |
) | |
hidden_states = hidden_states.transpose(1, 2).reshape( | |
batch_size, -1, attn.heads * head_dim | |
) | |
hidden_states = hidden_states.to(query.dtype) | |
if encoder_hidden_states is not None: | |
encoder_hidden_states, hidden_states = ( | |
hidden_states[:, : encoder_hidden_states.shape[1]], | |
hidden_states[:, encoder_hidden_states.shape[1] :], | |
) | |
# linear proj | |
hidden_states = attn.to_out[0](hidden_states) | |
# dropout | |
hidden_states = attn.to_out[1](hidden_states) | |
encoder_hidden_states = attn.to_add_out(encoder_hidden_states) | |
return hidden_states, encoder_hidden_states | |
else: | |
return hidden_states | |
class FusedFluxAttnProcessor2_0: | |
"""Attention processor used typically in processing the SD3-like self-attention projections.""" | |
def __init__(self): | |
if not hasattr(F, "scaled_dot_product_attention"): | |
raise ImportError( | |
"FusedFluxAttnProcessor2_0 requires PyTorch 2.0, to use it, please upgrade PyTorch to 2.0." | |
) | |
def __call__( | |
self, | |
attn: Attention, | |
hidden_states: torch.FloatTensor, | |
encoder_hidden_states: torch.FloatTensor = None, | |
attention_mask: Optional[torch.FloatTensor] = None, | |
image_rotary_emb: Optional[torch.Tensor] = None, | |
) -> torch.FloatTensor: | |
batch_size, _, _ = ( | |
hidden_states.shape | |
if encoder_hidden_states is None | |
else encoder_hidden_states.shape | |
) | |
# `sample` projections. | |
qkv = attn.to_qkv(hidden_states) | |
split_size = qkv.shape[-1] // 3 | |
query, key, value = torch.split(qkv, split_size, dim=-1) | |
inner_dim = key.shape[-1] | |
head_dim = inner_dim // attn.heads | |
query = query.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2) | |
key = key.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2) | |
value = value.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2) | |
if attn.norm_q is not None: | |
query = attn.norm_q(query) | |
if attn.norm_k is not None: | |
key = attn.norm_k(key) | |
# the attention in FluxSingleTransformerBlock does not use `encoder_hidden_states` | |
# `context` projections. | |
if encoder_hidden_states is not None: | |
encoder_qkv = attn.to_added_qkv(encoder_hidden_states) | |
split_size = encoder_qkv.shape[-1] // 3 | |
( | |
encoder_hidden_states_query_proj, | |
encoder_hidden_states_key_proj, | |
encoder_hidden_states_value_proj, | |
) = torch.split(encoder_qkv, split_size, dim=-1) | |
encoder_hidden_states_query_proj = encoder_hidden_states_query_proj.view( | |
batch_size, -1, attn.heads, head_dim | |
).transpose(1, 2) | |
encoder_hidden_states_key_proj = encoder_hidden_states_key_proj.view( | |
batch_size, -1, attn.heads, head_dim | |
).transpose(1, 2) | |
encoder_hidden_states_value_proj = encoder_hidden_states_value_proj.view( | |
batch_size, -1, attn.heads, head_dim | |
).transpose(1, 2) | |
if attn.norm_added_q is not None: | |
encoder_hidden_states_query_proj = attn.norm_added_q( | |
encoder_hidden_states_query_proj | |
) | |
if attn.norm_added_k is not None: | |
encoder_hidden_states_key_proj = attn.norm_added_k( | |
encoder_hidden_states_key_proj | |
) | |
# attention | |
query = torch.cat([encoder_hidden_states_query_proj, query], dim=2) | |
key = torch.cat([encoder_hidden_states_key_proj, key], dim=2) | |
value = torch.cat([encoder_hidden_states_value_proj, value], dim=2) | |
if image_rotary_emb is not None: | |
from .embeddings import apply_rotary_emb | |
query = apply_rotary_emb(query, image_rotary_emb) | |
key = apply_rotary_emb(key, image_rotary_emb) | |
hidden_states = scaled_dot_product_attention( | |
query, key, value, dropout_p=0.0, is_causal=False | |
) | |
hidden_states = hidden_states.transpose(1, 2).reshape( | |
batch_size, -1, attn.heads * head_dim | |
) | |
hidden_states = hidden_states.to(query.dtype) | |
if encoder_hidden_states is not None: | |
encoder_hidden_states, hidden_states = ( | |
hidden_states[:, : encoder_hidden_states.shape[1]], | |
hidden_states[:, encoder_hidden_states.shape[1] :], | |
) | |
# linear proj | |
hidden_states = attn.to_out[0](hidden_states) | |
# dropout | |
hidden_states = attn.to_out[1](hidden_states) | |
encoder_hidden_states = attn.to_add_out(encoder_hidden_states) | |
return hidden_states, encoder_hidden_states | |
else: | |
return hidden_states | |