
-
大模型的文章太多了,良璓不齊,要麼太專業,上來一堆概念,要麼太淺只知道一個概念。
-
開源模型,應用還簡單,但如果定製,如何擼程式碼。

每個神經元大約有1163~11628個突觸,突觸總量在14~15個數量級,放電頻繁大約在400~500Hz,每秒最高計算量大約40萬億次,換算成當前流行詞彙,大腦大概等價於100T引數模型(140B遜爆了),而且有別當前大模型中ReLU啟用函式,大腦惰性計算是不用算0值的,效率更高。

時,Y為

,透過訓練神經網路,以求得X和Y的隱含關係,並給出X為

時,Y的值。
-
對輸入的X,分解成n個向量(舉例方便,實際是直接矩陣計算,實現batch),對每個向量的X1和X2元素,假定一個函式f(x)=x1*w1+x2*w2+b進行計算(其中w1、w2和b用隨機值初始化)
-
用假定的f(x)計算X,得到結果和樣本Y進行比照,如果有差異,調整w1、w2和b的值,重複計算。
-
直到差異收斂到某個程度後(比如小於1),訓練結束。


from torch import nn
from torch.optim import Adam
import torch
model = nn.Linear(2, 1) # 模型定義,內部是 2 * 1的矩陣
optimizer = Adam(model.parameters(), lr=1e-1)
loss_fn = nn.MSELoss()
## 輸入是 10 * 2矩陣,可以理解為10個樣本輸入,每個樣本是一對值組成,譬如:樣本A(0.3,0,6) 樣本B(0.2,0,8)
## 目標是 10 * 1矩陣,可以理解為10個樣本對應的值, 這裡簡單把每個樣本的兩個值相加,並加入一個隨機變數。
## 對於單個變數, Y = sum[x1,x2] + 隨機值, 希望模型能夠學會的是 Y = w1 * x1 + w2 * x2,這裡的w1, w2就是模型的2*1矩陣的兩個值,希望w1、w2最終學為1
input = torch.randn(10,2) * 10
#普朗克常數
bias = 6.6260693
target = torch.add(input.sum(dim=1, keepdim=True),bias)
print('訓練樣本輸入',input,'\n訓練樣本輸入-偏移量', bias, '\n訓練樣本目標值', target)
print("\n模型內部引數的初始隨機值")
for name, param in model.named_parameters():
print(name, param.data)
print("\n開始訓練,會發現差異值越來越小,說明模型在收斂")
for epoch in range(100): # 練習100輪
pred = model(input) # 輸入 10*2 乘積模型的 2*1 得到預測值 10*1
loss = loss_fn(pred, target) # 和目標值差異
# 根據差異,修正模型引數。就是調w1, w2兩個引數
optimizer.zero_grad()
loss.backward()
optimizer.step()
if epoch%10==9:
print (f"Epoch: {epoch} | loss: {loss:.2f}")
# 拿一對隨機值,測下模型, 是否產出值 = sum([x1,x2])
test = torch.randn(1,2) * 10
target = test.sum(dim=1, keepdim=True)
pred = model(test)
print('\n訓練完畢,測試模型。\n可以看到預測值和目標值近似,說明模型訓練成功\n測試輸入:',test, '模型預測:',pred.detach(), '目標值:', target)
print("\n可以看到經過訓練後的模型值w1、w2接近於1")
for name, param in model.named_parameters():
print(name, param.data)
-
如何判斷收斂狀態 -
如何引數調整方向和值

的w和b是否最優解,只需要衡量真實值(訓練樣本值)

和測量值(訓練結算值)

的均方誤差

是否是最小值(顯然趨於0為最小時)
上面程式碼中的loss_fn = nn.MSELoss(),這裡的MSELoss就是指均方誤差損失函式

求w導數,即

,當w初始值為1.8時,L'=11.417163316>0,在區域性單調遞減下,w值的調整方向應該是左,也就是w=1.8-s(比如1.5)。


,在曲率大的地方大幅快速,小的地方小幅趨近。




,L‘是多元中w1的偏導。
無法同時得出w1、w2和b的最優解,也就是全域性最優解是無法得出的,最理想的情況就是假定w2和b為最優解的情況下,求w1最優解,然後再已w1最優解和假定b為最優解的情況下,求w2最優解,這樣求出來的是區域性最優解。

(lr:learning rate,一般設定0.01~0.001)


,透過拓撲設計來滿足不同輸入輸出矩陣形狀和擬合度(對可能的任意f(x)擬合)。
因子x
|
權重w
|
這個事情決策規則
|
模型
|
天氣
|
4
|
天氣(0)&價格(0|1)&同伴(0|1)=不去
天氣(1)&價格(1)&同伴(0|1)=去
天氣(1)&價格(0)&同伴(0|1)=不去
|
天氣*4+價格*2+同伴*1-6
>=0:去
<0:不去
|
價格
|
2
|
||
同伴
|
1
|



,改造後,原先[0,1]被轉換成(0,1),形成一個連續性結果。


,優點輸出空間在(0, 1),缺點是左右兩側導數趨0,會出現梯度消失(下文講),不利於訓練,其他的主流的啟用函式還有ReLU

,tanh、ELU等函式。






-
CNN聚焦區域性資訊,丟失全域性資訊;
-
RNN無法平行計算(序列模式理論上確實也可以做到視窗無限大,然後從左到右把全部資訊帶過來,效率太差)。


-
輸入可被用於數學計算; -
編碼資訊“足夠稠密”,能夠承載訓練過程中,學習的知識圖譜;



