實驗:使用LSTM遞歸神經網絡進行時間序列預測

6. 使用LSTM遞歸神經網絡進行時間序列預測

任務: 建立循環網絡, 對時間序列的數據進行學習預測

數據集: 1949年1月至1960年12月,即12年,144次數據記錄, 每個月飛機的乘客數量。

數據形式如下:

實驗:使用LSTM遞歸神經網絡進行時間序列預測

一.LSTM迴歸網絡

實驗:使用LSTM遞歸神經網絡進行時間序列預測

## 2019.11.1
# time_step = 1 lstm-cell個數
# n_inputs = 1 輸入大小, 也就是look-back
# n_outputs = 1 輸出大小, 全連接輸出
# iter_epoch = 100 訓練次數
# batch_size = 10 每次訓練的大小
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import tensorflow as tf
import matplotlib.pyplot as plt
import pandas as pd

## 讀取csv數據文件
def readCSV(filePath):
dataset = pd.read_csv(filePath, usecols=[1])
dataset = dataset.values.astype(np.float)
plt.plot(dataset)
plt.show()
return dataset

## 將數據轉換為x, y
## 這個函數將創造一個數據, x為時間t時刻的數據, y為時間t+1時刻的數據
## look_back 指代 我們需要過去多少個時間點的數據作為輸入 預測下一個點的數據
def create_dataset(data, look_back=1):
dataX, dataY = [], []
for i in range(len(data) - look_back - 1):
dataX.append(data[i:(i+look_back), 0])
dataY.append(data[i+look_back, 0])
return np.array(dataX), np.array(dataY).reshape([-1, 1])

def mean_square_error(pred, y):
return np.power(np.sum(np.power(pred - y, 2)) / pred.shape[0], 0.5)

def lstm_net(X, y, batch_size, units=100, learning_rate=0.002):

lstm_cell = tf.contrib.rnn.BasicLSTMCell(units, forget_bias=1.0, state_is_tuple=True)

init_state = lstm_cell.zero_state(batch_size, dtype=tf.float32)


# batch_size, units
outputs, final_state = tf.nn.dynamic_rnn(lstm_cell, X, initial_state=init_state, time_major=False)

result = tf.layers.dense(outputs, units=1, activation=None)

loss = tf.reduce_mean(tf.square(y-result))
train_op = tf.train.AdamOptimizer(learning_rate).minimize(loss)
return result, loss, train_op

def train(data):
## 劃分訓練數據集和測試數據集
tst = 100
x_train, x_test = data[0:tst, :], data[tst:, :]
## 產生數據
trainX, trainY = create_dataset(x_train)
testX, testY = create_dataset(x_test)
print(trainX.shape)
print(trainY.shape)
## reshape input to [samples, time step, features]
trainX = np.reshape(trainX, newshape=[trainX.shape[0], 1, trainX.shape[1]])
testX = np.reshape(testX, newshape=[testX.shape[0], 1, testX.shape[1]])

time_step = 1
n_inputs = 1
n_outputs = 1
iter_epoch = 100
batch_size = 10

x = tf.placeholder(tf.float32, shape=[None, time_step, n_inputs])
y = tf.placeholder(tf.float32, shape=[None, n_outputs])
result, loss, train_op = lstm_net(x, y, batch_size)

with tf.Session() as sess:
sess.run(tf.global_variables_initializer())

re_loss = []
for i in range(iter_epoch):
temp = 0
for j in range(int(trainX.shape[0] / batch_size)):
tx = trainX[(j * batch_size):((j + 1) * batch_size), :, :]
ty = trainY[(j * batch_size):((j + 1) * batch_size), :]
ls, _ = sess.run([loss, train_op], feed_dict={x: tx, y: ty})
temp += ls
print(temp / trainX.shape[0])
re_loss.append(temp / trainX.shape[0])

plt.plot(range(iter_epoch), re_loss, c='red')
plt.show()


re_train = []
for j in range(int(trainX.shape[0] / batch_size)):
tx = trainX[(j * batch_size):((j + 1) * batch_size), :, :]
ty = trainY[(j * batch_size):((j + 1) * batch_size), :]
pred = sess.run([result], feed_dict={x: tx, y: ty})
re_train.append(pred)

re_test = []
for j in range(int(testX.shape[0] / batch_size)):
tx = testX[(j * batch_size):((j + 1) * batch_size), :, :]
ty = testY[(j * batch_size):((j + 1) * batch_size), :]
pred = sess.run([result], feed_dict={x: tx, y: ty})
re_test.append(pred)

re_train = np.array(re_train).reshape([-1, 1])
re_test = np.array(re_test).reshape([-1, 1])

