欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 社会 > ros中地面站和无人机跨平台数据传递,使用 UDP 进行跨平台传输(python代码)

ros中地面站和无人机跨平台数据传递,使用 UDP 进行跨平台传输(python代码)

2025/2/12 5:19:15 来源:https://blog.csdn.net/zhangzhao147/article/details/142201408  浏览:    关键词:ros中地面站和无人机跨平台数据传递,使用 UDP 进行跨平台传输(python代码)

ros中无人车和无人机跨平台数据传递,使用 UDP 进行跨平台传输(python代码)

  • 1.数据介绍
    • 1.1 自定义数据类型
    • 1.2 文件夹结构
    • 1.3 CMakeLists.txt修改
    • 1.4 package.xml
  • 2.UDP 发送代码(ground_udp_publisher.py)
  • 2.1代码解释
  • 3.UDP 接受代码(ground_udp_subscriber.py)
  • 4.编译和运行
    • 4.2 启动 ROS 节点(启动无人车和无人机的 UDP 发布和订阅节点。)
  • 5.数据生成与udp格式发布(合为一个节点的示例)

核心流程: 地面站订阅 ground_data_topic 话题 → 收到 ROS 消息 → 序列化为 UDP 数据包 → 通过 UDP
发送给无人机

1.数据介绍

1.1 自定义数据类型

1_GroundObject.msg 文件的内容应为:

float64 UTC_time  # in ms
int32 VehicleNumber
VEHICLE_OBJ_COOP[] VehicleObjectCOOP

VEHICLE_OBJ_COOP.msg 文件的内容应为:

int32 class  # 0:unknown 1:car 2:pedestrian
int32 ismove
int32 vehicleclass  # 0:HM 1:LM 3:objects
float64 lon  # longitude
float64 lat  # latitude

1.2 文件夹结构

your_ros_workspace/
├── src/
│   ├── your_package/
│   │   ├── msg/
│   │   │   ├── 1_GroundObject.msg
│   │   │   ├── VEHICLE_OBJ_COOP.msg
│   │   ├── src/
│   │   │   ├── ground_udp_publisher.py
│   │   │   ├── ground_udp_subscriber.py
│   │   ├── CMakeLists.txt
│   │   ├── package.xml

1.3 CMakeLists.txt修改

在 CMakeLists.txt 中,需要确保自定义消息的生成和依赖性已经添加。

cmake_minimum_required(VERSION 3.0.2)
project(your_package)find_package(catkin REQUIRED COMPONENTSrospystd_msgsmessage_generation
)add_message_files(FILES1_GroundObject.msgVEHICLE_OBJ_COOP.msg
)generate_messages(DEPENDENCIESstd_msgs
)catkin_package(CATKIN_DEPENDS message_runtime std_msgs
)include_directories(${catkin_INCLUDE_DIRS}
)

1.4 package.xml

在 package.xml 中,添加对 message_generation 和 message_runtime 的依赖:

<package format="2"><name>your_package</name><version>0.0.0</version><description>UDP communication between vehicle and drone</description><maintainer email="your_email@example.com">Your Name</maintainer><license>BSD</license><buildtool_depend>catkin</buildtool_depend><build_depend>message_generation</build_depend><build_depend>rospy</build_depend><exec_depend>message_runtime</exec_depend><exec_depend>rospy</exec_depend>
</package>

2.UDP 发送代码(ground_udp_publisher.py)

#!/usr/bin/env python
import rospy
import socket
from your_package.msg import 1_GroundObject, VEHICLE_OBJ_COOP# 创建 UDP socket
udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_ip = "无人机的IP地址"  # 目标无人机的IP
udp_port = 12345  # 目标无人机的端口号def send_udp_message(msg):# 将自定义消息序列化并发送data = f"{msg.UTC_time},{msg.VehicleNumber}," + ",".join([f"{obj.class},{obj.ismove},{obj.vehicleclass},{obj.lon},{obj.lat}" for obj in msg.VehicleObjectCOOP])udp_sock.sendto(data.encode(), (udp_ip, udp_port))def ros_to_udp():rospy.init_node('ground_udp_publisher')rospy.Subscriber("ground_data_topic", 1_GroundObject, send_udp_message)  # 订阅 ROS 话题rospy.spin()if __name__ == '__main__':try:ros_to_udp()except rospy.ROSInterruptException:passfinally:udp_sock.close()

2.1代码解释

socket:Python 的网络通信库,提供了 UDP 和 TCP 的接口。

#创建一个 UDP socket。AF_INET 指的是 IPv4 协议,SOCK_DGRAM 指的是 UDP 协议。
socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

