欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 维修 > kylin v10 + argo + ascend 310p多机多卡 pytorch distributed 训练

kylin v10 + argo + ascend 310p多机多卡 pytorch distributed 训练

2025/4/29 16:52:02 来源:https://blog.csdn.net/u011564831/article/details/147563326  浏览:    关键词:kylin v10 + argo + ascend 310p多机多卡 pytorch distributed 训练

最近接了个模型训练编排多机多卡的改造需求,要求使用argo  dag task启动多个节点,同时多个节点能实现 torch.distributed.launch 这样多机多卡的训练模式

简述技术

 torch.distributed.launch命令介绍


我们在训练分布式时候,会使用到 torch.distributed.launch
可以通过命令,来打印该模块提供的可选参数 python -m torch.distributed.launch --help

usage: launch.py [-h] [--nnodes NNODES] [--node_rank NODE_RANK]
                [--nproc_per_node NPROC_PER_NODE] [--master_addr MASTER_ADDR] [--master_port MASTER_PORT] 
                [--use_env] [-m] [--no_python] [--logdir LOGDIR]
                training_script ...
 

torch.ditributed.launch参数解析(终端运行命令的参数):

  • nnodes:节点的数量,通常一个节点对应一个主机,方便记忆,直接表述为主机
  • node_rank:节点的序号,从0开始
  • nproc_per_node:一个节点中显卡的数量
  • master_addr:master节点的ip地址,也就是0号主机的IP地址,该参数是为了让 其他节点 知道0号节点的位,来将自己训练的参数传送过去处理
  • master_port:master节点的port号,在不同的节点上master_addr和master_port的设置是一样的,用来进行通信


torch.ditributed.launch相关环境变量解析(代码中os.environ中的参数):

  • WORLD_SIZE:os.environ[“WORLD_SIZE”]所有进程的数量
  • LOCAL_RANK:os.environ[“LOCAL_RANK”]每张显卡在自己主机中的序号,从0开始
  • RANK:os.environ[“RANK”]进程的序号,一般是1个gpu对应一个进程

多机上训练步骤看下图就明白了

假设我连接了两个 GPU。一个在 Device0 上,另一个在 Device1 上。

在 CPU 上存储一个非常大的数组(单个设备/GPU 无法容纳),例如 X = [1,2,3,4,5,6]。

将数组的一部分广播到 GPU 设备 0 和 GPU 设备 1。0 和 1 分别包含该数组的不同块。GPU0 的 Inds = [0,1] GPU0 的数据 = [1,2] GPU1 的 Inds = [2,3] GPU1 的数据 = [2,3]

在 GPU0 和 GPU1 上分别运行一个进程。为此,一个简单的 Add() 函数即可完成。

根据需要(对于 GPU 获取的 inds),使用 GPU 数据更新 CPU 版本。这时我可能会使用 reduce 从设备获取两个张量。我可能会将其存储在一个键值字典中,其中键是设备 ID(0 代表 GPU 0,1 代表 GPU 1),并将 inds 和数据存储在一个元组中。然后,我需要更新 CPU 版本并再次运行整个过程。

分布式训练镜像

基础镜像使用

swr.cn-south-1.myhuaweicloud.com/ascendhub/ascend-pytorch:24.0.RC2-A2-2.1.0-ubuntu20.04

使用这个train.py 作为训练入口