re_train = scaler.inverse_transform(re_train)
re_test = scaler.inverse_transform(re_test)
data = scaler.inverse_transform(data)

## plot 訓練集預測
plt.plot(range(re_train.shape[0]), re_train, label='Train Predict')
## plot 測試集預測
plt.plot(range(tst, tst + re_test.shape[0]), re_test, label='Test Predict')
## plot 實際數據
plt.plot(range(data.shape[0]), data, label='Data')
plt.show()

print('Train Accuary {:.3f}'.format(np.sqrt(mean_square_error(re_train, trainY[0:re_train.shape[0]]))))
print('Test Accuary {:.3f}'.format(np.sqrt(mean_square_error(re_test, testY[0:re_test.shape[0]]))))
v1 = np.array(re_train).reshape([-1, 1])
v2 = trainY[0: v1.shape[0], :]
print(np.hstack([v1, v2]))

if __name__ == '__main__':
pathFile = 'data/airline-passengers.csv'
## 加載數據
data = readCSV(pathFile)
## 正則化數據 數據收縮到[0, 1]
scaler = MinMaxScaler(feature_range=(0, 1))
data = scaler.fit_transform(data)
train(data)
實驗:使用LSTM遞歸神經網絡進行時間序列預測

2.LSTM for Regression Using the Window Method

將look_back = 3, 即是把窗口size設置為3,對於每個lstm-cell, 我們輸入n_inputs = look_back, 我們使用(t-look_back, t-1)間的數據進行預測t時刻

實驗:使用LSTM遞歸神經網絡進行時間序列預測

## 2019.11.1
# time_step = 1 lstm-cell個數
# n_inputs = 3 輸入大小, 也就是look-back
# n_outputs = 1 輸出大小, 全連接輸出
# iter_epoch = 100 訓練次數
# batch_size = 10 每次訓練的大小

import numpy as np
from sklearn.preprocessing import MinMaxScaler
import tensorflow as tf
import matplotlib.pyplot as plt
import pandas as pd


## 讀取csv數據文件
def readCSV(filePath):
dataset = pd.read_csv(filePath, usecols=[1])
dataset = dataset.values.astype(np.float)
plt.plot(dataset)
plt.show()
return dataset


## 將數據轉換為x, y
## 這個函數將創造一個數據, x為時間t時刻的數據, y為時間t+1時刻的數據
## look_back 指代 我們需要過去多少個時間點的數據作為輸入 預測下一個點的數據
def create_dataset(data, look_back=1):
dataX, dataY = [], []
for i in range(len(data) - look_back - 1):
dataX.append(data[i:(i+look_back), 0])
dataY.append(data[i+look_back, 0])
return np.array(dataX), np.array(dataY).reshape([-1, 1])


def mean_square_error(pred, y):
return np.power(np.sum(np.power(pred - y, 2)) / pred.shape[0], 0.5)


def lstm_net(X, y, batch_size, units=100, learning_rate=0.002):


lstm_cell = tf.contrib.rnn.BasicLSTMCell(units, forget_bias=1.0, state_is_tuple=True)

init_state = lstm_cell.zero_state(batch_size, dtype=tf.float32)

# batch_size, units
outputs, final_state = tf.nn.dynamic_rnn(lstm_cell, X, initial_state=init_state, time_major=False)

result = tf.layers.dense(outputs, units=1, activation=None)

loss = tf.reduce_mean(tf.square(y-result))
train_op = tf.train.AdamOptimizer(learning_rate).minimize(loss)
return result, loss, train_op


def train(data):
## 劃分訓練數據集和測試數據集
tst = 100
x_train, x_test = data[0:tst, :], data[tst:, :]


## 產生數據
trainX, trainY = create_dataset(x_train, 3)
testX, testY = create_dataset(x_test, 3)
print(trainX.shape)
print(trainY.shape)
## reshape input to [samples, time step, features]
trainX = np.reshape(trainX, newshape=[trainX.shape[0], 1, trainX.shape[1]])
testX = np.reshape(testX, newshape=[testX.shape[0], 1, testX.shape[1]])


time_step = 1
n_inputs = 3
n_outputs = 1
iter_epoch = 100
batch_size = 10


x = tf.placeholder(tf.float32, shape=[None, time_step, n_inputs])
y = tf.placeholder(tf.float32, shape=[None, n_outputs])
result, loss, train_op = lstm_net(x, y, batch_size)


with tf.Session() as sess:
sess.run(tf.global_variables_initializer())


