目录
DDPG(深度确定性策略梯度)
DPPO(分布式近端策略优化)
simply_PPO(简单近端策略优化)
discrete_DPPO(离散动作的分布式近端策略优化)
DDPG(深度确定性策略梯度)
算法概述
DDPG 是一种适用于连续动作空间的无模型强化学习算法,结合了深度 Q 网络(DQN)和确定性策略梯度(DPG)的思想。它使用两个神经网络:一个是策略网络(Actor),用于生成动作;另一个是价值网络(Critic),用于评估动作的价值。
代码结构与功能
超参数设置:定义了训练的最大回合数、每回合的最大步数、学习率、折扣因子等。
Actor 类:_build_net 方法:构建策略网络,输出确定性动作。
learn 方法:更新策略网络的参数,并定期更新目标网络。
choose_action 方法:根据当前状态选择动作。
add_grad_to_graph 方法:将 Critic 网络计算的梯度添加到策略网络的更新中。
Critic 类:_build_net 方法:构建价值网络,输出状态 - 动作对的价值。
learn 方法:更新价值网络的参数,并定期更新目标网络。
Memory 类:用于存储和采样经验数据。
训练循环
在每个回合中,智能体与环境交互,收集经验数据并存储到内存中。当内存达到一定容量时,开始从内存中采样数据进行训练。
代码示例
# 初始化环境
env = gym.make(ENV_NAME)
env = env.unwrapped
env.seed(1)# 定义状态和动作维度
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
action_bound = env.action_space.high# 创建会话
sess = tf.Session()# 初始化 Actor 和 Critic 网络
actor = Actor(sess, action_dim, action_bound, LR_A, REPLACE_ITER_A)
critic = Critic(sess, state_dim, action_dim, LR_C, GAMMA, REPLACE_ITER_C, actor.a, actor.a_)# 将 Critic 计算的梯度添加到 Actor 网络中
actor.add_grad_to_graph(critic.a_grads)# 初始化全局变量
sess.run(tf.global_variables_initializer())# 创建经验回放内存
M = Memory(MEMORY_CAPACITY, dims=2 * state_dim + action_dim + 1)# 训练循环
for i in range(MAX_EPISODES):
s = env.reset()
ep_reward = 0for j in range(MAX_EP_STEPS):if RENDER:
env.render()# 选择动作并添加探索噪声
a = actor.choose_action(s)
a = np.clip(np.random.normal(a, var), -2, 2)
s_, r, done, info = env.step(a)# 存储经验数据
M.store_transition(s, a, r / 10, s_)# 开始学习if M.pointer > MEMORY_CAPACITY:
var *= .9995 # 衰减探索噪声
b_M = M.sample(BATCH_SIZE)
b_s = b_M[:, :state_dim]
b_a = b_M[:, state_dim: state_dim + action_dim]
b_r = b_M[:, -state_dim - 1: -state_dim]
b_s_ = b_M[:, -state_dim:]# 更新 Critic 网络
critic.learn(b_s, b_a, b_r, b_s_)# 更新 Actor 网络
actor.learn(b_s)
s = s_
ep_reward += rif j == MAX_EP_STEPS - 1:print('Episode:', i, ' Reward: %i' % int(ep_reward), 'Explore: %.2f' % var)if ep_reward > -300:
RENDER = Truebreak
DPPO(分布式近端策略优化)
算法概述
DPPO 是一种基于近端策略优化(PPO)的分布式算法,用于处理连续动作空间。它通过多个并行的工作线程收集经验数据,然后由一个全局的 PPO 网络进行更新。
代码结构与功能
超参数设置:定义了训练的最大回合数、每回合的最大步数、学习率、折扣因子等。
PPO 类:_build_anet 方法:构建策略网络,输出动作的概率分布。
update 方法:更新策略网络和价值网络的参数。
choose_action 方法:根据当前状态选择动作。
get_v 方法:获取当前状态的价值。
Worker 类:每个工作线程负责与环境交互,收集经验数据并将其存储到队列中。
训练循环
多个工作线程并行运行,收集经验数据。当队列中的数据达到一定数量时,全局的 PPO 网络从队列中取出数据进行更新。
代码示例
# 初始化全局 PPO 网络
GLOBAL_PPO = PPO()
# 创建事件对象
UPDATE_EVENT, ROLLING_EVENT = threading.Event(), threading.Event()
UPDATE_EVENT.clear() # 初始时不进行更新
ROLLING_EVENT.set() # 开始收集数据
# 创建工作线程
workers = [Worker(wid=i) for i in range(N_WORKER)]
# 初始化全局计数器和奖励记录
GLOBAL_UPDATE_COUNTER, GLOBAL_EP = 0, 0
GLOBAL_RUNNING_R = []
# 创建协调器和队列
COORD = tf.train.Coordinator()
QUEUE = queue.Queue()
# 启动工作线程
threads = []
for worker in workers:
t = threading.Thread(target=worker.work, args=())
t.start()
threads.append(t)
# 启动 PPO 更新线程
threads.append(threading.Thread(target=GLOBAL_PPO.update,))
threads[-1].start()
# 等待所有线程结束
COORD.join(threads)
# 绘制奖励变化曲线并进行测试
plt.plot(np.arange(len(GLOBAL_RUNNING_R)), GLOBAL_RUNNING_R)
plt.xlabel('Episode'); plt.ylabel('Moving reward'); plt.ion(); plt.show()
env = gym.make('Pendulum-v1')
while True:
s = env.reset()for t in range(300):
env.render()
s = env.step(GLOBAL_PPO.choose_action(s))[0]
simply_PPO(简单近端策略优化)
算法概述
simply_PPO 是一种简化的近端策略优化算法,用于处理连续动作空间。它使用一个策略网络和一个价值网络,通过裁剪代理目标函数来更新策略网络的参数。
代码结构与功能
超参数设置:定义了训练的最大回合数、每回合的最大步数、学习率、折扣因子等。
PPO 类:_build_anet 方法:构建策略网络,输出动作的概率分布。
update 方法:更新策略网络和价值网络的参数。
choose_action 方法:根据当前状态选择动作。
get_v 方法:获取当前状态的价值。
训练循环
在每个回合中,智能体与环境交互,收集经验数据并存储到缓冲区中。当缓冲区中的数据达到一定数量时,开始从缓冲区中采样数据进行训练。
代码示例
# 初始化环境
env = gym.make('Pendulum-v1').unwrapped
# 初始化 PPO 网络
ppo = PPO()
# 记录所有回合的奖励
all_ep_r = []
# 训练循环
for ep in range(EP_MAX):
s = env.reset()
buffer_s, buffer_a, buffer_r = [], [], []
ep_r = 0for t in range(EP_LEN):
env.render()# 选择动作
a = ppo.choose_action(s)
s_, r, done, _ = env.step(a)# 存储经验数据
buffer_s.append(s)
buffer_a.append(a)
buffer_r.append((r + 8) / 8) # 归一化奖励
s = s_
ep_r += r# 更新 PPO 网络if (t + 1) % BATCH == 0 or t == EP_LEN - 1:
v_s_ = ppo.get_v(s_)
discounted_r = []for r in buffer_r[::-1]:
v_s_ = r + GAMMA * v_s_
discounted_r.append(v_s_)
discounted_r.reverse()
bs, ba, br = np.vstack(buffer_s), np.vstack(buffer_a), np.array(discounted_r)[:, np.newaxis]
buffer_s, buffer_a, buffer_r = [], [], []
ppo.update(bs, ba, br)if ep == 0:
all_ep_r.append(ep_r)else:
all_ep_r.append(all_ep_r[-1] * 0.9 + ep_r * 0.1)print('Ep: %i' % ep,"|Ep_r: %i" % ep_r,("|Lam: %.4f" % METHOD['lam']) if METHOD['name'] == 'kl_pen' else '',)
# 绘制奖励变化曲线
plt.plot(np.arange(len(all_ep_r)), all_ep_r)
plt.xlabel('Episode'); plt.ylabel('Moving averaged episode reward'); plt.show()
discrete_DPPO(离散动作的分布式近端策略优化)
算法概述
discrete_DPPO 是一种适用于离散动作空间的分布式近端策略优化算法。它通过多个并行的工作线程收集经验数据,然后由一个全局的 PPO 网络进行更新。
代码结构与功能
超参数设置:定义了训练的最大回合数、每回合的最大步数、学习率、折扣因子等。
PPONet 类:_build_anet 方法:构建策略网络,输出动作的概率分布。
update 方法:更新策略网络和价值网络的参数。
choose_action 方法:根据当前状态选择动作。
get_v 方法:获取当前状态的价值。
Worker 类:每个工作线程负责与环境交互,收集经验数据并将其存储到队列中。
训练循环
多个工作线程并行运行,收集经验数据。当队列中的数据达到一定数量时,全局的 PPO 网络从队列中取出数据进行更新。
代码示例
# 初始化全局 PPO 网络
GLOBAL_PPO = PPONet()
# 创建事件对象
UPDATE_EVENT, ROLLING_EVENT = threading.Event(), threading.Event()
UPDATE_EVENT.clear() # 初始时不进行更新
ROLLING_EVENT.set() # 开始收集数据
# 创建工作线程
workers = [Worker(wid=i) for i in range(N_WORKER)]
# 初始化全局计数器和奖励记录
GLOBAL_UPDATE_COUNTER, GLOBAL_EP = 0, 0
GLOBAL_RUNNING_R = []
# 创建协调器和队列
COORD = tf.train.Coordinator()
QUEUE = queue.Queue()
# 启动工作线程
threads = []
for worker in workers:
t = threading.Thread(target=worker.work, args=())
t.start()
threads.append(t)
# 启动 PPO 更新线程
threads.append(threading.Thread(target=GLOBAL_PPO.update,))
threads[-1].start()
# 等待所有线程结束
COORD.join(threads)
# 绘制奖励变化曲线并进行测试
plt.plot(np.arange(len(GLOBAL_RUNNING_R)), GLOBAL_RUNNING_R)
plt.xlabel('Episode'); plt.ylabel('Moving reward'); plt.ion(); plt.show()
env = gym.make('CartPole-v0')
while True:
s = env.reset()for t in range(1000):
env.render()
s, r, done, info = env.step(GLOBAL_PPO.choose_action(s))if done:break