一、任务背景
本次实战我们使用来自Kaggle的特斯拉股价数据《TESLA Stock Data》,这个数据集收集了从2010年6月29日到2022年3月24日之间的共2956条特斯拉股价数据,字段如下图所示。我们将使用调整后的收盘价“adj close”作为本次本次实践的时间序列数据集,并基于前面2656条数据构建训练集,基于后面300条数据构建测试集。然后,我们将训练一个时序预测领域的经典模型——LSTM,来预测测试集中的股价走势并与真实股价走势进行对比。
二、python建模
1、数据读取
首先,我们读取数据集并打印部分数据样例看一下。为了便于后期数据探索,我们将Date列转为datetime类型并设置为索引列。
import pandas as pdpath = '/kaggle/input/tesla-stock-data-updated-till-28jun2021/TSLA.csv'
df = pd.read_csv(path)
df['Date'] = pd.to_datetime(df['Date'])
df.set_index('Date', inplace=True)
print('数据量:', len(df))
df.head()
根据任务要求,我们的目标数据是Adj Close,因此单独获取该列数据作为我们的数据集,并可视化看看。
import matplotlib.pyplot as plt
import matplotlib.dates as mdatesadjClose = df['Adj Close'].tolist()
plt.figure(figsize=(20, 6))
plt.plot(df.index, adjClose)
# 设置x轴仅显示每年1月1日的刻度
ax = plt.gca()
ax.xaxis.set_major_locator(mdates.YearLocator())
ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
plt.xlabel('Date')
plt.xticks(rotation=-80)
plt.ylabel('Price($)')
plt.show()
可以看到,老马的特斯拉股价在2020年以后开始暴涨。
2、数据集构建
这里,我们自定义一个数据集构建函数。具体来说,我们以长度为seq_length的滑动窗口对数据进行滚动遍历,每seq_length条股价数据为训练数据x,而其后的那一条股价数据则为标签y。完成数据划分之后,我们直接将数据转为torch.tensor并构建DataLoader,测试数据暂时不构建DataLoader。
import torch
from torch.utils.data import DataLoader, TensorDataset# 数据集构建函数
def create_sequences(data, seq_length):xs = []ys = []for i in range(len(data) - seq_length):x = data[i:i+seq_length]y = data[i+seq_length]xs.append(x)ys.append(y)return np.array(xs), np.array(ys)seq_length = 10
X_train, y_train = create_sequences(adjClose[:-300], seq_length)
X_test, y_test = create_sequences(adjClose[-300:], seq_length)# 转换为PyTorch张量
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)# 创建数据加载器
train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
3、模型构建
下面,我们构建一个LSTM模型,使用2层单向的LSTM,后面接一个全连接层用于回归预测。
import torch.nn as nnclass StockPriceLSTM(nn.Module):def __init__(self, input_size, hidden_size, num_layers, output_size):super(StockPriceLSTM, self).__init__()self.hidden_size = hidden_sizeself.num_layers = num_layersself.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)self.fc = nn.Linear(hidden_size, output_size)def forward(self, x):h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)out, _ = self.lstm(x, (h0, c0))out = self.fc(out[:, -1, :])return outinput_size = 1 # 每个时间步的输入特征数
hidden_size = 128
num_layers = 2
output_size = 1model = StockPriceLSTM(input_size, hidden_size, num_layers, output_size)
4、模型训练
接着,我们构建模型训练的代码,迭代训练模型500个Epoch。
import torch.optim as optim
from tqdm import tqdm# 定义损失函数和优化器
device = torch.device('cuda') if torch.cuda.is_available() else 'cpu'
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
model.to(device)# 训练模型
num_epochs = 500
model.train()
for epoch in tqdm(range(num_epochs)):total_loss = 0for inputs, targets in train_loader:inputs = inputs.unsqueeze(-1) # 添加一个维度以匹配LSTM输入targets = targets.unsqueeze(-1)inputs = inputs.to(device)targets = targets.to(device)outputs = model(inputs)loss = criterion(outputs, targets)total_loss += loss.item()optimizer.zero_grad()loss.backward()optimizer.step()if (epoch+1) % 100 == 0:print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss/len(train_loader):.4f}')
可以看到,损失在逐步下降,模型训练是有效的。
5、模型预测
最后,我们结合测试数据,来看看训练的LSTM预测效果。要注意的是,我们这里的模型预测相当于单步预测,因为是每次给定真实的前10条股价数据,模型来预测当前的一条股价数据。如果要采用滚动的多步预测,则需要在每次预测之后将模型的预测结果加入到后续的特征x中,迭代一定的预测轮次之后,模型的预测输入x将全部基于其历史预测结果而非真实股价。
# 预测股价
model.eval()
predictions = []
with torch.no_grad():for inputs in X_test:inputs = inputs.reshape(1, -1).unsqueeze(-1) # 调整维度以匹配LSTM输入inputs = inputs.to(device)outputs = model(inputs)predictions.append(outputs.squeeze(-1).cpu().numpy().tolist()[0])plt.figure(figsize=(15, 6))
# 绘制训练数据
plt.plot(df.iloc[:-300].index, df.iloc[:-300]['Adj Close'], label='Training Data')
# 绘制实际股价
plt.plot(df.iloc[-290:].index, y_test, label='Actual Price', color='skyblue')
# 绘制预测股价,需要注意这里实际预测的结果为最后290条,因为测试数据的前10条并没有被模型预测到
plt.plot(df.iloc[-290:].index, predictions, label='Predicted Price', color='red', linestyle='--')
plt.xlabel('Date')
plt.ylabel('Price($)')
plt.legend()
plt.show()
我们再单独plot一下最后预测的数据跟实际数据的图像。可以看到,模型预测的结果在前半部分较为接近真实股价,趋势基本一致,而后半部分的效果欠佳,说明还有很大的优化空间。
plt.figure(figsize=(15, 6))
plt.plot(df.iloc[-290:].index, y_test, label='Actual Price', color='skyblue')
# 绘制预测股价,需要注意这里实际预测的结果为最后290条,因为测试数据的前10条并没有被模型预测到
plt.plot(df.iloc[-290:].index, predictions, label='Predicted Price', color='red', linestyle='--')
plt.xlabel('Date')
plt.ylabel('Price($)')
plt.legend()
plt.show()
三、完整代码
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
import torch.optim as optim
from tqdm import tqdm
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import pandas as pdpath = '/kaggle/input/tesla-stock-data-updated-till-28jun2021/TSLA.csv'
df = pd.read_csv(path)
df['Date'] = pd.to_datetime(df['Date'])
df.set_index('Date', inplace=True)
print('数据量:', len(df))
print(df.head())adjClose = df['Adj Close'].tolist()
plt.figure(figsize=(20, 6))
plt.plot(df.index, adjClose)
# 设置x轴仅显示每年1月1日的刻度
ax = plt.gca()
ax.xaxis.set_major_locator(mdates.YearLocator())
ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
plt.xlabel('Date')
plt.xticks(rotation=-80)
plt.ylabel('Price($)')
plt.show()# 数据集构建函数
def create_sequences(data, seq_length):xs = []ys = []for i in range(len(data) - seq_length):x = data[i:i+seq_length]y = data[i+seq_length]xs.append(x)ys.append(y)return np.array(xs), np.array(ys)seq_length = 10
X_train, y_train = create_sequences(adjClose[:-300], seq_length)
X_test, y_test = create_sequences(adjClose[-300:], seq_length)# 转换为PyTorch张量
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)# 创建数据加载器
train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)class StockPriceLSTM(nn.Module):def __init__(self, input_size, hidden_size, num_layers, output_size):super(StockPriceLSTM, self).__init__()self.hidden_size = hidden_sizeself.num_layers = num_layersself.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)self.fc = nn.Linear(hidden_size, output_size)def forward(self, x):h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)out, _ = self.lstm(x, (h0, c0))out = self.fc(out[:, -1, :])return outinput_size = 1 # 每个时间步的输入特征数
hidden_size = 128
num_layers = 2
output_size = 1model = StockPriceLSTM(input_size, hidden_size, num_layers, output_size)# 定义损失函数和优化器
device = torch.device('cuda') if torch.cuda.is_available() else 'cpu'
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
model.to(device)# 训练模型
num_epochs = 500
model.train()
for epoch in tqdm(range(num_epochs)):total_loss = 0for inputs, targets in train_loader:inputs = inputs.unsqueeze(-1) # 添加一个维度以匹配LSTM输入targets = targets.unsqueeze(-1)inputs = inputs.to(device)targets = targets.to(device)outputs = model(inputs)loss = criterion(outputs, targets)total_loss += loss.item()optimizer.zero_grad()loss.backward()optimizer.step()if (epoch+1) % 100 == 0:print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss/len(train_loader):.4f}')# 预测股价
model.eval()
predictions = []
with torch.no_grad():for inputs in X_test:inputs = inputs.reshape(1, -1).unsqueeze(-1) # 调整维度以匹配LSTM输入inputs = inputs.to(device)outputs = model(inputs)predictions.append(outputs.squeeze(-1).cpu().numpy().tolist()[0])plt.figure(figsize=(15, 6))
# 绘制训练数据
plt.plot(df.iloc[:-300].index, df.iloc[:-300]['Adj Close'], label='Training Data')
# 绘制实际股价
plt.plot(df.iloc[-290:].index, y_test, label='Actual Price', color='skyblue')
# 绘制预测股价,需要注意这里实际预测的结果为最后290条,因为测试数据的前10条并没有被模型预测到
plt.plot(df.iloc[-290:].index, predictions, label='Predicted Price', color='red', linestyle='--')
plt.xlabel('Date')
plt.ylabel('Price($)')
plt.legend()
plt.show()