-
所有這些單詞都有一條直的紅色列,它們在這個維度上是相似的(名詞維度); -
“woman”和“girl”在很多地方是相似的,“man”和“boy”也是一樣(性別維度); -
“boy”和“girl”也有彼此相似的地方,但這些地方卻與“woman”或“man”不同。(年齡維度);
-
對輸入X(LLM with banzang)進行分詞:"LL","M","with","ban","z","ang"。 -
查詢分詞在gpt詞庫中的索引列表:3069, 44, 351, 3958, 89, 648。 -
對索引列表向量化:經過一個全連線層(後面講,可以理解經過 x*w+b ),形成6*768的矩陣。


-
分詞有多種方式,廣泛使用的子詞分割(subword)方式,空間和效率更加平衡,在gpt2使用的是BPE(Byte-Pair Encoding)。
為什麼透過subword分詞 基於單詞的分詞:因為有running和run,dogs和dog等會形成大量的token,因為沒有覆蓋全導致標記unk,會降低擬合度。基於字母的分詞:沒有意義,比如一箇中文字元,包含太多的資訊。基於子詞的分詞:將不常見的詞拆解成多個常見詞,tokenization被拆分token和ization,一方面tokenization意義被整體保留,另一方面因為token都是出現頻率較高的詞,所以整個詞彙表會縮減到很小而覆蓋全。
-
向量化不一定非得從頭訓練產出,也可以使用GloVe這類預訓練好,含有某些聯絡的模型資料。
-
6*768,768個維度是gpt2預設設定,向量化後的作用在第一部已經說過,不再重複。


-
索引到向量過程,並沒有按論文演算法實現,而是直接隨機初始了w引數,然後起了一層前饋層訓練出768維度的最終值(暴力美學)。
-
出了詞向量外,還疊加了詞位置向量(position embedding),來解決相同詞在不同位置的語意不同,例如“你真狗”,“這是狗,不是豹子”。
論文中詞位置向量細節 引入詞位置的時候,還需要解決兩個問題
如果是用整數數值表示,推理時可能會遇到比訓練時還要長的序列;而且隨著序列長度的增加,位置值會越來越大。 如果用[-1,+1]之間進行表示,要解決序列長度不同時,能表示token在序列中的絕對位置;而且在序列長度不同時,也要在不同序列中token的相對位置/距離保持一致。所以最後的實現是,對奇數維度,sin(詞的index去除以10000的2*維度*詞向量維度(768)),偶數cos,這樣會將不管長度為多少的句子都是固定的長度的位置編碼,以及位置編碼都會被縮放在[-1, 1]的平滑範圍,對模型非常理想的,最重要的是由於正弦和餘弦函式的週期性(且是不同週期),對於任意固定偏移量 k,PE(pos+k)可以表示為 PE(pos) 的線性函式。這意味著模型可以很容易地透過學習注意力權重來關注相對位置,因為相對位置的編碼可以透過簡單的線性變換來獲得。即我知道了絕對距離,也能知道相對距離。最後向量直接相加得出embedding後的值。上面是transform論文內容,但實際上gpt2沒有按照sin和cos去計算,而是直接透過一個前饋層直接訓練得出的,然後和詞向量直接相加,最終訓練過程也不再還原位置資訊,而是以這種隱含的方式將位置資訊參與後續訓練,提高參數擬合度。

比如看到下面這張圖,短時間內大腦可能只對圖片中的“錦江飯店”有印象,即注意力集中在了“錦江飯店”處。短時間內,大腦可能並沒有注意到錦江飯店上面有一串電話號碼,下面有幾個行人,後面還有“喜運來大酒家”等資訊。 大腦在短時間內處理資訊時,主要將圖片中最吸引人注意力的部分讀出來了,類似下面:




相乘(可以簡單理解成 矩陣X乘以矩陣X的倒置),得到一個6*6的矩陣Z(seq.len,seq.len),Z代表著 [LL、M、with、ban、z、ang] 每個token之間的向量距離,換個說法就是把輸入和輸入的倒置矩陣相乘代表著LL、MM、with、ban、zang的token之間兩兩所有維度疊加後的“相似度”,這個是Attention最核心部分。


對映到象限表後,相似肉眼看夾角度數,如下


,比如說LL和M的頂點距離是

,因為cos結果區間是[-1, 1],所以兩個矩陣的相似度也就相當於 兩個矩陣的點積透過模長和cos給歸一成[-1, 1]的值,等價於LL和M兩個矩陣相似度,先算點積再歸一,比如簡化下上面的Q、K、Z矩陣。

回想下,上一篇講到的詞向量,LL和M、以及M和with之間的相似度是不一樣的,經過的訓練後,已經包含了某種掛鏈,比如下圖中cat與kitten在所有維度的接近,要高於dog和houses。

,然後再縮放

(

中k稍後在多頭部分講,此時等於向量維度也就是

),最後再歸一

得到每個token的兩兩相似度(歸一後值)。
-
避免數值過大,再最後歸一的時候會導致梯度消失(vanishing gradients)迴歸效果差。
-
歸一函式的輸入如果太大會導致梯度非常的小,訓練會很難。

實現很簡單,就是對向量中所有元素,指數放大後,算了一個出現機率(指數用e的原因是e的-1次方是0.3679,也在(0, 1)的區間)


再乘以V矩陣,得到一個6*6的矩陣Z',也就是token兩兩關係在每個維度下,和token序列的關係度。




,整個過程如下





