欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 维修 > 强化学习的常用策略浅析

强化学习的常用策略浅析

2025/3/26 15:56:53 来源:https://blog.csdn.net/yuange1666/article/details/146462109  浏览:    关键词:强化学习的常用策略浅析

目录

DDPG(深度确定性策略梯度)

DPPO(分布式近端策略优化)

simply_PPO(简单近端策略优化)

discrete_DPPO(离散动作的分布式近端策略优化)


DDPG(深度确定性策略梯度)

算法概述

DDPG 是一种适用于连续动作空间的无模型强化学习算法,结合了深度 Q 网络(DQN)和确定性策略梯度(DPG)的思想。它使用两个神经网络:一个是策略网络(Actor),用于生成动作;另一个是价值网络(Critic),用于评估动作的价值。

代码结构与功能

超参数设置:定义了训练的最大回合数、每回合的最大步数、学习率、折扣因子等。

Actor 类:_build_net 方法:构建策略网络,输出确定性动作。

learn 方法:更新策略网络的参数,并定期更新目标网络。

choose_action 方法:根据当前状态选择动作。

add_grad_to_graph 方法:将 Critic 网络计算的梯度添加到策略网络的更新中。

Critic 类:_build_net 方法:构建价值网络,输出状态 - 动作对的价值。

learn 方法:更新价值网络的参数,并定期更新目标网络。

Memory 类:用于存储和采样经验数据。

训练循环

在每个回合中,智能体与环境交互,收集经验数据并存储到内存中。当内存达到一定容量时,开始从内存中采样数据进行训练。

代码示例

# 初始化环境
env = gym.make(ENV_NAME)
env = env.unwrapped
env.seed(1)# 定义状态和动作维度
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
action_bound = env.action_space.high# 创建会话
sess = tf.Session()# 初始化 Actor 和 Critic 网络
actor = Actor(sess, action_dim, action_bound, LR_A, REPLACE_ITER_A)
critic = Critic(sess, state_dim, action_dim, LR_C, GAMMA, REPLACE_ITER_C, actor.a, actor.a_)# 将 Critic 计算的梯度添加到 Actor 网络中
actor.add_grad_to_graph(critic.a_grads)# 初始化全局变量
sess.run(tf.global_variables_initializer())# 创建经验回放内存
M = Memory(MEMORY_CAPACITY, dims=2 * state_dim + action_dim + 1)# 训练循环
for i in range(MAX_EPISODES):
    s = env.reset()
    ep_reward = 0for j in range(MAX_EP_STEPS):if RENDER:
            env.render()# 选择动作并添加探索噪声
        a = actor.choose_action(s)
        a = np.clip(np.random.normal(a, var), -2, 2)
        s_, r, done, info = env.step(a)# 存储经验数据
        M.store_transition(s, a, r / 10, s_)# 开始学习if M.pointer > MEMORY_CAPACITY:
            var *= .9995  # 衰减探索噪声
            b_M = M.sample(BATCH_SIZE)
            b_s = b_M[:, :state_dim]
            b_a = b_M[:, state_dim: state_dim + action_dim]
            b_r = b_M[:, -state_dim - 1: -state_dim]
            b_s_ = b_M[:, -state_dim:]# 更新 Critic 网络
            critic.learn(b_s, b_a, b_r, b_s_)# 更新 Actor 网络
            actor.learn(b_s)
        s = s_
        ep_reward += rif j == MAX_EP_STEPS - 1:print('Episode:', i, ' Reward: %i' % int(ep_reward), 'Explore: %.2f' % var)if ep_reward > -300:
                RENDER = Truebreak

DPPO(分布式近端策略优化)

算法概述

DPPO 是一种基于近端策略优化(PPO)的分布式算法,用于处理连续动作空间。它通过多个并行的工作线程收集经验数据,然后由一个全局的 PPO 网络进行更新。

代码结构与功能

超参数设置:定义了训练的最大回合数、每回合的最大步数、学习率、折扣因子等。

PPO 类:_build_anet 方法:构建策略网络,输出动作的概率分布。

update 方法:更新策略网络和价值网络的参数。

choose_action 方法:根据当前状态选择动作。

get_v 方法:获取当前状态的价值。

Worker 类:每个工作线程负责与环境交互,收集经验数据并将其存储到队列中。

训练循环

多个工作线程并行运行,收集经验数据。当队列中的数据达到一定数量时,全局的 PPO 网络从队列中取出数据进行更新。

代码示例

# 初始化全局 PPO 网络
GLOBAL_PPO = PPO()
# 创建事件对象
UPDATE_EVENT, ROLLING_EVENT = threading.Event(), threading.Event()
UPDATE_EVENT.clear()  # 初始时不进行更新
ROLLING_EVENT.set()  # 开始收集数据
# 创建工作线程
workers = [Worker(wid=i) for i in range(N_WORKER)]
# 初始化全局计数器和奖励记录
GLOBAL_UPDATE_COUNTER, GLOBAL_EP = 0, 0
GLOBAL_RUNNING_R = []
# 创建协调器和队列
COORD = tf.train.Coordinator()
QUEUE = queue.Queue()
# 启动工作线程
threads = []
for worker in workers:
    t = threading.Thread(target=worker.work, args=())
    t.start()
    threads.append(t)