re_loss = []
for i in range(iter_epoch):
temp = 0
for j in range(int(trainX.shape[0] / batch_size)):
tx = trainX[(j * batch_size):((j + 1) * batch_size), :, :]
ty = trainY[(j * batch_size):((j + 1) * batch_size), :]
ls, _ = sess.run([loss, train_op], feed_dict={x: tx, y: ty})
temp += ls
print(temp / trainX.shape[0])
re_loss.append(temp / trainX.shape[0])


plt.plot(range(iter_epoch), re_loss, c='red')
plt.show()


re_train = []
for j in range(int(trainX.shape[0] / batch_size)):
tx = trainX[(j * batch_size):((j + 1) * batch_size), :, :]
ty = trainY[(j * batch_size):((j + 1) * batch_size), :]
pred = sess.run([result], feed_dict={x: tx, y: ty})
re_train.append(pred)


re_test = []
for j in range(int(testX.shape[0] / batch_size)):
tx = testX[(j * batch_size):((j + 1) * batch_size), :, :]
ty = testY[(j * batch_size):((j + 1) * batch_size), :]
pred = sess.run([result], feed_dict={x: tx, y: ty})
re_test.append(pred)


re_train = np.array(re_train).reshape([-1, 1])
re_test = np.array(re_test).reshape([-1, 1])


re_train = scaler.inverse_transform(re_train)
re_test = scaler.inverse_transform(re_test)
data = scaler.inverse_transform(data)


## plot 訓練集預測
plt.plot(range(re_train.shape[0]), re_train, label='Train Predict')
## plot 測試集預測
plt.plot(range(tst, tst + re_test.shape[0]), re_test, label='Test Predict')
## plot 實際數據
plt.plot(range(data.shape[0]), data, label='Data')
plt.show()


print('Train Accuary {:.3f}'.format(np.sqrt(mean_square_error(re_train, trainY[0:re_train.shape[0]]))))
print('Test Accuary {:.3f}'.format(np.sqrt(mean_square_error(re_test, testY[0:re_test.shape[0]]))))
v1 = np.array(re_train).reshape([-1, 1])
v2 = trainY[0: v1.shape[0], :]
print(np.hstack([v1, v2]))


if __name__ == '__main__':
pathFile = 'data/airline-passengers.csv'
## 加載數據
data = readCSV(pathFile)

## 正則化數據 數據收縮到[0, 1]
scaler = MinMaxScaler(feature_range=(0, 1))
data = scaler.fit_transform(data)
train(data)
實驗:使用LSTM遞歸神經網絡進行時間序列預測

3.LSTM for Regression with Time Steps

實驗:使用LSTM遞歸神經網絡進行時間序列預測

## 2019.11.1
# time_step = 3 lstm-cell個數
# n_inputs = 1 輸入大小, 也就是look-back
# n_outputs = 1 輸出大小, 全連接輸出
# iter_epoch = 100 訓練次數
# batch_size = 10 每次訓練的大小
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import tensorflow as tf
import matplotlib.pyplot as plt
import pandas as pd

## 讀取csv數據文件
def readCSV(filePath):
dataset = pd.read_csv(filePath, usecols=[1])
dataset = dataset.values.astype(np.float)
plt.plot(dataset)
plt.show()
return dataset

## 將數據轉換為x, y
## 這個函數將創造一個數據, x為時間t時刻的數據, y為時間t+1時刻的數據
## look_back 指代 我們需要過去多少個時間點的數據作為輸入 預測下一個點的數據
def create_dataset(data, look_back=1):
dataX, dataY = [], []
for i in range(len(data) - look_back - 1):
dataX.append(data[i:(i+look_back), 0])
dataY.append(data[i+look_back, 0])
return np.array(dataX), np.array(dataY).reshape([-1, 1])

def mean_square_error(pred, y):
return np.power(np.sum(np.power(pred - y, 2)) / pred.shape[0], 0.5)

def lstm_net(X, y, batch_size, units=100, learning_rate=0.002):

lstm_cell = tf.contrib.rnn.BasicLSTMCell(units, forget_bias=1.0, state_is_tuple=True)

init_state = lstm_cell.zero_state(batch_size, dtype=tf.float32)


# batch_size, units
outputs, final_state = tf.nn.dynamic_rnn(lstm_cell, X, initial_state=init_state, time_major=False)

outputs = tf.reshape(outputs, [-1, units * X.shape[1]])
result = tf.layers.dense(outputs, units=1, activation=None)

loss = tf.reduce_mean(tf.square(y-result))
train_op = tf.train.AdamOptimizer(learning_rate).minimize(loss)
return result, loss, train_op