,但每層的輸出其實是對X不同維度和力度的調整,需歸一成正態分佈後再相加,實現原理也很簡單:
-
求矩陣每行所有列的均值(例如上面例子,取LL的768維度的均值)
-
算出每行所有列的方差
-
最後用矩陣每行每列,減去這行的均值,再除以這行的標準差(𝜖是一個小的常數,防止除0),然後再引入a和b訓練引數,抵消這個過程的損失


也不是不可以,但不連線其他模組效能會差很多,同時也沒有辦法實現並行訓練(Teacher Forcing方式),所以又設計了Decoder模組,這兩種模組的組合可以實現下面的場景:

-
文字分類(僅使用Encoder模組),label=f(tokens)。
-
下一個token預測(僅使用Decoder模組),next_token=f(token)。比如給定一些詞,做出下一個詞的預測,而且是逐字向後推測。
-
文字翻譯(Encoder+Decoder模組),中文=f(英文)。

,即輸入"LL、M、with、ban、z、ang"(此時的輸入相比Encoder已經包含了token之間的相互關注資訊),調整引數計算出結果和"大模型、與、半臧"比較,loss收斂後結束,但實現上有兩個特別的點:


Teacher Forcing RNN模型,逐字推測訓練過程,會產生大量的預測分支計算,比如“大”->[模型(90%)、小(4%)、量(1%)…],可能在某個迭代會推測出下一個詞是“小”,倒置推測從“大模型”走向“大小變數”的分支,越走越遠,loss收斂不了。Teacher Forcing的概念是仍然正常推測訓練,但下一個token不採納推測實際結果,而按照訓練樣本直接指定正確的值,進行下一個token的預測。在transformer中,把完整的token序列參與訓練,但是遮蔽了每個token的後面資訊,形成了可以並行的執行任務,每個token序列推測過程都無視其他任務推測結果,而採用訓練樣本值進行前序context推測下一個token。

論文中Encoder和Decoder是可以堆疊的,即輸入透過堆疊n層Encoder(論文中使用了2/4/6/8層,只有6層效果最好,但實際在只有一行訓練樣本場景下,n_layers=1是效果最好的)處理後在傳遞到n層Deocder繼續處理,同時每層Deocder的KV都來自最後一層的Encoder的輸出。
-
訓練文字(輸入/輸出)['LLM with banzang', '大模型和半臧']的輸入['LLM with banzang']進行分詞和embedding。
-
Encoder計算訓練文字輸入X的Self-AttentionScore,形成輸入X的token間關注資訊帶入Decoder。
-
訓練文字(輸入/輸出)['LLM with banzang', '大模型和半臧']的輸出['大模型和半臧']進行分詞和embedding。
-
第一層Decoder對訓練文字輸出X'進行右上三角形MASK遮蓋操作後進行Self-AttentionScore,形成輸出X'從左往右詞序的token間關注資訊(即只能瞭解過去資訊,無法提前知道未來詞序,因為要預測),事實上形成了訓練輸出長度為m(
)的並行任務。
-
第二層Decoder將第一層Deocder輸出的結果做為輸入Q,使用Encoder的KV引數,拼接訓練輸入和輸出再次做AttentionScore。
-
預測下一個結果和訓練輸出對比後,反向傳播調整各層引數,直到Loss收斂到預期結果。


-
LL -> LLM -
LLM -> LLM with -
LLM with -> LLM with ban -
LLM with ban -> LLM with banz -
LLM with banz -> LLM with banzang
-
LLM with banzang -> LLM with banzang 大 -> LLM with banzang 大模型 -> LLM with banzang 大模型和半臧; -
-> LLM with banzang 法學碩士 -> LLM with banzang 法學碩士半臧; -
LLM with banzang 蘿莉 -> LLM with banzang 蘿莉男 -> LLM with banzang 蘿莉男和半臧; -
-> LLM with banzang 蘿莉塔 -> LLM with banzang 半臧是蘿莉塔;

),對輸入矩陣做線形變化成token_size*logits_size,同時引入訓練引數做擬合,再經過一個softmax歸一,擬合每個token在字典表中最高機率的index,拿index還原最終token。