# 启动 PPO 更新线程
threads.append(threading.Thread(target=GLOBAL_PPO.update,))
threads[-1].start()
# 等待所有线程结束
COORD.join(threads)
# 绘制奖励变化曲线并进行测试
plt.plot(np.arange(len(GLOBAL_RUNNING_R)), GLOBAL_RUNNING_R)
plt.xlabel('Episode'); plt.ylabel('Moving reward'); plt.ion(); plt.show()
env = gym.make('Pendulum-v1')
while True:
    s = env.reset()for t in range(300):
        env.render()
        s = env.step(GLOBAL_PPO.choose_action(s))[0]

simply_PPO(简单近端策略优化)

算法概述

simply_PPO 是一种简化的近端策略优化算法,用于处理连续动作空间。它使用一个策略网络和一个价值网络,通过裁剪代理目标函数来更新策略网络的参数。

代码结构与功能

超参数设置:定义了训练的最大回合数、每回合的最大步数、学习率、折扣因子等。

PPO 类:_build_anet 方法:构建策略网络,输出动作的概率分布。

update 方法:更新策略网络和价值网络的参数。

choose_action 方法:根据当前状态选择动作。

get_v 方法:获取当前状态的价值。

训练循环

在每个回合中,智能体与环境交互,收集经验数据并存储到缓冲区中。当缓冲区中的数据达到一定数量时,开始从缓冲区中采样数据进行训练。

代码示例

# 初始化环境
env = gym.make('Pendulum-v1').unwrapped
# 初始化 PPO 网络
ppo = PPO()
# 记录所有回合的奖励
all_ep_r = []
# 训练循环
for ep in range(EP_MAX):
    s = env.reset()
    buffer_s, buffer_a, buffer_r = [], [], []
    ep_r = 0for t in range(EP_LEN):
        env.render()# 选择动作
        a = ppo.choose_action(s)
        s_, r, done, _ = env.step(a)# 存储经验数据
        buffer_s.append(s)
        buffer_a.append(a)
        buffer_r.append((r + 8) / 8)  # 归一化奖励
        s = s_
        ep_r += r# 更新 PPO 网络if (t + 1) % BATCH == 0 or t == EP_LEN - 1:
            v_s_ = ppo.get_v(s_)
            discounted_r = []for r in buffer_r[::-1]:
                v_s_ = r + GAMMA * v_s_
                discounted_r.append(v_s_)
            discounted_r.reverse()
            bs, ba, br = np.vstack(buffer_s), np.vstack(buffer_a), np.array(discounted_r)[:, np.newaxis]
            buffer_s, buffer_a, buffer_r = [], [], []
            ppo.update(bs, ba, br)if ep == 0:
        all_ep_r.append(ep_r)else:
        all_ep_r.append(all_ep_r[-1] * 0.9 + ep_r * 0.1)print('Ep: %i' % ep,"|Ep_r: %i" % ep_r,("|Lam: %.4f" % METHOD['lam']) if METHOD['name'] == 'kl_pen' else '',)
# 绘制奖励变化曲线
plt.plot(np.arange(len(all_ep_r)), all_ep_r)
plt.xlabel('Episode'); plt.ylabel('Moving averaged episode reward'); plt.show()

discrete_DPPO(离散动作的分布式近端策略优化)

算法概述

discrete_DPPO 是一种适用于离散动作空间的分布式近端策略优化算法。它通过多个并行的工作线程收集经验数据,然后由一个全局的 PPO 网络进行更新。

代码结构与功能

超参数设置:定义了训练的最大回合数、每回合的最大步数、学习率、折扣因子等。

PPONet 类:_build_anet 方法:构建策略网络,输出动作的概率分布。

update 方法:更新策略网络和价值网络的参数。

choose_action 方法:根据当前状态选择动作。

get_v 方法:获取当前状态的价值。

Worker 类:每个工作线程负责与环境交互,收集经验数据并将其存储到队列中。

训练循环

多个工作线程并行运行,收集经验数据。当队列中的数据达到一定数量时,全局的 PPO 网络从队列中取出数据进行更新。

代码示例

# 初始化全局 PPO 网络
GLOBAL_PPO = PPONet()
# 创建事件对象
UPDATE_EVENT, ROLLING_EVENT = threading.Event(), threading.Event()
UPDATE_EVENT.clear()  # 初始时不进行更新
ROLLING_EVENT.set()  # 开始收集数据
# 创建工作线程
workers = [Worker(wid=i) for i in range(N_WORKER)]
# 初始化全局计数器和奖励记录
GLOBAL_UPDATE_COUNTER, GLOBAL_EP = 0, 0
GLOBAL_RUNNING_R = []
# 创建协调器和队列
COORD = tf.train.Coordinator()
QUEUE = queue.Queue()
# 启动工作线程
threads = []
for worker in workers:
    t = threading.Thread(target=worker.work, args=())
    t.start()
    threads.append(t)
# 启动 PPO 更新线程
threads.append(threading.Thread(target=GLOBAL_PPO.update,))
threads[-1].start()
# 等待所有线程结束
COORD.join(threads)
# 绘制奖励变化曲线并进行测试
plt.plot(np.arange(len(GLOBAL_RUNNING_R)), GLOBAL_RUNNING_R)
plt.xlabel('Episode'); plt.ylabel('Moving reward'); plt.ion(); plt.show()
env = gym.make('CartPole-v0')
while True:
    s = env.reset()for t in range(1000):
        env.render()
        s, r, done, info = env.step(GLOBAL_PPO.choose_action(s))if done:break

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词