def train(data):
## 劃分訓練數據集和測試數據集
tst = 100
x_train, x_test = data[0:tst, :], data[tst:, :]

## 產生數據
trainX, trainY = create_dataset(x_train, 3)
testX, testY = create_dataset(x_test, 3)
print(trainX.shape)
print(trainY.shape)
## reshape input to [samples, time step, features]
trainX = np.reshape(trainX, newshape=[trainX.shape[0], trainX.shape[1], 1])
testX = np.reshape(testX, newshape=[testX.shape[0], testX.shape[1], 1])

time_step = 3
n_inputs = 1
n_outputs = 1
iter_epoch = 100
batch_size = 10

x = tf.placeholder(tf.float32, shape=[None, time_step, n_inputs])
y = tf.placeholder(tf.float32, shape=[None, n_outputs])
result, loss, train_op = lstm_net(x, y, batch_size)

with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
re_loss = []
for i in range(iter_epoch):
temp = 0
for j in range(int(trainX.shape[0] / batch_size)):
tx = trainX[(j * batch_size):((j + 1) * batch_size), :, :]
ty = trainY[(j * batch_size):((j + 1) * batch_size), :]
ls, _ = sess.run([loss, train_op], feed_dict={x: tx, y: ty})
temp += ls
print(temp / trainX.shape[0])
re_loss.append(temp / trainX.shape[0])

plt.plot(range(iter_epoch), re_loss, c='red')

plt.show()

re_train = []
for j in range(int(trainX.shape[0] / batch_size)):
tx = trainX[(j * batch_size):((j + 1) * batch_size), :, :]
ty = trainY[(j * batch_size):((j + 1) * batch_size), :]
pred = sess.run([result], feed_dict={x: tx, y: ty})
re_train.append(pred)

re_test = []
for j in range(int(testX.shape[0] / batch_size)):
tx = testX[(j * batch_size):((j + 1) * batch_size), :, :]
ty = testY[(j * batch_size):((j + 1) * batch_size), :]
pred = sess.run([result], feed_dict={x: tx, y: ty})
re_test.append(pred)

re_train = np.array(re_train).reshape([-1, 1])
re_test = np.array(re_test).reshape([-1, 1])

re_train = scaler.inverse_transform(re_train)
re_test = scaler.inverse_transform(re_test)
data = scaler.inverse_transform(data)

## plot 訓練集預測
plt.plot(range(re_train.shape[0]), re_train, label='Train Predict')
## plot 測試集預測
plt.plot(range(tst, tst + re_test.shape[0]), re_test, label='Test Predict')
## plot 實際數據
plt.plot(range(data.shape[0]), data, label='Data')
plt.show()

print('Train Accuary {:.3f}'.format(np.sqrt(mean_square_error(re_train, trainY[0:re_train.shape[0]]))))
print('Test Accuary {:.3f}'.format(np.sqrt(mean_square_error(re_test, testY[0:re_test.shape[0]]))))
v1 = np.array(re_train).reshape([-1, 1])
v2 = trainY[0: v1.shape[0], :]
print(np.hstack([v1, v2]))

if __name__ == '__main__':
pathFile = 'data/airline-passengers.csv'
## 加載數據
data = readCSV(pathFile)
## 正則化數據 數據收縮到[0, 1]
scaler = MinMaxScaler(feature_range=(0, 1))
data = scaler.fit_transform(data)
train(data)
實驗:使用LSTM遞歸神經網絡進行時間序列預測

4.多層的LSTM網絡

實驗:使用LSTM遞歸神經網絡進行時間序列預測

## 2019.11.1
# time_step = 3 lstm-cell個數
# n_inputs = 1 輸入大小, 也就是look-back
# n_outputs = 1 輸出大小, 全連接輸出
# iter_epoch = 100 訓練次數
# batch_size = 10 每次訓練的大小
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import tensorflow as tf
import matplotlib.pyplot as plt

import pandas as pd


## 讀取csv數據文件
def readCSV(filePath):
dataset = pd.read_csv(filePath, usecols=[1])
dataset = dataset.values.astype(np.float)
plt.plot(dataset)
plt.show()
return dataset


## 將數據轉換為x, y
## 這個函數將創造一個數據, x為時間t時刻的數據, y為時間t+1時刻的數據
## look_back 指代 我們需要過去多少個時間點的數據作為輸入 預測下一個點的數據
def create_dataset(data, look_back=1):
dataX, dataY = [], []
for i in range(len(data) - look_back - 1):
dataX.append(data[i:(i+look_back), 0])
dataY.append(data[i+look_back, 0])
return np.array(dataX), np.array(dataY).reshape([-1, 1])