import torch
from torch import nn
from torch import optim
from torch.utils import data as Data
import numpy as np
d_model = 6# Embedding的大小
max_len = 1024# 輸入序列的最長大小
d_ff = 12# 前饋神經網路的隱藏層大小, 一般是d_model的四倍
d_k = d_v = 3# 自注意力中K和V的維度, Q的維度直接用K的維度代替, 因為這二者必須始終相等
n_layers = 1# Encoder和Decoder的層數
n_heads = 8# 自注意力多頭的頭數
p_drop = 0.1# propability of dropout
# 對encoder_input的PAD(0)做Mask,可以支援對訓練樣本打掩碼
defget_attn_pad_mask(seq_q, seq_k):
batch, len_q = seq_q.size()
batch, len_k = seq_k.size()
# we define index of PAD is 0, if tensor equals (zero) PAD tokens
pad_attn_mask = seq_k.data.eq(0).unsqueeze(1) # [batch, 1, len_k]
return pad_attn_mask.expand(batch, len_q, len_k) # [batch, len_q, len_k]
# 在deocder_input做上三角掩碼
defget_attn_subsequent_mask(seq):
attn_shape = [seq.size(0), seq.size(1), seq.size(1)] # [batch, target_len, target_len]
subsequent_mask = np.triu(np.ones(attn_shape), k=1) # [batch, target_len, target_len]
subsequent_mask = torch.from_numpy(subsequent_mask)
return subsequent_mask # [batch, target_len, target_len]
# 做encoder_input新增位置編碼
classPositionalEncoding(nn.Module):
def__init__(self, d_model, dropout=.1, max_len=1024):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(p=p_drop)
positional_encoding = torch.zeros(max_len, d_model) # [max_len, d_model]
position = torch.arange(0, max_len).float().unsqueeze(1) # [max_len, 1]
div_term = torch.exp(torch.arange(0, d_model, 2).float() *
(-torch.log(torch.Tensor([10000])) / d_model)) # [max_len / 2]
positional_encoding[:, 0::2] = torch.sin(position * div_term) # even
positional_encoding[:, 1::2] = torch.cos(position * div_term) # odd
# [max_len, d_model] -> [1, max_len, d_model] -> [max_len, 1, d_model]
positional_encoding = positional_encoding.unsqueeze(0).transpose(0, 1)
# register_buffer能夠申請一個緩衝區中的常量, 並且它不會被加入到計算圖中, 也就不會參與反向傳播.
self.register_buffer('pe', positional_encoding)
defforward(self, x):
# x: [seq_len, batch, d_model]
# we can add positional encoding to x directly, and ignore other dimension
x = x + self.pe[:x.size(0), ...]
returnself.dropout(x)
# Encoder和Deocder後的前饋層(含歸一層)
classFeedForwardNetwork(nn.Module):
def__init__(self):
super(FeedForwardNetwork, self).__init__()
self.ff1 = nn.Linear(d_model, d_ff)
# 線性變化還原
self.ff2 = nn.Linear(d_ff, d_model)
self.relu = nn.ReLU()
self.dropout = nn.Dropout(p=p_drop)
self.layer_norm = nn.LayerNorm(d_model)
defforward(self, x):
x = self.ff1(x)
x = self.relu(x)
x = self.ff2(x)
returnself.layer_norm(x)
# 多頭注意力(含歸一層)
classMultiHeadAttention(nn.Module):
def__init__(self, n_heads=8):
super(MultiHeadAttention, self).__init__()
# do not use more instance to implement multihead attention
# it can be complete in one matrix
self.n_heads = n_heads
# we can't use bias because there is no bias term in formular
# 多頭放在同一個矩陣計算
self.W_Q = nn.Linear(d_model, d_k * n_heads, bias=False)
self.W_K = nn.Linear(d_model, d_k * n_heads, bias=False)
self.W_V = nn.Linear(d_model, d_v * n_heads, bias=False)
self.fc = nn.Linear(d_v * n_heads, d_model, bias=False)
self.layer_norm = nn.LayerNorm(d_model)
defforward(self, input_Q, input_K, input_V, attn_mask):
'''
To make sure multihead attention can be used both in encoder and decoder,
we use Q, K, V respectively.
input_Q: [batch, len_q, d_model]
input_K: [batch, len_k, d_model]
input_V: [batch, len_v, d_model]
'''
residual, batch = input_Q, input_Q.size(0)
# [batch, len_q, d_model] -- matmul W_Q --> [batch, len_q, d_q * n_heads] -- view -->
# [batch, len_q, n_heads, d_k,] -- transpose --> [batch, n_heads, len_q, d_k]
Q = self.W_Q(input_Q).view(batch, -1, n_heads, d_k).transpose(1, 2) # [batch, n_heads, len_q, d_k]
K = self.W_K(input_K).view(batch, -1, n_heads, d_k).transpose(1, 2) # [batch, n_heads, len_k, d_k]
V = self.W_V(input_V).view(batch, -1, n_heads, d_v).transpose(1, 2) # [batch, n_heads, len_v, d_v]
attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1) # [batch, n_heads, seq_len, seq_len]
# prob: [batch, n_heads, len_q, d_v] attn: [batch, n_heads, len_q, len_k]
prob, attn = ScaledDotProductAttention()(Q, K, V, attn_mask)
prob = prob.transpose(1, 2).contiguous() # [batch, len_q, n_heads, d_v]
prob = prob.view(batch, -1, n_heads * d_v).contiguous() # [batch, len_q, n_heads * d_v]
output = self.fc(prob) # [batch, len_q, d_model]
returnself.layer_norm(residual + output), attn
# 點積,QKV
classScaledDotProductAttention(nn.Module):
def__init__(self):
super(ScaledDotProductAttention, self).__init__()
defforward(self, Q, K, V, attn_mask):
'''
Q: [batch, n_heads, len_q, d_k]
K: [batch, n_heads, len_k, d_k]
V: [batch, n_heads, len_v, d_v]
attn_mask: [batch, n_heads, seq_len, seq_len]
'''
scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k) # [batch, n_heads, len_q, len_k]
# -1e9是用很大的負數,使得其在Softmax中可以被忽略,實現mask效果
scores.masked_fill_(attn_mask, -1e9)
attn = nn.Softmax(dim=-1)(scores) # [batch, n_heads, len_q, len_k]
prob = torch.matmul(attn, V) # [batch, n_heads, len_q, d_v]
return prob, attn
# Encoder層,可以構建多層Encoder,由多頭注意力和前饋層構成
classEncoderLayer(nn.Module):
def__init__(self):
super(EncoderLayer, self).__init__()
self.encoder_self_attn = MultiHeadAttention()
self.ffn = FeedForwardNetwork()
defforward(self, encoder_input, encoder_pad_mask):
'''
encoder_input: [batch, source_len, d_model]
encoder_pad_mask: [batch, n_heads, source_len, source_len]
encoder_output: [batch, source_len, d_model]
attn: [batch, n_heads, source_len, source_len]
'''
encoder_output, attn = self.encoder_self_attn(encoder_input, encoder_input, encoder_input, encoder_pad_mask)
encoder_output = self.ffn(encoder_output) # [batch, source_len, d_model]
return encoder_output, attn
# Encoder
classEncoder(nn.Module):
def__init__(self):
super(Encoder, self).__init__()
self.source_embedding = nn.Embedding(source_vocab_size, d_model)
self.positional_embedding = PositionalEncoding(d_model)
self.layers = nn.ModuleList([EncoderLayer() for layer in range(n_layers)])
defforward(self, encoder_input):
# encoder_input: [batch, source_len]
encoder_output = self.source_embedding(encoder_input) # [batch, source_len, d_model]
encoder_output = self.positional_embedding(encoder_output.transpose(0, 1)).transpose(0, 1) # [batch, source_len, d_model]
encoder_self_attn_mask = get_attn_pad_mask(encoder_input, encoder_input) # [batch, source_len, source_len]
encoder_self_attns = list()
for layer inself.layers:
# encoder_output: [batch, source_len, d_model]
# encoder_self_attn: [batch, n_heads, source_len, source_len]
encoder_output, encoder_self_attn = layer(encoder_output, encoder_self_attn_mask)
encoder_self_attns.append(encoder_self_attn)
return encoder_output, encoder_self_attns
# Decoder層,可以構建多層Decoder,由多頭注意力和前饋層構成
classDecoderLayer(nn.Module):
def__init__(self):
super(DecoderLayer, self).__init__()
self.decoder_self_attn = MultiHeadAttention()
self.encoder_decoder_attn = MultiHeadAttention()
self.ffn = FeedForwardNetwork()
defforward(self, decoder_input, encoder_output, decoder_self_mask, decoder_encoder_mask):
'''
decoder_input: [batch, target_len, d_mdoel]
encoder_output: [batch, source_len, d_model]
decoder_self_mask: [batch, target_len, target_len]
decoder_encoder_mask: [batch, target_len, source_len]
'''
# masked mutlihead attention
# Q, K, V all from decoder it self
# decoder_output: [batch, target_len, d_model]
# decoder_self_attn: [batch, n_heads, target_len, target_len]
decoder_output, decoder_self_attn = self.decoder_self_attn(decoder_input, decoder_input, decoder_input, decoder_self_mask)
# Q from decoder, K, V from encoder
# decoder_output: [batch, target_len, d_model]
# decoder_encoder_attn: [batch, n_heads, target_len, source_len]
decoder_output, decoder_encoder_attn = self.encoder_decoder_attn(decoder_output, encoder_output, encoder_output, decoder_encoder_mask)
decoder_output = self.ffn(decoder_output) # [batch, target_len, d_model]
return decoder_output, decoder_self_attn, decoder_encoder_attn
# Decoder
classDecoder(nn.Module):
def__init__(self):
super(Decoder, self).__init__()
self.target_embedding = nn.Embedding(target_vocab_size, d_model)
self.positional_embedding = PositionalEncoding(d_model)
self.layers = nn.ModuleList([DecoderLayer() for layer in range(n_layers)])
defforward(self, decoder_input, encoder_input, encoder_output):
'''
decoder_input: [batch, target_len]
encoder_input: [batch, source_len]
encoder_output: [batch, source_len, d_model]
'''
decoder_output = self.target_embedding(decoder_input) # [batch, target_len, d_model]
decoder_output = self.positional_embedding(decoder_output.transpose(0, 1)).transpose(0, 1) # [batch, target_len, d_model]
decoder_self_attn_mask = get_attn_pad_mask(decoder_input, decoder_input) # [batch, target_len, target_len]
decoder_subsequent_mask = get_attn_subsequent_mask(decoder_input) # [batch, target_len, target_len]
decoder_encoder_attn_mask = get_attn_pad_mask(decoder_input, encoder_input) # [batch, target_len, source_len]
decoder_self_mask = torch.gt(decoder_self_attn_mask + decoder_subsequent_mask, 0)
decoder_self_attns, decoder_encoder_attns = [], []
for layer inself.layers:
# decoder_output: [batch, target_len, d_model]
# decoder_self_attn: [batch, n_heads, target_len, target_len]
# decoder_encoder_attn: [batch, n_heads, target_len, source_len]
decoder_output, decoder_self_attn, decoder_encoder_attn = layer(decoder_output, encoder_output, decoder_self_mask, decoder_encoder_attn_mask)
decoder_self_attns.append(decoder_self_attn)
decoder_encoder_attns.append(decoder_encoder_attn)
return decoder_output, decoder_self_attns, decoder_encoder_attns
classTransformer(nn.Module):
def__init__(self):
super(Transformer, self).__init__()
self.encoder = Encoder()
self.decoder = Decoder()
self.fc = nn.Linear(d_model, target_vocab_size, bias=False)
defforward(self, encoder_input, decoder_input):
'''
encoder_input: [batch, source_len]
decoder_input: [batch, target_len]
'''
# encoder_output: [batch, source_len, d_model]
# encoder_attns: [n_layers, batch, n_heads, source_len, source_len]
encoder_output, encoder_attns = self.encoder(encoder_input)
# decoder_output: [batch, target_len, d_model]
# decoder_self_attns: [n_layers, batch, n_heads, target_len, target_len]
# decoder_encoder_attns: [n_layers, batch, n_heads, target_len, source_len]
decoder_output, decoder_self_attns, decoder_encoder_attns = self.decoder(decoder_input, encoder_input, encoder_output)
decoder_logits = self.fc(decoder_output) # [batch, target_len, target_vocab_size]
# decoder_logits: [batch * target_len, target_vocab_size]
return decoder_logits.view(-1, decoder_logits.size(-1))
classTokenizer:
def__init__(self, sentences):
super(Tokenizer, self).__init__()
self.sentences = sentences
defget_source_vocab(self):
returnself.source_vocab
defget_target_vocab(self):
returnself.target_vocab
defconvert_token_to_ids(self):
source_inputs = " ".join([sentences[i][0] for i in range(len(sentences))]).replace('E', '').split()
source_inputs.insert(0, 'E')
self.source_vocab = {k: v for v, k in enumerate(source_inputs)}
target_inputs = " ".join([sentences[i][1] for i in range(len(sentences))]).replace('E', '').replace('S', '').split()
target_inputs.insert(0, 'E')
target_inputs.insert(1, 'S')
self.target_vocab = {k: v for v, k in enumerate(target_inputs)}
encoder_inputs, decoder_inputs, decoder_outputs = [], [], []
for i in range(len(sentences)):
encoder_input = [self.source_vocab[word] for word in sentences[i][0].split()]
decoder_input = [self.target_vocab[word] for word in sentences[i][1].split()]
decoder_output = [self.target_vocab[word] for word in sentences[i][2].split()]
encoder_inputs.append(encoder_input)
decoder_inputs.append(decoder_input)
decoder_outputs.append(decoder_output)
return torch.LongTensor(encoder_inputs), torch.LongTensor(decoder_inputs), torch.LongTensor(decoder_outputs)
defconvert_ids_to_source_sentences(self, ids, split_word=' '):
return split_word.join([key for key inself.source_vocab][ids[i].item()] for i in range(len(ids)))
defconvert_ids_to_target_sentences(self, ids, split_word=' '):
return split_word.join([key for key inself.target_vocab][ids[i].item()] for i in range(len(ids)))
# 訓練
device = torch.device('cuda'if torch.cuda.is_available() else'cpu')
epochs = 400
lr = 1e-1
model = Transformer().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)
sentences = [
# Encoder的輸入 Decoder的輸入(teaching force) #Decoder的輸出(樣本值)
['LLM with banzang E', 'S 半臧 和 大模型', '半臧 和 大模型 E']
]
tokenizer = Tokenizer(sentences)
encoder_inputs, decoder_inputs, decoder_outputs = tokenizer.convert_token_to_ids()
dataset = Seq2SeqDataset(encoder_inputs, decoder_inputs, decoder_outputs)
data_loader = Data.DataLoader(dataset, 2, True)
source_vocab_size = len(tokenizer.get_source_vocab())
target_vocab_size = len(tokenizer.get_target_vocab())
for epoch in range(epochs):
for encoder_input, decoder_input, decoder_output indata_loader:
encoder_input = encoder_input.to(device)
decoder_input = decoder_input.to(device)
decoder_output = decoder_output.to(device)
output = model(encoder_input, decoder_input)
loss = criterion(output, decoder_output.view(-1))
print('Epoch:', '%04d' % (epoch + 1), 'loss =', '{:.6f}'.format(loss), '\t預測:', tokenizer.convert_ids_to_target_sentences(output.max(dim=1, keepdim=False)[1].data), '\t(訓練樣本:', tokenizer.convert_ids_to_target_sentences(decoder_output.view(-1).data), ')')
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 預測
output_len = len(decoder_outputs.squeeze(0))
for encoder_input, decoder_input, decoder_output indata_loader:
encoder_input = encoder_input.to(device)
decoder_input = torch.zeros(1, output_len).type_as(encoder_input.data)
# 從"S"開始,source_vocab={'E':0, 'S':1, ...}
next_symbol = 1
print('輸入:', tokenizer.convert_ids_to_source_sentences(encoder_input.data.squeeze(0)))
for i in range(output_len):
decoder_input[0][i] = next_symbol
output = model(encoder_input, decoder_input)
prob = output.max(dim=1, keepdim=False)[1]
next_symbol = prob.data[i].item()
print('輸出:', tokenizer.convert_ids_to_target_sentences(prob.data[:i+1], ''))
if next_symbol == 0:
break


