本文最后更新于:1 年前
针对于有顺序的数据类型,我们需要设计特定模型。我们不仅仅可以接收一个序列作为输入,而是还可能期望继续猜测这个序列的后续。如果说卷积神经网络可以有效地处理空间信息, 那么本章的循环神经网络 (recurrent neural network,RNN)则可以更好地处理序列信息。 循环神经网络通过引入状态变量存储过去的信息和当前的输入,从而可以确定当前的输出。
循环神经网络 (RNN) 是深度学习模型,通过循环 连接捕获序列的动态,循环连接可以被视为节点网络中的循环。乍一看这似乎违反直觉。毕竟,正是神经网络的前馈性质使得计算顺序变得明确。然而,循环边缘以精确的方式定义,以确保不会出现此类歧义。循环神经网络 跨时间步长(或序列步长)展开 ,每个步骤应用相同的基础参数。**标准连接同步 应用以在同一时间步长 将每一层的激活传播到后续层,循环连接是 动态的 ,跨相邻时间步传递信息。
序列模型
数据呈现时间性,季节性等,会随着时间而改变!
预测明天的股价要比过去的股价更困难,尽管两者都只是估计一个数字。 毕竟,先见之明比事后诸葛亮难得多。 在统计学中,前者(对超出已知观测范围进行预测)称为外推法 (extrapolation), 而后者(在现有观测值之间进行估计)称为内插法 (interpolation)。前者比后者困难的多得多!!!
统计工具
用$x_t$表示价格,即在时间步 (time step)$t \in \mathbb{Z}^+$时,观察到的价格$x_t$。 请注意,$t$对于本文中的序列通常是离散的,并在整数或其子集上变化。 假设一个交易员想在$t$日的股市中表现良好,于是通过以下途径预测$x_t$:
$$ x_t \sim P(x_t \mid x_{t-1}, \ldots, x_1). $$
自回归模型
输入数据的数量, 输入$x_{t-1}, \ldots, x_1$本身因$t$而异。 也就是说,输入数据的数量这个数字将会随着我们遇到的数据量的增加而增加, 因此需要一个近似方法来使这个计算变得容易处理。 本章后面的大部分内容将围绕着如何有效估计$P(x_t \mid x_{t-1}, \ldots, x_1)$展开。 简单地说,它归结为以下两种策略。
第一种策略,假设在现实情况下相当长的序列$x_{t-1}, \ldots, x_1$可能是不必要的, 因此我们只需要满足某个长度为$\tau$的时间跨度, 即使用观测序列$x_{t-1}, \ldots, x_{t-\tau}$。 当下获得的最直接的好处就是参数的数量总是不变的, 至少在$t > \tau$时如此,这就使我们能够训练一个上面提及的深度网络。 这种模型被称为自回归模型 (autoregressive models), 因为它们是对自己执行回归。
第二种策略,保留一些对过去观测的总结$h_t$, 并且同时更新预测$\hat{x}t$和总结$h_t$。 这就产生了基于$\hat{x}t = P(x_t \mid h {t})$估计$x_t$, 以及公式$h_t = g(h {t-1}, x_{t-1})$更新的模型。 由于$h_t$从未被观测到,这类模型也被称为 隐变量自回归模型 (latent autoregressive models)。
如何生成训练数据? 一个经典方法是使用历史观测来预测下一个未来观测。 显然,我们并不指望时间会停滞不前。 然而,一个常见的假设是虽然特定值$x_t$可能会改变, 但是序列本身的动力学不会改变。 这样的假设是合理的,因为新的动力学一定受新的数据影响, 而我们不可能用目前所掌握的数据来预测新的动力学。 统计学家称不变的动力学为静止的 (stationary)。 因此,整个序列的估计值都将通过以下的方式获得:
$$ P(x_1, \ldots, x_T) = \prod_{t=1}^T P(x_t \mid x_{t-1}, \ldots, x_1). $$
注意,如果我们处理的是离散的对象(如单词), 而不是连续的数字,则上述的考虑仍然有效。 唯一的差别是,对于离散的对象, 我们需要使用分类器而不是回归模型来估计$P(x_t \mid x_{t-1}, \ldots, x_1)$。
马尔可夫模型
在自回归模型的近似法中, 我们使用$x_{t-1}, \ldots, x_{t-\tau}$而不是$x_{t-1}, \ldots, x_1$来估计$x_t$。 只要这种是近似精确的,我们就说序列满足马尔可夫条件 (Markov condition)。 特别是,如果$\tau = 1$,得到一个 一阶马尔可夫模型 (first-order Markov model), $P(x)$由下式给出:
$$ P(x_1, \ldots, x_T) = \prod_{t=1}^T P(x_t \mid x_{t-1}) \text{ 当 } P(x_1 \mid x_0) = P(x_1). $$
假设$x_t$仅是离散值时,这样的模型特别棒, 因为在这种情况下,使用动态规划可以沿着马尔可夫链精确地计算结果。 例如,我们可以高效地计算$P(x_{t+1} \mid x_{t-1})$:
$$ \begin{split}\begin{aligned} P(x_{t+1} \mid x_{t-1}) &= \frac{\sum_{x_t} P(x_{t+1}, x_t, x_{t-1})}{P(x_{t-1})}\ &= \frac{\sum_{x_t} P(x_{t+1} \mid x_t, x_{t-1}) P(x_t, x_{t-1})}{P(x_{t-1})}\ &= \sum_{x_t} P(x_{t+1} \mid x_t) P(x_t \mid x_{t-1}) \end{aligned}\end{split} $$
离散的情况下,只需要对可能出现的情况进行罗列,对每种出现情况的概率进行求和罢了。参考最后的马尔可夫链昂!
因果关系
$P(x_1, \ldots, x_T)$基于条件概率公式,总是可以写出:
$$ P(x_1, \ldots, x_T) = \prod_{t=T}^1 P(x_t \mid x_{t+1}, \ldots, x_T). $$
如果基于一个马尔可夫模型, 我们还可以得到一个反向的条件概率分布。 然而,在许多情况下,数据存在一个自然的方向,即在时间上是前进的。 很明显,未来的事件不能影响过去。
训练
首先,我们生成一些数据:使用正弦函数和一些可加性噪声来生成序列数据。
1 2 3 4 5 6 7 8 9 %matplotlib inlineimport torchfrom torch import nnfrom d2l import torch as d2l T = 1000 time = torch.arange(1 , T + 1 , dtype=torch.float32) x = torch.sin(0.01 * time) + torch.normal(0 , 0.2 , (T,)) d2l.plot(time, [x], 'time' , 'x' , xlim=[1 , 1000 ], figsize=(6 , 3 ))
我们使用一个相当简单的架构训练模型: 一个拥有两个全连接层的多层感知机,ReLU激活函数和平方损失。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def init_weights (m ): if type (m) == nn.Linear: nn.init.xavier_uniform_(m.weight)def get_net (): net = nn.Sequential(nn.Linear(4 , 10 ), nn.ReLU(), nn.Linear(10 , 1 )) net.apply(init_weights) return net loss = nn.MSELoss(reduction='none' )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def train (net, train_iter, loss, epochs, lr ): trainer = torch.optim.Adam(net.parameters(), lr) for epoch in range (epochs): for X, y in train_iter: trainer.zero_grad() l = loss(net(X), y) l.sum ().backward() trainer.step() print (f'epoch {epoch + 1 } , ' f'loss: {d2l.evaluate_loss(net, train_iter, loss):f} ' ) net = get_net() train(net, train_iter, loss, 5 , 0.01 ) epoch 1 , loss: 0.063133 epoch 2 , loss: 0.053832 epoch 3 , loss: 0.051174 epoch 4 , loss: 0.050547 epoch 5 , loss: 0.047369
预测
由于训练损失很小,因此我们期望模型能有很好的工作效果。 让我们看看这在实践中意味着什么。 首先是检查模型预测下一个时间步的能力, 也就是单步预测 (one-step-ahead prediction)。
1 2 3 4 5 onestep_preds = net(features) d2l.plot([time, time[tau:]], [x.detach().numpy(), onestep_preds.detach().numpy()], 'time' , 'x' , legend=['data' , '1-step preds' ], xlim=[1 , 1000 ], figsize=(6 , 3 ))
单步预测效果不错。 即使这些预测的时间步超过了600+4(n_train + tau), 其结果看起来仍然是可信的。
对于直到$x_t$的观测序列,其在时间步$t+k$处的预测输出$\hat{x}_{t+k}$ 称为$k$步预测(k-step-ahead-prediction)。换句话说,我们必须使用我们自己的预测(而不是原始数据)来进行多步预测。
1 2 3 4 5 6 7 8 9 10 11 multistep_preds = torch.zeros(T) multistep_preds[: n_train + tau] = x[: n_train + tau]for i in range (n_train + tau, T): multistep_preds[i] = net( multistep_preds[i - tau:i].reshape((1 , -1 ))) d2l.plot([time, time[tau:], time[n_train + tau:]], [x.detach().numpy(), onestep_preds.detach().numpy(), multistep_preds[n_train + tau:].detach().numpy()], 'time' , 'x' , legend=['data' , '1-step preds' , 'multistep preds' ], xlim=[1 , 1000 ], figsize=(6 , 3 ))
绿线的预测显然并不理想。经过几个预测步骤之后,预测的结果很快就会衰减到一个常数。 事实是由于错误的累积,预测误差会接着影响下一次的预测,依此类推。 误差可能会相当快地偏离真实的观测结果。
基于$k = 1, 4, 16, 64$,通过对整个序列预测的计算, 让我们更仔细地看一下$k$步预测的困难。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 max_steps = 64 features = torch.zeros((T - tau - max_steps + 1 , tau + max_steps))for i in range (tau): features[:, i] = x[i: i + T - tau - max_steps + 1 ]for i in range (tau, tau + max_steps): features[:, i] = net(features[:, i - tau:i]).reshape(-1 ) steps = (1 , 4 , 16 , 64 ) d2l.plot([time[tau + i - 1 : T - max_steps + i] for i in steps], [features[:, tau + i - 1 ].detach().numpy() for i in steps], 'time' , 'x' , legend=[f'{i} -step preds' for i in steps], xlim=[5 , 1000 ], figsize=(6 , 3 ))
以上例子清楚地说明了当我们试图预测更远的未来时,预测的质量是如何变化的。 虽然“4步预测”看起来仍然不错,但超过这个跨度的任何预测几乎都是无用的。
文本预处理
一篇文章可以被简单地看作一串单词序列,甚至是一串字符序列。
读取数据集 1 2 3 4 5 6 7 8 9 10 11 12 13 14 d2l.DATA_HUB['time_machine' ] = (d2l.DATA_URL + 'timemachine.txt' , '090b5e7e70c295757f55df93cb0a180b9691891a' )def read_time_machine (): """将时间机器数据集加载到文本行的列表中""" with open (d2l.download('time_machine' ), 'r' ) as f: lines = f.readlines() return [re.sub('[^A-Za-z]+' , ' ' , line).strip().lower() for line in lines] lines = read_time_machine()print (f'# 文本总行数: {len (lines)} ' )print (lines[0 ])print (lines[10 ])
下面的函数将数据集读取到由多条文本行组成的列表中,其中每条文本行都是一个字符串。 为简单起见,我们在这里忽略了标点符号和字母大写。
词元化
下面的tokenize
函数将文本行列表(lines
)作为输入, 列表中的每个元素是一个文本序列(如一条文本行)。 每个文本序列又被拆分成一个词元列表,词元 (token)是文本的基本单位。 最后,返回一个由词元列表组成的列表,其中的每个词元都是一个字符串(string)。
1 2 3 4 5 6 7 8 9 10 11 12 def tokenize (lines, token='word' ): """将文本行拆分为单词或字符词元""" if token == 'word' : return [line.split() for line in lines] elif token == 'char' : return [list (line) for line in lines] else : print ('错误:未知词元类型:' + token) tokens = tokenize(lines)for i in range (11 ): print (tokens[i])
词表
我们构建一个字典,通常也叫做词表 (vocabulary), 用来将字符串类型的词元映射到从0开始的数字索引中。 我们先将训练集中的所有文档合并在一起,对它们的唯一词元进行统计, 得到的统计结果称之为语料 (corpus)。 然后根据每个唯一词元的出现频率,为其分配一个数字索引。 很少出现的词元通常被移除,这可以降低复杂性。 另外,语料库中不存在或已删除的任何词元都将映射到一个特定的未知词元“”。 我们可以选择增加一个列表,用于保存那些被保留的词元, 例如:填充词元(“”); 序列开始词元(“”); 序列结束词元(“”)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 class Vocab : """文本词表""" def __init__ (self, tokens=None , min_freq=0 , reserved_tokens=None ): if tokens is None : tokens = [] if reserved_tokens is None : reserved_tokens = [] counter = count_corpus(tokens) self ._token_freqs = sorted (counter.items(), key=lambda x: x[1 ], reverse=True ) self .idx_to_token = ['<unk>' ] + reserved_tokens self .token_to_idx = {token: idx for idx, token in enumerate (self .idx_to_token)} for token, freq in self ._token_freqs: if freq < min_freq: break if token not in self .token_to_idx: self .idx_to_token.append(token) self .token_to_idx[token] = len (self .idx_to_token) - 1 def __len__ (self ): return len (self .idx_to_token) def __getitem__ (self, tokens ): if not isinstance (tokens, (list , tuple )): return self .token_to_idx.get(tokens, self .unk) return [self .__getitem__(token) for token in tokens] def to_tokens (self, indices ): if not isinstance (indices, (list , tuple )): return self .idx_to_token[indices] return [self .idx_to_token[index] for index in indices] @property def unk (self ): return 0 @property def token_freqs (self ): return self ._token_freqsdef count_corpus (tokens ): """统计词元的频率""" if len (tokens) == 0 or isinstance (tokens[0 ], list ): tokens = [token for line in tokens for token in line] return collections.Counter(tokens)
1 2 vocab = Vocab(tokens)print (list (vocab.token_to_idx.items())[:10 ])
1 2 3 for i in [0 , 10 ]: print ('文本:' , tokens[i]) print ('索引:' , vocab[tokens[i]])
整合所有功能
我们将所有功能打包到load_corpus_time_machine
函数中, 该函数返回corpus
(词元索引列表)和vocab
(时光机器语料库的词表)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def load_corpus_time_machine (max_tokens=-1 ): """返回时光机器数据集的词元索引列表和词表""" lines = read_time_machine() tokens = tokenize(lines, 'char' ) vocab = Vocab(tokens) corpus = [vocab[token] for line in tokens for token in line] if max_tokens > 0 : corpus = corpus[:max_tokens] return corpus, vocab corpus, vocab = load_corpus_time_machine()len (corpus), len (vocab)
为了简化后面章节中的训练,我们使用字符(而不是单词)实现文本词元化;
时光机器数据集中的每个文本行不一定是一个句子或一个段落,还可能是一个单词,因此返回的corpus
仅处理为单个列表,而不是使用多词元列表构成的一个列表。
语言模型和数据集
将文本数据映射为词元, 以及将这些词元可以视为一系列离散的观测,例如单词或字符。 假设长度为$T$的文本序列中的词元依次为$x_1, x_2, \ldots, x_T$。 于是,$x_t(1 \leq t \leq T)$可以被认为是文本序列在时间步$t$处的观测或标签。 在给定这样的文本序列时,语言模型 (language model)的目标是估计序列的联合概率
$$ P(x_1, x_2, \ldots, x_T). $$
例如:只需要一次抽取一个词元$x_t \sim P(x_t \mid x_{t-1}, \ldots, x_1)$,一个理想的语言模型就能够基于模型本身生成自然文本。
学习语言模型
$$ P(x_1, x_2, \ldots, x_T) = \prod_{t=1}^T P(x_t \mid x_1, \ldots, x_{t-1}). \ Example: \ P(\text{deep}, \text{learning}, \text{is}, \text{fun}) = P(\text{deep}) P(\text{learning} \mid \text{deep}) P(\text{is} \mid \text{deep}, \text{learning}) P(\text{fun} \mid \text{deep}, \text{learning}, \text{is}). $$
我们需要计算单词的概率, 以及给定前面几个单词后出现某个单词的条件概率。 这些概率本质上就是语言模型的参数。
一种(稍稍不太精确的)方法是统计单词“deep”在数据集中的出现次数, 然后将其除以整个语料库中的单词总数。 这种方法效果不错,特别是对于频繁出现的单词。 接下来,我们可以尝试估计
$$ \hat{P}(\text{learning} \mid \text{deep}) = \frac{n(\text{deep, learning})}{n(\text{deep})}, $$
$n(x)$和$n(x, x’)$分别是单个单词和连续单词对的出现次数。由于连续单词对“deep learning”的出现频率要低得多, 所以估计这类单词正确的概率要困难得多。 特别是对于一些不常见的单词组合,要想找到足够的出现次数来获得准确的估计可能都不容易。 而对于三个或者更多的单词组合,情况会变得更糟。 许多合理的三个单词组合可能是存在的,但是在数据集中却找不到。 如果数据集很小,或者单词非常罕见,那么这类单词出现一次的机会可能都找不到。
一种常见的策略是执行某种形式的拉普拉斯平滑 (Laplace smoothing), 具体方法是在所有计数中添加一个小常量。 用n表示训练集中的单词总数,用m表示唯一单词的数量。 此解决方案有助于处理单元素问题,例如通过:
$$ \begin{split}\begin{aligned} \hat{P}(x) & = \frac{n(x) + \epsilon_1/m}{n + \epsilon_1}, \ \hat{P}(x’ \mid x) & = \frac{n(x, x’) + \epsilon_2 \hat{P}(x’)}{n(x) + \epsilon_2}, \ \hat{P}(x’’ \mid x,x’) & = \frac{n(x, x’,x’’) + \epsilon_3 \hat{P}(x’’)}{n(x, x’) + \epsilon_3}. \end{aligned}\end{split} $$
$\epsilon_1,\epsilon_2,\epsilon_3$是超参数。 当$\epsilon$为0的时候,不应用平滑。无穷大的时候,$\hat{P}(x)$接近均匀概率分布1/m。这样的模型很容易变得无效,原因如下: 首先,我们需要存储所有的计数; 其次,这完全忽略了单词的意思。 例如,“猫”(cat)和“猫科动物”(feline)可能出现在相关的上下文中, 但是想根据上下文调整这类模型其实是相当困难的。 最后,长单词序列大部分是没出现过的, 因此一个模型如果只是简单地统计先前“看到”的单词序列频率, 那么模型面对这种问题肯定是表现不佳的。
马尔可夫模型与n元语法
序列上的分布满足一阶马尔可夫性质。 阶数越高,对应的依赖关系就越长。 这种性质推导出了许多可以应用于序列建模的近似公式:
$$ \begin{split}\begin{aligned} P(x_1, x_2, x_3, x_4) &= P(x_1) P(x_2) P(x_3) P(x_4),\ P(x_1, x_2, x_3, x_4) &= P(x_1) P(x_2 \mid x_1) P(x_3 \mid x_2) P(x_4 \mid x_3),\ P(x_1, x_2, x_3, x_4) &= P(x_1) P(x_2 \mid x_1) P(x_3 \mid x_1, x_2) P(x_4 \mid x_2, x_3). \end{aligned}\end{split} $$
涉及一个、两个和三个变量的概率公式分别被称为 一元语法 (unigram)、二元语法 (bigram)和三元语法 (trigram)模型。
本质就是$\tau = 0, \tau = 1, \tau = 2$,每个词和前面多少个词相关,因此就有这个概率了昂!
自然语言统计
1 2 3 4 5 6 7 8 9 import randomimport torchfrom d2l import torch as d2l tokens = d2l.tokenize(d2l.read_time_machine()) corpus = [token for line in tokens for token in line] vocab = d2l.Vocab(corpus) vocab.token_freqs[:10 ]
最流行的词看起来很无聊, 这些词通常被称为停用词 (stop words),因此可以被过滤掉。 尽管如此,它们本身仍然是有意义的,我们仍然会在模型中使用它们。 此外,还有个明显的问题是词频衰减的速度相当地快。
1 2 3 freqs = [freq for token, freq in vocab.token_freqs] d2l.plot(freqs, xlabel='token: x' , ylabel='frequency: n(x)' , xscale='log' , yscale='log' )
词频以一种明确的方式迅速衰减。 将前几个单词作为例外消除后,剩余的所有单词大致遵循双对数坐标图上的一条直线。 这意味着单词的频率满足齐普夫定律 (Zipf’s law), 即第$i$个最常用单词的频率$n_i$为:
$$ n_i \propto \frac{1}{i^\alpha}, $$
等价于
$$ \log n_i = -\alpha \log i + c, $$
$\alpha$是刻画分布的指数,$c$是常数。 这告诉我们想要通过计数统计和平滑来建模单词是不可行的, 因为这样建模的结果会大大高估尾部单词的频率,也就是所谓的不常用单词。 那么其他的词元组合,比如二元语法、三元语法等等,又会如何呢? 我们来看看二元语法的频率是否与一元语法的频率表现出相同的行为方式。
1 2 3 bigram_tokens = [pair for pair in zip (corpus[:-1 ], corpus[1 :])] bigram_vocab = d2l.Vocab(bigram_tokens) bigram_vocab.token_freqs[:10 ]
在十个最频繁的词对中,有九个是由两个停用词组成的, 只有一个与“the time”有关。 我们再进一步看看三元语法的频率是否表现出相同的行为方式。
1 2 3 4 trigram_tokens = [triple for triple in zip ( corpus[:-2 ], corpus[1 :-1 ], corpus[2 :])] trigram_vocab = d2l.Vocab(trigram_tokens) trigram_vocab.token_freqs[:10 ]
我们直观地对比三种模型中的词元频率:一元语法、二元语法和三元语法。
1 2 3 4 5 bigram_freqs = [freq for token, freq in bigram_vocab.token_freqs] trigram_freqs = [freq for token, freq in trigram_vocab.token_freqs] d2l.plot([freqs, bigram_freqs, trigram_freqs], xlabel='token: x' , ylabel='frequency: n(x)' , xscale='log' , yscale='log' , legend=['unigram' , 'bigram' , 'trigram' ])
这张图非常令人振奋!原因有很多:
除了一元语法词,单词序列似乎也遵循齐普夫定律, 尽管公式 (8.3.7) 中的指数$\alpha$更小 (指数的大小受序列长度的影响);
词表中n元组的数量并没有那么大,这说明语言中存在相当多的结构, 这些结构给了我们应用模型的希望;
很多n元组很少出现,这使得拉普拉斯平滑非常不适合语言建模。 作为代替,我们将使用基于深度学习的模型。
读取长序列数据
当序列变得太长而不能被模型一次性全部处理时, 我们可能希望拆分这样的序列方便模型读取。我们看一下总体策略。 假设我们将使用神经网络来训练语言模型, 模型中的网络一次处理具有预定义长度 (例如n个时间步)的一个小批量序列。 现在的问题是如何随机生成一个小批量数据的特征和标签以供读取。
由于文本序列可以是任意长的, 例如整本《时光机器》(The Time Machine ), 于是任意长的序列可以被我们划分为具有相同时间步数的子序列。 当训练我们的神经网络时,这样的小批量子序列将被输入到模型中。 假设网络一次只处理具有n个时间步的子序列。每个时间步的词元对应于一个字符,如果大小为5,我们可以选择任意偏移量来指示初始位置,所以我们有相当大的自由度。
如果我们只选择一个偏移量, 那么用于训练网络的、所有可能的子序列的覆盖范围将是有限的。 因此,我们可以从随机偏移量开始划分序列, 以同时获得覆盖性 (coverage)和随机性 (randomness)。 下面,我们将描述如何实现随机采样 (random sampling)和 顺序分区 (sequential partitioning)策略。
随机采样
在随机采样中,每个样本都是在原始的长序列上任意捕获的子序列。 在迭代过程中,来自两个相邻的、随机的、小批量中的子序列不一定在原始序列上相邻。 对于语言建模,目标是基于到目前为止我们看到的词元来预测下一个词元, 因此标签是移位了一个词元的原始序列。
下面的代码每次可以从数据中随机生成一个小批量。 在这里,参数batch_size
指定了每个小批量中子序列样本的数目, 参数num_steps
是每个子序列中预定义的时间步数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 def seq_data_iter_random (corpus, batch_size, num_steps ): """使用随机抽样生成一个小批量子序列""" corpus = corpus[random.randint(0 , num_steps - 1 ):] num_subseqs = (len (corpus) - 1 ) // num_steps initial_indices = list (range (0 , num_subseqs * num_steps, num_steps)) random.shuffle(initial_indices) def data (pos ): return corpus[pos: pos + num_steps] num_batches = num_subseqs // batch_size for i in range (0 , batch_size * num_batches, batch_size): initial_indices_per_batch = initial_indices[i: i + batch_size] X = [data(j) for j in initial_indices_per_batch] Y = [data(j + 1 ) for j in initial_indices_per_batch] yield torch.tensor(X), torch.tensor(Y)
下面我们生成一个从0到34的序列。 假设批量大小为2,时间步数为5,这意味着可以生成 ⌊(35−1)/5⌋=6个“特征-标签”子序列对。 如果设置小批量大小为2,我们只能得到3个小批量。
1 2 3 my_seq = list (range (35 ))for X, Y in seq_data_iter_random(my_seq, batch_size=2 , num_steps=5 ): print ('X: ' , X, '\nY:' , Y)
顺序分区
除了对原始序列可以随机抽样外, 我们还可以保证两个相邻的小批量中的子序列在原始序列上也是相邻的。 这种策略在基于小批量的迭代过程中保留了拆分的子序列的顺序,因此称为顺序分区。
1 2 3 4 5 6 7 8 9 10 11 12 13 def seq_data_iter_sequential (corpus, batch_size, num_steps ): """使用顺序分区生成一个小批量子序列""" offset = random.randint(0 , num_steps) num_tokens = ((len (corpus) - offset - 1 ) // batch_size) * batch_size Xs = torch.tensor(corpus[offset: offset + num_tokens]) Ys = torch.tensor(corpus[offset + 1 : offset + 1 + num_tokens]) Xs, Ys = Xs.reshape(batch_size, -1 ), Ys.reshape(batch_size, -1 ) num_batches = Xs.shape[1 ] // num_steps for i in range (0 , num_steps * num_batches, num_steps): X = Xs[:, i: i + num_steps] Y = Ys[:, i: i + num_steps] yield X, Y
通过顺序分区读取每个小批量的子序列的特征X
和标签Y
。 通过将它们打印出来可以发现: 迭代期间来自两个相邻的小批量中的子序列在原始序列中确实是相邻的。
1 2 for X, Y in seq_data_iter_sequential(my_seq, batch_size=2 , num_steps=5 ): print ('X: ' , X, '\nY:' , Y)
Summary
我们将上面的两个采样函数包装到一个类中, 以便稍后可以将其用作数据迭代器。
1 2 3 4 5 6 7 8 9 10 11 12 class SeqDataLoader : """加载序列数据的迭代器""" def __init__ (self, batch_size, num_steps, use_random_iter, max_tokens ): if use_random_iter: self .data_iter_fn = d2l.seq_data_iter_random else : self .data_iter_fn = d2l.seq_data_iter_sequential self .corpus, self .vocab = d2l.load_corpus_time_machine(max_tokens) self .batch_size, self .num_steps = batch_size, num_steps def __iter__ (self ): return self .data_iter_fn(self .corpus, self .batch_size, self .num_steps)
最后,我们定义了一个总函数load_data_time_machine
, 它同时返回数据迭代器和词表。
1 2 3 4 5 6 def load_data_time_machine (batch_size, num_steps, use_random_iter=False , max_tokens=10000 ): """返回时光机器数据集的迭代器和词表""" data_iter = SeqDataLoader( batch_size, num_steps, use_random_iter, max_tokens) return data_iter, data_iter.vocab
RNN
我们可以基于当前输入$x_{t}$和先前隐状态$h_{t-1}$来计算时间步t处的任何时间的隐状态:
$$ h_t = f(x_{t}, h_{t-1}). $$
无隐状态的神经网络
隐藏层的输出$\mathbf{H} \in \mathbb{R}^{n \times h}$通过下式计算:
$$ \mathbf{H} = \phi(\mathbf{X} \mathbf{W}_{xh} + \mathbf{b}_h). $$
$$ \mathbf{O} = \mathbf{H} \mathbf{W}_{hq} + \mathbf{b}_q, $$
只要可以随机选择“特征-标签”对, 并且通过自动微分和随机梯度下降能够学习网络参数就可以了。
有隐状态的循环神经网络
使用latent variable来总结过去的信息
更新输出状态:
更新隐藏状态:$\mathbf{H}t = \phi(\mathbf{X}t \mathbf{W} {xh} + \mathbf{H} {t-1} \mathbf{W}_{hh} + \mathbf{b}_h).$
输出层的输出类似于多层感知机中的计算:$\mathbf{O}_t = \mathbf{H}t \mathbf{W} {hq} + \mathbf{b}_q.$
步骤:
先更新隐藏状态
再根据当前的隐藏状态和输入,算出输出
损失就是当前输出,和下一个输入之间的不同产生的昂!
在任意时间步t,隐状态的计算可以被视为:
拼接当前时间步t的输入$X_t$和前一时间步t-1的隐状态$X_{t-1}$;
将拼接的结果送入带有激活函数$\phi$的全连接层。 全连接层的输出是当前时间步t的隐状态$H_t$。
困惑度(perplexity)
我们可以通过一个序列中所有的n个词元的交叉熵损失的平均值来衡量语言模型的好坏(准确程度),这里又涉及到交叉熵啊,Softmax的那一套,本质是一个概率的估算:
$$ \frac{1}{n} \sum_{t=1}^n -\log P(x_t \mid x_{t-1}, \ldots, x_1), $$
这使得不同长度的文档的性能具有了可比性。 由于历史原因,自然语言处理的科学家更喜欢使用一个叫做困惑度 (perplexity)的量。
$$ \exp\left(-\frac{1}{n} \sum_{t=1}^n \log P(x_t \mid x_{t-1}, \ldots, x_1)\right). $$
梯度剪裁
长度为$O(T)$的矩阵乘法链,导致数值不稳定。梯度剪裁可以有效预防梯度爆炸。
更多应用
RNN手动实现
1 2 3 4 5 6 7 8 9 %matplotlib inlineimport mathimport torchfrom torch import nnfrom torch.nn import functional as Ffrom d2l import torch as d2l batch_size, num_steps = 32 , 35 train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)
独热编码 1 F.one_hot(torch.tensor([0 , 2 ]), len (vocab))
将这些索引直接输入神经网络可能会使学习变得困难。 我们通常将每个词元表示为更具表现力的特征向量。 最简单的表示称为独热编码 (one-hot encoding)
我们每次采样的小批量数据形状是二维张量: (批量大小,时间步数)。 one_hot
函数将这样一个小批量数据转换成三维张量, 张量的最后一个维度等于词表大小(len(vocab)
)。 我们经常转换输入的维度,以便获得形状为 (时间步数,批量大小,词表大小)的输出。 这将使我们能够更方便地通过最外层的维度, 一步一步地更新小批量数据的隐状态。
1 2 X = torch.arange(10 ).reshape((2 , 5 )) F.one_hot(X.T, 28 ).shape
转置?把时间放在第一个维度,每次访问都是连续的(batch + one_hot vector)
初始化模型参数
我们初始化循环神经网络模型的模型参数。 隐藏单元数num_hiddens
是一个可调的超参数。 当训练语言模型时,输入和输出来自相同的词表。 因此,它们具有相同的维度,即词表的大小。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 def get_params (vocab_size, num_hiddens, device ): num_inputs = num_outputs = vocab_size def normal (shape ): return torch.randn(size=shape, device=device) * 0.01 W_xh = normal((num_inputs, num_hiddens)) W_hh = normal((num_hiddens, num_hiddens)) b_h = torch.zeros(num_hiddens, device=device) W_hq = normal((num_hiddens, num_outputs)) b_q = torch.zeros(num_outputs, device=device) params = [W_xh, W_hh, b_h, W_hq, b_q] for param in params: param.requires_grad_(True ) return params
W_hh移掉之后,就是一个单隐藏层的MLP嘛,一个输入的W和b,一个输出的W和b。
循环神经网络模型
我们首先需要一个init_rnn_state
函数在初始化时返回隐状态。
1 2 def init_rnn_state (batch_size, num_hiddens, device ): return (torch.zeros((batch_size, num_hiddens), device=device), )
下面的rnn
函数定义了如何在一个时间步内计算隐状态和输出。 循环神经网络模型通过inputs
最外层的维度实现循环, 以便逐时间步更新小批量数据的隐状态H
。 此外,这里使用tanh函数作为激活函数。
1 2 3 4 5 6 7 8 9 10 11 def rnn (inputs, state, params ): W_xh, W_hh, b_h, W_hq, b_q = params H, = state outputs = [] for X in inputs: H = torch.tanh(torch.mm(X, W_xh) + torch.mm(H, W_hh) + b_h) Y = torch.mm(H, W_hq) + b_q outputs.append(Y) return torch.cat(outputs, dim=0 ), (H,)
定义了所有需要的函数之后,接下来我们创建一个类来包装这些函数, 并存储从零开始实现的循环神经网络模型的参数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class RNNModelScratch : """从零开始实现的循环神经网络模型""" def __init__ (self, vocab_size, num_hiddens, device, get_params, init_state, forward_fn ): self .vocab_size, self .num_hiddens = vocab_size, num_hiddens self .params = get_params(vocab_size, num_hiddens, device) self .init_state, self .forward_fn = init_state, forward_fn def __call__ (self, X, state ): X = F.one_hot(X.T, self .vocab_size).type (torch.float32) return self .forward_fn(X, state, self .params) def begin_state (self, batch_size, device ): return self .init_state(batch_size, self .num_hiddens, device)
检查输出是否具有正确的形状。 例如,隐状态的维数是否保持不变。
1 2 3 4 5 6 num_hiddens = 512 net = RNNModelScratch(len (vocab), num_hiddens, d2l.try_gpu(), get_params, init_rnn_state, rnn) state = net.begin_state(X.shape[0 ], d2l.try_gpu()) Y, new_state = net(X.to(d2l.try_gpu()), state) Y.shape, len (new_state), new_state[0 ].shape
我们可以看到输出形状是(时间步数×批量大小,词表大小), 而隐状态形状保持不变,即(批量大小,隐藏单元数)。
预测
我们首先定义预测函数来生成prefix
之后的新字符, 其中的prefix
是一个用户提供的包含多个字符的字符串。 在循环遍历prefix
中的开始字符时, 我们不断地将隐状态传递到下一个时间步,但是不生成任何输出。 这被称为预热 (warm-up)期, 因为在此期间模型会自我更新(例如,更新隐状态), 但不会进行预测。 预热期结束后,隐状态的值通常比刚开始的初始值更适合预测, 从而预测字符并输出它们。
1 2 3 4 5 6 7 8 9 10 11 12 def predict_ch8 (prefix, num_preds, net, vocab, device ): """在prefix后面生成新字符""" state = net.begin_state(batch_size=1 , device=device) outputs = [vocab[prefix[0 ]]] get_input = lambda : torch.tensor([outputs[-1 ]], device=device).reshape((1 , 1 )) for y in prefix[1 :]: _, state = net(get_input(), state) outputs.append(vocab[y]) for _ in range (num_preds): y, state = net(get_input(), state) outputs.append(int (y.argmax(dim=1 ).reshape(1 ))) return '' .join([vocab.idx_to_token[i] for i in outputs])
梯度裁剪
对于长度为T的序列,我们在迭代中计算这T个时间步上的梯度, 将会在反向传播过程中产生长度为O(T)的矩阵乘法链。 如 4.8节 所述, 当T较大时,它可能导致数值不稳定, 例如可能导致梯度爆炸或梯度消失。 因此,循环神经网络模型往往需要额外的方式来支持稳定训练。
有时梯度可能很大,从而优化算法可能无法收敛。我们可以通过降低$\eta$的学习率来解决这个问题。一个流行的替代方案是通过将梯度g投影回给定半径 (例如$\theta$)的球来裁剪梯度g。 如下式:
$$ \mathbf{g} \leftarrow \min\left(1, \frac{\theta}{|\mathbf{g}|}\right) \mathbf{g}. $$
我们知道梯度范数永远不会超过$\theta$, 并且更新后的梯度完全与g的原始方向对齐。 它还有一个值得拥有的副作用, 即限制任何给定的小批量数据(以及其中任何给定的样本)对参数向量的影响, 这赋予了模型一定程度的稳定性。 梯度裁剪提供了一个快速修复梯度爆炸的方法, 虽然它并不能完全解决问题,但它是众多有效的技术之一。
1 2 3 4 5 6 7 8 9 10 def grad_clipping (net, theta ): """裁剪梯度""" if isinstance (net, nn.Module): params = [p for p in net.parameters() if p.requires_grad] else : params = net.params norm = torch.sqrt(sum (torch.sum ((p.grad ** 2 )) for p in params)) if norm > theta: for param in params: param.grad[:] *= theta / norm
全局梯度裁剪
训练
我们定义一个函数在一个迭代周期内训练模型:
序列数据的不同采样方法(随机采样和顺序分区)将导致隐状态初始化的差异。
我们在更新模型参数之前裁剪梯度。 这样的操作的目的是,即使训练过程中某个点上发生了梯度爆炸,也能保证模型不会发散。
我们用困惑度来评价模型。如 8.4.4节 所述, 这样的度量确保了不同长度的序列具有可比性。
定义一个函数在一个迭代周期内训练模型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 def train_epoch_ch8 (net, train_iter, loss, updater, device, use_random_iter ): """训练网络一个迭代周期(定义见第8章)""" state, timer = None , d2l.Timer() metric = d2l.Accumulator(2 ) for X, Y in train_iter: if state is None or use_random_iter: state = net.begin_state(batch_size=X.shape[0 ], device=device) else : if isinstance (net, nn.Module) and not isinstance (state, tuple ): state.detach_() else : for s in state: s.detach_() y = Y.T.reshape(-1 ) X, y = X.to(device), y.to(device) y_hat, state = net(X, state) l = loss(y_hat, y.long()).mean() if isinstance (updater, torch.optim.Optimizer): updater.zero_grad() l.backward() grad_clipping(net, 1 ) updater.step() else : l.backward() grad_clipping(net, 1 ) updater(batch_size=1 ) metric.add(l * y.numel(), y.numel()) return math.exp(metric[0 ] / metric[1 ]), metric[1 ] / timer.stop()
循环神经网络模型的训练函数既支持从零开始实现, 也可以使用高级API来实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 def train_ch8 (net, train_iter, vocab, lr, num_epochs, device, use_random_iter=False ): """训练模型(定义见第8章)""" loss = nn.CrossEntropyLoss() animator = d2l.Animator(xlabel='epoch' , ylabel='perplexity' , legend=['train' ], xlim=[10 , num_epochs]) if isinstance (net, nn.Module): updater = torch.optim.SGD(net.parameters(), lr) else : updater = lambda batch_size: d2l.sgd(net.params, lr, batch_size) predict = lambda prefix: predict_ch8(prefix, 50 , net, vocab, device) for epoch in range (num_epochs): ppl, speed = train_epoch_ch8( net, train_iter, loss, updater, device, use_random_iter) if (epoch + 1 ) % 10 == 0 : print (predict('time traveller' )) animator.add(epoch + 1 , [ppl]) print (f'困惑度 {ppl:.1 f} , {speed:.1 f} 词元/秒 {str (device)} ' ) print (predict('time traveller' )) print (predict('traveller' ))
我们训练循环神经网络模型。 因为我们在数据集中只使用了10000个词元, 所以模型需要更多的迭代周期来更好地收敛。
1 2 num_epochs, lr = 500 , 1 train_ch8(net, train_iter, vocab, lr, num_epochs, d2l.try_gpu())
1 2 3 4 net = RNNModelScratch(len (vocab), num_hiddens, d2l.try_gpu(), get_params, init_rnn_state, rnn) train_ch8(net, train_iter, vocab, lr, num_epochs, d2l.try_gpu(), use_random_iter=True )
RNN简洁实现
1 2 3 4 5 6 7 import torchfrom torch import nnfrom torch.nn import functional as Ffrom d2l import torch as d2l batch_size, num_steps = 32 , 35 train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)
定义模型 1 2 num_hiddens = 256 rnn_layer = nn.RNN(len (vocab), num_hiddens)
我们使用张量来初始化隐状态,它的形状是(隐藏层数,批量大小,隐藏单元数)。
1 2 state = torch.zeros((1 , batch_size, num_hiddens)) state.shape
通过一个隐状态和一个输入,我们就可以用更新后的隐状态计算输出。 需要强调的是,rnn_layer
的“输出”(Y
)不涉及输出层的计算: 它是指每个时间步的隐状态,这些隐状态可以用作后续输出层的输入。
1 2 3 X = torch.rand(size=(num_steps, batch_size, len (vocab))) Y, state_new = rnn_layer(X, state) Y.shape, state_new.shape
我们为一个完整的循环神经网络模型定义了一个RNNModel
类。 注意,rnn_layer
只包含隐藏的循环层,我们还需要创建一个单独的输出层。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 class RNNModel (nn.Module): """循环神经网络模型""" def __init__ (self, rnn_layer, vocab_size, **kwargs ): super (RNNModel, self ).__init__(**kwargs) self .rnn = rnn_layer self .vocab_size = vocab_size self .num_hiddens = self .rnn.hidden_size if not self .rnn.bidirectional: self .num_directions = 1 self .linear = nn.Linear(self .num_hiddens, self .vocab_size) else : self .num_directions = 2 self .linear = nn.Linear(self .num_hiddens * 2 , self .vocab_size) def forward (self, inputs, state ): X = F.one_hot(inputs.T.long(), self .vocab_size) X = X.to(torch.float32) Y, state = self .rnn(X, state) output = self .linear(Y.reshape((-1 , Y.shape[-1 ]))) return output, state def begin_state (self, device, batch_size=1 ): if not isinstance (self .rnn, nn.LSTM): return torch.zeros((self .num_directions * self .rnn.num_layers, batch_size, self .num_hiddens), device=device) else : return (torch.zeros(( self .num_directions * self .rnn.num_layers, batch_size, self .num_hiddens), device=device), torch.zeros(( self .num_directions * self .rnn.num_layers, batch_size, self .num_hiddens), device=device))
训练与预测
1 2 3 4 device = d2l.try_gpu() net = RNNModel(rnn_layer, vocab_size=len (vocab)) net = net.to(device) d2l.predict_ch8('time traveller' , 10 , net, vocab, device)
我们使用 8.5节 中 定义的超参数调用train_ch8
,并且使用高级API训练模型。
1 2 num_epochs, lr = 500 , 1 d2l.train_ch8(net, train_iter, vocab, lr, num_epochs, device)
References
LLM面试题
d2l
马尔可夫链
序列模型 Q&A
文本预处理 Q&A
语音模型 Q&A
Introduction of n-gram
RNN Q&A
RNN实现 Q&A