昨天我们看了V-REP中一个github项目的环境代码,今天我们来分析下他的强化学习代码。
git链接:
https://github.com/deep-reinforcement-learning-book/Chapter16-Robot-Learning-in-Simulation.
首先导入了库
import math
import randomimport gym
import numpy as npimport torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Normal #正态分布概率函数
设置多进程启动方式
torch.multiprocessing.set_start_method('forkserver', force=True)
在使用 torTorch.multiprocessing
模块时,启动子进程的方式非常重要,尤其是在使用 CUDA 时。默认的启动方式('fork'
)可能会导致问题,因为 CUDA 对进程分叉(fork)的处理方式不太友好。
from IPython.display import clear_output #Ipython显示功能
import matplotlib.pyplot as plt
from matplotlib import animation
from IPython.display import display from sawyer_grasp_env_boundingbox import GraspEnv #导入昨天的自定义环境
import argparse #用于解析命令行
import time #用于时间相关操作
import pickle #用于序列化和反序列化import torch.multiprocessing as mp #多进程处理
from torch.multiprocessing import Process#多进程工具from multiprocessing import Process, Manager#多进程共享工具
from multiprocessing.managers import BaseManager#创建自定义的共享对象管理器
根据条件选择GPU还是CPU,并打印出当前设备
GPU = True
device_idx = 0
if GPU:device = torch.device("cuda:" + str(device_idx) if torch.cuda.is_available() else "cpu")
else:device = torch.device("cpu")
print(device)
使用python的argparse模块解析命令行,让用户可以通过命令行指定是进行训练(--train)还是测试(--test)。
parser = argparse.ArgumentParser(description='Train or test neural net motor controller.')
parser.add_argument('--train', dest='train', action='store_true', default=False)
parser.add_argument('--test', dest='test', action='store_true', default=False) #添加命令行参数args = parser.parse_args() #解析命令行参数
设置经验回放缓冲区
class ReplayBuffer:def __init__(self, capacity):self.capacity = capacity #设置缓冲区最大存储量self.buffer = [] #开辟空间存储self.position = 0def push(self, state, action, reward, next_state, done): #存放动作if len(self.buffer) < self.capacity: self.buffer.append(None) #这种做法通常用于初始化缓冲区,或者在某些特殊情况下确保缓冲区的长度与容量一致。self.buffer[self.position] = (state, action, reward, next_state, done) #存放状态,动作,奖励,下一个状态,动作有没有结束self.position = int((self.position + 1) % self.capacity) #更新当前位置写入的索引,并确保索引在缓冲区容量范围内循环
sample函数,用于从缓冲区随机抽取一批经验数据,供训练模型使用
def sample(self, batch_size):batch = random.sample(self.buffer, batch_size) #batch是一个列表,表示一条经验数据state, action, reward, next_state, done = map(np.stack, zip(*batch)) # stack for each element''' the * serves as unpack: sum(a,b) <=> batch=(a,b), sum(*batch) ;zip: a=[1,2], b=[2,3], zip(a,b) => [(1, 2), (2, 3)] ;the map serves as mapping the function on each list element: map(square, [2,3]) => [4,9] ;np.stack((1,2)) => array([1, 2])'''return state, action, reward, next_state, done
设置len和get_length函数
def __len__(self): # cannot work in multiprocessing case, len(replay_buffer) is not available in proxy of manager!return len(self.buffer)def get_length(self):return len(self.buffer)
标准化函数
class NormalizedActions(gym.ActionWrapper):def _action(self, action):low = self.action_space.lowhigh = self.action_space.highaction = low + (action + 1.0) * 0.5 * (high - low)action = np.clip(action, low, high)return actiondef _reverse_action(self, action):low = self.action_space.lowhigh = self.action_space.highaction = 2 * (action - low) / (high - low) - 1action = np.clip(action, low, high)return action
_action函数将标准化动作映射回原始动作空间
_reverse_action将原始动作映射到标准化动作空间
为什么这样设计?
线性映射:
线性映射保持了动作的相对比例,不会改变动作的分布特性。
这对于强化学习算法的训练非常重要,因为非线性映射可能会引入额外的复杂性。
边界对齐:
确保原始动作的最小值和最大值分别映射到标准化动作的
-1
和1
。这样可以充分利用标准化动作空间的范围。
可逆性:
标准化和反标准化公式是互逆的,可以方便地在两个空间之间转换。
这对于算法的实现和调试非常有用。
数值稳定性:
标准化后的动作范围是
[-1, 1]
,避免了过大或过小的数值,提高了数值稳定性。
价值网络设置
表示在状态s下智能体未来可能获得的累计回报的期望值
class ValueNetwork(nn.Module):def __init__(self, state_dim, hidden_dim, init_w=3e-3):super(ValueNetwork, self).__init__()self.linear1 = nn.Linear(state_dim, hidden_dim)self.linear2 = nn.Linear(hidden_dim, hidden_dim)self.linear3 = nn.Linear(hidden_dim, hidden_dim)self.linear4 = nn.Linear(hidden_dim, 1)# 对权重和偏置初始化,用均匀分布将权重和偏置初始化为范围内的随机值self.linear4.weight.data.uniform_(-init_w, init_w)self.linear4.bias.data.uniform_(-init_w, init_w)def forward(self, state):x = F.relu(self.linear1(state))x = F.relu(self.linear2(x))x = F.relu(self.linear3(x))x = self.linear4(x)return x
为什么需要权重初始化?
权重初始化对神经网络的训练非常重要,原因如下:
避免梯度消失或爆炸:
如果权重初始化过大或过小,可能导致梯度在反向传播时消失或爆炸。
合适的初始化可以缓解这一问题。
加速收敛:
良好的初始化可以使网络更快地收敛到最优解。
打破对称性:
如果所有权重初始化为相同的值,网络中的神经元会学习相同的特征,导致性能下降。
随机初始化可以打破对称性,使每个神经元学习不同的特征。
softQ网络
它的核心思想是在传统的Q-learning思想上引入熵正则化,从而鼓励策略的探索性。
class SoftQNetwork(nn.Module):def __init__(self, num_inputs, num_actions, hidden_size, init_w=3e-3):super(SoftQNetwork, self).__init__()self.linear1 = nn.Linear(num_inputs + num_actions, hidden_size)self.linear2 = nn.Linear(hidden_size, hidden_size)self.linear3 = nn.Linear(hidden_size, hidden_size)self.linear4 = nn.Linear(hidden_size, 1)self.linear4.weight.data.uniform_(-init_w, init_w)self.linear4.bias.data.uniform_(-init_w, init_w)def forward(self, state, action):x = torch.cat([state, action], 1) # the dim 0 is number of samplesx = F.relu(self.linear1(x))x = F.relu(self.linear2(x))x = F.relu(self.linear3(x))x = self.linear4(x)return x
输入的是状态s和动作a的拼接。
策略网络
策略网络的目标是直接学习策略,即在给定状态s下,智能体应该采取的动作a,策略可以是确定性的或随机性的
class PolicyNetwork(nn.Module):def __init__(self, num_inputs, num_actions, hidden_size, action_range=1., init_w=3e-3, log_std_min=-20, log_std_max=2):super(PolicyNetwork, self).__init__()self.log_std_min = log_std_minself.log_std_max = log_std_maxself.linear1 = nn.Linear(num_inputs, hidden_size)self.linear2 = nn.Linear(hidden_size, hidden_size)self.linear3 = nn.Linear(hidden_size, hidden_size)self.linear4 = nn.Linear(hidden_size, hidden_size)self.mean_linear = nn.Linear(hidden_size, num_actions)self.mean_linear.weight.data.uniform_(-init_w, init_w)self.mean_linear.bias.data.uniform_(-init_w, init_w)self.log_std_linear = nn.Linear(hidden_size, num_actions)self.log_std_linear.weight.data.uniform_(-init_w, init_w)self.log_std_linear.bias.data.uniform_(-init_w, init_w)self.action_range = action_rangeself.num_actions = num_actionsdef forward(self, state):x = F.relu(self.linear1(state))x = F.relu(self.linear2(x))x = F.relu(self.linear3(x))x = F.relu(self.linear4(x))mean = (self.mean_linear(x))log_std = self.log_std_linear(x)log_std = torch.clamp(log_std, self.log_std_min, self.log_std_max)return mean, log_std
evaluate函数
用来评估策略网络在给定状态下的动作和对数概率。
def evaluate(self, state, epsilon=1e-6):'''generate sampled action with state as input wrt the policy network;'''mean, log_std = self.forward(state)std = log_std.exp() # no clip in evaluation, clip affects gradients flownormal = Normal(0, 1)z = normal.sample() action_0 = torch.tanh(mean + std*z.to(device)) # TanhNormal distribution as actions; reparameterization trickaction = self.action_range*action_0log_prob = Normal(mean, std).log_prob(mean+ std*z.to(device)) - torch.log(1. - action_0.pow(2) + epsilon) - np.log(self.action_range)# both dims of normal.log_prob and -log(1-a**2) are (N,dim_of_action); # the Normal.log_prob outputs the same dim of input features instead of 1 dim probability, # needs sum up across the features dim to get 1 dim prob; or else use Multivariate Normal.log_prob = log_prob.sum(dim=1, keepdim=True)return action, log_prob, z, mean, log_std
选择动作函数
def get_action(self, state, deterministic):#将输入的状态s转为pytorch张量state = torch.FloatTensor(state).unsqueeze(0).to(device)#调用策略网络的前向传播方法,输出动作均值和对数标准差mean, log_std = self.forward(state)std = log_std.exp() #将对数标准差转化为标准差normal = Normal(0, 1) #创建一个标准正态分布z = normal.sample().to(device)#从正态分布中采样一个随即值action = self.action_range* torch.tanh(mean + std*z) #计算采样动作,缩放动作action = self.action_range* torch.tanh(mean).detach().cpu().numpy()[0] if deterministic else action.detach().cpu().numpy()[0]return action
随机采取动作函数
从均匀分布中随机采样一个动作
def sample_action(self,):a=torch.FloatTensor(self.num_actions).uniform_(-1, 1)return self.action_range*a.numpy()
SAC训练函数
核心函数,用SAC方法来训练智能体
def __init__(self, replay_buffer, hidden_dim, action_range):self.replay_buffer = replay_buffer #经验回放缓冲区#softq网络self.soft_q_net1 = SoftQNetwork(state_dim, action_dim, hidden_dim).to(device)self.soft_q_net2 = SoftQNetwork(state_dim, action_dim, hidden_dim).to(device)#目标Q网络self.target_soft_q_net1 = SoftQNetwork(state_dim, action_dim, hidden_dim).to(device)self.target_soft_q_net2 = SoftQNetwork(state_dim, action_dim, hidden_dim).to(device)#策略网络self.policy_net = PolicyNetwork(state_dim, action_dim, hidden_dim, action_range).to(device)#熵系数,用于自动调整熵正则化的强度self.log_alpha = torch.zeros(1, dtype=torch.float32, requires_grad=True, device=device)print('Soft Q Network (1,2): ', self.soft_q_net1)print('Policy Network: ', self.policy_net)#初始化目标网络的参数for target_param, param in zip(self.target_soft_q_net1.parameters(), self.soft_q_net1.parameters()):target_param.data.copy_(param.data)for target_param, param in zip(self.target_soft_q_net2.parameters(), self.soft_q_net2.parameters()):target_param.data.copy_(param.data)#损失函数self.soft_q_criterion1 = nn.MSELoss()self.soft_q_criterion2 = nn.MSELoss()#学习率soft_q_lr = 3e-4policy_lr = 3e-4alpha_lr = 3e-4#优化器self.soft_q_optimizer1 = optim.Adam(self.soft_q_net1.parameters(), lr=soft_q_lr)self.soft_q_optimizer2 = optim.Adam(self.soft_q_net2.parameters(), lr=soft_q_lr)self.policy_optimizer = optim.Adam(self.policy_net.parameters(), lr=policy_lr)self.alpha_optimizer = optim.Adam([self.log_alpha], lr=alpha_lr)
更新函数
def update(self, batch_size, reward_scale=10., auto_entropy=True, use_demons=False, target_entropy=-2, gamma=0.99,soft_tau=1e-2):#获取状态动作奖励,下一个动作,是否完成state, action, reward, next_state, done = self.replay_buffer.sample(batch_size)if use_demons==True: #从文件中加载预先收集的演示数据。,并于当前数据合并data_file=open('./demons_data/demon_data.pickle', "rb")demons_data = pickle.load(data_file)state_, action_, reward_, next_state_, done_=map(np.stack, zip(*demons_data))state = np.concatenate((state, state_), axis=0)action = np.concatenate((action, action_), axis=0)reward = np.concatenate((reward, reward_), axis=0)next_state = np.concatenate((next_state, next_state_), axis=0)done = np.concatenate((done, done_), axis=0)#将数据转换为PyTorch张量并移动到设备state = torch.FloatTensor(state).to(device)next_state = torch.FloatTensor(next_state).to(device)action = torch.FloatTensor(action).to(device)reward = torch.FloatTensor(reward).unsqueeze(1).to(device) # reward is single value, unsqueeze() to add one dim to be [reward] at the sample dim;done = torch.FloatTensor(np.float32(done)).unsqueeze(1).to(device)#计算Q网络的预测值predicted_q_value1 = self.soft_q_net1(state, action)predicted_q_value2 = self.soft_q_net2(state, action)#评估策略网络new_action, log_prob, z, mean, log_std = self.policy_net.evaluate(state)new_next_action, next_log_prob, _, _, _ = self.policy_net.evaluate(next_state)#奖励归一化reward = reward_scale * (reward - reward.mean(dim=0)) / (reward.std(dim=0) + 1e-6) # normalize with batch mean and std; plus a small number to prevent numerical problem# 更新熵系数# alpha = 0.0 # trade-off between exploration (max entropy) and exploitation (max Q) if auto_entropy is True:alpha_loss = -(self.log_alpha * (log_prob + target_entropy).detach()).mean()self.alpha_optimizer.zero_grad()alpha_loss.backward()self.alpha_optimizer.step()self.alpha = self.log_alpha.exp()else:self.alpha = 1.alpha_loss = 0
-
演示数据通常由专家策略生成,用于引导智能体的学习。
-
合并数据:
-
将演示数据与当前的经验数据合并,形成更大的数据集。
-
合并后的数据可以用于训练智能体,从而提高学习效率。
-
熵系数alpha的作用:
alpha用于控制探索和利用之间的权衡。具体来说alpha时熵正则化项的系数,它决定了策略的随机性在优化目标中的重要性。
SAC的目标函数中引入了熵正则化项:
H(π(⋅∣st)) 是策略 π 在状态 st 下的熵。
alpha是熵正则化系数。
-
当 α较大时,算法更倾向于探索(高熵策略)。
-
当 α较小时,算法更倾向于利用(低熵策略)。
Q函数的训练
通过训练Q函数,智能体可以学习到哪些动作在特定状态下更有价值。
# Training Q Function#计算目标q值#选择两个目标Q值中较小值,以减少过估计问题target_q_min = torch.min(self.target_soft_q_net1(next_state, new_next_action),self.target_soft_q_net2(next_state, new_next_action)) - self.alpha * next_log_probtarget_q_value = reward + (1 - done) * gamma * target_q_min # if done==1, only reward#计算Q网络损失,目标 Q 值,使用 .detach() 断开计算图,避免梯度传播到目标网络。q_value_loss1 = self.soft_q_criterion1(predicted_q_value1, target_q_value.detach()) # detach: no gradients for the variableq_value_loss2 = self.soft_q_criterion2(predicted_q_value2, target_q_value.detach())#参数更新,通过反向传播算法,计算损失函数对模型参数的梯度。self.soft_q_optimizer1.zero_grad()#清空优化器的梯度缓存q_value_loss1.backward() #计算梯度self.soft_q_optimizer1.step() #更新参数self.soft_q_optimizer2.zero_grad()q_value_loss2.backward()self.soft_q_optimizer2.step()
目标q值:它表示在当前状态 ss 下采取动作 aa 后,智能体未来可能获得的累积回报的期望值。
训练策略网络
# Training Policy Function#选择Q值中小的那个,以减少过估计predicted_new_q_value = torch.min(self.soft_q_net1(state, new_action),self.soft_q_net2(state, new_action))policy_loss = (self.alpha * log_prob - predicted_new_q_value).mean()self.policy_optimizer.zero_grad()policy_loss.backward()self.policy_optimizer.step()
软更新价值网络
# Soft update the target value netfor target_param, param in zip(self.target_soft_q_net1.parameters(), self.soft_q_net1.parameters()):target_param.data.copy_( # copy data value into target parameterstarget_param.data * (1.0 - soft_tau) + param.data * soft_tau)for target_param, param in zip(self.target_soft_q_net2.parameters(), self.soft_q_net2.parameters()):target_param.data.copy_( # copy data value into target parameters#核心公式target_param.data * (1.0 - soft_tau) + param.data * soft_tau)return predicted_new_q_value.mean()
SAC中使用两个独立的Q网络来减少过估计问题,并为每个Q网络维护一个对应的目标网
-
两个 Q 网络可以互相监督,避免单个 Q 网络的偏差影响整体训练。
保存,加载模型函数
def save_model(self, path):torch.save(self.soft_q_net1.state_dict(), path+'_q1') # have to specify different path name here!torch.save(self.soft_q_net2.state_dict(), path+'_q2')torch.save(self.policy_net.state_dict(), path+'_policy')def load_model(self, path):#将加载的参数加载到模型中self.soft_q_net1.load_state_dict(torch.load(path+'_q1'))self.soft_q_net2.load_state_dict(torch.load(path+'_q2'))self.policy_net.load_state_dict(torch.load(path+'_policy'))self.soft_q_net1.eval()self.soft_q_net2.eval()self.policy_net.eval()#将模型设置为评估模式,用于推理或测试
用于采样数据和训练模型的worker函数
通过多进程与其他worker并行执行
def worker(id, sac_trainer, rewards_queue, replay_buffer, max_episodes, max_steps, batch_size, explore_steps, \update_itr, AUTO_ENTROPY, DETERMINISTIC, USE_DEMONS, hidden_dim, model_path, headless):print(sac_trainer, replay_buffer) # sac_tainer are not the same, but all networks and optimizers in it are the same; replay buffer is the same one.env = GraspEnv(headless=headless) #自定义的环境类,用于模拟抓取任务action_dim = env.action_space.shape[0]state_dim = env.observation_space.shape[0]frame_idx=0for eps in range(max_episodes):episode_reward = 0state = env.reset()#每间隔20个episode重新初始化环境,避免环境中的问题if eps%20==0 and eps>0:env.reinit() for step in range(max_steps):#如果当前部署超过探索步数,则使用策略网络生成动作if frame_idx > explore_steps:action = sac_trainer.policy_net.get_action(state, deterministic = DETERMINISTIC)else:action = sac_trainer.policy_net.sample_action()try:next_state, reward, done, _ = env.step(action) except KeyboardInterrupt:print('Finished')sac_trainer.save_model(model_path)#将经验数据存入回放缓冲区replay_buffer.push(state, action, reward, next_state, done)#更新当前状态state = next_stateepisode_reward += rewardframe_idx += 1 #更新总步数# 如果缓冲区的数据量大小大于批量大小,则开始更新模型if replay_buffer.get_length() > batch_size:for i in range(update_itr):#更新SAC算法的参数_=sac_trainer.update(batch_size, reward_scale=10., auto_entropy=AUTO_ENTROPY, use_demons=USE_DEMONS, target_entropy=-1.*action_dim)#每间隔10个episode保存一次模型if eps % 10 == 0 and eps>0:sac_trainer.save_model(model_path)if done:breakprint('Episode: ', eps, '| Episode Reward: ', episode_reward)rewards_queue.put(episode_reward)sac_trainer.save_model(model_path)env.shutdown()
多进程环境中共享 Adam 优化器的参数函数
在多进程训练中,多个进程可能同时访问和更新优化器的状态(如动量项和平方梯度项)。为了确保这些状态在进程之间保持一致,需要将它们共享到内存中
def ShareParameters(adamoptim):''' share parameters of Adamoptimizers for multiprocessing '''for group in adamoptim.param_groups:for p in group['params']:state = adamoptim.state[p] #获取优化器的状态# 优化器的步数state['step'] = 0state['exp_avg'] = torch.zeros_like(p.data) #动量项state['exp_avg_sq'] = torch.zeros_like(p.data)#票房梯度项# 将张量共享到内存中,以便多进程可以访问和修改同一份数据。state['exp_avg'].share_memory_()state['exp_avg_sq'].share_memory_()
绘制曲线并保存为图像的函数
def plot(rewards):clear_output(True)# plt.figure(figsize=(20,5))plt.plot(rewards)plt.savefig('sac_multi.png')# plt.show()plt.clf()
主函数
if __name__ == '__main__':replay_buffer_size = 1e6 #设置回访缓冲区大小BaseManager.register('ReplayBuffer', ReplayBuffer)#注册回放缓冲区类,使其可以通过管理器创建共享对象。manager = BaseManager() #创建管理器对象,用于管理共享对象manager.start() #启动管理器,使其可以创建和共享对象replay_buffer = manager.ReplayBuffer(replay_buffer_size) #创建共享的回放缓冲区
定义超参数
# hyper-parameters for RL training, no need for sharing across processesmax_episodes = 500000max_steps = 30 explore_steps = 0 batch_size=128update_itr = 1 #每次采样后更新模型的迭代次数AUTO_ENTROPY=True #是否自动调整熵系数DETERMINISTIC=False #是否使用确定性策略USE_DEMONS = False #是否使用演示数据hidden_dim = 512 #神经网络的隐藏层model_path = './model/sac_multi'num_workers=6 headless = True #是否以无头模式运行环境
创建实例用于训练SAC算法
sac_trainer=SAC_Trainer(replay_buffer, hidden_dim=hidden_dim, action_range=action_range )
if args.train:#sac_trainer.load_model(model_path) # 选择加载预训练模型#共享全局参数sac_trainer.soft_q_net1.share_memory()sac_trainer.soft_q_net2.share_memory()sac_trainer.target_soft_q_net1.share_memory()sac_trainer.target_soft_q_net2.share_memory()sac_trainer.policy_net.share_memory()sac_trainer.log_alpha.share_memory_()#共享优化器状态ShareParameters(sac_trainer.soft_q_optimizer1)ShareParameters(sac_trainer.soft_q_optimizer2)ShareParameters(sac_trainer.policy_optimizer)ShareParameters(sac_trainer.alpha_optimizer)rewards_queue=mp.Queue() # 多进程队列,用于存储每个进程的奖励值,主进程可以从队列中获取奖励值并绘制训练曲线processes=[]rewards=[]for i in range(num_workers):process = Process(target=worker, args=(i, sac_trainer, rewards_queue, replay_buffer, max_episodes, max_steps, \batch_size, explore_steps, update_itr, AUTO_ENTROPY, DETERMINISTIC, USE_DEMONS, hidden_dim, model_path, headless)) process.daemon=True #将进程设置为守护进程,当主进程结束时,所有子进程也会自动结束processes.append(process) #将进程对象添加到列表中,方便后续管理[p.start() for p in processes #启动所有工作进程while True: #收集奖励r = rewards_queue.get()if r is not None:rewards.append(r)else:breakif len(rewards)%20==0 and len(rewards)>0:# plot(rewards)np.save('reward_log', rewards)[p.join() for p in processes] #等待所有进程结束sac_trainer.save_model(model_path)
测试模式
if args.test:env = GraspEnv(headless=False, control_mode='joint_velocity') # for visualizing in test#加载预训练模型trained_model_path1 = './model/trained_model/augmented_dense_reward/sac_multi' # pre-trained model with augmented dense rewardtrained_model_path2 = './model/trained_model/dense_reward/sac_multi' # pre-trained model with dense rewardsac_trainer.load_model(model_path) # new model after trainingfor eps in range(30):state = env.reset()episode_reward = 0for step in range(max_steps):#使用策略网络生成动作action = sac_trainer.policy_net.get_action(state, deterministic = DETERMINISTIC)next_state, reward, done, _ = env.step(action) episode_reward += rewardstate=next_stateprint('Episode: ', eps, '| Episode Reward: ', episode_reward)env.shutdown()
以上就是今天的SAC代码分析
SAC的核心关键我认为是
1.引入了熵奖励
2.使用两种独立的神经网络来分别表示A和C,并且这个C(价值网络)由两个Q函数组成,减少了估计偏差
3.有离线策略,可以从经验回放池中采样训练数据,使得学习更稳定。
4.熵系数能够动态调整。