TensorFlow系列專題(八):七步帶你實現RNN循環神經網絡小示例

【前言】:在前面的內容裡,我們已經學習了循環神經網絡的基本結構和運算過程,這一小節裡,我們將用TensorFlow實現簡單的RNN,並且用來解決時序數據的預測問題,看一看RNN究竟能達到什麼樣的效果,具體又是如何實現的。

在這個演示項目裡,我們使用隨機生成的方式生成一個數據集(由0和1組成的二進制序列),然後人為的增加一些數據間的關係。最後我們把這個數據集放進RNN裡,讓RNN去學習其中的關係,實現二進制序列的預測。數據生成的方式如下:

循環生成規模為五十萬的數據集,每次產生的數據為0或1的概率均為0.5。如果連續生成了兩個1(或兩個0)的話,則下一個數據強制為0(或1)。

1. 我們首先導入需要的Python模塊:

 #!/usr/bin/python
# -*- coding: UTF-8 -*-
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow.contrib import rnn

2. 定義一個Data類,用來產生數據:

class Data:
def __init__(self, data_size, num_batch, batch_size, time_step):
self.data_size = data_size # 數據集的大小
self.batch_size = batch_size # 一個batch的大小
self.num_batch = num_batch # batch的數目(num_batch=data_size//batch_size)
self.time_step = time_step # RNN的時間步
self.data_without_rel = [] # 保存隨機生成的數據,數據間沒有聯繫

self.data_with_rel = [] # 保存有時序關係的數據

3. 在構造方法"__init__"中,我們初始化了數據集的大小"data_size"、一個batch的大小"batch_size"、一個epoch中的batch數目"num_batch"以及RNN的時間步"time_step"。接下來我們定義一個"generate_data"方法:

def generate_data(self):
# 隨機生成數據
self.data_without_rel = np.array(np.random.choice(2, size=(self.data_size,)))

for i in range(self.data_size):
if self.data_without_rel[i-1] == 1 and self.data_without_rel[i-2] == 1:
# 之前連續出現兩個1,當前數據設為0
self.data_with_rel.append(0)
continue
elif self.data_without_rel[i-1] == 0 and self.data_without_rel[i-2] == 0:
# 之前連續出現兩個0,當前數據設為1
self.data_with_rel.append(1)
continue
# np.random.rand()產生的隨機數範圍:[0,1]
else:
if np.random.rand() >= 0.5:
self.data_with_rel.append(1)
else:
self.data_with_rel.append(0)
return self.data_without_rel, self.data_with_rel

在第11行代碼中,我們用了 "np.random.choice"函數生成的由0和1組成的長串數據。接下來我們用了一個for循環,在"data_without_rel"保存的數據的基礎上重新生成了一組數據,並保存在"data_with_rel"數組中。為了使生成的數據間具有一定的序列關係,我們使用了前面介紹的很簡單的數據生成方式:以"data_without_rel"中的數據為參照,如果出現了連續兩個1(或0)則生成一個0(或1),其它情況則以相等概率隨機生成0或1。

有了數據我們接下來要用RNN去學習這些數據,看看它能不能學習到我們產生這些數據時使用的策略,即數據間的聯繫。評判RNN是否學習到規律以及學習的效果如何的依據,是我們在第三章裡介紹過的交叉熵損失函數。根據我們生成數據的規則,如果RNN沒有學習到規則,那麼它預測正確的概率就是0.5,否則它預測正確的概率為:(在"data_without_rel"中,連續出現的兩個數字的組合為:00、01、10和11。00和11出現的總概率佔0.5,在這種情況下,如果RNN學習到了規律,那麼一定能預測出下一個數字,00對應1,11對應0。而如果出現的是01或10的話,RNN預測正確的概率就只有0.5,所以綜合起來就是0.75)。

根據交叉熵損失函數,在沒有學習到規律的時候,其交叉熵損失為:

loss = - (0.5 * np.log(0.5) + 0.5 * np.log(0.5)) = 0.6931471805599453

在學習到規律的時候,其交叉熵損失為:

Loss = -0.5*(0.5 * np.log(0.5) + np.log(0.5))

=-0.25 * (1 * np.log(1) ) - 0.25 * (1 * np.log(1))=0.34657359027997264

4. 我們定義"generate_epochs"方法處理生成的數據:

def generate_epochs(self):
# 生成數據
self.generate_data()

data_x = np.zeros([self.num_batch, self.batch_size], dtype=np.int32)

data_y = np.zeros([self.num_batch, self.batch_size], dtype=np.int32)

