欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 房产 > 建筑 > 25/1/5 算法笔记<强化学习> MPC,交叉熵法,PETS算法

25/1/5 算法笔记<强化学习> MPC,交叉熵法,PETS算法

2025/4/1 3:23:48 来源:https://blog.csdn.net/yyyy2711/article/details/144940430  浏览:    关键词:25/1/5 算法笔记<强化学习> MPC,交叉熵法,PETS算法

MPC

一个棋手下棋,会根据当前的局势来推演落子几步可能发生的局势,然后选择局势最好的一种情况来决定当前落子位置。

模型预测控制方法MPC,就是这样一种迭代的、基于模型的控制方法。值得注意的是MPC中不存在一个显示的策略。具体而言就是MPC在每次采取动作是,首先会生成一些候选动作序列,然后根据当前状态来确定每一条候选序列能得到多好的结果,最终选择结果最好的那条动作序列的第一个动作来执行。因此在MPC方法中主要在两个过程中迭代,一个是根据历史数据学习环境模型P(s,a),二是在和真实环境的交互过程中用环境模型来选择动作。

生成候选序列的动作,称为 打靶 。

交叉熵方法

交叉熵方法CEM是一种进化策略方法,它的核心思想是维护一个带参数的分布,根据每次采样的结果来更新分布中的参数,使分布中能获得较高的奖励的动作序列的概率比较高,相比于随机打靶法,交叉熵法能够利用之前采样得到的比较好的结果,在一定程度上减少采样到较差动作的概率,从而使算法更加高效,对于一个与连续动作交互的环境,每次交互时交叉熵方法的做法如下:

        for 次数e = 1->E do

                从分布P(A)中选取N条动作序列A1...AN

                对于每条动作序列A1...AN,用环境模型评估累计奖励

                根据评估结果保留M条最优的动作序列Ai1,...AiM

                用这些动作序列Ai1,...AiM更新分布P(A)

                end for

        计算所有最优动作序列的第一个动作的均值,作为当前按时刻采取的动作

我们可以使用如下的代码来实现交叉熵方法其中将采用截断正态分布。

import numpy as np
from scipy.stats import truncnorm
import gym
import torch
import torch.nn as nn
import torch.nn/functional as F
import collections
import matplotlib.pyplot as plt
class CEM:def __init__(self,n_sequence,elite_ratio,fake_env,upper_bound,lower_bound):self.n_sequence = n.sequenceself.elite_ratio = elite_ratioself.upper_bound = upper_boundself.lower_bound = lower_boundself.fake_env = fake_envdef optimizer(self,state,init_mean,init_var):mean,var = init_mean,init_varx = truncnorm(-2,2,loc=np.zeros_like(mean),scale = np.ones_like(var))state = np.tile(state,(self.n_sequence,1))for _ in range(5):lb_dist,ub_dist = mean - self.lower_bound,self.upper_bound - meanconstrained_var = np.minimum(np.minimum(np.square(lb_dist/2),np.square(ub_dist/2)),var)#生成动作序列action_sequences = [X.rvs() for _ in range(self.n_sequences)]*np.sqrt(constrained_var)+mean#计算每条动作序列的累计奖励returns = self.fake_env.propagate(state,action_sequences)[:,0]#选取累计奖励最高的若干条动作序列elites = action_sequences[np.argsort(returns)][-int(self.elite_ratio * self.n_sequence):]new_mean = np.mean(elites,axis = 0)new_var = np.var(elites,axis = 0)#更新动作序列分布mean = 0.1*mean+0.9*new_meanvar = 0.1*var*0.9*new_varreturn mean

PETS算法

带有轨迹采样的概率集成PETS 是一种使用MPC的基于强化学习算法。在PETS中,环境模型采用集成学习的方法,即会构建多个环境模型,然后用多个环境模型来进行预测,最后使用CEM进行模型预测控制。

在PETS算法中,环境模型构建会同时考虑两种不确定性(偶然不确定性--系统中存在的随机引起的)(认知不确定性--由“见”过的数据较少所导致的自身的不足引起的)。首先,我们定义模型输出为一个高斯分布,用来捕捉偶然不确定性。

在此基础上,我们再用集成学习来捕捉认知不确定性。具体而言,我们构建B个网络框架一样的神经网络。它们的输入都是状态动作对,输出都是下一个状态的高斯分布的均值向量和协方差矩阵。但是它们的参数采用不同的随机初始化方法,并且每次训练时,会从真实数据中随机采样不同的数据来训练。

有了环境模型后,MPC算法会用其来预测奖励和下一个状态,具体来说,每一次预测会从B个模型中挑选一个来进行预测,因此一条轨迹的采样会用到多个环境模型

代码

首先为了搭建这样一个较为复杂的模型,我们定义模型中每一层的构造,在定义时就必须考虑每一层都是一个集成。

device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")class Swish(nn.Module):def __init__(self):super(Swish,self).__init__()def forward(self,x):return x*torch.sigmoid(x)def init_weights(m):def truncated_normal_init(t,mean=0.0,std=0.01):torch.nn.init.normal_(t,mean=mean,std = std)while True:cond = (t<mean-2*std)|(t>mean+2*std)if not torch.sum(cond):breakt = torch.where(cond,torch.nn.init.normal_(torch.ones(t.shape,     device = device),mean = mean,std = std),t)return tif type(m)==nn.Linear or isinstance(m,FCLayer):truncated_normal_init(m.weight,std=1/(2*np.sqrt(m._input_dim)))m.bias.data.fill_(0.0)
class FCLayer(nn.Module):#集成之后的全连接层def __init__(self,input_dim,output_dim,ensemble_size,actiivation):super(FCLayer,self).__init__()self._input_dim,self._output_dim = input_dim,output_dimself.weight = nn.Parameter(torch.Tensor(ensemble_size,input_dim,output_dim).to(device))self._activation = activationself.bias = nn.Parameter(torch.Tensor(ensemble_size,output_dim).to(device))def forward(self, x):return self._activation(torch.bmm(x, self.weight) + self.bias[:, None, :])

接着使用高斯分布的概率来定义一个集成模型

class EnsembleModel(nn.Module):#环境模型集成def __init__(self,state_dim,action_dim,ensemble_size = 5,learning_rate = le-3):super(EnsembleModel,self).__init__()#输出包括均值和方差,因此时状态与奖励维度之和的两倍self._output_dim = (state_dim + 1) * 2self._max_logvar = nn.Parameter((torch.ones((1,self._output_dim//2)).float()/2).to(device),requires_grad = False)self._min_logvar = nn.Parameter((-torch.ones((1,self._output_dim//2)).float() * 10).to(device),requires_grad=False)self.layer1 = FCLayer(state_dim + action_dim,200,ensemble_size,Swish())self.layer2 = FCLayer(200,200,ensemble_size,Swish())self.layer3 = FCLayer(200,200,ensemble_size,Swish())self.layer4 = FCLayer(200,200,ensemble_size,Swish())self.layer5 = FCLayer(200,200,ensemble_size,Swish())self.apply(init_weights) #初始化环境模型中的参数self.optimizer = torch.optim.Adam(self.parameteres(),lr = learning_rate)def forward(self,x,return_log_var = False):ret = self.layer5(self.layer4(self.layer3(self.layer2(self.layer1(x)))))mean = ret[:,:,:self._output_dim//2]#在PETS算法中,将方差控制在最小值和最大值之间logvar = self._max_logvar - F.softplus(self._max_logvar - ret[:,:,self._output_dim//2:])logvar = self._min_logvar + F.softplus(logvar - self._min_logvar)return mean,logvar if return_log_var else torch.exp(logvar)def loss(self,mean,logvar,labels,use_var_loss = True):inverse_var = torch.exp(-logvar)if use_var_loss:mse_loss = torch.mean(torch.mean(torch.pow(mean - labels,2)*inverse_var,dim=-1),dim=-1)var_loss = torch.mean(torch.mean(logvar,dim=-1),dim=-1)total_loss = torch.sum(mse_loss)+torch.sum(var_loss)else:mse_loss = torch.mean(torch.pow(mean - labels,2),dim=(1,2))total_loss = torch.sum(mse_loss)return total_loss,mse_lossdef train(self,loss):self.optimizer.zero_grad()loss += 0.01*torch.sum(self._max_logvar)-0.01*torch.sum(self._min_logvar)loss.backward()self.optimizer.step()

集成模型的关键特点

  • 多个子模型的独立性:

    • 每个子模型都有自己的参数,通过 ensemble_size 维度实现并行计算。

  • 预测结果的多样性:

    • 由于每个子模型的参数不同,它们的预测结果也会有所不同,这种多样性可以提高模型的鲁棒性。

  • 不确定性估计:

    • 通过集成多个子模型的预测结果,模型可以估计预测的不确定性(方差),这在强化学习中非常重要。

接下来,定义一个EnsembleDynamicsModel类,把模型集成的训练设计得更加精细化。具体而言,我们并不会选择模型训练的轮数,而是在每次训练的时候将一部分数据单独取出来,用于验证模型的表现,在5次没有获得表现提升时就结束训练。

