欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > 名人名企 > tcp接发json字符串

tcp接发json字符串

2025/4/3 6:19:10 来源:https://blog.csdn.net/you_get_me_there/article/details/146605034  浏览:    关键词:tcp接发json字符串

因工作需要对接硬件设备,需要通过tcp协议接收发送字符串,而字符串里面全是json字符串,登陆用json对象发送,心跳也用json发送,设备检测到信号后自动推送的也是json字符串,只要登陆后心跳就要每过10秒发送一次,而信号的推送则是在登陆后的任意时间发生.每个json与json之间没有换行符分割,所以让deepseek帮着写了能读取嵌套json的解析代码.

因为是个小工具,因此选择python编写,核心代码如下:

import socket 
import json 
import time 
import threading class TCPClient:def __init__(self, host='127.0.0.1', port=60, fun_exec=None):print(f"host={host} port={port}")self.host  = host self.port  = port self.client_socket  = None self.heartbeat_interval  = 10  # 心跳间隔10秒 self.is_running  = False self.buffer  = "" self.fun_exec = fun_execdef connect(self):"""建立TCP连接并发送登录信息"""try:# 创建TCP套接字 self.client_socket  = socket.socket(socket.AF_INET,  socket.SOCK_STREAM)# 设置超时时间(单位:秒)self.client_socket.settimeout(100)# 连接服务器 try:print('开始连接')self.client_socket.connect((self.host,  self.port)) print(f"已连接") except socket.timeout: print("连接超时,请检查网络或服务器状态")return Falseexcept Exception as e:print(f"连接失败: {str(e)}")return False# 发送登录信息 login_data = {"cmd": "1","username": "账号","password": "密码"}self._send_json(login_data)return True except Exception as e:print(f"连接失败: {str(e)}")return False def _send_json(self, data):"""发送JSON数据"""try:json_str = json.dumps(data)# print(f"发送信息:{data}")self.client_socket.sendall(json_str.encode('utf-8')) except Exception as e:print(f"发送数据失败: {str(e)}")self.reconnect() def _heartbeat_task(self):"""心跳维持线程"""if 'all' == global_leida_log:print('开始发送心跳')while self.is_running: heartbeat_data = {"cmd": "2"}self._send_json(heartbeat_data)time.sleep(self.heartbeat_interval) def _process_message(self, message):"""处理接收到的JSON消息"""try:data = json.loads(message) print(f"收到服务器响应: {data}")self.fun_exec(data)except json.JSONDecodeError:print("收到无效的JSON数据")def reconnect(self):"""断线重连机制"""self.stop() print("尝试重新连接...")while True:if self.connect(): self.start() break time.sleep(5) def start(self):"""启动客户端服务"""self.is_running  = True # 启动心跳线程 threading.Thread(target=self._heartbeat_task, daemon=True).start()# 启动接收线程 threading.Thread(target=self._receive_handler, daemon=True).start()def stop(self):"""停止客户端"""self.is_running  = False if self.client_socket: try:self.client_socket.close() except:pass def _receive_handler(self): """接收数据处理器"""# print('开始接收数据:')while self.is_running: try: data = self.client_socket.recv(4096)  if not data: print("连接已断开") self.reconnect()  break self.buffer  += data.decode('utf-8')  # 处理缓冲区中的数据 while self.buffer:  try: # 尝试解析 JSON 对象 obj, end_index = self._parse_json(self.buffer)  if obj is not None: # 处理解析后的 JSON 对象 self._process_message(json.dumps(obj))  # 移除已处理的数据 self.buffer  = self.buffer[end_index:].lstrip()  else: break except json.JSONDecodeError: break except Exception as e: print(f"发生错误: {e}") def _parse_json(self, data): stack = [] start_index = None for i, char in enumerate(data): if char == '{': if not stack: start_index = i stack.append(char)  elif char == '}': if stack: stack.pop()  if not stack: try: json_obj = json.loads(data[start_index:i  + 1]) return json_obj, i + 1 except json.JSONDecodeError: pass return None, 0 def temp_fun(d):print(d)if __name__ == "__main__":client = TCPClient(host='192.168.0.6', port=60, fun_exec=temp_fun)if client.connect(): try:client.start() # 保持主线程运行 while True:time.sleep(1) except KeyboardInterrupt:client.stop() 

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词