import os
import argparse
import sys
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
import torch.nn.functional as F
from torch.nn.parallel import DistributedDataParallel as DDP# 检查是否支持昇腾 NPU
try:import torch_npuNPU_AVAILABLE = Trueprint("NPU support detected: torch_npu is available")
except ImportError:NPU_AVAILABLE = Falseprint("NPU support not detected: torch_npu is not available")
sys.stdout.flush()# 定义简单的神经网络模型
class SimpleNet(nn.Module):def __init__(self):super(SimpleNet, self).__init__()self.fc1 = nn.Linear(10, 50)self.fc2 = nn.Linear(50, 10)def forward(self, x):x = F.relu(self.fc1(x))x = self.fc2(x)return x# 解析命令行参数
def parse_args():parser = argparse.ArgumentParser(description="Distributed Training with BladeTrain")parser.add_argument('--type', type=str, default='example', help='Type argument')parser.add_argument('--parameter', type=str, default='default', help='Parameter argument')args = parser.parse_args()print(f"Parsed arguments: type={args.type}, parameter={args.parameter}, server_address={args.server_address}, job_id={args.job_id}")sys.stdout.flush()return args# 训练函数
def train(model, optimizer, rank, epoch):model.train()print(f"Rank {rank}: Entering training loop")sys.stdout.flush()while True:  # 无限训练循环inputs = torch.randn(32, 10).to(rank)  # 批次大小 32,输入维度 10targets = torch.randn(32, 10).to(rank)  # 输出维度 10optimizer.zero_grad()outputs = model(inputs)loss = F.mse_loss(outputs, targets)loss.backward()optimizer.step()# 增加更频繁的日志输出if rank == 0:if epoch % 10 == 0:  # 每 10 步打印一次print(f"Epoch {epoch}, Loss: {loss.item():.4f}")sys.stdout.flush()epoch += 1def main():# 从环境变量获取 local_ranklocal_rank = int(os.environ['LOCAL_RANK'])print(f"Starting process on local_rank {local_rank}")sys.stdout.flush()args = parse_args()# 初始化分布式环境if NPU_AVAILABLE:# 使用昇腾 NPUprint(f"Rank {local_rank}: Setting NPU device to {local_rank}")torch_npu.npu.set_device(local_rank)device = torch.device(f'npu:{local_rank}')print(f"Rank {local_rank}: Initializing distributed group with hccl backend")dist.init_process_group(backend='hccl')  # 使用 hccl 后端print(f"Rank {local_rank}: Distributed group initialized with hccl")else:# 回退到 CPUdevice = torch.device('cpu')print(f"Rank {local_rank}: Initializing distributed group with gloo backend (CPU fallback)")dist.init_process_group(backend='gloo')print(f"Rank {local_rank}: Distributed group initialized with gloo")sys.stdout.flush()world_size = dist.get_world_size()print(f"Rank {local_rank}: World size is {world_size}")sys.stdout.flush()# 创建模型并移到设备print(f"Rank {local_rank}: Creating and moving model to device {device}")model = SimpleNet().to(device)model = DDP(model, device_ids=[local_rank] if NPU_AVAILABLE else None)print(f"Rank {local_rank}: Model wrapped with DDP")sys.stdout.flush()# 定义优化器optimizer = optim.SGD(model.parameters(), lr=0.01)print(f"Rank {local_rank}: Optimizer initialized")sys.stdout.flush()# 开始训练print(f"Rank {local_rank}: Starting training with world size {world_size}")sys.stdout.flush()train(model, optimizer, local_rank, epoch=0)# 清理分布式环境print(f"Rank {local_rank}: Cleaning up distributed environment")dist.destroy_process_group()sys.stdout.flush()if __name__ == "__main__":main()

把训练用train.py 添加到镜像里 

java 创建argo workflow的代码

import io.argoproj.workflow.ApiClient;
import io.argoproj.workflow.ApiException;
import io.argoproj.workflow.Configuration;
import io.argoproj.workflow.apis.WorkflowServiceApi;
import io.argoproj.workflow.models.*;
import io.kubernetes.client.custom.Quantity;
import io.kubernetes.client.openapi.models.*;private Workflow createWorkflow(JobConfig jobConfig, String graph) {String cmd = jobConfig.getCmd();String image = jobConfig.getImage();logger.info("image:{}", image);String jobType = jobConfig.getJobType();Map<String, String> labels = jobConfig.getLabels();Map<String, String> map = new HashMap<>();map.put(PARAMETER_CMD, cmd);map.put(PARAMETER_IMAGE, image);map.put(PARAMETER_ENV, jobType);Map<String, Object> node_resource_map = new HashMap<>();if (graph != null) {Map<String, Object> graphMap =JSON.parseObject(graph, new TypeReference<HashMap<String, Object>>() {});Map<String, Object> resource_config = (Map<String, Object>) graphMap.get("resource_config");JSONArray jsonArray = (JSONArray) resource_config.get("nodeConfigs");String distributed_master = jobConfig.getTrainingNodes().stream().filter(TrainingNode::getIsMaster).map(TrainingNode::getNodeName).findFirst().orElse(null);// 使用 Java 8 的流遍历 jsonArrayIntStream.range(0, jsonArray.size()).forEach(i -> {try {JSONObject object = jsonArray.getJSONObject(i);String parameterNodeName = object.getString(PARAMETER_NODE_NAME);// 创建一个新的 HashMap 并放入 mapnode_resource_map.put(parameterNodeName, new HashMap<>() {{put(PARAMETER_CPU_LIMIT, String.valueOf(object.get("cpu_value")));put(PARAMETER_GPU_LIMIT, String.valueOf(object.get("gpu_value")));put(PARAMETER_MEM_LIMIT, String.valueOf(object.get("mem_value")));if (object.containsKey("gpu_mem_value")) {put(PARAMETER_GPU_MEM_LIMIT, String.valueOf(object.get("gpu_mem_value")));}if (object.containsKey("gpu_cores_value")) {put(PARAMETER_GPU_CORES_LIMIT, String.valueOf(object.get("gpu_cores_value")));}put(PARAMETER_ENV, String.join(" ","--nproc_per_node=1","--nnodes=" + jsonArray.size(),"--node_rank=" + (StringUtils.equals(distributed_master, parameterNodeName)? "0":"1"),"--master_addr=" + distributed_master,"--master_port=" +  jobConfig.getDistributed_port(),jobConfig.getMainfile()));}});}catch (Exception e) {e.printStackTrace();}});}WorkflowCreateRequest body = new WorkflowCreateRequest();Workflow workflow = getWorkflowFromTemplate(jobConfig, map, node_resource_map, labels);body.setWorkflow(workflow);try {Workflow result = apiInstance.workflowServiceCreateWorkflow(jobConfig.namespace, body);return result;} catch (Exception e) {logger.error("argo workflow api err:" + e.toString());throw new RuntimeException(e);}}... (其他方法略)private Workflow createWorkflow(JobConfig jobConfig, String graph) {String cmd = jobConfig.getCmd();String image = jobConfig.getImage();logger.info("image:{}", image);String jobType = jobConfig.getJobType();Map<String, String> labels = jobConfig.getLabels();Map<String, String> map = new HashMap<>();map.put(PARAMETER_CMD, cmd);map.put(PARAMETER_IMAGE, image);map.put(PARAMETER_ENV, jobType);Map<String, Object> node_resource_map = new HashMap<>();if (graph != null) {Map<String, Object> graphMap =JSON.parseObject(graph, new TypeReference<HashMap<String, Object>>() {});Map<String, Object> resource_config = (Map<String, Object>) graphMap.get("resource_config");JSONArray jsonArray = (JSONArray) resource_config.get("nodeConfigs");String distributed_master = jobConfig.getTrainingNodes().stream().filter(TrainingNode::getIsMaster).map(TrainingNode::getNodeName).findFirst().orElse(null);// 使用 Java 8 的流遍历 jsonArrayIntStream.range(0, jsonArray.size()).forEach(i -> {try {JSONObject object = jsonArray.getJSONObject(i);String parameterNodeName = object.getString(PARAMETER_NODE_NAME);// 创建一个新的 HashMap 并放入 mapnode_resource_map.put(parameterNodeName, new HashMap<>() {{put(PARAMETER_CPU_LIMIT, String.valueOf(object.get("cpu_value")));put(PARAMETER_GPU_LIMIT, String.valueOf(object.get("gpu_value")));put(PARAMETER_MEM_LIMIT, String.valueOf(object.get("mem_value")));if (object.containsKey("gpu_mem_value")) {put(PARAMETER_GPU_MEM_LIMIT, String.valueOf(object.get("gpu_mem_value")));}if (object.containsKey("gpu_cores_value")) {put(PARAMETER_GPU_CORES_LIMIT, String.valueOf(object.get("gpu_cores_value")));}put(PARAMETER_ENV, String.join(" ","--nproc_per_node=1","--nnodes=" + jsonArray.size(),"--node_rank=" + (StringUtils.equals(distributed_master, parameterNodeName)? "0":"1"),"--master_addr=" + distributed_master,"--master_port=" +  jobConfig.getDistributed_port(),jobConfig.getMainfile()));}});}catch (Exception e) {e.printStackTrace();}});}WorkflowCreateRequest body = new WorkflowCreateRequest();Workflow workflow = getWorkflowFromTemplate(jobConfig, map, node_resource_map, labels);body.setWorkflow(workflow);try {Workflow result = apiInstance.workflowServiceCreateWorkflow(jobConfig.namespace, body);return result;} catch (Exception e) {logger.error("argo workflow api err:" + e.toString());throw new RuntimeException(e);}}
private Workflow getWorkflowFromTemplate(JobConfig jobConfig,Map<String, String> parameter_map,Map<String, Object> node_resource_map,Map<String, String> labels) {Workflow workflow = new Workflow();workflow.setMetadata(getMetaData(jobConfig));WorkflowSpec spec = new WorkflowSpec();//spec.setArguments(getArguments(parameter_map));spec.setPodMetadata(getPodMetadata(labels));if (StringUtils.isNotBlank(jobConfig.getImage())) {spec.setHostNetwork(true);}Template mainTemplate = new Template();mainTemplate.setMetadata(new Metadata());mainTemplate.getMetadata().setLabels(labels);mainTemplate.setName("rl-main");V1Container container = new V1Container();container.setName("rl-job-container");container.setImage(StringUtils.isNotBlank(jobConfig.getImage()) ?jobConfig.getImage() : config.rayImage);container.setCommand(List.of(jobConfig.getCmd()));if (!CollectionUtils.isEmpty(jobConfig.getTrainingNodes())) {List<TrainingNode> nodes = jobConfig.getTrainingNodes();DAGTemplate dag = new DAGTemplate();List<DAGTask> dagTasks = new ArrayList<>();List<Template> templates = new ArrayList<>();for (int i = 0; i < nodes.size(); i++) {TrainingNode node = nodes.get(i);DAGTask task = new DAGTask();task.setName(node.getDagTaskName());task.setTemplate("task-template-" + (i + 1));dagTasks.add(task);Template taskTemplate = new Template();taskTemplate.setName("task-template-" + (i + 1));// 使用 V1Affinity 替换 nodeSelectorV1Affinity affinity = new V1Affinity();V1NodeAffinity nodeAffinity = new V1NodeAffinity();// 设置硬性要求 (requiredDuringSchedulingIgnoredDuringExecution)V1NodeSelectorRequirement nodeSelectorRequirement = new V1NodeSelectorRequirement();nodeSelectorRequirement.setKey("kubernetes.io/hostname");nodeSelectorRequirement.setOperator("In");nodeSelectorRequirement.setValues(Collections.singletonList(node.getNodeName()));V1NodeSelectorTerm nodeSelectorTerm = new V1NodeSelectorTerm();nodeSelectorTerm.setMatchExpressions(Collections.singletonList(nodeSelectorRequirement));nodeAffinity.setRequiredDuringSchedulingIgnoredDuringExecution(new V1NodeSelector().nodeSelectorTerms(Collections.singletonList(nodeSelectorTerm)));affinity.setNodeAffinity(nodeAffinity);taskTemplate.setAffinity(affinity);// 容器配置保持不变V1Container taskContainer = new V1Container();taskContainer.setName("container-" + (i + 1) + "-" + node.getNodeName().replace(".", "-"));taskContainer.setImage(container.getImage());Map<String, String> nodeMap = (Map<String, String>) node_resource_map.get(node.getNodeName());V1ResourceRequirements resources = getResourceLimit(nodeMap);taskContainer.setResources(resources);List<String> command = new ArrayList<>(container.getCommand());List<String> args = getDistributedArgs(nodeMap);command.add("-m");command.add("torch.distributed.launch");command.addAll(args);taskContainer.setCommand(command);taskTemplate.setContainer(taskContainer);templates.add(taskTemplate);}dag.setTasks(dagTasks);mainTemplate.setDag(dag);templates.add(mainTemplate);spec.setTemplates(templates);} else {mainTemplate.setContainer(container);spec.setTemplates(Collections.singletonList(mainTemplate));}spec.setEntrypoint("rl-main");workflow.setSpec(spec);return workflow;}

创建argo workflow

查看作业信息

container:name: container-1-192-168-110-22image: '192.168.110.125:8443/agent/ppo_torch:3.1'command:- python- '-m'- torch.distributed.launch- '--nproc_per_node=1'- '--nnodes=2'- '--node_rank=0'- '--master_addr=192.168.110.22'- '--master_port=25195'- /code/rl/BladeTrain.pyresources:limits:cpu: 512mhuawei.com/Ascend310P: '0'huawei.com/Ascend910: '0'memory: 1Ginvidia.com/gpu: '0'requests:cpu: 512mhuawei.com/Ascend310P: '0'huawei.com/Ascend910: '0'memory: 1Ginvidia.com/gpu: '0'

查看端口工作状态,看到子节点连接过来

但是一段时间后 dag task上master节点错误挂掉了

查看日志

  File "/usr/local/python3.9.2/lib/python3.9/runpy.py", line 87, in _run_codeexec(code, run_globals)File "/usr/local/python3.9.2/lib/python3.9/site-packages/torch/distributed/launch.py", line 196, in <module>main()File "/usr/local/python3.9.2/lib/python3.9/site-packages/torch/distributed/launch.py", line 192, in mainlaunch(args)File "/usr/local/python3.9.2/lib/python3.9/site-packages/torch/distributed/launch.py", line 177, in launchrun(args)File "/usr/local/python3.9.2/lib/python3.9/site-packages/torch/distributed/run.py", line 797, in runelastic_launch(File "/usr/local/python3.9.2/lib/python3.9/site-packages/torch/distributed/launcher/api.py", line 134, in __call__return launch_agent(self._config, self._entrypoint, list(args))File "/usr/local/python3.9.2/lib/python3.9/site-packages/torch/distributed/launcher/api.py", line 264, in launch_agentraise ChildFailedError(
torch.distributed.elastic.multiprocessing.errors.ChildFailedError:
============================================================
/code/rl/BladeTrain.py FAILED
------------------------------------------------------------
Failures:<NO_OTHER_FAILURES>
------------------------------------------------------------
Root Cause (first observed failure):
[0]:time      : 2025-04-28_01:40:11host      : feitengrank      : 0 (local_rank: 0)exitcode  : 1 (pid: 225)error_file: <N/A>traceback : To enable traceback see: https://pytorch.org/docs/stable/elastic/errors.html
============================================================
Error: exit status 1

问题分析

GLOO_SOCKET_IFNAME 是一个环境变量,用于指定 PyTorch 的 gloo 后端在进行分布式通信时绑定的网络接口(网卡)。让我详细解释一下它的作用和背景:

作用

GLOO_SOCKET_IFNAME 告诉 gloo 后端(一个基于 TCP 的通信库)在当前机器上使用哪个网络接口来进行分布式进程之间的通信。换句话说,它绑定了通信所使用的网卡,从而决定了通过哪个网络接口发送和接收数据。

  • 为什么需要它? 在分布式训练中,多个进程(可能运行在同一台机器或不同机器上)需要通过网络相互通信。gloo 默认会尝试自动选择一个网络接口,但如果机器有多个网卡(例如 eth0、eth1 或虚拟接口),或者网络环境复杂(如容器化环境 Kubernetes),它可能会选择错误的接口(例如回环接口 lo),导致通信失败或无法连接到其他节点。
  • Warning: Unable to resolve hostname to a (local) address. Using the loopback address as fallback.  这表明 gloo 无法正确解析主节点地址(192.168.110.22),于是退回到使用回环地址(127.0.0.1)。但回环地址只能用于本地通信,无法连接到其他机器或 pod,因此造成了 connectFullMesh failed 的错误。设置 GLOO_SOCKET_IFNAME 可以强制 gloo 使用正确的网卡,避免这种问题。

它本质上是绑定使用某个网卡,而这个网卡通常关联着一个或多个 IP 地址。具体来说:

  • 你通过设置 GLOO_SOCKET_IFNAME 指定网卡的名称(例如 eth0),gloo 会使用这个网卡的 IP 地址进行通信。
  • 如果一台机器有多个网卡(例如一个内网接口 eth0: 192.168.110.x,一个外网接口 eth1: 10.0.0.x),GLOO_SOCKET_IFNAME 确保 gloo 使用你指定的那个网卡,而不是随机选择或默认回环。

ip addr 输出所有网络接口,根据 ip addr 的输出,选择正确的网卡名称(例如 enp11s0f0)

GLOO_SOCKET_IFNAME 的作用是指定 gloo 通信绑定的网卡,避免它选择错误的网络接口(比如回环)。在测试场景中设置它为正确的网卡可以解决 gloo 回退到 localhost 的问题,从而修复分布式训练的连接失败。尝试着在 Argo 工作流中加上这个环境变量,然后观察是否还有错误。

两个节点上的网卡名都需要配置

再次启动实现多机多卡运行了

目前使用的cpu训练, 通讯用的gloo , 因为我只有一台机器上装有npu的卡

那么是否可以级联cpu+npu共同训练呢?

torch_npu.npu.set_device(local_rank) 的作用

  1. 功能
    • torch_npu.npu.set_device(local_rank) 是 torch_npu 模块提供的函数,用于在昇腾 NPU(例如 Ascend 310P)上设置当前进程使用的设备。
    • local_rank 通常是分布式训练中的一个参数,表示当前进程在本地节点上的设备编号(从 0 开始)。
    • 它的作用类似于 PyTorch 中的 torch.cuda.set_device(local_rank),但专用于华为的 NPU。
  2. 具体作用
    • 设备分配:在多 NPU 环境下,确保每个进程绑定到特定的 NPU。例如,如果一台机器有 4 个 NPU,local_rank=0 的进程使用 NPU 0,local_rank=1 使用 NPU 1。
    • 上下文隔离:设置后,后续的 NPU 操作(例如张量计算)会默认在这个设备上执行,避免设备冲突。
    • 分布式训练支持:与 torch.distributed.launch 配合使用,确保每个进程在正确的设备上运行。
  3. 使用场景
    • 在代码中:
      if NPU_AVAILABLE:torch_npu.npu.set_device(local_rank)device = torch.device(f'npu:{local_rank}')dist.init_process_group(backend='hccl')
      else:device = torch.device('cpu')dist.init_process_group(backend='gloo')

    • 这里 set_device(local_rank) 是为了在 NPU 可用时,将进程绑定到特定 NPU。
  4. 是否必须?
    • 如果不调用 set_device,PyTorch 默认使用设备 0(npu:0),可能会导致多进程竞争同一个 NPU,引发错误或性能问题。
    • 在单 NPU 或非分布式环境下,可以省略,但分布式训练中通常需要。

是否可以不用 torch_npu,一台机器用昇腾 310P,另一台用 CPU?

在一台机器上使用昇腾 310P(NPU),另一台机器使用 CPU,通过 torch.distributed.launch 进行分布式训练,同时尽量避免依赖 torch_npu。得分析可行性和实现方式。

可行性分析

  • 可以实现,但有条件
    • PyTorch 的分布式训练(torch.distributed)支持异构设备(例如一台机器用 NPU,另一台用 CPU),但需要正确配置通信后端和设备。
    • torch_npu 是华为专为昇腾 NPU 设计的适配模块,如果你不用 torch_npu,需要确保 NPU 被 PyTorch 原生识别(通常不行,因为 PyTorch 原生不支持 Ascend NPU)。
    • 通信后端(backend)需要兼容两台机器:NPU 通常用 hccl(Huawei Collective Communication Library),CPU 用 gloo。混合使用需要特别处理。
    • 如果不使用 torch_npu,昇腾 310P 无法被 PyTorch 直接识别为计算设备(不像 CUDA GPU)。torch.distributed.launch 依赖一致的设备和后端配置,混合 NPU 和 CPU 会增加复杂性。

看样子不现实,因为 PyTorch 原生不认识昇腾 310P,必须用 torch_npu 支持 NPU。

所以等采购到位后再测试两台机器上都使用torch_npu进行训练

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词