加载中...

「Gym 课程笔记 05」经典控制 - Mountain Car 山地车(连续动作)


0x00 问题描述

Mountain Car ContinuousMountain Car 是一样的题型,区别在于 Mountain Car Continuous 的动作空间是连续的,而 DQN 算法也不再适用于连续动作问题。

0x10 问题解读

Mountain Car Continuous 页面的描述中,可以得到以下信息:

0x11 环境说明

首先看到这个这个表,它的含义是:

  1. 在 python 通过以下语句可以创建 MountainCarContinuous(版本 v0)的预设环境:
    • import gymnasium
    • env = gymnasium.make("MountainCarContinuous-v0")
  2. 而在这个预设环境中:
    • 执行 env.action_space 可以得到动作空间(Action Space)为 Box(-1.0, 1.0, (1,), float32)
    • 执行 env.observation_space 可以得到观测空间(Observation Space)为 Box([-1.2 -0.07], [0.6 0.07], (2,), float32)

0x12 动作空间

文档中给出的动作空间为 [-1.0, 1.0] 的连续空间,表示施加在小车上的力,正值表示向着目标的方向、负值表示背向目标的方向。

这是目前为止首次遇到的连续动作空间问题,之前使用的 DQN 算法并不适用此问题,因此下文会换一种解题方法

0x13 观测空间

观测空间与 Mountain Car 是一样的,observation_space 的 2 个独立的观察值分别代表:

  • 0: Cart Position: 小车位置,即 x 坐标轴的范围
  • 1: Cart Velocity: 小车速度

0x14 奖励

目标是尽快到达右侧山顶的旗帜,因此在结束回合前,它的每一步都会受到惩罚值、只有当到达旗帜时才会收到 +100 的奖励值。

可以不用关注默认的奖励,因为默认的奖励方案并不适用;需要按照 Mountain Car 的奖励方案重新设计

0x15 初始状态

初始状态中,小车位置为 [-0.6, -0.4] 之间的随机值,小车速度总是为 0。

0x16 回合终止

如果发生以下情况之一,则回合终止:

  • 终止: 小车位置 >= 0.45 (到达右侧山顶的旗帜)
  • 中止: 回合步数大于 999

训练时不建议使用 999,因为事实上 200 步以内已经可以完成,999 不但训练慢、还可能会导致过度训练。

0x20 解题过程

但是前面已经提过, DQN 算法并不适用连续动作空间的问题:

这是因为在 DQN 中、智能体可以在给定有限的离散空间中选择一个动作,然后学习一个动作价值函数(Q 函数),为每个状态-动作对赋予一个价值,通过比较和优化有限的动作产生的 Q 值、指导智能体做出决策。

但在连续动作空间中,动作的可能性是无限的,这使得直接使用 DQN 进行动作选择变得不可行。对于连续动作空间,需要一种能够生成连续动作值的方法,而不是从预先定义的动作集合中选择。

0x21 算法选型

早在 Acrobot 的笔记中我们就提到过,基于贪婪策略的 Q-learning 系列算法中,针对 “离散” 和 “连续” 的问题有不同的适用算法:

状态空间 动作空间 算法 备注
离散 离散 Q-learning 基础算法,使用 Q-table 存储每种状态动作所对应的 Q 值估计
连续 离散 DQN 为了处理高维的连续状态空间,使用深度神经网络近似处理 Q-table
连续 连续 DDPG 结合了策略梯度方法和 Q-learning 的思想,使用 Actor-Critic 框架:其中 Actor 负责生成动作,Critic 评估这些动作
连续 连续 TD3 引入双 Q 学习和延迟策略更新,解决 DDPG 中过估计动作值的问题

在这里我选择了 TD3 算法。

0x22 构建 Actor 网络模型

Actor 负责生成动作,其代码为:

class Actor(nn.Module):
    def __init__(self, obs_size, action_size, max_action):
        super(Actor, self).__init__()
        self.layer1 = nn.Linear(obs_size, 400) 
        self.layer2 = nn.Linear(400, 300)
        self.layer3 = nn.Linear(300, action_size) 
        self.max_action = max_action

    def forward(self, x):
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))
        x = self.max_action * torch.tanh(self.layer3(x))
        return x

比对之前的训练代码可知,Actor 的神经网络模型和 DQN 的神经网络模型几乎毫无差别:

  1. 网络模型输入为状态张量 obs_size,输出为动作张量 action_size
  2. 唯一的区别是前向传播函数 forward 的最后一步做了从“离散”映射到“连续”的处理

forward 的最后一步很关键:

  1. 通过 tanh 函数把网络输出的离散值归一化,落在 [-1, 1] 的标准区间
  2. 然后乘以实际动作空间的上限值 max_action,把标准区间映射回去实际的动作区间

tanh 对应数学上三角函数的 tan

0x23 构建 Critic 网络模型

Critic 评估 Actor 所生成的动作,其代码为:

class Critic(nn.Module):

    def __init__(self, obs_size, act_size):
        super(Critic, self).__init__()

        # 第一个 Critic 网络
        self.layer1 = nn.Linear(obs_size + act_size, 400)
        self.layer2 = nn.Linear(400, 300)
        self.layer3 = nn.Linear(300, 1)

        # 第二个 Critic 网络
        self.layer4 = nn.Linear(obs_size + act_size, 400)
        self.layer5 = nn.Linear(400, 300)
        self.layer6 = nn.Linear(300, 1)


    # x: obs_batch 批量的状态张量,形状为 [batch_size, obs_size], 如 32x2
    # u: act_batch 批量的动作张量,形状为 [batch_size, act_size], 如 32x1
    # “批量”指从【经验回放存储】中每次所取出用于 TD3 学习的经验数量
    def forward(self, x, u) :

        # 表示把两个张量的第 1 维拼接在一起形成一个新的张量
        # 即 xu = [ obs_size + act_size ]
        xu = torch.cat([x, u], 1)

        # 第一个 Critic 网络的前向传播
        x1 = F.relu(self.layer1(xu))
        x1 = F.relu(self.layer2(x1))
        q1 = self.layer3(x1)

        # 第二个 Critic 网络的前向传播
        x2 = F.relu(self.layer4(xu))
        x2 = F.relu(self.layer5(x2))
        q2 = self.layer6(x2)
        return q1, q2


    # 使用 Q1 网络更新 Actor 的 Q 值估计
    def Q1(self, x, u):
        xu = torch.cat([x, u], 1)
        x1 = F.relu(self.layer1(xu))
        x1 = F.relu(self.layer2(x1))
        q1 = self.layer3(x1)
        return q1

在 TD3 算法中,Critic 模型中实际定义了两个网络。

这两个网络结构完全相同,但是它们各自独立地学习和更新,以便为同一“状态-动作对”提供两个略有不同的价值估计,其作用是:

  1. 减少过度估计:采用两个网络的最小值作为目标 Q 值,减少 Q 值的过度估计倾向(过度估计会导致策略评估不准确、影响学习性能)
  2. 提高稳定性:两个独立的价值估计可以增加学习过程的稳定性(防止算法过于依赖于某个可能不准确的单一价值估计)

在这个问题中,obs_size 即观测空间的状态数量,即小车的位置和速度、共 2 个状态; act_size 即动作空间的动作数量,即施加在小车上的力、共 1 个动作。

nn.Linear(obs_size + act_size, 400) 表示 Critic 网络串接了状态和动作(2 + 1 = 3)、第一层网络形状为 3 x 400,即接受输入的张量形状为 k x 3

nn.Linear(300, 1) 表示最后一层网络形状为 300 x 1、即会输出 1 个评估值,就是对 k x 3 的 Q 价值评估。

0x24 动作噪音

在 TD3 算法中,选择下一步动作 action 后,需要为其添加噪音以平滑目标策略:

def add_noise(targs: TrainArgs, action, noise) :
    '''
    为 action 添加噪音:TD3 通过在选取的动作上添加噪声来平滑目标策略
    :params: targs 用于训练的环境和模型关键参数
    :params: action 动作张量        array or tensor
    :params: noise 噪音张量         array or tensor
    :return: 添加噪音的动作张量      tensor (维度和形状 与 action和noise 一致)
    '''
    min_action = torch.tensor(targs.env.action_space.low, device=targs.device, dtype=torch.float)
    max_action = torch.tensor(targs.env.action_space.high, device=targs.device, dtype=torch.float)
    action = (action + noise).clip(min_action, max_action) # 确保 action + noise 依然在动作空间的取值范围内
    return action

通过添加噪声,TD3 能更有效地探索环境,避免过早收敛到局部最优,保持策略的泛化能力。

通俗来说,就是使得智能体对输入的小变化不那么敏感,从而提高学习过程的稳定性。

前期在 Actor 上加以一定噪声可以鼓励对环境探索,但噪声对于训练后期算法的收敛是不利的。随着训练的进行逐步减小噪声是一个不错的选择。

0x25 TD3 算法

和 DQN 类似,在结束训练的每个回合后,TD3 都需要从经验回放存储中抽取一批历史样本进行学习:

def td3(targs: TrainArgs, total_loss, step_counter) :
    '''
    进行 TD3 学习(基于 Q 值的强化学习方法):
        这个过程是 TD3 学习算法的核心,它利用从环境中收集的经验来不断调整和优化网络,使得预测的 Q 值尽可能接近实际的 Q 值。
        通过迭代这个过程,使得神经网络逐渐学习到一个策略,该策略可以最大化累积奖励。
    :params: targs 用于训练的环境和模型关键参数
    :params: action 下一步动作
    :params: total_loss 累积损失
    :params: step_counter 步数计数器
    :return: 执行动作后观测空间返回的状态
    '''

    # 确保只有当【经验回放存储】中的样本数量超过批处理大小时,才进行学习过程
    # 这是为了确保有足够的样本来进行有效的批量学习
    if len(targs.memory) <= targs.batch_size :
        return total_loss

    # ===============================
    # 准备批量数据
    # ===============================
    # 从【经验回放存储】中随机抽取 batch_size 个样本数
    # 这种随机抽样是为了减少样本间的相关性,增强学习的稳定性和效率
    transitions = random.sample(targs.memory, targs.batch_size)

    # 解压 transitions 到单独的批次
    batch = Transition(*zip(*transitions))

    # 将每个样本的组成部分 (obs, action, reward, next_obs, done) ,拆分转换为独立的批次
    obs_batch = down_dim(torch.stack(batch.obs).to(targs.device))
    act_batch = torch.tensor(np.array(batch.action), dtype=torch.float, device=targs.device)
    reward_batch = up_dim(torch.tensor(batch.reward, dtype=torch.float, device=targs.device))
    next_obs_batch = down_dim(torch.stack(batch.next_obs).to(targs.device))
    done_batch = up_dim(torch.tensor(batch.done, dtype=torch.float, device=targs.device))

    # print(f"obs_batch: {obs_batch}")              # tensor 32x2
    # print(f"act_batch: {act_batch}")              # tensor 32x1
    # print(f"reward_batch: {reward_batch}")        # tensor 32x1
    # print(f"next_obs_batch: {next_obs_batch}")    # tensor 32x2
    # print(f"done_batch: {done_batch}")            # tensor 32x1

    # ===============================
    # 更新 Critic 网络
    # ===============================
    # 计算下一个状态的最大预测 Q 值
    with torch.no_grad():
        next_act_batch = targs.target_actor_model(next_obs_batch)  # tensor 32x1
        noise_batch = (torch.randn_like(next_act_batch) * targs.noise).clip(-targs.noise_limit, targs.noise_limit) 
        next_act_batch = add_noise(targs, next_act_batch, noise_batch)   # tensor 32x1

        # next_obs_batch 必须形状 tensor 32x2
        # next_act_batch 必须形状 tensor 32x1
        target_Q1, target_Q2 = targs.target_critic_model(next_obs_batch, next_act_batch)
        target_Q = torch.min(target_Q1, target_Q2)
        target_Q_values = reward_batch + (1 - done_batch) * targs.gamma * target_Q


    # 获得当前状态下的 Q 值(对当前状态的观测进行前向传播的结果,用于计算损失)
    # obs_batch 必须形状 tensor 32x2
    # act_batch 必须形状 tensor 32x1
    current_Q1, current_Q2 = targs.critic_model(obs_batch, act_batch)
    critic_loss = F.mse_loss(current_Q1, target_Q_values) + F.mse_loss(current_Q2, target_Q_values)
    total_loss += critic_loss.item()    # 更新累积损失
    optimize_params(targs.critic_model, targs.critic_optimizer, critic_loss)    # 参数优化


    # ===============================
    # 延迟更新 Actor 网络
    # ===============================
    # 在 TD3 中,Actor 网络的更新频率较低(例如,每 2 次 Critic 更新后更新一次 Actor),以减少策略的方差。
    if step_counter % targs.update_action_every == 0 :
        actor_loss = -targs.critic_model.Q1(obs_batch, targs.actor_model(obs_batch)).mean()
        total_loss += actor_loss.item()     # 更新累积损失
        optimize_params(targs.actor_model, targs.actor_optimizer, actor_loss)   # 参数优化

    return total_loss

这个过程和 DQN 十分相似,区别在于两点:

  1. 需要更新两个 Critic 网络,亦即前面提到的 “双 Q 学习”
  2. Actor 网络只会在隔几回合才更新一次,亦即前面提到的 “延迟策略更新”

具体实现可参考注释或 Github 源码,过程中需要注意各个张量的形状,通过升维或降维使其与定义的网络模型输入一致。

0x26 奖励重塑

奖励机制的思路和离散型的 Mountain Car 一样,把奖励函数照搬过来即可:

  • 远离旗帜的初期激励:在小车向远离目标的方向移动时,尤其是当它移动到起点左侧较远的地方时,可以给予小的正奖励(鼓励初期的 “借力” 行为)。
  • 向目标靠近时的增强激励:当小车开始向目标方向移动并接近目标时,逐渐增加奖励的数额,以强化接近旗帜的行为。
  • 停滞惩罚:如果小车的速度太低(接近于零),这意味着小车可能正处于停滞状态,此时可以给予一个小的负奖励。

0x30 解题结果

其实本题重点在于换一个支持连续动作空间的算法模型 TD3 。

剩下的问题都已经在离散型问题中解决过了,直接训练即可解题:


文章作者: EXP
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 EXP !
 上一篇
「Gym 课程笔记 06」经典控制 - Pendulum 摆锤 「Gym 课程笔记 06」经典控制 - Pendulum 摆锤
Pendulum 是控制一个倒立摆在抵抗重力作用的前提下,尽快摆动到垂直向上的位置、并尽量保持在这个状态下最多的步数。
2024-03-26
下一篇 
「Gym 课程笔记 04」经典控制 - Mountain Car 山地车(离散动作) 「Gym 课程笔记 04」经典控制 - Mountain Car 山地车(离散动作)
Mountain Car(离散)是控制一个无法直接攀登陡峭山坡的小车,必须利用山坡的反向坡度来获得足够的动量,使其尽快到达山顶。
2024-03-04
  目录