6. 使用LSTM遞歸神經網絡進行時間序列預測
任務: 建立循環網絡, 對時間序列的數據進行學習預測
數據集: 1949年1月至1960年12月,即12年,144次數據記錄, 每個月飛機的乘客數量。
數據形式如下:
一.LSTM迴歸網絡
## 2019.11.1
# time_step = 1 lstm-cell個數
# n_inputs = 1 輸入大小, 也就是look-back
# n_outputs = 1 輸出大小, 全連接輸出
# iter_epoch = 100 訓練次數
# batch_size = 10 每次訓練的大小
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import tensorflow as tf
import matplotlib.pyplot as plt
import pandas as pd
## 讀取csv數據文件
def readCSV(filePath):
dataset = pd.read_csv(filePath, usecols=[1])
dataset = dataset.values.astype(np.float)
plt.plot(dataset)
plt.show()
return dataset
## 將數據轉換為x, y
## 這個函數將創造一個數據, x為時間t時刻的數據, y為時間t+1時刻的數據
## look_back 指代 我們需要過去多少個時間點的數據作為輸入 預測下一個點的數據
def create_dataset(data, look_back=1):
dataX, dataY = [], []
for i in range(len(data) - look_back - 1):
dataX.append(data[i:(i+look_back), 0])
dataY.append(data[i+look_back, 0])
return np.array(dataX), np.array(dataY).reshape([-1, 1])
def mean_square_error(pred, y):
return np.power(np.sum(np.power(pred - y, 2)) / pred.shape[0], 0.5)
def lstm_net(X, y, batch_size, units=100, learning_rate=0.002):
lstm_cell = tf.contrib.rnn.BasicLSTMCell(units, forget_bias=1.0, state_is_tuple=True)
init_state = lstm_cell.zero_state(batch_size, dtype=tf.float32)
# batch_size, units
outputs, final_state = tf.nn.dynamic_rnn(lstm_cell, X, initial_state=init_state, time_major=False)
result = tf.layers.dense(outputs, units=1, activation=None)
loss = tf.reduce_mean(tf.square(y-result))
train_op = tf.train.AdamOptimizer(learning_rate).minimize(loss)
return result, loss, train_op
def train(data):
## 劃分訓練數據集和測試數據集
tst = 100
x_train, x_test = data[0:tst, :], data[tst:, :]
## 產生數據
trainX, trainY = create_dataset(x_train)
testX, testY = create_dataset(x_test)
print(trainX.shape)
print(trainY.shape)
## reshape input to [samples, time step, features]
trainX = np.reshape(trainX, newshape=[trainX.shape[0], 1, trainX.shape[1]])
testX = np.reshape(testX, newshape=[testX.shape[0], 1, testX.shape[1]])
time_step = 1
n_inputs = 1
n_outputs = 1
iter_epoch = 100
batch_size = 10
x = tf.placeholder(tf.float32, shape=[None, time_step, n_inputs])
y = tf.placeholder(tf.float32, shape=[None, n_outputs])
result, loss, train_op = lstm_net(x, y, batch_size)
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
re_loss = []
for i in range(iter_epoch):
temp = 0
for j in range(int(trainX.shape[0] / batch_size)):
tx = trainX[(j * batch_size):((j + 1) * batch_size), :, :]
ty = trainY[(j * batch_size):((j + 1) * batch_size), :]
ls, _ = sess.run([loss, train_op], feed_dict={x: tx, y: ty})
temp += ls
print(temp / trainX.shape[0])
re_loss.append(temp / trainX.shape[0])
plt.plot(range(iter_epoch), re_loss, c='red')
plt.show()
re_train = []
for j in range(int(trainX.shape[0] / batch_size)):
tx = trainX[(j * batch_size):((j + 1) * batch_size), :, :]
ty = trainY[(j * batch_size):((j + 1) * batch_size), :]
pred = sess.run([result], feed_dict={x: tx, y: ty})
re_train.append(pred)
re_test = []
for j in range(int(testX.shape[0] / batch_size)):
tx = testX[(j * batch_size):((j + 1) * batch_size), :, :]
ty = testY[(j * batch_size):((j + 1) * batch_size), :]
pred = sess.run([result], feed_dict={x: tx, y: ty})
re_test.append(pred)
re_train = np.array(re_train).reshape([-1, 1])
re_test = np.array(re_test).reshape([-1, 1])
re_train = scaler.inverse_transform(re_train)
re_test = scaler.inverse_transform(re_test)
data = scaler.inverse_transform(data)
## plot 訓練集預測
plt.plot(range(re_train.shape[0]), re_train, label='Train Predict')
## plot 測試集預測
plt.plot(range(tst, tst + re_test.shape[0]), re_test, label='Test Predict')
## plot 實際數據
plt.plot(range(data.shape[0]), data, label='Data')
plt.show()
print('Train Accuary {:.3f}'.format(np.sqrt(mean_square_error(re_train, trainY[0:re_train.shape[0]]))))
print('Test Accuary {:.3f}'.format(np.sqrt(mean_square_error(re_test, testY[0:re_test.shape[0]]))))
v1 = np.array(re_train).reshape([-1, 1])
v2 = trainY[0: v1.shape[0], :]
print(np.hstack([v1, v2]))
if __name__ == '__main__':
pathFile = 'data/airline-passengers.csv'
## 加載數據
data = readCSV(pathFile)
## 正則化數據 數據收縮到[0, 1]
scaler = MinMaxScaler(feature_range=(0, 1))
data = scaler.fit_transform(data)
train(data)
2.LSTM for Regression Using the Window Method
將look_back = 3, 即是把窗口size設置為3,對於每個lstm-cell, 我們輸入n_inputs = look_back, 我們使用(t-look_back, t-1)間的數據進行預測t時刻
## 2019.11.1
# time_step = 1 lstm-cell個數
# n_inputs = 3 輸入大小, 也就是look-back
# n_outputs = 1 輸出大小, 全連接輸出
# iter_epoch = 100 訓練次數
# batch_size = 10 每次訓練的大小
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import tensorflow as tf
import matplotlib.pyplot as plt
import pandas as pd
## 讀取csv數據文件
def readCSV(filePath):
dataset = pd.read_csv(filePath, usecols=[1])
dataset = dataset.values.astype(np.float)
plt.plot(dataset)
plt.show()
return dataset
## 將數據轉換為x, y
## 這個函數將創造一個數據, x為時間t時刻的數據, y為時間t+1時刻的數據
## look_back 指代 我們需要過去多少個時間點的數據作為輸入 預測下一個點的數據
def create_dataset(data, look_back=1):
dataX, dataY = [], []
for i in range(len(data) - look_back - 1):
dataX.append(data[i:(i+look_back), 0])
dataY.append(data[i+look_back, 0])
return np.array(dataX), np.array(dataY).reshape([-1, 1])
def mean_square_error(pred, y):
return np.power(np.sum(np.power(pred - y, 2)) / pred.shape[0], 0.5)
def lstm_net(X, y, batch_size, units=100, learning_rate=0.002):
lstm_cell = tf.contrib.rnn.BasicLSTMCell(units, forget_bias=1.0, state_is_tuple=True)
init_state = lstm_cell.zero_state(batch_size, dtype=tf.float32)
# batch_size, units
outputs, final_state = tf.nn.dynamic_rnn(lstm_cell, X, initial_state=init_state, time_major=False)
result = tf.layers.dense(outputs, units=1, activation=None)
loss = tf.reduce_mean(tf.square(y-result))
train_op = tf.train.AdamOptimizer(learning_rate).minimize(loss)
return result, loss, train_op
def train(data):
## 劃分訓練數據集和測試數據集
tst = 100
x_train, x_test = data[0:tst, :], data[tst:, :]
## 產生數據
trainX, trainY = create_dataset(x_train, 3)
testX, testY = create_dataset(x_test, 3)
print(trainX.shape)
print(trainY.shape)
## reshape input to [samples, time step, features]
trainX = np.reshape(trainX, newshape=[trainX.shape[0], 1, trainX.shape[1]])
testX = np.reshape(testX, newshape=[testX.shape[0], 1, testX.shape[1]])
time_step = 1
n_inputs = 3
n_outputs = 1
iter_epoch = 100
batch_size = 10
x = tf.placeholder(tf.float32, shape=[None, time_step, n_inputs])
y = tf.placeholder(tf.float32, shape=[None, n_outputs])
result, loss, train_op = lstm_net(x, y, batch_size)
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
re_loss = []
for i in range(iter_epoch):
temp = 0
for j in range(int(trainX.shape[0] / batch_size)):
tx = trainX[(j * batch_size):((j + 1) * batch_size), :, :]
ty = trainY[(j * batch_size):((j + 1) * batch_size), :]
ls, _ = sess.run([loss, train_op], feed_dict={x: tx, y: ty})
temp += ls
print(temp / trainX.shape[0])
re_loss.append(temp / trainX.shape[0])
plt.plot(range(iter_epoch), re_loss, c='red')
plt.show()
re_train = []
for j in range(int(trainX.shape[0] / batch_size)):
tx = trainX[(j * batch_size):((j + 1) * batch_size), :, :]
ty = trainY[(j * batch_size):((j + 1) * batch_size), :]
pred = sess.run([result], feed_dict={x: tx, y: ty})
re_train.append(pred)
re_test = []
for j in range(int(testX.shape[0] / batch_size)):
tx = testX[(j * batch_size):((j + 1) * batch_size), :, :]
ty = testY[(j * batch_size):((j + 1) * batch_size), :]
pred = sess.run([result], feed_dict={x: tx, y: ty})
re_test.append(pred)
re_train = np.array(re_train).reshape([-1, 1])
re_test = np.array(re_test).reshape([-1, 1])
re_train = scaler.inverse_transform(re_train)
re_test = scaler.inverse_transform(re_test)
data = scaler.inverse_transform(data)
## plot 訓練集預測
plt.plot(range(re_train.shape[0]), re_train, label='Train Predict')
## plot 測試集預測
plt.plot(range(tst, tst + re_test.shape[0]), re_test, label='Test Predict')
## plot 實際數據
plt.plot(range(data.shape[0]), data, label='Data')
plt.show()
print('Train Accuary {:.3f}'.format(np.sqrt(mean_square_error(re_train, trainY[0:re_train.shape[0]]))))
print('Test Accuary {:.3f}'.format(np.sqrt(mean_square_error(re_test, testY[0:re_test.shape[0]]))))
v1 = np.array(re_train).reshape([-1, 1])
v2 = trainY[0: v1.shape[0], :]
print(np.hstack([v1, v2]))
if __name__ == '__main__':
pathFile = 'data/airline-passengers.csv'
## 加載數據
data = readCSV(pathFile)
## 正則化數據 數據收縮到[0, 1]
scaler = MinMaxScaler(feature_range=(0, 1))
data = scaler.fit_transform(data)
train(data)
3.LSTM for Regression with Time Steps
## 2019.11.1
# time_step = 3 lstm-cell個數
# n_inputs = 1 輸入大小, 也就是look-back
# n_outputs = 1 輸出大小, 全連接輸出
# iter_epoch = 100 訓練次數
# batch_size = 10 每次訓練的大小
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import tensorflow as tf
import matplotlib.pyplot as plt
import pandas as pd
## 讀取csv數據文件
def readCSV(filePath):
dataset = pd.read_csv(filePath, usecols=[1])
dataset = dataset.values.astype(np.float)
plt.plot(dataset)
plt.show()
return dataset
## 將數據轉換為x, y
## 這個函數將創造一個數據, x為時間t時刻的數據, y為時間t+1時刻的數據
## look_back 指代 我們需要過去多少個時間點的數據作為輸入 預測下一個點的數據
def create_dataset(data, look_back=1):
dataX, dataY = [], []
for i in range(len(data) - look_back - 1):
dataX.append(data[i:(i+look_back), 0])
dataY.append(data[i+look_back, 0])
return np.array(dataX), np.array(dataY).reshape([-1, 1])
def mean_square_error(pred, y):
return np.power(np.sum(np.power(pred - y, 2)) / pred.shape[0], 0.5)
def lstm_net(X, y, batch_size, units=100, learning_rate=0.002):
lstm_cell = tf.contrib.rnn.BasicLSTMCell(units, forget_bias=1.0, state_is_tuple=True)
init_state = lstm_cell.zero_state(batch_size, dtype=tf.float32)
# batch_size, units
outputs, final_state = tf.nn.dynamic_rnn(lstm_cell, X, initial_state=init_state, time_major=False)
outputs = tf.reshape(outputs, [-1, units * X.shape[1]])
result = tf.layers.dense(outputs, units=1, activation=None)
loss = tf.reduce_mean(tf.square(y-result))
train_op = tf.train.AdamOptimizer(learning_rate).minimize(loss)
return result, loss, train_op
def train(data):
## 劃分訓練數據集和測試數據集
tst = 100
x_train, x_test = data[0:tst, :], data[tst:, :]
## 產生數據
trainX, trainY = create_dataset(x_train, 3)
testX, testY = create_dataset(x_test, 3)
print(trainX.shape)
print(trainY.shape)
## reshape input to [samples, time step, features]
trainX = np.reshape(trainX, newshape=[trainX.shape[0], trainX.shape[1], 1])
testX = np.reshape(testX, newshape=[testX.shape[0], testX.shape[1], 1])
time_step = 3
n_inputs = 1
n_outputs = 1
iter_epoch = 100
batch_size = 10
x = tf.placeholder(tf.float32, shape=[None, time_step, n_inputs])
y = tf.placeholder(tf.float32, shape=[None, n_outputs])
result, loss, train_op = lstm_net(x, y, batch_size)
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
re_loss = []
for i in range(iter_epoch):
temp = 0
for j in range(int(trainX.shape[0] / batch_size)):
tx = trainX[(j * batch_size):((j + 1) * batch_size), :, :]
ty = trainY[(j * batch_size):((j + 1) * batch_size), :]
ls, _ = sess.run([loss, train_op], feed_dict={x: tx, y: ty})
temp += ls
print(temp / trainX.shape[0])
re_loss.append(temp / trainX.shape[0])
plt.plot(range(iter_epoch), re_loss, c='red')
plt.show()
re_train = []
for j in range(int(trainX.shape[0] / batch_size)):
tx = trainX[(j * batch_size):((j + 1) * batch_size), :, :]
ty = trainY[(j * batch_size):((j + 1) * batch_size), :]
pred = sess.run([result], feed_dict={x: tx, y: ty})
re_train.append(pred)
re_test = []
for j in range(int(testX.shape[0] / batch_size)):
tx = testX[(j * batch_size):((j + 1) * batch_size), :, :]
ty = testY[(j * batch_size):((j + 1) * batch_size), :]
pred = sess.run([result], feed_dict={x: tx, y: ty})
re_test.append(pred)
re_train = np.array(re_train).reshape([-1, 1])
re_test = np.array(re_test).reshape([-1, 1])
re_train = scaler.inverse_transform(re_train)
re_test = scaler.inverse_transform(re_test)
data = scaler.inverse_transform(data)
## plot 訓練集預測
plt.plot(range(re_train.shape[0]), re_train, label='Train Predict')
## plot 測試集預測
plt.plot(range(tst, tst + re_test.shape[0]), re_test, label='Test Predict')
## plot 實際數據
plt.plot(range(data.shape[0]), data, label='Data')
plt.show()
print('Train Accuary {:.3f}'.format(np.sqrt(mean_square_error(re_train, trainY[0:re_train.shape[0]]))))
print('Test Accuary {:.3f}'.format(np.sqrt(mean_square_error(re_test, testY[0:re_test.shape[0]]))))
v1 = np.array(re_train).reshape([-1, 1])
v2 = trainY[0: v1.shape[0], :]
print(np.hstack([v1, v2]))
if __name__ == '__main__':
pathFile = 'data/airline-passengers.csv'
## 加載數據
data = readCSV(pathFile)
## 正則化數據 數據收縮到[0, 1]
scaler = MinMaxScaler(feature_range=(0, 1))
data = scaler.fit_transform(data)
train(data)
4.多層的LSTM網絡
## 2019.11.1
# time_step = 3 lstm-cell個數
# n_inputs = 1 輸入大小, 也就是look-back
# n_outputs = 1 輸出大小, 全連接輸出
# iter_epoch = 100 訓練次數
# batch_size = 10 每次訓練的大小
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import tensorflow as tf
import matplotlib.pyplot as plt
import pandas as pd
## 讀取csv數據文件
def readCSV(filePath):
dataset = pd.read_csv(filePath, usecols=[1])
dataset = dataset.values.astype(np.float)
plt.plot(dataset)
plt.show()
return dataset
## 將數據轉換為x, y
## 這個函數將創造一個數據, x為時間t時刻的數據, y為時間t+1時刻的數據
## look_back 指代 我們需要過去多少個時間點的數據作為輸入 預測下一個點的數據
def create_dataset(data, look_back=1):
dataX, dataY = [], []
for i in range(len(data) - look_back - 1):
dataX.append(data[i:(i+look_back), 0])
dataY.append(data[i+look_back, 0])
return np.array(dataX), np.array(dataY).reshape([-1, 1])
def mean_square_error(pred, y):
return np.power(np.sum(np.power(pred - y, 2)) / pred.shape[0], 0.5)
def lstm_net(X, y, batch_size, units=100, learning_rate=0.002):
lstm_cells = []
for i in range(2):
lstm_cells.append(tf.contrib.rnn.BasicLSTMCell(units, forget_bias=1.0, state_is_tuple=True))
multi_lstm = tf.contrib.rnn.MultiRNNCell(lstm_cells, state_is_tuple=True)
init_state = multi_lstm.zero_state(batch_size, dtype=tf.float32)
# batch_size, units
outputs, final_state = tf.nn.dynamic_rnn(multi_lstm, X, initial_state=init_state, time_major=False)
outputs = tf.reshape(outputs, [-1, units * X.shape[1]])
result = tf.layers.dense(outputs, units=1, activation=None)
loss = tf.reduce_mean(tf.square(y - result))
train_op = tf.train.AdamOptimizer(learning_rate).minimize(loss)
return result, loss, train_op
def train(data):
## 劃分訓練數據集和測試數據集
tst = 100
x_train, x_test = data[0:tst, :], data[tst:, :]
## 產生數據
trainX, trainY = create_dataset(x_train, 3)
testX, testY = create_dataset(x_test, 3)
print(trainX.shape)
print(trainY.shape)
## reshape input to [samples, time step, features]
trainX = np.reshape(trainX, newshape=[trainX.shape[0], trainX.shape[1], 1])
testX = np.reshape(testX, newshape=[testX.shape[0], testX.shape[1], 1])
time_step = 3
n_inputs = 1
n_outputs = 1
iter_epoch = 100
batch_size = 10
x = tf.placeholder(tf.float32, shape=[None, time_step, n_inputs])
y = tf.placeholder(tf.float32, shape=[None, n_outputs])
result, loss, train_op = lstm_net(x, y, batch_size)
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
re_loss = []
for i in range(iter_epoch):
temp = 0
for j in range(int(trainX.shape[0] / batch_size)):
tx = trainX[(j * batch_size):((j + 1) * batch_size), :, :]
ty = trainY[(j * batch_size):((j + 1) * batch_size), :]
ls, _ = sess.run([loss, train_op], feed_dict={x: tx, y: ty})
temp += ls
print(temp / trainX.shape[0])
re_loss.append(temp / trainX.shape[0])
plt.plot(range(iter_epoch), re_loss, c='red')
plt.show()
re_train = []
for j in range(int(trainX.shape[0] / batch_size)):
tx = trainX[(j * batch_size):((j + 1) * batch_size), :, :]
ty = trainY[(j * batch_size):((j + 1) * batch_size), :]
pred = sess.run([result], feed_dict={x: tx, y: ty})
re_train.append(pred)
re_test = []
for j in range(int(testX.shape[0] / batch_size)):
tx = testX[(j * batch_size):((j + 1) * batch_size), :, :]
ty = testY[(j * batch_size):((j + 1) * batch_size), :]
pred = sess.run([result], feed_dict={x: tx, y: ty})
re_test.append(pred)
re_train = np.array(re_train).reshape([-1, 1])
re_test = np.array(re_test).reshape([-1, 1])
re_train = scaler.inverse_transform(re_train)
re_test = scaler.inverse_transform(re_test)
data = scaler.inverse_transform(data)
## plot 訓練集預測
plt.plot(range(re_train.shape[0]), re_train, label='Train Predict')
## plot 測試集預測
plt.plot(range(tst, tst + re_test.shape[0]), re_test, label='Test Predict')
## plot 實際數據
plt.plot(range(data.shape[0]), data, label='Data')
plt.show()
print('Train Accuary {:.3f}'.format(np.sqrt(mean_square_error(re_train, trainY[0:re_train.shape[0]]))))
print('Test Accuary {:.3f}'.format(np.sqrt(mean_square_error(re_test, testY[0:re_test.shape[0]]))))
v1 = np.array(re_train).reshape([-1, 1])
v2 = trainY[0: v1.shape[0], :]
print(np.hstack([v1, v2]))
if __name__ == '__main__':
pathFile = 'data/airline-passengers.csv'
## 加載數據
data = readCSV(pathFile)
## 正則化數據 數據收縮到[0, 1]
scaler = MinMaxScaler(feature_range=(0, 1))
data = scaler.fit_transform(data)
train(data)
閱讀更多 潛水烏賊 的文章