  • I. 前言
  • II. 数据集说明
  • III. 模型
    • 3.1 GCN
    • 3.2 GraphSAGE
    • 3.3 GAT
  • IV. 训练与测试
  • V. 实验结果

pip install torch_scatter torch_sparse torch_cluster torch_spline_conv torch-geometric -f https://data.pyg.org/whl/torch-1.10.0+cu113.html




II. 数据集说明


其中,PEMS04是由307个探测器(节点数)每隔5分钟采集一次数据,共采集59天产生的交通流量数据;PEMS08是由170个探测器每隔5分钟采集一次,共采集62天产生的数据。每个探测器每次采集的数据包含三个维度的特征,分别为:流量、平均速度和平均占有率。因此,数据集的格式应该为一个矩阵,大小为num * num_nodes * 3,其中PSMS04的num=59*24*12=16992num_nodes=307,而PEMS08的num=62*24*12=17856num_nodes=170。PEMS03和PEMS07两个数据集中只包含流量这一个变量,二者的的大小分别为26208*358*1进和28224*883*1



def nn_seq(args):seq_len = args.seq_lenbatch_size, pred_len = args.batch_size, args.pred_lenroot_path = os.path.abspath(os.path.dirname(os.getcwd()))file_name = args.file_namedata_path = root_path + "/data/" + file_name + "/"npz = np.load(data_path + file_name + ".npz")data = npz['data']  # lens num_nodes, in_featsprint(data.shape)# data = data[:2000]# 3表示:车流量、平均车速、平均车道占用率num_nodes = data.shape[1]# 加载邻接矩阵adj_data = pd.read_csv(data_path + file_name + ".csv")adj_data = adj_data[["from", "to"]].values.tolist()# 找出最大最小值all_elements = [element for row in adj_data for element in row]all_elements = list(set(all_elements))all_elements.sort()print(len(all_elements) == num_nodes)node_dict = dict(zip(all_elements, [x for x in range(num_nodes)]))# print(max_val, min_val)adj = torch.zeros((num_nodes, num_nodes))for src, dst in adj_data:src = node_dict[src]dst = node_dict[dst]adj[src, dst] = adj[dst, src] = 1## splittrain = data[:int(len(data) * 0.6)]val = data[int(len(data) * 0.6):int(len(data) * 0.8)]test = data[int(len(data) * 0.8):]# 归一化 要求在站点内部,对按照时间列进行归一化scalers = []for idx in range(num_nodes):cur_train = train[:, idx, :]cur_val = val[:, idx, :]cur_test = test[:, idx, :]scaler = MinMaxScaler()train[:, idx, :] = scaler.fit_transform(cur_train)val[:, idx, :] = scaler.transform(cur_val)test[:, idx, :] = scaler.transform(cur_test)scalers.append(scaler)def process(dataset, step_size, shuffle):# dataset: num num_nodes dimseq = []for i in tqdm(range(0, len(dataset) - seq_len - pred_len + 1, step_size)):x = dataset[i:i + seq_len]y = dataset[i + seq_len:i + seq_len + pred_len]# tensorx = torch.FloatTensor(x)y = torch.FloatTensor(y)seq.append((x, y))seq = MyDataset(seq)seq = DataLoader(dataset=seq, batch_size=batch_size, shuffle=shuffle, num_workers=0, drop_last=False)return seqDtr = process(train, step_size=1, shuffle=True)Val = process(val, step_size=1, shuffle=True)Dte = process(test, step_size=pred_len, shuffle=False)return Dtr, Val, Dte, adj, scalers


III. 模型


图卷积网络(Graph Convolutional Network,GCN)是最早提出的图神经网络之一,GCN通过在图的邻域内进行信息聚合来学习节点的低维表示。具体来说,GCN利用了拉普拉斯矩阵的特征值分解,通过图傅里叶变换将卷积操作转换为频域上的滤波操作。GCN的核心公式为:
h ( l + 1 ) = σ ( D ~ − 1 2 A ~ D ~ − 1 2 h ( l ) W ( l ) ) h^{(l+1)} = \sigma(\tilde{D}^{-\frac{1}{2}} \tilde{A} \tilde{D}^{-\frac{1}{2}} h^{(l)} W^{(l)}) h(l+1)=σ(D~21A~D~21h(l)W(l))