“,”.join([…]):遍历 msg.VehicleObjectCOOP 数组中的每个对象,将每个对象的属性(class, ismove, vehicleclass, lon, lat)提取出来,构成一个逗号分隔的字符串。

#将序列化后的字符串 data 编码为字节,并通过 sendto 函数发送到 udp_ip 和 udp_port 指定的无人机地址。
udp_sock.sendto(data.encode(), (udp_ip, udp_port))
#订阅 ROS 话题 ground_data_topic,当收到消息时,调用 send_udp_message 函数。该函数会将收到的 ROS 消息通过 UDP 发送出去。
rospy.Subscriber("ground_data_topic", 1_GroundObject, send_udp_message)

3.UDP 接受代码(ground_udp_subscriber.py)

#!/usr/bin/env python
import rospy
import socket
from your_package.msg import 1_GroundObject, VEHICLE_OBJ_COOP# 创建 UDP socket
udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_ip = "0.0.0.0"  # 本地IP
udp_port = 12345  # 本地监听端口
udp_sock.bind((udp_ip, udp_port))def udp_to_ros():rospy.init_node('ground_udp_subscriber')pub = rospy.Publisher('ground_data_topic', 1_GroundObject, queue_size=10)while not rospy.is_shutdown():data, addr = udp_sock.recvfrom(1024)decoded_data = data.decode().split(',')obj = 1_GroundObject()obj.UTC_time = float(decoded_data[0])obj.VehicleNumber = int(decoded_data[1])for i in range(2, len(decoded_data), 5):vehicle_obj = VEHICLE_OBJ_COOP()vehicle_obj.class = int(decoded_data[i])vehicle_obj.ismove = int(decoded_data[i+1])vehicle_obj.vehicleclass = int(decoded_data[i+2])vehicle_obj.lon = float(decoded_data[i+3])vehicle_obj.lat = float(decoded_data[i+4])obj.VehicleObjectCOOP.append(vehicle_obj)pub.publish(obj)if __name__ == '__main__':try:udp_to_ros()except rospy.ROSInterruptException:passfinally:udp_sock.close()

4.编译和运行

cd ~/your_ros_workspace
catkin_make

编译成功后,使用以下命令使环境变量生效:

source devel/setup.bash

4.2 启动 ROS 节点(启动无人车和无人机的 UDP 发布和订阅节点。)

在无人机端启动发布节点

rosrun your_package ground_udp_publisher.py

在无人车启动接受节点

rosrun your_package ground_udp_subscriber.py

这样,无人车上的 ROS 信息将通过 UDP 协议发送到无人机,无人机再将这些信息发布为 ROS 话题,进行进一步的处理。

5.数据生成与udp格式发布(合为一个节点的示例)

algorithm_and_udp_node.py

#!/usr/bin/env python
import rospy
import socket
from your_package.msg import 1_GroundObject, VEHICLE_OBJ_COOP# 创建 UDP socket
udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_ip = "无人机的IP地址"  # 目标无人机的IP
udp_port = 12345  # 目标无人机的端口号def send_udp_message(msg):# 将自定义消息序列化并发送data = f"{msg.UTC_time},{msg.VehicleNumber}," + ",".join([f"{obj.class},{obj.ismove},{obj.vehicleclass},{obj.lon},{obj.lat}" for obj in msg.VehicleObjectCOOP])udp_sock.sendto(data.encode(), (udp_ip, udp_port))def algorithm_publish():pub = rospy.Publisher('ground_data_topic', 1_GroundObject, queue_size=10)rospy.Subscriber("ground_data_topic", 1_GroundObject, send_udp_message)  # 订阅并发送数据rospy.init_node('algorithm_and_udp_node')rate = rospy.Rate(10)  # 10Hz 发布频率while not rospy.is_shutdown():# 构建消息ground_msg = 1_GroundObject()ground_msg.UTC_time = rospy.Time.now().to_sec() * 1000  # 毫秒ground_msg.VehicleNumber = 5  # 示例车辆数量vehicle_obj = VEHICLE_OBJ_COOP()vehicle_obj.class = 1  # 车辆vehicle_obj.ismove = 1  # 移动状态vehicle_obj.vehicleclass = 0  # HM 类别vehicle_obj.lon = 120.123456  # 示例经度vehicle_obj.lat = 30.123456   # 示例纬度# 添加到数组中ground_msg.VehicleObjectCOOP.append(vehicle_obj)# 发布消息pub.publish(ground_msg)rate.sleep()if __name__ == '__main__':try:algorithm_publish()except rospy.ROSInterruptException:passfinally:udp_sock.close()
运行:
rosrun your_package algorithm_and_udp_node.py

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com