class EnsembleDynamicsModel:#环境集成,加入精细化的训练def __init__(self,state_dim,action_dim,num_network = 5):self._num_network = num_networkself._state_dim,self._action_dim = state_dim,action_dimself.model = EnsembleModel(state_dim ,action_dim,ensemble_size = unm_networkself._epoch_since_last_update = 0def train(self, inputs, labels, batch_size=64, holdout_ratio=0.1, max_iter=20):# 设置训练集与验证集permutation = np.random.permutation(inputs.shape[0])inputs, labels = inputs[permutation], labels[permutation]# 计算验证集的大小num_holdout = int(inputs.shape[0] * holdout_ratio)# 划分训练集和验证集train_inputs, train_labels = inputs[num_holdout:], labels[num_holdout:]holdout_inputs, holdout_labels = inputs[:num_holdout], labels[:num_holdout]# 将验证集数据转换为 PyTorch 张量并移动到指定设备holdout_inputs = torch.from_numpy(holdout_inputs).float().to(device)holdout_labels = torch.from_numpy(holdout_labels).float().to(device)# 扩展验证集数据以匹配集成模型的数量holdout_inputs = holdout_inputs[None, :, :].repeat([self.num_network, 1, 1])holdout_labels = holdout_labels[None, :, :].repeat([self.num_network, 1, 1])#保留最好的结果self._snapshots = {i:(None,lel0)for i in range(self._num_network)}for epoch in itertools.count():#定义每一个网络的训练数据train_index = np.vstack([np.random.permutation(train_inputs.shape[0])for _ in range(self._num_network)]#所有真实数据都用来训练# 分批训练for batch_start_pos in range(0, train_inputs.shape[0], batch_size):# 获取当前批次的数据batch_index = permutation[batch_start_pos:batch_start_pos + batch_size]train_input = torch.from_numpy(train_inputs[batch_index]).float().to(device)train_label = torch.from_numpy(train_labels[batch_index]).float().to(device)# 扩展训练集数据以匹配集成模型的数量train_input = train_input[None, :, :].repeat([self._num_network, 1, 1])train_label = train_label[None, :, :].repeat([self._num_network, 1, 1])# 前向传播mean, logvar = self.model(train_input, return_log_var=True)# 计算损失loss, _ = self.model.loss(mean, logvar, train_label, use_var_loss=False)# 反向传播和优化self.model.train(loss)# 验证集评估with torch.no_grad():mean, logvar = self.model(holdout_inputs, return_log_var=True)holdout_losses = self.model.loss(mean, logvar, holdout_labels, use_var_loss=False)holdout_losses = holdout_losses.cpu()# 保存最佳模型break_condition = self._save_best(epoch, holdout_losses)# 如果满足停止条件,则结束训练if break_condition or epoch >= max_iter:breakdef _save_best(self, epoch, losses, threshold=0.1):updated = Falsefor i in range(len(losses)):current = losses[i]_, best = self._snapshots[i]# 计算改进程度improvement = (best - current) / best if best != 0 else 0# 如果改进超过阈值,则保存当前模型if improvement > threshold:self._snapshots[i] = (epoch, current)updated = True# 更新 epoch 计数self._epoch_since_last_update = 0 if updated else self._epoch_since_last_update + 1return self._epoch_since_last_update>5def predict(self, inputs, batch_size=64):"""使用集成模型进行预测。参数:inputs: 输入数据,形状为 (num_samples, input_dim)。batch_size: 批大小,默认为 64。返回:mean: 预测的均值,形状为 (num_samples, output_dim)。var: 预测的方差,形状为 (num_samples, output_dim)。"""mean, var = [], []for i in range(0, inputs.shape[0], batch_size):# 获取当前批次的数据batch_inputs = inputs[i:min(i + batch_size, inputs.shape[0])]batch_inputs = torch.from_numpy(batch_inputs).float().to(device)# 扩展输入数据以匹配集成模型的数量batch_inputs = batch_inputs[None, :, :].repeat([self._num_network, 1, 1])# 使用模型进行预测cur_mean, cur_var = self.model(batch_inputs, return_log_var=False)# 将结果保存到列表中mean.append(cur_mean.detach().cpu().numpy())var.append(cur_var.detach().cpu().numpy())return np.hstack(mean),np.hstack(var)

有了环境模型之后,我们就可以定义一个FakeEnv,主要用于实现给定状态和动作,用模型集成来进行预测,该功能会用在MPC算法中。