# 將數據劃分成num_batch組
for i in range(self.num_batch):
data_x[i] = self.data_without_rel[self.batch_size * i:self.batch_size * (i + 1)]
data_y[i] = self.data_with_rel[self.batch_size * i:self.batch_size * (i + 1)]
# 將每個batch的數據按time_step進行切分
epoch_size = self.batch_size // self.time_step

# 返回最終的數據
for i in range(epoch_size):
x = data_x[:, self.time_step * i:self.time_step * (i + 1)]
y = data_y[:, self.time_step * i:self.time_step * (i + 1)]
yield (x, y)

5. 接下來實現RNN部分:

class Model:
def __init__(self, data_size, batch_size, time_step, state_size):
self.data_size = data_size
self.batch_size = batch_size
self.num_batch = self.data_size // self.batch_size
self.time_step = time_step
self.state_size = state_size

# 輸入數據的佔位符
self.x = tf.placeholder(tf.int32, [self.num_batch, self.time_step], name='input_placeholder')
self.y = tf.placeholder(tf.int32, [self.num_batch, self.time_step], name='labels_placeholder')

# 記憶單元的佔位符
self.init_state = tf.zeros([self.num_batch, self.state_size])
# 將輸入數據進行one-hot編碼
self.rnn_inputs = tf.one_hot(self.x, 2)

# 隱藏層的權重矩陣和偏置項
self.W = tf.get_variable('W', [self.state_size, 2])
self.b = tf.get_variable('b', [2], initializer=tf.constant_initializer(0.0))

# RNN隱藏層的輸出
self.rnn_outputs, self.final_state = self.model()


# 計算輸出層的輸出
logits = tf.reshape( tf.matmul(tf.reshape(self.rnn_outputs, [-1, self.state_size]), self.W) + self.b, [self.num_batch, self.time_step, 2])

self.losses = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=self.y, logits=logits)
self.total_loss = tf.reduce_mean(self.losses)
self.train_step = tf.train.AdagradOptimizer(0.1).minimize(self.total_loss)

6. 定義RNN模型:

 def model(self):
cell = rnn.BasicRNNCell(self.state_size)
rnn_outputs, final_state = tf.nn.dynamic_rnn(cell, self.rnn_inputs,
initial_state=self.init_state)
return rnn_outputs, final_state

這裡我們使用了"dynamic_rnn",因此每次會同時處理所有batch的第一組數據,總共處理的次數為:batch_size / time_step。

 def train(self):
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
training_losses = []
d = Data(self.data_size, self.num_batch, self.batch_size, self.time_step)
training_loss = 0
training_state = np.zeros((self.num_batch, self.state_size))
for step, (X, Y) in enumerate(d.generate_epoch()):
tr_losses, training_loss_, training_state, _ = \
sess.run([self.losses, self.total_loss, self.final_state, self.train_step],
feed_dict={self.x: X, self.y: Y, self.init_state: training_state})
training_loss += training_loss_
if step % 20 == 0 and step > 0:
training_losses.append(training_loss/20)
training_loss = 0
return training_losses

7. 到這裡,我們已經實現了整個RNN模型,接下來初始化相關數據,看看RNN的學習效果如何:

 if __name__ == '__main__':
data_size = 500000
batch_size = 2000
time_step = 5
state_size = 6

m = Model(data_size, batch_size, time_step, state_size)
training_losses = m.train()
plt.plot(training_losses)
plt.show()

定義數據集的大小為500000,每個batch的大小為2000,RNN的"時間步"設為5,隱藏層的神經元數目為6。將訓練過程中的loss可視化,結果如下圖中的左側圖像所示:

TensorFlow系列專題(八):七步帶你實現RNN循環神經網絡小示例

TensorFlow系列專題(八):七步帶你實現RNN循環神經網絡小示例

圖1 二進制序列數據訓練的loss曲線

從左側loss曲線可以看到,loss最終穩定在了0.35左右,這與我們之前的計算結果一致,說明RNN學習到了序列數據中的規則。右側的loss曲線是在調整了序列關係的時間間隔後(此時的time_step過小,導致RNN無法學習到序列數據的規則)的結果,此時loss穩定在0.69左右,與之前的計算也吻合。

下一篇,我們將介紹幾種常見的RNN循環神經網絡結構以及部分代碼示例。


對深度學習感興趣,熱愛Tensorflow的小夥伴,歡迎關注我們的網站http://www.panchuang.net 我們的公眾號:磐創AI。


分享到:


相關文章: