File size: 7,270 Bytes
2ef3fe3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
import torch.nn as nn
import copy, math
import torch
import numpy as np
import torch.nn.functional as F
class Bert(nn.Module):
def __init__(self, encoder, src_embed):
super(Bert, self).__init__()
self.encoder = encoder
self.src_embed = src_embed
def forward(self, src, src_mask):
return self.encoder(self.src_embed(src), src_mask)
class Encoder(nn.Module):
"Encoder是N个EncoderLayer的堆积而成"
def __init__(self, layer, N):
super(Encoder, self).__init__()
#layer是一个SubLayer,我们clone N个
self.layers = clones(layer, N)
#再加一个LayerNorm层
self.norm = LayerNorm(layer.size)
def forward(self, x, mask):
"把输入(x,mask)被逐层处理"
for layer in self.layers:
x = layer(x, mask)
return self.norm(x) #N个EncoderLayer处理完成之后还需要一个LayerNorm
class LayerNorm(nn.Module):
"构建一个layernorm模型"
def __init__(self, features, eps=1e-6):
super(LayerNorm, self).__init__()
self.a_2 = nn.Parameter(torch.ones(features))
self.b_2 = nn.Parameter(torch.zeros(features))
self.eps = eps
def forward(self, x):
mean = x.mean(-1, keepdim=True)
std = x.std(-1, keepdim=True)
return self.a_2 * (x - mean) / (std + self.eps) + self.b_2
class SublayerConnection(nn.Module):
"""
LayerNorm + sublayer(Self-Attenion/Dense) + dropout + 残差连接
为了简单,把LayerNorm放到了前面,这和原始论文稍有不同,原始论文LayerNorm在最后
"""
def __init__(self, size, dropout):
super(SublayerConnection, self).__init__()
self.norm = LayerNorm(size)
self.dropout = nn.Dropout(dropout)
def forward(self, x, sublayer):
#将残差连接应用于具有相同大小的任何子层
return x + self.dropout(sublayer(self.norm(x)))
class EncoderLayer(nn.Module):
"Encoder由self-attn and feed forward构成"
def __init__(self, size, self_attn, feed_forward, dropout):
super(EncoderLayer, self).__init__()
self.self_attn = self_attn
self.feed_forward = feed_forward
self.sublayer = clones(SublayerConnection(size, dropout), 2)
self.size = size
def forward(self, x, mask):
"如上图所示"
x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, mask))
return self.sublayer[1](x, self.feed_forward)
class PositionwiseFeedForward(nn.Module):
"Implements FFN equation."
def __init__(self, d_model, d_ff, dropout=0.1):
super(PositionwiseFeedForward, self).__init__()
self.w_1 = nn.Linear(d_model, d_ff)
self.w_2 = nn.Linear(d_ff, d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
return self.w_2(self.dropout(F.relu(self.w_1(x))))
def make_bert(src_vocab, N=6, d_model=512, d_ff=2048, h=8, dropout=0.1):
"构建模型"
c = copy.deepcopy
attn = MultiHeadedAttention(h, d_model)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
position = PositionalEncoding(d_model, dropout)
model = Bert(
Encoder(EncoderLayer(d_model, c(attn), c(ff), dropout), N),
nn.Sequential(Embeddings(d_model, src_vocab), c(position)),
)
# 随机初始化参数,这非常重要用Glorot/fan_avg.
for p in model.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
return model
def make_bert_without_emb(d_model=128, N=2, d_ff=512, h=8, dropout=0.1):
c = copy.deepcopy
attn = MultiHeadedAttention(h, d_model)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
trainable_encoder = Encoder(EncoderLayer(d_model, c(attn), c(ff), dropout), N)
return trainable_encoder
def clones(module, N):
"克隆N个完全相同的SubLayer,使用了copy.deepcopy"
return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])
def subsequent_mask(size):
"Mask out subsequent positions."
attn_shape = (1, size, size)
subsequent_mask = np.triu(np.ones(attn_shape), k=1).astype('uint8')
return torch.from_numpy(subsequent_mask) == 0
def attention(query, key, value, mask=None, dropout=None):
"计算 'Scaled Dot Product Attention'"
d_k = query.size(-1)
scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
if mask is not None:
mask = mask.unsqueeze(-2)
scores = scores.masked_fill(mask == 0, -1e9)
p_attn = F.softmax(scores, dim = -1)
if dropout is not None:
p_attn = dropout(p_attn)
return torch.matmul(p_attn, value), p_attn
class MultiHeadedAttention(nn.Module):
def __init__(self, h, d_model, dropout=0.1):
"传入head个数及model的维度."
super(MultiHeadedAttention, self).__init__()
assert d_model % h == 0
# 这里假设d_v=d_k
self.d_k = d_model // h
self.h = h
self.linears = clones(nn.Linear(d_model, d_model), 4)
self.attn = None
self.dropout = nn.Dropout(p=dropout)
def forward(self, query, key, value, mask=None):
"Implements Figure 2"
if mask is not None:
# 相同的mask适应所有的head.
mask = mask.unsqueeze(1)
nbatches = query.size(0)
# 1) 首先使用线性变换,然后把d_model分配给h个Head,每个head为d_k=d_model/h
query, key, value = \
[l(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2)
for l, x in zip(self.linears, (query, key, value))]
# 2) 使用attention函数计算scaled-Dot-product-attention
x, self.attn = attention(query, key, value, mask=mask,
dropout=self.dropout)
# 3) 实现Multi-head attention,用view函数把8个head的64维向量拼接成一个512的向量。
#然后再使用一个线性变换(512,521),shape不变.
x = x.transpose(1, 2).contiguous() \
.view(nbatches, -1, self.h * self.d_k)
return self.linears[-1](x)
class Embeddings(nn.Module):
def __init__(self, d_model, vocab):
super(Embeddings, self).__init__()
self.lut = nn.Embedding(vocab, d_model)
self.d_model = d_model
def forward(self, x):
return self.lut(x) * math.sqrt(self.d_model)
class PositionalEncoding(nn.Module):
"实现PE函数"
def __init__(self, d_model, dropout, max_len=5000):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(p=dropout)
# Compute the positional encodings once in log space.
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2) *
-(math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0)
self.register_buffer('pe', pe)
def forward(self, x):
x = x + self.pe[:, :x.size(1)].clone().detach()
return self.dropout(x)
|