其中, A ~ \tilde{A} A~是带有自环的邻接矩阵, D ~ \tilde{D} D~是对应的度矩阵, h ( l ) h^{(l)} h(l) 是第 l l l层的隐藏状态, W ( l ) W^{(l)} W(l)是权重矩阵, σ \sigma σ是激活函数。

GraphSAGE是由Hamilton等人在2017年提出的,旨在解决大规模图上的节点表示学习问题。GraphSAGE通过采样节点的邻居,并在局部邻域内进行信息聚合,从而生成节点表示。GraphSAGE支持多种聚合方法,包括 Mean Aggregator、LSTM Aggregator 和 Max Pooling Aggregator。GraphSAGE的核心公式为:
h i ( k + 1 ) = σ ( W f ( h i ( k ) , { h j ( k ) ∣ j ∈ N ( i ) } ) ) h_i^{(k+1)} = \sigma(W f(h_i^{(k)}, \{h_j^{(k)} | j \in \mathcal{N}(i)\})) hi(k+1)=σ(Wf(hi(k),{hj(k)jN(i)}))

其中 h i ( k ) h_i^{(k)} hi(k)是第 k k k层节点 i i i的隐藏状态, N ( i ) \mathcal{N}(i) N(i)是节点 i i i的邻居集合, f f f是聚合函数。GraphSAGE通过多层聚合操作,能够有效地捕捉节点的局部结构信息。

图注意力网络GAT(Graph Attention Networks)通过整合注意力机制实现了对图中不同邻居节点的动态加权。其主要创新之处在于,为每个邻接节点分配一个注意力得分,从而使模型可以聚焦于那些更为重要的邻近节点。GAT的核心公式为:
h i ( l + 1 ) = σ ( ∑ j ∈ N ( i ) α i j W h j ( l ) ) h_i^{(l+1)} = \sigma \left( \sum_{j \in \mathcal{N}(i)} \alpha_{ij} W h_j^{(l)} \right) hi(l+1)=σ jN(i)αijWhj(l)

其中 α i j \alpha_{ij} αij是节点 i i i j j j之间的注意力分数, W W W是权重矩阵。GAT通过自注意力机制,能够更好地捕捉节点之间的关系。


在进行模型讲解之前,先规定一下模型的输入和输出维度。在本文中,模型的输入尺寸为:batch_size * seq_len * num_nodes * in_feats,表示每个站点的多变量历史数据,输出为batch_size * pred_len * num_nodes * in_feats,表示多个站点未来的多变量数据。

3.1 GCN


