欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 教育 > 锐评 > Python创建多个线程分别启动http、WebSocket服务

Python创建多个线程分别启动http、WebSocket服务

2024/10/25 18:31:58 来源:https://blog.csdn.net/weixin_38126537/article/details/142856007  浏览:    关键词:Python创建多个线程分别启动http、WebSocket服务

我的计划是启动主程序后新建3个独立的线程,一个线程执行PLC读取,一个线程启动工艺测试(含http服务),另外一个线程启动WebSocket。

新增 /lib/PlcReader.py

# 执行 PLC 读取类
# 读取 PLC 配置文件
# 定时(每秒)读取PLC值,发送到内存模块中,并通过 WebSocket 发送给所有连接客户端import timeclass PlcReader(object):def __init__(self, job_name):self.jobName = job_nameself.readPlcValueConfigList = []def run(self):print("PLC读取模块启动")self.read_config()# 定时读取 PLC 的值while True:self.read_plc_value()time.sleep(1)def read_config(self):print("读取 PLC 配置文件")self.readPlcValueConfigList = [{"name": "aaa"}]def refresh_config(self):print("更新 PLC 配置文件")self.readPlcValueConfigList = []def read_plc_value(self):print("读取 PLC 值")# 如有没有读取配置列表,则不读取if not self.readPlcValueConfigList or len(self.readPlcValueConfigList) == 0:print("如有没有读取配置列表")returnprint("去更新内存中的PLC值,并通过WebSocket发送给所有连接客户端")

新增/lib/ProcessTester.py

# 工艺测试主程序
# 提供初始化工艺
# 提供开始执行
# 提供暂定执行
# 提供结束执行(紧急停止可调用)
# 提供手动调节
# 对外提供 http 调用服务,通过 http 调用来执行工艺的测试过程(初始化工艺、开始执行、暂停执行、结束执行、手动调节)import http.server
import socketserver
class HTTPRequestHandler(http.server.BaseHTTPRequestHandler):def do_GET(self):self.send_response(200)self.send_header('Content-type', 'text/html')self.end_headers()message = "Hello, World!"self.wfile.write(bytes(message, "utf8"))returndef do_POST(self):self.send_response(200)self.send_header('Content-type', 'text/html')self.end_headers()message = "Hello, World2!"self.wfile.write(bytes(message, "utf8"))returnclass ProcessTester:def __init__(self, port):self.handler = HTTPRequestHandlerself.port = portdef start(self):with socketserver.TCPServer(("", self.port), self.handler) as httpd:print("HTTP 服务启动在端口", self.port)httpd.serve_forever()

/lib/WebSocketServer.py

import tornado.web
import tornado.httpserver
import tornado.ioloop
import tornado.websocketclass WebSocketHandler(tornado.websocket.WebSocketHandler):def open(self):print('WebSocket opened')def on_message(self, message):print('WebSocket message received {}'.format(message))self.write_message(message)def on_close(self):print('WebSocket closed')def check_origin(self, origin):return Trueclass WebSocketServer:def __init__(self, port):self.port = portdef start(self):application = tornado.web.Application([(r'/websocket', WebSocketHandler)])http_server = tornado.httpserver.HTTPServer(application)http_server.listen(self.port)tornado.ioloop.IOLoop.instance().start()

最外层新增Main.py

# 启动三个线程
# 1. PLC读取模块
# 2. 执行工艺试验模块
# 3. WebSocket通讯模块import threading
import timefrom lib.PlcReader import PlcReader
from lib.ProcessTester import ProcessTester
from lib.WebSocketServer import WebSocketServerplcReader = PlcReader("PlcReader")
processTester = ProcessTester(8080)
wsServer = WebSocketServer(9001)if __name__ == "__main__":print("启动程序")# 创建线程对象thread1 = threading.Thread(target=plcReader.run)thread2 = threading.Thread(target=processTester.start)thread3 = threading.Thread(target=wsServer.start)# 启动线程thread1.start()thread2.start()thread3.start()time.sleep(10)plcReader.refresh_config()print("aaa")

编辑完了之后,启动Main.py

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com