def mean_square_error(pred, y):
return np.power(np.sum(np.power(pred - y, 2)) / pred.shape[0], 0.5)


def lstm_net(X, y, batch_size, units=100, learning_rate=0.002):
lstm_cells = []
for i in range(2):
lstm_cells.append(tf.contrib.rnn.BasicLSTMCell(units, forget_bias=1.0, state_is_tuple=True))

multi_lstm = tf.contrib.rnn.MultiRNNCell(lstm_cells, state_is_tuple=True)

init_state = multi_lstm.zero_state(batch_size, dtype=tf.float32)

# batch_size, units
outputs, final_state = tf.nn.dynamic_rnn(multi_lstm, X, initial_state=init_state, time_major=False)

outputs = tf.reshape(outputs, [-1, units * X.shape[1]])
result = tf.layers.dense(outputs, units=1, activation=None)

loss = tf.reduce_mean(tf.square(y - result))

train_op = tf.train.AdamOptimizer(learning_rate).minimize(loss)


return result, loss, train_op


def train(data):
## 劃分訓練數據集和測試數據集
tst = 100
x_train, x_test = data[0:tst, :], data[tst:, :]


## 產生數據
trainX, trainY = create_dataset(x_train, 3)
testX, testY = create_dataset(x_test, 3)
print(trainX.shape)
print(trainY.shape)
## reshape input to [samples, time step, features]
trainX = np.reshape(trainX, newshape=[trainX.shape[0], trainX.shape[1], 1])
testX = np.reshape(testX, newshape=[testX.shape[0], testX.shape[1], 1])


time_step = 3
n_inputs = 1
n_outputs = 1
iter_epoch = 100
batch_size = 10


x = tf.placeholder(tf.float32, shape=[None, time_step, n_inputs])
y = tf.placeholder(tf.float32, shape=[None, n_outputs])
result, loss, train_op = lstm_net(x, y, batch_size)


with tf.Session() as sess:
sess.run(tf.global_variables_initializer())


re_loss = []
for i in range(iter_epoch):
temp = 0
for j in range(int(trainX.shape[0] / batch_size)):
tx = trainX[(j * batch_size):((j + 1) * batch_size), :, :]
ty = trainY[(j * batch_size):((j + 1) * batch_size), :]
ls, _ = sess.run([loss, train_op], feed_dict={x: tx, y: ty})
temp += ls
print(temp / trainX.shape[0])
re_loss.append(temp / trainX.shape[0])


plt.plot(range(iter_epoch), re_loss, c='red')
plt.show()


re_train = []
for j in range(int(trainX.shape[0] / batch_size)):
tx = trainX[(j * batch_size):((j + 1) * batch_size), :, :]
ty = trainY[(j * batch_size):((j + 1) * batch_size), :]
pred = sess.run([result], feed_dict={x: tx, y: ty})
re_train.append(pred)


re_test = []
for j in range(int(testX.shape[0] / batch_size)):
tx = testX[(j * batch_size):((j + 1) * batch_size), :, :]
ty = testY[(j * batch_size):((j + 1) * batch_size), :]
pred = sess.run([result], feed_dict={x: tx, y: ty})
re_test.append(pred)


re_train = np.array(re_train).reshape([-1, 1])
re_test = np.array(re_test).reshape([-1, 1])


re_train = scaler.inverse_transform(re_train)
re_test = scaler.inverse_transform(re_test)
data = scaler.inverse_transform(data)


## plot 訓練集預測
plt.plot(range(re_train.shape[0]), re_train, label='Train Predict')
## plot 測試集預測
plt.plot(range(tst, tst + re_test.shape[0]), re_test, label='Test Predict')
## plot 實際數據
plt.plot(range(data.shape[0]), data, label='Data')
plt.show()


print('Train Accuary {:.3f}'.format(np.sqrt(mean_square_error(re_train, trainY[0:re_train.shape[0]]))))
print('Test Accuary {:.3f}'.format(np.sqrt(mean_square_error(re_test, testY[0:re_test.shape[0]]))))
v1 = np.array(re_train).reshape([-1, 1])
v2 = trainY[0: v1.shape[0], :]
print(np.hstack([v1, v2]))


if __name__ == '__main__':
pathFile = 'data/airline-passengers.csv'
## 加載數據

data = readCSV(pathFile)
## 正則化數據 數據收縮到[0, 1]
scaler = MinMaxScaler(feature_range=(0, 1))
data = scaler.fit_transform(data)
train(data)
實驗:使用LSTM遞歸神經網絡進行時間序列預測


分享到:


相關文章: