欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 时评 > python socket 发生UDP 和 UDPServer接受UDP实例

python socket 发生UDP 和 UDPServer接受UDP实例

2024/10/24 21:25:48 来源:https://blog.csdn.net/lyq308152569/article/details/141467731  浏览:    关键词:python socket 发生UDP 和 UDPServer接受UDP实例

python UDP 通信

socket 发送udp 示例

import socket
import time# 初始化端口
self.ip_port = (host_msg,ip_port_msg)
# 创建 socket
self.client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)  
# 发送
self.client.sendto(self.msg,self.ip_port)
# 关闭 soceket
self.client.close()

UDPServer 用于接收 UDP 示例

# 继承  UDPServer 
class MyUDPServer(UDPServer):def __init__(self, server_address, RequestHandlerClass, queue):# 传递用于接受的队列self.udp_queue = queue  # UDPServer 实例化           地址          服务函数UDPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=True)class Handler(BaseRequestHandler):def handle(self):self.data = self.request[0].strip()self.server.udp_queue.put(self.data)

class my_udp_server():def __init__(self,udp_queue):super().__init__()# 创建用于接受的队列self.queue = udp_queue# 创建服务,def open_udp_server(self,ip,port):  ADDR = ip, port#实例化 MyUDPServer 类self.UDPServer = MyUDPServer(ADDR, Handler, self.queue)  #创建线程,将UDPServer的serve_forever  传递进线程成中self.server_thread = threading.Thread(target=self.UDPServer.serve_forever)  #设置后台线程self.server_thread.setDaemon(True) #启动线程self.server_thread.start()   def close_udp_server(self):  #服务  shutdownself.UDPServer.shutdown()#关闭服务self.UDPServer.server_close()# 创建 队列用于接收
self.udp_queue= Queue(maxsize=5)
# 实例化 udp 服务
self.udp_server = my_udp_server(self.udp_queue)
# 打开服务
self.udp_server.open_udp_server("192.168.1.100",6000)
# 关闭服务
self.udp_server.close_udp_server()

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com