class FakeEnv:def __init__(self,model):self.model = modeldef step(self,obs,act):inputs = np.concatenate((obs,act),axis = -1)ensemble_model_means,ensemble_model_vars = self.model.predict(inputs)ensemble_model_means[:,:,1:]+=obs.numpy()ensemble_model_stds = np.sqrt(ensemble_model_vars)ensemble_samples = ensemble_model_means + np.random.normal(size=ensemble_model_means.shape) * ensemble_model_stdsnum_models,batch_size,_ = ensemble_model_means.shapemodels_to_use = np.random.choice([i for i in range(self.model._num_network)],size = batch_size)batch_inds = np.arange(0,batch_size)samples = ensemble_samples[models_to_use,batcj_inds]rewards,next_obs = samples[:,:1],samples[:,1:]return rewards,next_obsdef propagate(self,obs,actions):with torch.no_grad():obs = np.copy(obs)total_reward = np.expand_dim(np.zeros(obs.shape[0]),axis = -1)obs,actions = torch.as_tensor(obs).torch.as_tensor(actions)for i in range(actions.shape[1]):action = torch.unsqueeze(actions[:,i],1)rewards,next_obs = self.step(obs,action)total_reward += rewardsobs = torch.as_tensor(next_obs)return total_reward

接下来定义经验回放池的类Replay和Buffer。与之前章节对比,此处经验回访缓冲区会额外实现一个返回所有数据的函数。

class ReplayBuffer:def __init__(self,capacity):self.buffer = collections.deque(maxlen - capacity)def add(self,state,action,reward,next_state,done):self.buffer.append((state,action,reward,next_state,done))def size(self):return len(self.buffer)def return_all_samples(self):all_transitions = list(self.buffer)state,action,reward,next_state,done = zip(*all_transitions)return np.array(state),action,reward,np.array(next_state),done

    接下来是PETS算法的主题部分

class PETS:def __init__(self,env,replay_buffer,n_sequence,elite_ratio,plan_horizon,num_episodes):self._env = envself._env_pool = ReplayBuffer(buffer_size) #存储交互数据obs_dim = env.observation_space.shape[0]self._action_dim = env.action_space.shape[0]self._model = EnsembleDynamocsModel(obs_dim,self._action_dim)self._fake_env = FakeEnv(self._model)self.upper_bound = env.action_space.high[0]self.lower_bound = env.action_space.low[0]self._cem = CEM(n_sequence,elite_ratio,self._fake_env,self.upper_bound,self.lower_bound) #交叉熵方法的优化器self.plan_horizon = plan_horizonself.num_episodes = num_episodesdef train_model(self):env_samples = self._env_pool.return_all_samples()obs = env_samples[0]actions = np.array(env_samples[1])rewards = np.array(env_samples[2]).reshape(-1,1)next_obs = env.samples[3]inputs = np.concatenate((obs,actions),axis = -1)labels = np.concatenate((rewards,next_obs - obs),axis=-1)self._model.train(inputs,labels)def mpc(self):mean = np.tile((self.upper_bound + self.lower_bound)/2.0,self.plan_horizon)var = np.tile(np.square(self.upper_bound - self.lower_bound)/16,self.plan_horizon)obs,done,episode_return = self._env.reset(),False,0while not done:actions = self._cem.optimize(obs,mean,var)action = actions[:self._action_dim]#选取第一个动作next_obs,reward,done,_ = self._env.step(action)self._env_pool.add(obs,action,reward,next_obs,done)obs = next_obs episode_return += rewardmean = np.concatenate([np.copy(actions)[self._action_dim:],np.zeros(self._action_dim)])return episode_return def explore(self): #使用随机策略探索,收集一条轨迹的数据,并将其存储到经验回放池中obs,done,episode_return = self._env.reset(),False,0while not done:action = self._env.action_space.sample()next_obs,reward,done,_ = self._env.step(action)self._env_pool.add(obs,action,reward,next_obs,done)obs = next_obsepisode_return += rewardreturn episode_return def train(self):    #训练 MPC 控制器,包括初始的随机探索和后续的 MPC 控制。return_list = []explore_return = self.explore()#先进行随机策略的探索来手机一条序列的数据print('episode: 1,return: %d' %explore_return)return_list.append(explore_return)for i_episode in range(self.num_episodes-1):self.train_model()episode_return = self.mpc()return_list.append(episode_return)print('episode: %d,return:%d'%(i_episode+2,episode_return))return return_list

PETS算法的效果非常好,但是由于每次选取动作都要在环境模型上进行大量的模拟,因此运行速度非常慢。与SAC算法的结果进行对比可以看出,PETS算法大大提高了样本效率,在比SAC算法的环境交互次数少得多的情况下就取得了差不多的效果。

通过实践,我们可以发现模型预测控制MPC方法有着其独特的优势,例如它不需要构建和训练策略,可以更好地利用环境,可以进行更长部署的规划。但是MPC也有其局限性,例如模型在多步推演之后的准确性会大大降低,简单的控制策略对于复杂系统可能不够。MPC还有一个更为严重的问题,即每次计算动作的复杂度太大,这使得其在一些策略即使性要求较高的系统中应用就变得不太现实。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词