循环神经网络:揭秘RNN的核心与应用
循环神经网络(RNN)详解
1. 核心概念
循环神经网络(Recurrent Neural Network, RNN)是一类专为序列数据设计的神经网络,通过内部状态(隐藏状态)保留历史信息,实现对时序依赖关系的建模。其核心特点是:
-
时间步共享参数:同一套权重参数在每一步重复使用。
-
记忆能力:隐藏状态传递历史信息,适合处理变长序列。
2. 经典RNN结构
(1) 基本单元
h_t = tanh(W_{xh} x_t + W_{hh} h_{t-1} + b_h) y_t = W_{hy} h_t + b_y
-
输入:当前时间步的输入
x_t
和上一步的隐藏状态h_{t-1}
。 -
输出:当前状态
h_t
和预测值y_t
。 -
激活函数:通常用
tanh
或ReLU
约束数值范围。
(2) 展开示意图
时间步1: x₁ → h₁ → y₁ 时间步2: x₂ → h₂ → y₂ ... 时间步T: x_T → h_T → y_T
-
隐藏状态
h_t
像“记忆”一样传递信息。
3. 解决序列问题的能力
-
语言建模:预测下一个词(如
"hello" → "world"
)。 -
时间序列预测:股票价格、天气数据。
-
机器翻译:将句子从一种语言映射到另一种语言。
4. 梯度问题与改进
(1) 梯度消失/爆炸
-
问题:长序列中梯度反向传播时指数级衰减或膨胀。
-
解决方案:
-
LSTM(长短期记忆网络):引入门控机制(输入门、遗忘门、输出门)。
f_t = σ(W_f [h_{t-1}, x_t] + b_f) # 遗忘门 i_t = σ(W_i [h_{t-1}, x_t] + b_i) # 输入门 o_t = σ(W_o [h_{t-1}, x_t] + b_o) # 输出门
-
GRU(门控循环单元):简化版LSTM,合并门控。
-
实列代码(创作歌词) :
import zipfile
import random
import torch
import torch.nn.functional as F
import numpy as np
import time
import math
# import d2lzh as d2l
import torch.optim as optimdevice = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
with zipfile.ZipFile('data/jaychou_lyrics.txt.zip') as zin:with zin.open('jaychou_lyrics.txt') as f:corpus_chars = f.read().decode('utf-8')corpus_chars = corpus_chars.replace('\n', ' ').replace('\r', ' ')
corpus_chars = corpus_chars[:10000]
# print('length of corpus: ', len(corpus_chars))idx_to_char = list(set(corpus_chars))
# vprint('number of unique characters: ', len(idx_to_char))char_to_idx = dict([(char, i) for i, char in enumerate(idx_to_char)])
vocab_size = len(char_to_idx)
# print('vocab size: ', len(char_to_idx))corpus_indices = [char_to_idx[char] for char in corpus_chars]
# print('corpus_indices[:20]: ', corpus_indices[:20])# 随机采样
def data_iter_random(corpus_indices, batch_size, num_steps, device=None):# 有多少个样本num_examples = (len(corpus_indices) - 1) // num_steps# 计算一共有多少个batchnum_batches = num_examples // batch_size# num_examples indexexample_indices = list(range(num_examples))random.shuffle(example_indices)for i in range(num_batches):i = i * batch_sizebatch_indices = example_indices[i: i+batch_size]X = [ corpus_chars[j*num_steps: (j+1)*num_steps] for j in batch_indices]Y = [ corpus_chars[j*num_steps+1: (j+1)*num_steps+1] for j in batch_indices]yield X, Y# 相邻采样
def data_iter_consecutive(corpus_indices, batch_size, num_steps, device=None):if device is None:device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')corpus_indices = torch.tensor(corpus_indices,dtype=torch.float32, device=device)# 原始数据长度data_len = len(corpus_indices)batch_len = data_len // batch_sizeindices = corpus_indices[0: batch_size*batch_len].view(batch_size, batch_len)# 减1是因为输出的索引x是相应输入的索引y加1epoch_size = (batch_len - 1) // num_stepsfor i in range(epoch_size):i = i * num_stepsX = indices[:, i: i + num_steps]Y = indices[:, i + 1: i + num_steps + 1]yield X, Y # temp = F.one_hot(torch.tensor([0, 2, 1]), vocab_size)
# print('text one-hot:', temp)def to_onehot(X, size):return F.one_hot(X.t(), size)# X = torch.arange(10).view(2, 5)
# temp = to_onehot(X, vocab_size)
# print(len(temp), temp.shape)def get_params():def _one(shape):ts = torch.tensor(np.random.normal(0, 0.01, size=shape),device=device, dtype=torch.float32)return torch.nn.Parameter(ts, requires_grad=True)# 隐藏层参数W_xh = _one((num_inputs, num_hiddens))W_hh = _one((num_hiddens, num_hiddens))b_h = torch.nn.Parameter(torch.zeros(num_hiddens, device=device,requires_grad=True))# 输出层参数W_hq = _one((num_hiddens, num_outputs))b_q = torch.nn.Parameter(torch.zeros(num_outputs, device=device,requires_grad=True))return torch.nn.ParameterList([W_xh, W_hh, b_h, W_hq, b_q])def init_rnn_state(batch_size, num_hiddens, device):return (torch.zeros((batch_size, num_hiddens), device=device), )def rnn(inputs, state, params):# inputs和outputs皆为num_steps个形状为(batch_size, vocab_size)的矩阵W_xh, W_hh, b_h, W_hq, b_q = params# 上一层传来的隐藏状态H, = stateoutputs = []for X in inputs:H = torch.tanh(torch.matmul(X.float(), W_xh) +torch.matmul(H, W_hh) + b_h)Y = torch.matmul(H, W_hq) + b_qoutputs.append(Y)return outputs, (H, )# X = torch.arange(10).view(2, 5)
# state = init_rnn_state(X.shape[0], num_hiddens, device)
# inputs = to_onehot(X.to(device), vocab_size)
# print('inputs:', inputs.shape)
# params = get_params()
# outputs, state_new = rnn(inputs, state, params)
# print(len(outputs), outputs[0].shape, state_new[0].shape)# 预测
def predict_rnn(prefix, num_chars, rnn, params,init_rnn_state, num_hiddens, vocab_size,device, idx_to_char, char_to_idx):# 初始化的隐藏状态state = init_rnn_state(1, num_hiddens, device)# 将输入的首字符传入到输出序列中output = [char_to_idx[prefix[0]]]for t in range(num_chars + len(prefix) - 1):# 将上一时间步的输出作为当前时间步的输入X = to_onehot(torch.tensor([[output[-1]]], device=device),vocab_size)# 计算输出和更新隐藏状态(Y, state) = rnn(X, state, params)# 下一个时间步的输入是prefix里的字符或者当前的最佳预测字符if t < len(prefix)-1:output.append(char_to_idx[prefix[t+1]])else:output.append(int(Y[0].argmax(dim=1).item()))return ''.join([idx_to_char[i] for i in output])# out = predict_rnn('分开', 10, rnn, params, init_rnn_state,
# num_hiddens, vocab_size, device,
# idx_to_char, char_to_idx)
# print(out)def grad_clipping(params, theta, device):# 存储参数梯度值的平方和开根号norm = torch.tensor([0.0], device=device)for param in params:norm += (param.grad.data**2).sum()norm = norm.sqrt().item()if norm > theta:for param in params:param.grad.data *= (theta/norm)# rnn - 要使用的rnn模型
# get_params - 初始化模型参数
# init_rnn_state - 初始化隐藏状态
# num_hiddens - 隐层神经元个数
# vocab_size - 字典大小
# device - 指定计算在GPU或者CPU上进行
# corpus_indices - 编码之后的样本
# idx_to_char - 索引到字符之间的映射
# char_to_idx - 字符到索引之间的映射
# is_random_iter - 是否采用随机采样
# num_epochs - 训练轮次
# num_steps - 每一批中每个样本的大小
# lr - 学习率
# clipping_theta - 梯度裁剪阈值
# batch_size - 批大小
# pred_period - 每多少次打印一下训练结果
# pred_len - 生成样本长度
# prefixes - 传入的生成引导内容def train_and_predict_rnn(rnn, get_params, init_rnn_state,num_hiddens, vocab_size, device,corpus_indices, idx_to_char,char_to_idx, is_random_iter,num_epochs, num_steps, lr, clipping_theta,batch_size, pred_period, pred_len, prefixes):if is_random_iter:# 随机采样data_iter_fn = data_iter_randomelse:# 相邻采样data_iter_fn = data_iter_consecutive# 初始化模型参数params = get_params()# 交叉熵损失loss = torch.nn.CrossEntropyLoss()for epoch in range(num_epochs):if not is_random_iter:# 如使用相邻采样,在epoch开始时初始化隐藏状态state = init_rnn_state(batch_size, num_hiddens, device)# 损失之和、样本数、起始时间l_sum, n, start = 0.0, 0, time.time()# 生成训练样本及labeldata_iter = data_iter_fn(corpus_indices, batch_size,num_steps, device)for X, Y in data_iter:# 如使用随机采样,在每个小批量更新前初始化隐藏状态if is_random_iter:state = init_rnn_state(batch_size,num_hiddens, device)else:# 否则需要使用detach函数从计算图分离隐藏状态, 这是为了# 使模型参数的梯度计算只依赖一次迭代读取的小批量序列# 梯度节流:https://www.cnblogs.com/catnofishing/# p/13287322.html?tdsourcetag=s_pctim_aiomsgfor s in state:s.detach()# 独热编码inputs = to_onehot(X.long(), vocab_size)# outputs有num_steps个形状为(batch_size, vocab_size)的矩阵(outputs, state) = rnn(inputs, state, params)# 拼接之后形状为(num_steps * batch_size, vocab_size)outputs = torch.cat(outputs, dim=0)# Y的形状是(batch_size, num_steps),转置后再变成长度为# batch * num_steps 的向量,这样跟输出的行一一对应y = torch.transpose(Y, 0, 1).contiguous().view(-1)# 使用交叉熵损失计算平均分类误差l = loss(outputs, y.long())# 梯度清0if params[0].grad is not None:for param in params:param.grad.data.zero_()l.backward(retain_graph=True)# 裁剪梯度grad_clipping(params, clipping_theta, device)# 因为误差已经取过均值,梯度不用再做平均#d2l.sgd(params, lr, 1)optimizer = optim.SGD(params, lr=lr) # params是模型参数,lr是学习率optimizer.step() # 执行一步参数更新optimizer.zero_grad()l_sum += l.item() * y.shape[0]n += y.shape[0]if (epoch + 1) % pred_period == 0:print('epoch %d, perplexity %f, time %.2f sec' %(epoch+1, math.exp(l_sum/n), time.time()-start))for prefix in prefixes:print(' -', predict_rnn(prefix, pred_len, rnn, params,init_rnn_state, num_hiddens,vocab_size, device, idx_to_char,char_to_idx))# 训练并预测模型
num_inputs, num_hiddens, num_outputs = vocab_size, 256, vocab_size
print('will use', device)num_epochs, num_steps, batch_size, lr = 250, 35, 32, 1e2
clipping_theta = 1e-2
pred_period, pred_len, prefixes = 50, 50, ['分开', '不分开']train_and_predict_rnn(rnn, get_params, init_rnn_state,num_hiddens, vocab_size, device,corpus_indices, idx_to_char, char_to_idx,True, num_epochs, num_steps, lr,clipping_theta, batch_size, pred_period,pred_len, prefixes)