Spaces:
Running
Running
# https://github.com/lucidrains/denoising-diffusion-pytorch/blob/main/denoising_diffusion_pytorch/attend.py | |
from functools import wraps | |
from packaging import version | |
from collections import namedtuple | |
import torch | |
from torch import nn, einsum | |
import torch.nn.functional as F | |
from einops import rearrange, repeat | |
from functools import partial | |
# constants | |
AttentionConfig = namedtuple('AttentionConfig', ['enable_flash', 'enable_math', 'enable_mem_efficient']) | |
# helpers | |
def exists(val): | |
return val is not None | |
def default(val, d): | |
return val if exists(val) else d | |
def once(fn): | |
called = False | |
def inner(x): | |
nonlocal called | |
if called: | |
return | |
called = True | |
return fn(x) | |
return inner | |
print_once = once(print) | |
class RMSNorm(nn.Module): | |
def __init__(self, dim): | |
super().__init__() | |
self.g = nn.Parameter(torch.ones(1, dim, 1, 1)) | |
def forward(self, x): | |
return F.normalize(x, dim = 1) * self.g * (x.shape[1] ** 0.5) | |
# main class | |
class Attend(nn.Module): | |
def __init__( | |
self, | |
dropout = 0., | |
flash = False, | |
scale = None | |
): | |
super().__init__() | |
self.dropout = dropout | |
self.scale = scale | |
self.attn_dropout = nn.Dropout(dropout) | |
self.flash = flash | |
assert not (flash and version.parse(torch.__version__) < version.parse('2.0.0')), 'in order to use flash attention, you must be using pytorch 2.0 or above' | |
# determine efficient attention configs for cuda and cpu | |
self.cpu_config = AttentionConfig(True, True, True) | |
self.cuda_config = None | |
if not torch.cuda.is_available() or not flash: | |
return | |
device_properties = torch.cuda.get_device_properties(torch.device('cuda')) | |
device_version = version.parse(f'{device_properties.major}.{device_properties.minor}') | |
if device_version > version.parse('8.0'): | |
print_once('A100 GPU detected, using flash attention if input tensor is on cuda') | |
self.cuda_config = AttentionConfig(True, False, False) | |
else: | |
print_once('Non-A100 GPU detected, using math or mem efficient attention if input tensor is on cuda') | |
self.cuda_config = AttentionConfig(False, True, True) | |
def flash_attn(self, q, k, v): | |
_, heads, q_len, _, k_len, is_cuda, device = *q.shape, k.shape[-2], q.is_cuda, q.device | |
if exists(self.scale): | |
default_scale = q.shape[-1] | |
q = q * (self.scale / default_scale) | |
q, k, v = map(lambda t: t.contiguous(), (q, k, v)) | |
# Check if there is a compatible device for flash attention | |
config = self.cuda_config if is_cuda else self.cpu_config | |
# pytorch 2.0 flash attn: q, k, v, mask, dropout, causal, softmax_scale | |
with torch.backends.cuda.sdp_kernel(**config._asdict()): | |
out = F.scaled_dot_product_attention( | |
q, k, v, | |
dropout_p = self.dropout if self.training else 0. | |
) | |
return out | |
def forward(self, q, k, v): | |
""" | |
einstein notation | |
b - batch | |
h - heads | |
n, i, j - sequence length (base sequence length, source, target) | |
d - feature dimension | |
""" | |
q_len, k_len, device = q.shape[-2], k.shape[-2], q.device | |
if self.flash: | |
return self.flash_attn(q, k, v) | |
scale = default(self.scale, q.shape[-1] ** -0.5) | |
# similarity | |
sim = einsum(f"b h i d, b h j d -> b h i j", q, k) * scale | |
# attention | |
attn = sim.softmax(dim = -1) | |
attn = self.attn_dropout(attn) | |
# aggregate values | |
out = einsum(f"b h i j, b h j d -> b h i d", attn, v) | |
return out | |
class LinearAttention(nn.Module): | |
def __init__(self, dim, heads = 4, dim_head = 32): | |
super().__init__() | |
self.scale = dim_head ** -0.5 | |
self.heads = heads | |
hidden_dim = dim_head * heads | |
self.to_qkv = nn.Conv2d(dim, hidden_dim * 3, 1, bias = False) | |
self.to_out = nn.Sequential( | |
nn.Conv2d(hidden_dim, dim, 1), | |
RMSNorm(dim) | |
) | |
def forward(self, x): | |
b, c, h, w = x.shape | |
qkv = self.to_qkv(x).chunk(3, dim = 1) | |
q, k, v = map(lambda t: rearrange(t, 'b (h c) x y -> b h c (x y)', h = self.heads), qkv) | |
q = q.softmax(dim = -2) | |
k = k.softmax(dim = -1) | |
q = q * self.scale | |
context = torch.einsum('b h d n, b h e n -> b h d e', k, v) | |
out = torch.einsum('b h d e, b h d n -> b h e n', context, q) | |
out = rearrange(out, 'b h c (x y) -> b (h c) x y', h = self.heads, x = h, y = w) | |
return self.to_out(out) | |
class Attention(nn.Module): | |
def __init__(self, dim, heads = 4, dim_head = 32): | |
super().__init__() | |
self.scale = dim_head ** -0.5 | |
self.heads = heads | |
hidden_dim = dim_head * heads | |
self.to_qkv = nn.Conv2d(dim, hidden_dim * 3, 1, bias = False) | |
self.to_out = nn.Conv2d(hidden_dim, dim, 1) | |
def forward(self, x): | |
b, c, h, w = x.shape | |
qkv = self.to_qkv(x).chunk(3, dim = 1) | |
q, k, v = map(lambda t: rearrange(t, 'b (h c) x y -> b h c (x y)', h = self.heads), qkv) | |
q = q * self.scale | |
sim = einsum('b h d i, b h d j -> b h i j', q, k) | |
attn = sim.softmax(dim = -1) | |
out = einsum('b h i j, b h d j -> b h i d', attn, v) | |
out = rearrange(out, 'b h (x y) d -> b (h d) x y', x = h, y = w) | |
return self.to_out(out) |