-
程式碼中Encoder和Deocder有Layer的實現,意思是Encoder或者Decoder可以堆疊執行,Encoder堆疊後的輸出統一提供給每個Decoder(論文中使用了2/4/6/8層,只有6層效果最好,但實際在只有一行訓練樣本場景下,n_layers=1是效果最好的)
#第一層的Encoder的輸出會做為第二層Encoder的輸入
for layer in self.layers:
encoder_output, encoder_self_attn = layer(encoder_output, encoder_self_attn_mask)
encoder_self_attns.append(encoder_self_attn)
-
包括多頭n_heads = 8的實現,也不是對input拆分成8份分別計算,而是和Layer一樣直接合並在一個矩陣中做一次計算(並行實現)
-
透過訓練已經存在的“子技術PM”需求列表(1000+),求出剩餘(1000+)需求的“子技術PM”欄位。
Hugging Face Transformers 是一個開源 Python 庫,其提供了數以千計的預訓練 transformer 模型,可廣泛用於自然語言處理 (NLP) 、計算機視覺、音訊等各種任務。它透過對底層 ML 框架 (如 PyTorch、TensorFlow 和 JAX) 進行抽象,簡化了 transformer 模型的實現,從而大大降低了 transformer 模型訓練或部署的複雜性。

目前預訓練模型存在引數量大,推理時間長,部署難度大的問題,為了減少模型引數及儲存空間,加快推理速度,我們推出了實用性強、適用面廣的中文小型預訓練模型MiniRBT,我們採用瞭如下技術: 全詞掩碼技術:全詞掩碼技術(Whole Word Masking)是預訓練階段的訓練樣本生成策略。簡單來說,原有基於WordPiece的分詞方式會把一個完整的詞切分成若干個子詞,在生成訓練樣本時,這些被分開的子詞會隨機被mask(替換成[MASK];保持原詞彙;隨機替換成另外一個詞)。而在WWM中,如果一個完整的詞的部分WordPiece子詞被mask,則同屬該詞的其他部分也會被mask。更詳細的說明及樣例請參考:Chinese-BERT-wwm,本工作中我們使用哈工大LTP作為分詞工具。兩段式蒸餾:相較於教師模型直接蒸餾到學生模型的傳統方法,我們採用中間模型輔助教師模型到學生模型蒸餾的兩段式蒸餾方法,即教師模型先蒸餾到助教模型(Teacher Assistant),學生模型透過對助教模型蒸餾得到,以此提升學生模型在下游任務的表現。並在下文中貼出了下游任務上兩段式蒸餾與一段式蒸餾的實驗對比,結果表明兩段式蒸餾能取得相比一段式蒸餾更優的效果。構建窄而深的學生模型。相較於寬而淺的網路結構,如TinyBERT結構(4層,隱層維數312),我們構建了窄而深的網路結構作為學生模型MiniRBT(6層,隱層維數256和288),實驗表明窄而深的結構下游任務表現更優異。MiniRBT目前有兩個分支模型,分別為MiniRBT-H256和MiniRBT-H288,表示隱層維數256和288,均為6層Transformer結構,由兩段式蒸餾得到。同時為了方便實驗效果對比,我們也提供了TinyBERT結構的RBT4-H312模型下載。
import torch
from transformers import AdamW, AutoTokenizer, AutoModelForSequenceClassification
from torch.utils.data import Dataset, DataLoader
from tqdm.autoimport tqdm
checkpoint = "hfl/minirbt-h288"
tokenizer = AutoTokenizer.from_pretrained(checkpoint)
model = AutoModelForSequenceClassification.from_pretrained(checkpoint, num_labels=3)
device = torch.device('cuda'if torch.cuda.is_available() else'cpu')
model.to(device)
# 1. 訓練和評估資料準備
class TrainDataset(Dataset):
def __init__(self, sentences):
self.sentences = sentences
def __len__(self):
return len(self.sentences)
def __getitem__(self, idx):
return sentences['train'][idx][0], sentences['train'][idx][1], sentences['train'][idx][2]
class ValidationDataset(Dataset):
def __init__(self, sentences):
self.sentences = sentences
def __len__(self):
return len(self.sentences)
def __getitem__(self, idx):
return sentences['validation'][idx][0], sentences['validation'][idx][1], sentences['validation'][idx][2]
def data_collator(batch):
sentence ,labels = [],[]
for item in batch:
sentence.append([item[0], item[1]])
labels.append(item[2])
inputs = tokenizer(sentence, padding=True, truncation=True, return_tensors="pt")
inputs['labels'] = torch.tensor(labels)
return inputs
# 2. 準備訓練和評估資料
sentences = {
"train": [
["I have a green apple", "apple", 0],
["I have a black apple", "apple", 1],
["I have a red apple", "apple", 2],
["I have a red banner", "banner", 2]
],
"validation": [
["I have a black apple", "apple", 1],
["I have a green banner", "banner", 0]
]
}
# 3. 評估模型
import evaluate
from transformers import TrainingArguments
from transformers import Trainer
import numpy as np
def compute_metrics(eval_preds):
metric = evaluate.load("glue", "mrpc")
logits, labels = eval_preds
predictions = np.argmax(logits, axis=-1)
return metric.compute(predictions=predictions, references=labels)
training_args = TrainingArguments("test-trainer", evaluation_strategy="epoch")
trainer = Trainer(
model,
training_args,
train_dataset = TrainDataset(sentences),
eval_dataset = ValidationDataset(sentences),
data_collator = data_collator,
tokenizer = tokenizer,
compute_metrics=compute_metrics
)
trainer.train()
# 4. 進行訓練
train_dataset = TrainDataset(sentences)
train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=8, collate_fn=data_collator)
num_epochs = 10
progress_bar = tqdm(range(num_epochs))
model.train()
# 觀測指標
metric = evaluate.load("glue", "mrpc")
optimizer = AdamW(model.parameters(), lr=5e-5)
for epoch in range(num_epochs):
for batch in train_dataloader:
optimizer.zero_grad()
output = model(**batch)
loss, logits = output[:2]
loss.backward()
optimizer.step()
predictions = torch.argmax(logits, dim=-1)
metric.add_batch(predictions=predictions, references=batch["labels"])
# 過程日誌
print(f"Epoch {epoch + 1}, Loss: {loss.item()}")
progress_bar.update(1)
# 觀測指標結果
metric.compute()
# 5. 預測
model.eval()
sentences = ["I have a black banner", "banner"]
logits = model(**tokenizer(sentences[0], sentences[1], return_tensors="pt")).logits
pred = torch.argmax(logits,dim=-1)
print(pred)
-
模型引入:AutoModelForSequenceClassification.from_pretrained(checkpoint, num_labels=3)(num_labels在案例中,等於“子技術PM”的去重數)
-
checkpoint = "hfl/minirbt-h288"
tokenizer = AutoTokenizer.from_pretrained(checkpoint)
model = AutoModelForSequenceClassification.from_pretrained(checkpoint, num_labels=3)
-
訓練樣本準備:分成兩部分Train和Validation兩個訓練集,每個訓練集分成三個維度,前兩個是文字的兩個普通維度,最後一個是labels表明了文字的分類,在案例中就是“子技術PM”值 -
模型評估:使用Transformer的高階工具Trainer,看模型對資料的準確性指標(evaluate需要安裝,預設了指標)
-
訓練:同樣對整個訓練結束增加指標觀測(evaluation_strategy="epoch"),tqdm實現進度條。
-
預測:
連線本地執行後,執行時會出現一些問題
依賴的包透過pip,按錯誤提示在本機上挨個安裝一遍,比如說 pip install torch 透過hugginface線上引入的模型,可能會遇到網路連線問題,遇到的話,下載模型到本地https://huggingface.co/docs/transformers/installation#offline-mode Trainer包可能出現依賴版本問題,指標觀察部分可以註釋掉,包括trainer.train()、metric.compute()
import torch
from transformers import AdamW, AutoTokenizer, AutoModelForSequenceClassification
from torch.utils.data import Dataset, DataLoader
from tqdm.autoimport tqdm
from datasets import load_dataset
device = 'mps'if torch.backends.mps.is_available() else'cpu'
class UniqueLabelsDataset(Dataset):
def __init__(self, data, labels):
self.data = data
self.labels = labels
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
return self.labels[idx], pre_process_input(self.data[idx].values())
# 訓練和評估資料準備
def data_collator(batch):
sentence, labels = [],[]
for l, item in batch:
sentence.append(item)
labels.append(l.item())
inputs = tokenizer(sentence, padding=True, truncation=True, return_tensors="pt")
inputs['labels'] = torch.tensor(labels)
return inputs
def pre_process_input(inputs):
return f" ".join([str(value).replace(' ', '') for value in inputs])
# 準備訓練和評估資料
'''
{
'標題': Value(dtype='string', id=None),
'需求指派人': Value(dtype='string', id=None),
'***': Value(dtype='int64', id=None),
'***': Value(dtype='string', id=None),
'***': Value(dtype='string', id=None),
'***': Value(dtype='string', id=None),
'子需求技術pm': Value(dtype='int64', id=None),
'team_alias': Value(dtype='string', id=None)}
'''
train_dataset = load_dataset("csv", data_files = "./tt_l_demands.csv", delimiter=",")['train']
labels_vocab, unique_labels = torch.unique(torch.tensor(train_dataset['子需求技術pm']), return_inverse=True)
num_labels = len(labels_vocab)
train_dataset.remove_columns(["子需求技術pm"])
train_dataloader = DataLoader(UniqueLabelsDataset(train_dataset, unique_labels), shuffle=True, batch_size=64, collate_fn=data_collator)
# 初始化模型
checkpoint = "/Users/nanzhang/minirbt-h288"
tokenizer = AutoTokenizer.from_pretrained(checkpoint)
model = AutoModelForSequenceClassification.from_pretrained(checkpoint, num_labels=num_labels)
model.to(device)
from transformers import get_scheduler
# 進行訓練
num_epochs = 100
# 顯示訓練進度,dynamic_ncols=True保持在一行
progress_bar = tqdm(range(num_epochs), dynamic_ncols=True)
# lr從 1e-1 的預熱,爬坡到 5e-5
num_training_steps = num_epochs * len(train_dataloader)
lr_scheduler = get_scheduler(
"linear",
optimizer=optimizer,
num_warmup_steps=1e-1,
num_training_steps=num_training_steps,
)
model.train()
# 觀測指標
optimizer = AdamW(model.parameters(), lr=5e-5)
for epoch in range(num_epochs):
for batch in train_dataloader:
batch = {k: v.to(device) for k, v in batch.items()}
output = model(**batch)
loss, logits = output[:2]
loss.backward()
optimizer.step()
lr_scheduler.step()
optimizer.zero_grad()
# 顯示進度
#print(f"Epoch {epoch + 1}, Loss: {loss.item()}")
progress_bar.set_postfix(loss=loss.item())
progress_bar.update(1)
# 持久化訓練好的引數
model.save_pretrained('./model')
tokenizer.save_pretrained('./model')
# 預測
sentences = ["***", '407080', '407080', '2722', '4066', '淘天專案', '50932674', '***']
input = tokenizer(pre_process_input(sentences), padding=True, truncation=True, return_tensors="pt")
input.to(device)
logits = model(**input).logits
pred = torch.argmax(logits,dim=-1)
print(labels_vocab[pred.item()])
-
device = 'mps' if torch.backends.mps.is_available() else 'cpu',是啟用M3的GPU加速
-
訓練的資料結構中,“子需求技術PM”欄位是labels,即“分類值”,訓練目標
{
'標題': Value(dtype='string', id=None),
'需求指派人': Value(dtype='string', id=None),
'***': Value(dtype='int64', id=None),
'***': Value(dtype='string', id=None),
'***': Value(dtype='string', id=None),
'子需求pd': Value(dtype='string', id=None),
'子需求技術pm': Value(dtype='int64', id=None),
'***': Value(dtype='string', id=None)
}
-
lr_scheduler,是預熱LR值,保持動態的LR(1e-1到5e-5範圍),加速收斂。