class GraphConvolution(Module):"""Simple GCN layer, similar to https://arxiv.org/abs/1609.02907"""def __init__(self, in_features, out_features, bias=True):super(GraphConvolution, self).__init__()self.in_features = in_featuresself.out_features = out_featuresself.weight = Parameter(torch.FloatTensor(in_features, out_features))if bias:self.bias = Parameter(torch.FloatTensor(out_features))else:self.register_parameter('bias', None)self.reset_parameters()def reset_parameters(self):stdv = 1. / math.sqrt(self.weight.size(1))self.weight.data.uniform_(-stdv, stdv)if self.bias is not None:self.bias.data.uniform_(-stdv, stdv)def forward(self, input, adj):support = torch.mm(input, self.weightoutput = torch.spmm(adj, support)if self.bias is not None:return output + self.biaselse:return outputdef __repr__(self):return self.__class__.__name__ + ' (' \+ str(self.in_features) + ' -> ' \+ str(self.out_features) + ')'

可以看到,GCN的本质就是将归一化后的邻接矩阵和节点特征矩阵执行矩阵乘法,即(num_nodes, num_nodes) * (num_nodes, feats) -> (num_nodes, feats)

因此,对于大小为batch_size * seq_len * num_nodes * in_feats的输入,可以直接对后两个维度进行计算。代码如下:

class GCNConv(nn.Module):def __init__(self, in_features, out_features, bias=True):super(GCNConv, self).__init__()self.in_features = in_featuresself.out_features = out_featuresself.weight = Parameter(torch.FloatTensor(in_features, out_features))if bias:self.bias = Parameter(torch.FloatTensor(out_features))else:self.register_parameter('bias', None)self.reset_parameters()def reset_parameters(self):stdv = 1. / math.sqrt(self.weight.size(1))self.weight.data.uniform_(-stdv, stdv)if self.bias is not None:self.bias.data.uniform_(-stdv, stdv)def forward(self, x, adj):support = torch.matmul(x, self.weight)# 输入的数据是x = b s n d, adj = n * noutput = torch.einsum("tn,bsnd->bstd", adj, support)   # bsndif self.bias is not None:output + self.biasreturn output

具体来讲,首先将batch_size * seq_len * num_nodes * in_feats利用self.weight变成batch_size * seq_len * num_nodes * out_feats,然后再与归一化后的邻接矩阵相乘,这里用到了torch.einsum()函数来指定参与计算的维度。


class GCN(torch.nn.Module):def __init__(self, args):super(GCN, self).__init__()self.args = argsself.conv1 = GCNConv(args.in_feats, args.h_feats)self.conv2 = GCNConv(args.h_feats, args.out_feats)self.dropout = 0.5self.fcs = nn.ModuleList()for _ in range(args.in_feats):self.fcs.append(nn.Sequential(nn.Linear(args.seq_len * args.out_feats, args.out_feats),nn.ReLU(),nn.Linear(args.out_feats, args.pred_len)))def forward(self, x, adj):# bsndx = F.dropout(x, self.dropout, training=self.training)x = F.elu(self.conv1(x, adj))x = self.conv2(x, adj)# b s n d  --> b s n 3x = x.permute(0, 2, 1, 3)  # bnsdx = torch.flatten(x, start_dim=2)  # bn s*dpred = []for idx in range(self.args.in_feats):sub_pred = self.fcs[idx](x)   # b n pred_lenpred.append(sub_pred)  # b pred_len 3pred = torch.stack(pred, dim=-1)  # b n pred_len 3# 变成和y一样的维度,即b pred_len num_node 3pred = pred.permute(0, 2, 1, 3)return pred

该模型由2个GCN层和一个预测层组成。输入batch_size * seq_len * num_nodes * in_feats(以下简称bsni)经过两层GCN变成bsno。接着,为了预测所有站点的多个变量,采用多任务学习中的思路,每个变量使用一个线性层进行预测。

预测时,首先将bsno的进行维度交换变成bnso,与LSTM等模型类似,可以将所有时刻的隐状态展开变成一个bn(s*d),然后使用多个线性层得到多个bn(pred_len),然后将多个变量的预测值拼接变成bn(pred_len)(in_feats)。最后,为了与真实值的batch_size * pred_len * num_nodes * in_feats相匹配,需要交换1和2两个维度。


def normalize_adj(adj):"""归一化邻接矩阵,适用于图卷积网络(GCN)。:param adj: 原始邻接矩阵,形状为 (N, N):return: 归一化后的邻接矩阵,形状为 (N, N)"""# 添加自环adj = adj + torch.eye(adj.size(0))# 计算度矩阵 Ddegree = adj.sum(1)# 计算 D 的逆平方根d_inv_sqrt = torch.pow(degree, -0.5)d_inv_sqrt[torch.isinf(d_inv_sqrt)] = 0  # 防止出现无穷大# 构建 D 的逆平方根矩阵d_mat_inv_sqrt = torch.diag(d_inv_sqrt)# 归一化邻接矩阵adj_normalized = d_mat_inv_sqrt @ adj @ d_mat_inv_sqrtreturn adj_normalized

3.2 GraphSAGE


class SAGEConv(nn.Module):def __init__(self, in_features, out_features):super(SAGEConv, self).__init__()self.in_features = in_featuresself.out_features = out_featuresself.proj = nn.Linear(in_features, out_features)self.out_proj = nn.Linear(2 * out_features, out_features)def forward(self, x, adj):# 假设有多个站点support = self.proj(x)# 输入的数据是x = b s n d, adj = n * n# 邻居平均 设定一个很小的正数epseps = torch.tensor(1e-8)# 计算每一行的和,并确保不会除以零row_sums = adj.sum(dim=1, keepdim=True)row_sums = torch.max(row_sums, eps)# 对每一行进行规范化normalized_adj = adj / row_sumsoutput = torch.einsum("tn,bsnd->bstd", normalized_adj, support)   # bsnd# catcat_x = torch.cat((support, output), dim=-1)  # bsn 2dz = self.out_proj(cat_x)# normz_norm = z.norm(p=2, dim=-1, keepdim=True)z_norm = torch.where(z_norm == 0, torch.tensor(1.).to(z_norm), z_norm)z = z / z_normreturn z


class GraphSAGE(torch.nn.Module):def __init__(self, args):super(GraphSAGE, self).__init__()self.args = argsself.conv1 = SAGEConv(args.in_feats, args.h_feats)self.conv2 = SAGEConv(args.h_feats, args.out_feats)self.dropout = 0.5self.fcs = nn.ModuleList()for _ in range(args.in_feats):self.fcs.append(nn.Sequential(nn.Linear(args.seq_len * args.out_feats, args.out_feats),nn.ReLU(),nn.Linear(args.out_feats, args.pred_len)))def forward(self, x, adj):# bsnd# x = F.dropout(x, self.dropout, training=self.training)x = F.relu(self.conv1(x, adj))x = self.conv2(x, adj)# b s n d  --> b s n 3x = x.permute(0, 2, 1, 3)  # bnsdx = torch.flatten(x, start_dim=2)  # bn s*dpred = []for idx in range(self.args.in_feats):sub_pred = self.fcs[idx](x)   # b n pred_lenpred.append(sub_pred)  # b pred_len 3pred = torch.stack(pred, dim=-1)  # b n pred_len 3pred = pred.permute(0, 2, 1, 3)return pred

3.3 GAT


class GraphAttentionLayer(nn.Module):"""Simple GAT layer, similar to https://arxiv.org/abs/1710.10903"""def __init__(self, in_features, out_features, dropout, alpha, concat=True):super(GraphAttentionLayer, self).__init__()self.dropout = dropoutself.in_features = in_featuresself.out_features = out_featuresself.alpha = alphaself.concat = concatself.W = nn.Parameter(torch.empty(size=(in_features, out_features)))nn.init.xavier_uniform_(self.W.data, gain=1.414)self.a = nn.Parameter(torch.empty(size=(2*out_features, 1)))nn.init.xavier_uniform_(self.a.data, gain=1.414)self.leakyrelu = nn.LeakyReLU(self.alpha)def forward(self, h, adj):Wh = torch.mm(h, self.W) # h.shape: (N, in_features), Wh.shape: (N, out_features)e = self._prepare_attentional_mechanism_input(Wh)zero_vec = -9e15*torch.ones_like(e)attention = torch.where(adj > 0, e, zero_vec)attention = F.softmax(attention, dim=1)attention = F.dropout(attention, self.dropout, training=self.training)h_prime = torch.matmul(attention, Wh)if self.concat:return F.elu(h_prime)else:return h_primedef _prepare_attentional_mechanism_input(self, Wh):# Wh.shape (N, out_feature)# self.a.shape (2 * out_feature, 1)# Wh1&2.shape (N, 1)# e.shape (N, N)Wh1 = torch.matmul(Wh, self.a[:self.out_features, :])Wh2 = torch.matmul(Wh, self.a[self.out_features:, :])# broadcast adde = Wh1 + Wh2.Treturn self.leakyrelu(e)def __repr__(self):return self.__class__.__name__ + ' (' + str(self.in_features) + ' -> ' + str(self.out_features) + ')'

上述代码使用了broadcast add技巧来得到每个节点与其他所有节点的权重,然后再使用adj来将不存在边的权重变成一个很小的负数。


class GraphAttentionLayer(nn.Module):def __init__(self, in_features, out_features, dropout, alpha, concat=True):super(GraphAttentionLayer, self).__init__()self.dropout = dropoutself.in_features = in_featuresself.out_features = out_featuresself.alpha = alphaself.concat = concatself.W = nn.Parameter(torch.empty(size=(in_features, out_features)))nn.init.xavier_uniform_(self.W.data, gain=1.414)self.a = nn.Parameter(torch.empty(size=(2*out_features, 1)))nn.init.xavier_uniform_(self.a.data, gain=1.414)self.leakyrelu = nn.LeakyReLU(self.alpha)def forward(self, h, adj):# bsnd nnWh = torch.matmul(h, self.W)e = self._prepare_attentional_mechanism_input(Wh)  # bsnn# 掩码操作mask = (adj == 0)# 广播掩码矩阵mask = mask.unsqueeze(0).unsqueeze(0)mask = mask.expand_as(e)# 应用掩码e[mask] = -9e15e = F.softmax(e, dim=1)e = F.dropout(e, self.dropout, training=self.training)h_prime = torch.einsum("bstn,bsnd->bstd", e, Wh)  # bsndif self.concat:return F.elu(h_prime)else:return h_primedef _prepare_attentional_mechanism_input(self, Wh):# Wh.shape (bsz, seq_len, N, out_feature)# self.a.shape (2 * out_feature, 1)# Wh1&2.shape (N, 1)# e.shape (bsz, seq_len, N, N)Wh1 = torch.matmul(Wh, self.a[:self.out_features, :])Wh2 = torch.matmul(Wh, self.a[self.out_features:, :])# broadcast add# 只是最后两个维度相加e = Wh1 + Wh2.permute(0, 1, 3, 2)return self.leakyrelu(e)def __repr__(self):return self.__class__.__name__ + ' (' + str(self.in_features) + ' -> ' + str(self.out_features) + ')'


  1. 其一,执行broadcast add时候,只是后两个维度进行操作(e = Wh1 + Wh2.permute(0, 1, 3, 2)),即bsnd+bsdn。
  2. 得到attention矩阵大小为bsnn,而不是二维的nn。因此,同样需要进行广播来实现掩码操作。


class GAT(torch.nn.Module):def __init__(self, args):super(GAT, self).__init__()self.args = argsalpha = 0.2self.dropout = args.dropoutself.conv1 = GraphAttentionLayer(args.in_feats,args.h_feats,dropout=self.dropout, alpha=alpha, concat=False)self.conv2 = GraphAttentionLayer(args.h_feats,args.out_feats,dropout=self.dropout, alpha=alpha, concat=False)self.fcs = nn.ModuleList()for _ in range(args.in_feats):self.fcs.append(nn.Sequential(nn.Linear(args.seq_len * args.out_feats, args.out_feats),nn.ReLU(),nn.Linear(args.out_feats, args.pred_len)))def forward(self, x, adj):# bsndx = F.dropout(x, self.dropout, training=self.training)x = F.elu(self.conv1(x, adj))x = self.conv2(x, adj)# b s n d  --> b s n 3x = x.permute(0, 2, 1, 3)  # bnsdx = torch.flatten(x, start_dim=2)  # bn s*dpred = []for idx in range(self.args.in_feats):sub_pred = self.fcs[idx](x)   # b n pred_lenpred.append(sub_pred)  # b pred_len 3pred = torch.stack(pred, dim=-1)  # b n pred_len 3# 变成和y一样的维度,即b pred_len num_node 3pred = pred.permute(0, 2, 1, 3)return pred


class GAT(nn.Module):def __init__(self, args):super(GAT, self).__init__()self.args = argsself.dropout = args.dropoutalpha = 0.2self.attentions = nn.ModuleList()for _ in range(args.heads):layer = GraphAttentionLayer(args.in_feats, args.h_feats, dropout=self.dropout, alpha=alpha, concat=True)self.attentions.append(layer)self.out_att = GraphAttentionLayer(args.h_feats * args.heads,args.out_feats,dropout=self.dropout, alpha=alpha, concat=False)# fcself.fcs = nn.ModuleList()for _ in range(args.in_feats):self.fcs.append(nn.Sequential(nn.Linear(args.seq_len * args.out_feats, args.out_feats),nn.ReLU(),nn.Linear(args.out_feats, args.pred_len)))def forward(self, x, adj):x = F.dropout(x, self.dropout, training=self.training)x = torch.cat([att(x, adj) for att in self.attentions], dim=-1)x = F.dropout(x, self.dropout, training=self.training)x = self.out_att(x, adj)# b s n d  --> b s n 3x = x.permute(0, 2, 1, 3)  # bnsdx = torch.flatten(x, start_dim=2)  # bn s*dpred = []for idx in range(self.args.in_feats):sub_pred = self.fcs[idx](x)  # b n pred_lenpred.append(sub_pred)  # b pred_len 3pred = torch.stack(pred, dim=-1)  # b n pred_len 3pred = pred.permute(0, 2, 1, 3)return pred

IV. 训练与测试


def train(args, Dtr, Val, adj, path, model_type):if model_type == "gcn":adj = normalize_adj(adj)model = GCN(args).to(device)elif model_type == "sage":model = GraphSAGE(args).to(device)elif model_type == "gat":model = GAT(args).to(device)else:raise ValueError("model_type has to be one of ('gcn', 'sage', 'gat')")adj = adj.to(device)loss_function = nn.MSELoss().to(device)optimizer = torch.optim.Adam(model.parameters(), lr=args.lr,weight_decay=args.weight_decay)scheduler = StepLR(optimizer, step_size=args.step_size, gamma=args.gamma)# trainingmin_epochs = 2best_model = Nonemin_val_loss = np.Inffor epoch in tqdm(range(args.epochs)):model.train()train_loss = []for (seq, label) in Dtr:optimizer.zero_grad()seq = seq.to(device)label = label.to(device)  # b pred_len num_node 3pred = model(seq, adj)  # b pred_len num_node 3# print(label.shape, pred.shape)loss = loss_function(pred, label)loss.backward()optimizer.step()train_loss.append(loss.item())scheduler.step()# validationval_loss = get_val_loss(args, model, Val, adj)if epoch + 1 >= min_epochs and val_loss < min_val_loss:min_val_loss = val_lossbest_model = copy.deepcopy(model)state = {'model': best_model.state_dict()}torch.save(state, path + '/models/' + model_type + '.pkl')print('epoch {:03d} train_loss {:.8f} val_loss {:.8f}'.format(epoch, np.mean(train_loss), val_loss))state = {'model': best_model.state_dict()}torch.save(state, path + '/models/' + model_type + '.pkl')


def test(args, Dte, adj, path, model_type, scalers):if model_type == "gcn":adj = normalize_adj(adj)model = GCN(args).to(device)elif model_type == "sage":model = GraphSAGE(args).to(device)elif model_type == "gat":model = GAT(args).to(device)else:raise ValueError("model_type has to be one of ('gcn', 'sage', 'gat')")model.load_state_dict(torch.load(path + '/models/' + model_type + '.pkl')['model'])adj = adj.to(device)y, pred = [], []for seq, label in Dte:seq = seq.to(device)y.append(label)sub_pred = model(seq, adj)  # b pred_len num_node 3pred.append(sub_pred.cpu())#y = torch.concat(y, dim=0)y = y.view(-1, y.size(2), y.size(3)).numpy()pred = torch.concat(pred, dim=0)  # num num_node 3pred = pred.view(-1, pred.size(2), pred.size(3)).numpy()# flatten# scalerfor idx in range(y.shape[1]):y[:, idx, :] = scalers[idx].inverse_transform(y[:, idx, :])pred[:, idx, :] = scalers[idx].inverse_transform(pred[:, idx, :])for idx in range(y.shape[1]):cur_y, cur_pred = y[:, idx, :], pred[:, idx, :]# 输出各种指标print('第{}个站点的指标为:'.format(idx + 1))maes, mses, rmses, mapes, r2s = get_metric(cur_y, cur_pred)print('mae:', maes)print('mse', mses)print('rmse:', rmses)print('mape:', mapes)print('r2:', r2s)# plotfor i in range(cur_y.shape[1]):plt.plot(cur_y[:, i], label="第{}个站点的第{}个变量的真实值".format(idx + 1, i + 1))plt.plot(cur_pred[:, i], label="第{}个站点的第{}个变量的预测值".format(idx + 1, i + 1))plt.legend()plt.show()


V. 实验结果



