运行官方代码库中提供的Colab代码:vision-based environment(二)(3)
- 九、类 `PushTImageEnv`,继承自`PushTEnv`
- 九.1 `def __init__()`
- 九.2 `def _get_obs()`
- 九.3 `def render()`
- 总结大函数和大类意义
- 十、Env Demo
- 总结
- 十一、函数 `create_sample_indices`
- 总体说明
- 十二、函数`sample_sequence`
- 总体说明
- 十三、函数`get_data_stats`
- 总体说明
- 十四、函数`normalize_data`
- 总体说明
- 十五、函数`unnormalize_data`
- 总体说明
- 设计原因
官方项目地址:https://diffusion-policy.cs.columbia.edu/
Colab代码:vision-based environment
九、类 PushTImageEnv
,继承自PushTEnv
class PushTImageEnv(PushTEnv):
- 作用:定义一个名为
PushTImageEnv
的新类,并继承自已有的PushTEnv
类。 - 意义:此类扩展了 PushTEnv 环境,使得环境的观察输出变为图像信息(RGB 数组)及 agent 位置,而不再使用原始的数值状态。
- 示例:假设 PushTEnv 提供了物理仿真和基本控制,而此类则专注于基于图像的输入输出。
metadata = {"render.modes": ["rgb_array"], "video.frames_per_second": 10}
- 作用:为此环境定义元数据,指定渲染模式仅支持 “rgb_array”,并设置视频帧率为 10 帧/秒。
- 意义:由于该环境主要输出图像数据,所以渲染模式不再包含 “human”(显示窗口);同时,帧率控制有助于与控制频率匹配。
- 示例:调用 render(mode=“rgb_array”) 后将返回一个 RGB 数组图像,而非在屏幕显示。
九.1 def __init__()
def __init__(self,legacy=False,block_cog=None,damping=None,render_size=96):
- 作用:定义构造函数,初始化 PushTImageEnv 实例。
- 参数说明:
legacy
(bool):是否采用旧版状态设置方式,默认 False。block_cog
:指定积木(block)的重心;例如可传 (260,300) 坐标。damping
:仿真空间的阻尼系数,如 0.9;若为 None 则保持默认。render_size
(int):渲染图像大小,默认为 96,表示输出图像尺寸为 96×96 像素。
- 意义:这些参数允许使用者调整环境的初始状态和渲染细节,同时保持与父类 PushTEnv 的接口一致。
super().__init__(legacy=legacy,block_cog=block_cog,damping=damping,render_size=render_size,render_action=False)
- 作用:调用父类
PushTEnv
的构造函数,传递部分参数,并将render_action
设置为 False。 - 意义:
- 保证继承父类的所有初始化设置(例如窗口尺寸、物理空间设置、agent 与 block 的添加等)。
- 将
render_action
设为 False,表示在图像模式下不显示动作标记(后续可以手动绘制,但默认不渲染动作标记)。
- 示例:若调用时 legacy=False, block_cog=(260,300), damping=0.9, render_size=96,则父类完成基本环境构建后,本类在此基础上进行图像化扩展。
ws = self.window_size
- 作用:将父类中定义的窗口尺寸(例如 512)赋值给局部变量
ws
。 - 意义:方便后续定义观察空间时使用窗口尺寸上限。
- 示例:若 self.window_size 为 512,则 ws=512。
self.observation_space = spaces.Dict({
- 作用:重新定义环境的观察空间为一个字典,包含多个部分。
- 意义:与 PushTEnv 中单一的 Box 状态不同,此处观察空间包含图像和 agent 的位置两部分,更适合图像输入的学习任务。
'image': spaces.Box(low=0,high=1,shape=(3,render_size,render_size),dtype=np.float32),
- 作用:定义字典中的 ‘image’ 部分,观察值为一个 Box 空间。
- 参数说明:
low=0, high=1
:像素值归一化到 [0,1]。shape=(3, render_size, render_size)
:3 通道(RGB)的图像,尺寸为 (3, 96, 96)(若 render_size=96)。dtype=np.float32
:数据类型为浮点型。
- 示例:输出图像形状为 (3,96,96),每个像素数值介于 0 到 1 之间。
'agent_pos': spaces.Box(low=0,high=ws,shape=(2,),dtype=np.float32)})
- 作用:定义字典中的 ‘agent_pos’ 部分,观察值为 agent 的二维位置。
- 参数说明:
low=0, high=ws
:位置数值范围从 0 到 ws(如 512)。shape=(2,)
:二维坐标,例如 (x,y)。dtype=np.float32
:数据类型为浮点型。
- 示例:agent_pos 的值可能为 [256.0, 400.0]。
self.render_cache = None
- 作用:初始化渲染缓存变量为 None,用于存储最新渲染得到的图像。
- 意义:避免每次调用 render() 都重复渲染;如果已有缓存则直接返回,加快渲染速度。
九.2 def _get_obs()
def _get_obs(self):
- 作用:定义内部方法 _get_obs,用于获取当前环境的观察值。
- 意义:覆盖父类 _get_obs 方法,使得观察值包含图像数据和 agent 位置,符合新的 observation_space 定义。
img = super()._render_frame(mode='rgb_array')
- 作用:调用父类的 _render_frame 方法,以 “rgb_array” 模式渲染当前物理场景,得到图像。
- 示例:返回一个 NumPy 数组 img,形状可能为 (512,512,3),代表原始渲染的图像(窗口尺寸为 512×512 像素)。
- 在大多数图像处理中,标准做法是将图像数据组织成 (高度, 宽度, 通道) 的格式。例如,(512,512,3) 表示 512 行像素,每行 512 个像素,每个像素由 3 个颜色分量(红、绿、蓝)组成。
pygame.surfarray.pixels3d(canvas)
时,Pygame 会将 canvas(通常是一个 512×512 的 Surface)中的像素数据转换为一个三维 NumPy 数组。
agent_pos = np.array(self.agent.position)
- 作用:将 agent 的位置转换为 NumPy 数组。
- 示例:若 self.agent.position 为 (256, 400),则 agent_pos = np.array([256,400])。
img_obs = np.moveaxis(img.astype(np.float32) / 255, -1, 0)
- 作用:
- 将 img 数据类型转换为 float32,并归一化到 [0,1](除以 255)。
- 使用 np.moveaxis 将图像数组的通道轴从最后一维移到第一维。
- 意义:
- 归一化后适合神经网络输入;
- 将形状从 (height, width, 3) 转换为 (3, height, width),符合大多数卷积网络的输入要求。
- 示例:若 img 原始形状 (512,512,3),归一化后再 moveaxis 后,img_obs 的形状为 (3,512,512)。但由于后续 render_size 缩放,最终输入的图像尺寸可能与 render_size 相关。
obs = {'image': img_obs,'agent_pos': agent_pos}
- 作用:构造观察字典 obs,包含图像和 agent 位置。
- 示例:obs = { ‘image’: (3,512,512) 数组, ‘agent_pos’: np.array([256,400]) }。
- 意义:使观察值符合 observation_space 的定义。
# draw actionif self.latest_action is not None:
- 作用:检查是否有最新动作记录,如果有则在图像上绘制动作标记。
- 意义:提供视觉反馈,帮助调试或观察动作信息。
action = np.array(self.latest_action)
- 作用:将最新动作转换为 NumPy 数组。
- 示例:若 latest_action 为 [300,350],则 action = np.array([300,350])。
coord = (action / 512 * 96).astype(np.int32)
- 作用:
- 将动作坐标从原始窗口尺寸(512)缩放到渲染图像尺寸(96)的比例。
- 计算公式:coord = action / 512 * 96,最后转换为整数坐标。
- 示例:若 action = [256,256],则 coord = [256/51296, 256/51296] = [48,48]。
marker_size = int(8/96*self.render_size)
- 作用:计算动作标记的大小。
- 示例:render_size 默认为 96,则 marker_size = int(8/96*96) = 8。
thickness = int(1/96*self.render_size)
- 作用:计算动作标记绘制时的线宽。
- 示例:同样,若 render_size 为 96,则 thickness = int(1/96*96) = 1。
cv2.drawMarker(img, coord,color=(255,0,0), markerType=cv2.MARKER_CROSS,markerSize=marker_size, thickness=thickness)
- 作用:使用 OpenCV 在原始图像 img 上绘制一个红色(255,0,0)的十字标记,标记位置为 coord。
- 意义:直观显示当前动作目标位置,有助于调试和可视化。
- 示例:在图像中 (48,48) 处绘制一个大小为 8 像素的十字标记,线宽为 1。
self.render_cache = img
- 作用:将最终渲染的图像存入 render_cache,以便后续调用 render() 方法直接返回缓存图像。
- 意义:缓存机制可以避免重复渲染,提高效率。
return obs
- 作用:返回构造好的观察字典 obs。
- 输出:包含归一化且通道轴已调整的图像和 agent 的位置。
九.3 def render()
def render(self, mode):
- 作用:定义 render 方法,接口符合 Gym 环境的要求。
- 参数说明:mode 指定渲染模式,但在此类中仅支持 ‘rgb_array’。
assert mode == 'rgb_array'
- 作用:断言模式必须为 ‘rgb_array’,否则会抛出异常。
- 意义:确保调用者只能请求图像数据形式的输出。
if self.render_cache is None:self._get_obs()
- 作用:如果 render_cache 为空,则调用 _get_obs() 方法刷新渲染图像。
- 意义:保证每次调用 render 时都有最新的渲染结果。
return self.render_cache
- 作用:返回当前存储在 render_cache 中的图像(RGB 数组)。
- 输出:一个 NumPy 数组,形状约为 (512,512,3) 经 cv2.resize 调整为 (96,96,3)(具体取决于 _render_frame 的实现)。
在
_render_frame
方法中,代码首先创建了一个 canvas,大小为 512×512 像素,然后通过 pygame.surfarray 得到的原始图像数据的形状是 (512,512,3)(其中 3 表示 RGB 三个通道)。
- 例如,原始 canvas 的像素数据可能是一个 512×512 的图像,每个像素有 3 个颜色分量,所以形状为 (512,512,3)。
接下来,
_render_frame
中使用了cv2.resize
将这个图像缩放到(render_size, render_size, 3)
的大小。
- 如果
render_size=96
,则最终返回的图像形状为 (96,96,3)。- 这一步的目的是将原始较大图像调整为较小的尺寸,适合用于强化学习算法或快速渲染。
在
_get_obs
方法中,调用了super()._render_frame(mode='rgb_array')
得到的 img 已经是经过cv2.resize
调整后的图像(例如 (96,96,3)),但接下来使用np.moveaxis
对 img 进行通道顺序调整:img_obs = np.moveaxis(img.astype(np.float32) / 255, -1, 0)
- 这一步把图像从 (96,96,3) 变成 (3,96,96),是为了符合大多数神经网络要求“通道优先”(channels-first)的格式。
最后,
_get_obs
将处理后的img_obs
放入观察字典中,而同时将原始 img(形状 (96,96,3))存入render_cache
。当你调用
render(self, mode)
时,代码直接返回self.render_cache
,也就是_render_frame
返回的原始图像(经过cv2.resize
后的),其形状为 (96,96,3)(假设render_size=96
)。总结:
- 原始 canvas 尺寸是 512×512,所以初始获取的像素数据为 (512,512,3)。
- 经过
cv2.resize
后,最终的渲染图像变成 (render_size, render_size, 3),比如 (96,96,3)。- 在
_get_obs
中,为适应网络输入,图像又通过np.moveaxis
变成 (3,96,96)。- render 方法则直接返回
render_cache
,也就是 (96,96,3) 的图像。
总结大函数和大类意义
-
PushTImageEnv 类
- 意义:该类是 PushTEnv 的一个子类,专门用于生成图像作为观察输入。
- 输入:继承父类的参数(legacy、block_cog、damping、render_size),且强制将 render_action 设为 False,避免在图像模式下干扰视觉效果。
- 输出:观察空间为一个 Dict,包含归一化图像(形状为 (3,render_size,render_size))和 agent 位置(二维)。
- 作用:适合于使用视觉信息进行强化学习的任务,使得环境能直接为神经网络提供图像输入,同时保留关键位置数据。
-
_get_obs 方法
- 意义:负责获取当前环境状态,并将物理仿真渲染为图像,同时加入动作标记(如果存在)。
- 细节:调用父类的 _render_frame 得到原始图像,归一化、调整通道顺序后构造观察字典,并在必要时在图像上绘制红色标记。
- 输出:返回包含 ‘image’ 和 ‘agent_pos’ 的字典。
-
render 方法
- 意义:提供标准的 Gym 渲染接口,仅支持 ‘rgb_array’ 模式,直接返回渲染缓存图像。
十、Env Demo
# 0. create env object
env = PushTImageEnv()
- 作用:
- 创建一个
PushTImageEnv
环境实例,并赋值给变量env
。
- 创建一个
- 细节:
- 由于没有传入参数,因此使用构造函数默认值,如 render_size=96、legacy=False 等。
- 示例:
- 假设内部默认 window_size 为 512、render_size 为 96,创建出的 env 对象包含图像观测与 agent 位置信息。
# 1. seed env for initial state.
# Seed 0-200 are used for the demonstration dataset.
env.seed(1000)
- 作用:
- 注释说明接下来将对环境进行种子设置,用于确定初始状态。
- 注释指出演示数据集使用 0 到 200 范围内的种子,但这里示例使用的是 1000。
- 调用环境的
seed
方法,将种子设置为 1000。
- 细节:
- 内部可能使用
np.random.randint
或default_rng(1000)
来初始化随机数生成器。
- 内部可能使用
- 意义:
- 通过种子设置保证实验可重复性,环境内部的随机生成过程将固定下来。
- 示例:
- 若种子 1000,则生成的初始状态、随机位置等均可复现;注意虽然注释说 0-200 用于演示数据,但这里选用 1000,说明用户可以传入任意种子。
# 2. must reset before use
obs, info = env.reset()
- 作用:
- 注释提示在使用环境前必须调用 reset() 方法。
- 调用环境的
reset()
方法,初始化物理仿真和状态,返回初始观察值(obs)和附加信息(info)。
- 细节:
- 例如,obs 可能是一个字典:
'image'
: 一个归一化图像,形状 (3, 96, 96)(RGB 三通道);'agent_pos'
: 一个二维数组,如 [256.0, 400.0] 表示 agent 位置。
- info 中可能包含其他调试信息,如当前速度、接触点数等。
- 例如,obs 可能是一个字典:
- 意义:
- Gym 环境的标准流程要求先调用 reset() 来初始化状态。
- 示例:
- 执行后可能得到:
- obs = { ‘image’: np.array(shape=(3,96,96), dtype=float32), ‘agent_pos’: np.array([256,400], dtype=float32) }
- info = { … }(具体内容依实现而定)。
- 执行后可能得到:
# 3. 2D positional action space [0,512]
action = env.action_space.sample()
- 作用:
- 注释说明动作空间是二维位置,其数值范围为 [0,512]。
- 调用环境的动作空间
action_space
的 sample() 方法,随机生成一个合法的动作。
- 细节:
- 根据之前定义,动作空间是一个 Box(low=[0,0], high=[512,512])。
- 意义:
- 告知用户动作必须是一个二维向量,每个分量在 0 到 512 之间,对应环境的空间范围。
- 示例:
- 可能生成的动作为 np.array([300.5, 350.2]),两个浮点数均在 [0,512] 范围内。
# 4. Standard gym step method
obs, reward, terminated, truncated, info = env.step(action)
- 作用:
- 注释说明接下来将使用 Gym 标准的 step() 方法推进环境仿真一步。
- 调用环境的
step()
方法,传入之前生成的动作,并获取执行一步后的输出。
- 细节:
- 返回值分别为:
obs
: 更新后的观察值(与 reset() 返回格式相同,即字典,包含图像和 agent 位置)。reward
: 一个标量奖励,范围 [0,1]。terminated
: 布尔值,表示任务是否结束(成功)。truncated
: 布尔值,表示是否因时间步限制而中止。info
: 附加信息字典,可能包含调试信息。
- 返回值分别为:
- 意义:
- 标明这是标准强化学习接口的核心步骤:执行动作、获得新状态和奖励等。
- 示例:
- 例如,obs 可能为 { ‘image’: np.array(…), ‘agent_pos’: [260, 410] },reward 为 0.75,terminated 为 False,truncated 为 False,info 包含如接触点数等数据。
# prints and explains each dimension of the observation and action vectors
with np.printoptions(precision=4, suppress=True, threshold=5):
- 作用:
- 注释说明接下来会打印观察和动作向量的各个维度及其说明。
- 使用 NumPy 的 printoptions 设置打印选项:
precision=4
:小数打印保留 4 位;suppress=True
:禁止科学计数法显示;threshold=5
:打印时仅显示最多 5 个元素,超出部分以 … 代替。
- 意义:
- 使输出结果更整洁和易读。
- 示例:
- 若数组为 [256.123456, 400.654321, …],则可能显示为 [256.1235 400.6543 …]。
print("obs['image'].shape:", obs['image'].shape, "float32, [0,1]")print("obs['agent_pos'].shape:", obs['agent_pos'].shape, "float32, [0,512]")print("action.shape: ", action.shape, "float32, [0,512]")
- 作用:
- 打印观察字典中 ‘image’ 部分的形状,数据类型和数值范围说明。
- 打印观察字典中 ‘agent_pos’ 部分的形状和数值范围说明。
- 打印动作向量的形状、数据类型和数值范围。
- 细节:
- 例如,obs[‘image’].shape 可能为 (3, 96, 96);数据类型为 float32;所有数值归一化在 [0,1] 范围。
- 例如,obs[‘agent_pos’] 可能为一个形状为 (2,) 的数组,如 [256, 400];数据类型为 float32,数值在 0 到 512 之间。
- 例如,action.shape 可能为 (2,);说明动作是二维向量;数据类型为 float32,数值范围 [0,512]。
- 意义:
- 帮助用户确认图像数据符合预期格式,便于后续用于神经网络输入。
- 确保 agent 位置数据格式正确,范围与物理环境窗口一致。
- 验证生成的动作符合预期,为后续控制提供正确数据格式。
总结
-
Env Demo 代码总体目的:
- 创建并初始化一个基于图像观察的环境实例 PushTImageEnv;
- 设置随机种子,重置环境获取初始状态;
- 随机生成一个合法动作,并通过 step() 推进环境一步,获取新的观察和奖励;
- 最后打印观察和动作的形状及数据范围说明,帮助用户理解数据格式。
-
大函数和大类的意义:
- PushTImageEnv 类:继承 PushTEnv 并扩展为图像模式,适合视觉强化学习任务。
- seed()、reset()、step()、render():这些都是 Gym 环境的标准接口,保证环境可以与强化学习算法无缝集成。
- 观察空间:包含图像和 agent 位置,满足视觉和位置信息输入需求。
- 打印部分:调试输出,帮助理解环境返回数据的结构和范围。
十一、函数 create_sample_indices
该函数的总体目的是根据各个 episode 的结束索引(episode_ends)生成样本缓冲区索引,这些索引用于从连续数据中抽取长度为 sequence_length 的样本序列,同时支持在序列前后添加 padding 区域。
def create_sample_indices(episode_ends: np.ndarray, sequence_length: int,pad_before: int = 0, pad_after: int = 0):
- 作用:定义函数
create_sample_indices
,它接受 4 个参数。 - 参数说明:
episode_ends
: 一个 NumPy 数组,包含每个 episode 结束的索引。例如,episode_ends = np.array([100, 250, 400])
表示第一个 episode 从起始位置 0 到 100,第二个从 100 到 250,第三个从 250 到 400。sequence_length
: 整数,表示每个样本序列的目标长度。例如,sequence_length = 20
表示希望每个抽取的样本长度为 20 个时间步。pad_before
: 整数,表示允许向序列前“延伸”的步数(负数位置可选填充部分),默认 0。pad_after
: 整数,表示允许向序列后延伸的步数,默认 0。
- 意义:该函数通过遍历每个 episode,根据传入的序列长度及前后 padding 参数,生成一系列索引对(以及在样本内部的偏移量),以便后续从数据缓冲区中抽取样本。
indices = list()
- 作用:创建一个空列表
indices
,用于保存每个生成样本的索引信息。 - 示例:开始时,
indices = []
。 - 意义:后续每个样本对应一个列表项,最终转换为 NumPy 数组返回。
for i in range(len(episode_ends)):
- 作用:对
episode_ends
中的每个 episode(通过索引 i)进行循环处理。 - 示例:若
episode_ends = [100, 250, 400]
,则 i 依次取值 0、1、2。 - 意义:每个 episode 独立处理,从中生成样本索引。
start_idx = 0
- 作用:为当前 episode 设置起始索引为 0,后续可能根据 i 值进行更新。
- 示例:对于 i=0,默认 start_idx=0。
- 意义:确定本 episode 在数据缓冲区中的起始位置。
if i > 0:start_idx = episode_ends[i-1]
- 作用:如果不是第一个 episode(即 i>0),则将 start_idx 设为前一个 episode 的结束索引。
- 示例:
- 当 i = 1 时,
start_idx = episode_ends[0]
,例如 episode_ends[0]=100,则 start_idx=100。 - 当 i = 2 时,start_idx = episode_ends[1],例如 250。
- 当 i = 1 时,
- 意义:保证每个 episode 的数据片段独立,episode i 数据范围为 [start_idx, episode_ends[i])。
end_idx = episode_ends[i]
- 作用:将 end_idx 设为当前 episode 的结束索引。
- 示例:对于 i=0,end_idx = episode_ends[0] = 100;i=1,end_idx=250;i=2,end_idx=400。
- 意义:确定本 episode 的数据区间上界。
episode_length = end_idx - start_idx
- 作用:计算当前 episode 的长度,即数据个数。
- 示例:
- 对于 i=0,episode_length = 100 - 0 = 100。
- 对于 i=1,episode_length = 250 - 100 = 150。
- 对于 i=2,episode_length = 400 - 250 = 150。
- 意义:后续根据 episode 长度确定可抽取样本的起始位置范围。
min_start = -pad_before
- 作用:设定样本序列在 episode 内允许的最小起始偏移量。
- 解释:
- 如果 pad_before 为 0,则 min_start = 0;如果 pad_before > 0,则 min_start 为负数,表示样本可以从 episode 之前开始填充。
- 示例:若 pad_before = 0,则 min_start = 0;若 pad_before = 5,则 min_start = -5。
- 意义:允许在抽取序列时“借用”前面不属于本 episode 的部分,通常用于序列模型的预热。
max_start = episode_length - sequence_length + pad_after
- 作用:设定样本序列在 episode 内允许的最大起始偏移量。
- 计算说明:
- 理想情况(无 padding):最大起始索引为 episode_length - sequence_length,即确保序列长度完整。
- 再加上 pad_after,允许超出 episode 长度后部分填充。
- 示例:
- 若 episode_length = 100,sequence_length = 20,pad_after = 0,则 max_start = 100 - 20 + 0 = 80。
- 若 pad_after = 3,则 max_start = 100 - 20 + 3 = 83。
- 意义:确定在本 episode 中可选的起始偏移范围,确保样本序列最终长度为 sequence_length。
# range stops one idx before endfor idx in range(min_start, max_start+1):
- 作用:
- 提示下一个 for 循环的 range() 将从 min_start 到 max_start(含 max_start),即结束时 idx 最大为 max_start。
- 在当前 episode 内,根据允许的偏移范围,遍历所有可能的起始偏移 idx。
- 示例
- 若 min_start = 0,max_start = 80,则 idx 从 0 到 80;
- 若 min_start = -5,max_start = 83,则 idx 从 -5 到 83(共 83 - (-5) + 1 = 89 个可能值)。
- 意义:
- 明确循环区间的边界处理。
- 为每个可能的起始位置生成一个样本切片。
buffer_start_idx = max(idx, 0) + start_idx
- 作用:确定样本在原始数据缓冲区中实际的起始索引,考虑了 pad_before 的情况。
- 解释:
- 若 idx 为负,则 max(idx,0)=0,此时 buffer_start_idx = 0 + start_idx;表示序列前面缺失部分需要填充。
- 若 idx 非负,则 buffer_start_idx = idx + start_idx。
- 示例:
- 以 i=0,start_idx=0;如果 idx = -3(pad_before=3允许负起始),则 buffer_start_idx = max(-3,0) + 0 = 0。
- 如果 idx = 10,则 buffer_start_idx = 10 + 0 = 10.
- 对于 i=1,start_idx = 100;若 idx = -3,则 buffer_start_idx = 100。
- 意义:确保提取数据时不会索引到 episode 之前的部分,若负数则用填充代替。
buffer_end_idx = min(idx+sequence_length, episode_length) + start_idx
- 作用:确定样本在数据缓冲区中结束的实际索引,考虑 pad_after。
- 解释:
- idx+sequence_length 表示理想情况下样本序列结束的位置(相对 episode 开头)。
- 取 min(…, episode_length) 限制不超过当前 episode 的数据量;超出的部分需要后续 padding。
- 再加上 start_idx 转换为全局缓冲区索引。
- 示例:
- 对于 i=0,start_idx=0;若 idx=10, sequence_length=20,则 idx+sequence_length=30,若 episode_length=100,则 buffer_end_idx = 30 + 0 = 30。
- 若 idx=90,则 idx+sequence_length=110,但 episode_length=100,所以 min(110,100)=100,buffer_end_idx=100。
- 对于 i=1,start_idx=100;若 idx=10,则 buffer_end_idx = min(10+20, episode_length) + 100.
- 意义:保证生成的样本区间不会超出当前 episode 的实际数据范围。
start_offset = buffer_start_idx - (idx+start_idx)
- 作用:计算因 idx 为负(或其他原因)而造成的样本前端空缺量,即填充需要的元素数。
- 解释:
- 理想起始索引应为 idx + start_idx;实际的 buffer_start_idx 取决于 max(idx,0)。
- 差值即为负偏移补偿。
- 示例:
- 若 idx = -3, start_idx = 0,则理想起始应为 -3,但 buffer_start_idx = 0,所以 start_offset = 0 - (-3) = 3;表示前 3 个位置需填充。
- 若 idx = 10,则 start_offset = (10 + start_idx) - (10 + start_idx) = 0。
- 意义:后续用于确定样本在输出缓冲区中填充的位置。
end_offset = (idx+sequence_length+start_idx) - buffer_end_idx
- 作用:计算理想结束索引与实际 buffer_end_idx 之间的差值,表示样本后端缺失的部分。
- 示例:
- 若 idx=90, sequence_length=20, start_idx=0,则理想结束=110,但 buffer_end_idx=100(episode_length限制),则 end_offset = 110 - 100 = 10;表示后面缺少 10 个数据,需要 padding。
- 若 idx=10,则 end_offset = (10+20) - 30 = 0。
- 意义:同 start_offset 一样,用于确定在样本缓冲区中后端填充的位置。
sample_start_idx = 0 + start_offset
- 作用:计算样本数据在输出缓冲区中的起始位置,应考虑 start_offset。
- 解释:
- 如果没有缺失(start_offset=0),则样本从 0 开始;否则,从 start_offset 开始。
- 示例:
- 若 start_offset=3,则 sample_start_idx = 3;若 start_offset=0,则为 0。
- 意义:输出样本将是一个长度为 sequence_length 的数组,其中前 start_offset 个位置可能需要填充(通常为零或特殊标记)。
sample_end_idx = sequence_length - end_offset
- 作用:计算样本数据在输出缓冲区中的结束位置,考虑 end_offset。
- 解释:
- 若没有缺失(end_offset=0),则 sample_end_idx = sequence_length;否则,减去缺失部分。
- 示例:
- 若 sequence_length=20,end_offset=10,则 sample_end_idx = 20 - 10 = 10;表示样本数据只填充到第 10 个位置,其余位置需填充。
- 意义:确定样本数据中实际数据所在的区间,其余部分需要 pad 填充。
indices.append([buffer_start_idx, buffer_end_idx,sample_start_idx, sample_end_idx])
- 作用:将本次循环计算出的四个值作为一个样本索引记录追加到 indices 列表中。
- 具体含义:
buffer_start_idx
: 数据缓冲区中实际开始抽取样本的索引。buffer_end_idx
: 数据缓冲区中实际结束抽取样本的索引。sample_start_idx
: 在目标样本(长度为 sequence_length 的数组)中,实际数据的起始索引(其前面需要 padding)。sample_end_idx
: 在目标样本中,实际数据的结束索引(其后面需要 padding)。
- 示例:
- 若 idx = -3,在 episode 开始时,可能得到 [0, 20, 3, 20](表示前 3 个位置缺失,需要 pad);若 idx=10,则可能得到 [10, 30, 0, 20]。
- 意义:这些索引可以后续用来从一个大的数据缓冲区中提取固定长度的样本,同时在不足的部分进行 padding 操作。
indices = np.array(indices)
- 作用:将存储所有样本索引信息的列表转换为 NumPy 数组,便于后续数值计算与索引操作。
- 示例:
- 如果 indices 列表中有 50 个样本,则转换后的数组形状为 (50, 4)。
- 意义:NumPy 数组在批量操作时效率更高,也是许多深度学习管道的标准格式。
return indices
- 作用:将最终的 indices 数组返回给调用者。
- 输出:一个 NumPy 数组,每行包含 4 个整数,分别表示缓冲区开始、结束索引和样本内部数据开始、结束索引。
总体说明
- 大函数意义:
- 该函数用于在序列数据(通常是一段连续采样的时间序列数据)中,根据每个 episode 的结束位置生成可用于构造定长样本序列的索引信息。
- 输入:
episode_ends
:每个 episode 的结束索引(例如 [100,250,400]),决定每个 episode 的数据范围。sequence_length
:目标样本长度,如 20。pad_before
、pad_after
:允许样本开始或结束处超出 episode 范围的 padding 数量。
- 输出:
- 一个形状为 (N,4) 的 NumPy 数组,其中 N 为所有 episode 中所有可能样本的数目。每行记录了从原始数据中抽取样本的实际索引区间及在目标样本中的有效数据区间。
- 设计原因:
- 在序列建模、时间序列预测或强化学习中,经常需要将长序列数据分割成定长的样本输入,同时允许序列边界处填充缺失数据。
- 此函数通过灵活控制样本起始索引(可负、可正)和计算内部偏移,为后续构造数据批次提供准确的索引信息,从而简化后续数据抽取与 padding 操作。
十二、函数sample_sequence
该函数的总体目的是从训练数据(train_data)中提取一个定长的样本序列,同时对边界部分进行填充(padding)以保证输出长度固定为 sequence_length。训练数据是一个字典,每个键对应一个 NumPy 数组,数组第一维是时间步。函数利用传入的缓冲区索引(buffer_start_idx, buffer_end_idx)确定实际数据区间,再利用 sample_start_idx 和 sample_end_idx 指定数据在定长样本中的位置。
def sample_sequence(train_data, sequence_length,buffer_start_idx, buffer_end_idx,sample_start_idx, sample_end_idx):
- 作用:定义一个名为 sample_sequence 的函数。
- 输入参数:
train_data
: 一个字典,其中每个键对应一个 NumPy 数组,数组第一维表示时间序列数据。- 例如,train_data = { “obs”: array, “act”: array },其中 “obs” 的 shape 为 (N, D),N 是时间步数,D 是观测维度。
sequence_length
(int): 目标样本序列的长度。例如,sequence_length = 20 表示希望输出的样本序列长度为 20。buffer_start_idx
(int): 从训练数据中提取样本的起始索引(全局数据索引)。buffer_end_idx
(int): 样本在训练数据中结束索引(不包括该索引)。sample_start_idx
(int): 样本内部起始位置,即在输出定长数组中实际数据应放置的位置开始。sample_end_idx
(int): 样本内部结束位置,即实际数据在输出数组中的结束位置。
- 输出:返回一个字典,包含与 train_data 相同的键,但每个值都是经过抽样和必要填充后的长度为 sequence_length 的数组。
result = dict()
- 作用:创建一个空字典 result,用于存储最终的样本数据。
- 示例:开始时 result = {}。
- 意义:将每个键的处理结果分别存入 result,最终返回所有键对应的样本。
for key, input_arr in train_data.items():
- 作用:对字典 train_data 中的每个键 (例如 “obs”, “act”) 及对应的数组进行循环处理。
- 示例:如果 train_data = { “obs”: arr1, “act”: arr2 },则第一次循环 key=“obs”, input_arr=arr1;第二次 key=“act”, input_arr=arr2。
- 意义:保证每个数据项都经过相同的抽样与填充处理。
sample = input_arr[buffer_start_idx:buffer_end_idx]
- 作用:从输入数组 input_arr 中抽取从 buffer_start_idx 到 buffer_end_idx(不包含 buffer_end_idx)的连续数据。
- 示例:
- 假设 input_arr 的 shape 为 (100, D),buffer_start_idx = 10,buffer_end_idx = 30,则 sample = input_arr[10:30] 得到一个 shape 为 (20, D) 的子数组。
- 意义:获取当前 episode 内实际可用的数据子序列。
data = sample
- 作用:将局部变量 data 先赋值为抽取的 sample,作为默认输出数据。
- 意义:如果不需要填充,直接使用 sample 即可;后续会根据条件对 data 进行填充处理。
if (sample_start_idx > 0) or (sample_end_idx < sequence_length):
- 作用:检测样本内部的起始或结束位置是否不占满整个 sequence_length。
- 解释:
- 如果 sample_start_idx > 0,说明样本开头有部分数据缺失,需要在前面填充。
- 如果 sample_end_idx < sequence_length,说明样本结尾缺失数据,需要在后面填充。
- 示例:
- 若 sequence_length = 20,sample_start_idx = 3,sample_end_idx = 20,则前面缺 3 个位置;若 sample_end_idx = 15,则后面缺 5 个位置。
- 意义:确保输出数组的长度固定为 sequence_length,即使原始 sample 不够长或超出边界时。
data = np.zeros(shape=(sequence_length,) + input_arr.shape[1:],dtype=input_arr.dtype)
- 作用:初始化一个全零数组 data,其形状为 (sequence_length, …) 后面的维度与 input_arr 相同。
- 示例:
- 若 sequence_length = 20 且 input_arr.shape 为 (100, D)(例如 D=10),则创建 data 的 shape 为 (20, 10)。
- 意义:用于存储输出样本,同时预留空间填充边界缺失部分。dtype 与原始数据保持一致。
if sample_start_idx > 0:data[:sample_start_idx] = sample[0]
- 作用:如果 sample_start_idx > 0,表示输出样本前面部分缺失原始数据,则将 data 数组前 sample_start_idx 行全部填充为 sample 的第一个数据点。
- 示例:
- 若 sample_start_idx = 3,则 data[0:3] 均被赋值为 sample[0]。例如,若 sample[0] = [1, 2, 3, …, 10](维度为 10),则 data[0], data[1], data[2] 均为该向量。
- 意义:填充前端使得样本数据连续,通常在时间序列处理中用前一个值来填充不足部分。
if sample_end_idx < sequence_length:data[sample_end_idx:] = sample[-1]
- 作用:如果 sample_end_idx 小于 sequence_length,表示样本后部缺失数据,则将 data 从 sample_end_idx 到末尾部分填充为 sample 的最后一个数据点。
- 示例:
- 若 sequence_length = 20,sample_end_idx = 15,则 data[15:20] 均被赋值为 sample[-1]。例如,若 sample[-1] = [5, 6, 7, …, 10],则后 5 行全为此值。
- 意义:确保后端数据连续填充,避免空值干扰后续处理。
data[sample_start_idx:sample_end_idx] = sample
- 作用:将 data 数组中从 sample_start_idx 到 sample_end_idx 的部分赋值为原始的 sample 数据。
- 示例:
- 若 sample_start_idx = 3,sample_end_idx = 15,且 sample 原本 shape 为 (20, D) 中实际有效数据为 sample[?];此行将 data[3:15] 设为 sample[0:12](具体取决于之前抽取的 sample 长度与边界计算)。
- 例如,若 sample 原始数据有 17 个数据点,则 data[3:15] 会覆盖这 12 个数据点。
- 意义:保证输出数组中间部分为实际采样数据,而两端则由前后填充值补足。
result[key] = data
- 作用:将处理后的 data 数组存入 result 字典,对应当前的 key(例如 “obs” 或 “act”)。
- 示例:
- 若 key = “obs”,则 result[“obs”] = data,data 的 shape 为 (sequence_length, D)。
- 意义:对每个数据项分别生成采样结果,保留在字典中。
return result
- 作用:返回包含所有键采样数据的字典 result。
- 输出示例:
- 例如,返回 { “obs”: array(shape=(20, D)), “act”: array(shape=(20, A)) },其中 D、A 分别为原始观测和动作的维度。
总体说明
-
大函数意义:
- 目的:该函数用于从连续的训练数据中抽取固定长度(sequence_length)的样本序列,同时处理边界情况(不足部分用前/后值填充)。
- 输入:
train_data
:字典,包含多个训练数据数组。sequence_length
:目标样本长度(例如 20)。buffer_start_idx
、buffer_end_idx
:指定原始数据缓冲区中实际数据区间,如 [10, 30] 表示抽取第 10 到第 29 个数据。sample_start_idx
、sample_end_idx
:定义在输出定长样本中,实际数据应位于哪个区间,例如 [3, 20] 表示前 3 个位置需要填充,剩余部分为实际数据。
- 输出:
- 返回一个字典,其中每个键对应一个 NumPy 数组,数组第一维为 sequence_length,其它维度与原始数据保持一致,且中间部分为原始数据,边界部分由第一个或最后一个数据填充。
-
设计原因:
- 在训练序列模型(如 RNN 或 Transformer)时,经常需要对不同 episode 的数据切分成定长样本;而由于 episode 边界处数据可能不足以构成完整序列,通常采用复制首尾数据的方式进行填充。
- 该函数通过计算 buffer 内部实际数据与期望样本之间的偏移量,实现自动填充和索引映射,使得后续数据加载更为简便高效。
十三、函数get_data_stats
def get_data_stats(data):
该函数用于计算数据每个特征(最后一维)的最小值和最大值,用于后续归一化。
输入:
data
: 一个 NumPy 数组,可能是多维(例如形状为 (N, D) 或 (B, T, D)),其中最后一维为特征维度。
输出:
-
一个字典,包含两个键
'min'
和'max'
,对应每个特征的最小值和最大值,均为一维数组,其长度等于特征数。 -
作用:定义函数
get_data_stats
,接受参数data
。 -
示例:例如,传入 data.shape 为 (100, 10) 表示 100 个样本、10 个特征。
data = data.reshape(-1, data.shape[-1])
- 作用:将 data 重塑为二维数组,其中第一维为所有样本合并后的总数,第二维为特征数。
- 细节说明:
-1
表示自动计算第一维大小,data.shape[-1]
保证保留原来最后一维。
- 示例:
- 若原始 data 的形状为 (5, 20, 10)(例如 5 个 episode,每个 episode 20 个时间步,每个时间步 10 个特征),经过重塑后变为 (100, 10)。
- 若 data 原本就是二维 (100, 10),则结果不变。
stats = {'min': np.min(data, axis=0),'max': np.max(data, axis=0)}
- 作用:构造一个字典 stats,计算每个特征的最小值和最大值。
- 细节说明:
np.min(data, axis=0)
计算 data 每一列(特征)的最小值;同理np.max(data, axis=0)
计算每一列的最大值。
- 示例:
- 假设 data 为:
则按 axis=0 得到:[[1, 5, 3],[3, 2, 6],[2, 7, 4]]
- min = [1, 2, 3]
- max = [3, 7, 6]
- 结果字典为:
{'min': np.array([1,2,3]), 'max': np.array([3,7,6])}
。
- 假设 data 为:
return stats
- 作用:返回计算好的 stats 字典。
- 输出:字典包含
'min'
与'max'
数组,可用于归一化操作。
总体说明
- 功能:计算数据每个特征的最小值和最大值。
- 输入:原始数据数组(可能多维),例如 (N, D) 或 (B, T, D)。
- 输出:字典
{'min': ..., 'max': ...}
,每个键对应一个一维数组。
十四、函数normalize_data
def normalize_data(data, stats):
该函数用于将数据归一化,先将数据映射到 [0,1] 范围,再将其线性映射到 [-1,1]。
输入:
data
: 原始数据,形状任意但最后一维为特征。stats
: 由get_data_stats
得到的字典,包含'min'
和'max'
。
输出:
-
归一化后的数据数组,范围在 [-1, 1],数据类型与原始数据相同(通常为 float)。
-
作用:定义函数
normalize_data
,接受数据和统计信息。 -
示例:data 为 shape (100, 10),stats 如
{'min': [1,2,...], 'max': [3,7,...]}
。
# nomalize to [0,1]ndata = (data - stats['min']) / (stats['max'] - stats['min'])
- 作用:
- 注释,说明下一行将数据归一化到 [0,1] 区间。
- 计算数据归一化公式,将每个特征的数值映射到 [0,1] 区间。
- 细节说明:
- 对于数据中每个元素,先减去对应特征的最小值,再除以该特征的取值范围(最大值 - 最小值)。
- 示例:
- 如果某一特征的最小值为 1,最大值为 3,则对于数值 2:
n d a t a = 2 − 1 3 − 1 = 1 2 = 0.5 ndata = \frac{2 - 1}{3 - 1} = \frac{1}{2} = 0.5 ndata=3−12−1=21=0.5。 - 对于数值 1(最小值)归一化后为 0,对于数值 3(最大值)归一化后为 1。
- 如果某一特征的最小值为 1,最大值为 3,则对于数值 2:
# normalize to [-1, 1]ndata = ndata * 2 - 1
- 作用:
- 注释,说明下一行将 [0,1] 映射到 [-1,1]。
- 将 ndata 从 [0,1] 线性变换到 [-1,1]。
- 数学公式:当 ndata = 0,结果为 -1;ndata = 1,结果为 1;中间值按比例线性映射。
- 示例:
- 前面例子中归一化 0.5 经过该变换:
( 0.5 * 2 - 1 = 1 - 1 = 0 )。 - 若 ndata = 0 则结果为 -1,ndata = 1 则结果为 1。
- 前面例子中归一化 0.5 经过该变换:
return ndata
- 作用:返回归一化后的数据 ndata,其范围在 [-1,1]。
- 输出:数组形状与输入 data 相同。
总体说明
- 功能:将原始数据按特征归一化到 [0,1],再线性映射到 [-1,1]。
- 输入:原始数据及其统计信息(min、max)。
- 输出:归一化后的数据,数值在 [-1,1]。
十五、函数unnormalize_data
def unnormalize_data(ndata, stats):
该函数用于将归一化到 [-1,1] 范围的数据还原到原始数值范围。
输入:
ndata
: 已归一化到 [-1,1] 的数据。stats
: 同前面,包含每个特征的'min'
和'max'
。
输出:
-
还原后的数据,与原始数据数值范围相同。
-
作用:定义函数
unnormalize_data
,接收归一化数据和统计信息。 -
示例:ndata 为 shape (100, 10) 且数值在 [-1,1]。
ndata = (ndata + 1) / 2
- 作用:将 ndata 从 [-1,1] 映射回 [0,1]。
- 数学公式:当 ndata = -1,则 (ndata+1)/2 = 0;ndata = 1,则结果为 1。
- 示例:
- 若 ndata = 0,则 (0+1)/2 = 0.5。
- 若 ndata = -1,则 ( -1 + 1 )/2 = 0。
data = ndata * (stats['max'] - stats['min']) + stats['min']
- 作用:根据 [0,1] 数据反归一化回原始数值范围。
- 数学公式:
- 每个特征数据恢复公式:
d a t a = n d a t a × ( m a x − m i n ) + m i n data = ndata \times (max - min) + min data=ndata×(max−min)+min
- 每个特征数据恢复公式:
- 示例:
- 对于某特征,若原始最小值为 1,最大值为 3,ndata = 0.5,则
d a t a = 0.5 × ( 3 − 1 ) + 1 = 0.5 × 2 + 1 = 1 + 1 = 2 data = 0.5 \times (3-1) + 1 = 0.5 \times 2 + 1 = 1 + 1 = 2 data=0.5×(3−1)+1=0.5×2+1=1+1=2。 - 若 ndata = 0,则 data = min;ndata = 1,则 data = max。
- 对于某特征,若原始最小值为 1,最大值为 3,ndata = 0.5,则
return data
- 作用:返回还原后的数据。
- 输出:数据数组,其范围与最初的未归一化数据一致。
总体说明
- 功能:将归一化到 [-1,1] 的数据还原到原始范围。
- 输入:归一化数据及统计信息。
- 输出:还原后的数据,与原始数据范围一致。
设计原因
- 归一化与反归一化:在许多机器学习和强化学习任务中,对数据归一化有助于加速训练过程,提高数值稳定性。归一化到 [-1,1] 这种范围常见于神经网络输入。
- 统计信息的作用:通过预先计算每个特征的最小、最大值,可以对不同数据维度进行独立归一化,保证每个特征的数值尺度一致。
- 实现细节:
- 重塑数据保证无论原始数据是二维还是更高维,都只对最后一维的特征进行统计。
- 归一化公式和反归一化公式都是标准的线性映射公式,简单有效。