因工作需要对接硬件设备,需要通过tcp协议接收发送字符串,而字符串里面全是json字符串,登陆用json对象发送,心跳也用json发送,设备检测到信号后自动推送的也是json字符串,只要登陆后心跳就要每过10秒发送一次,而信号的推送则是在登陆后的任意时间发生.每个json与json之间没有换行符分割,所以让deepseek帮着写了能读取嵌套json的解析代码.
因为是个小工具,因此选择python编写,核心代码如下:
import socket
import json
import time
import threading class TCPClient:def __init__(self, host='127.0.0.1', port=60, fun_exec=None):print(f"host={host} port={port}")self.host = host self.port = port self.client_socket = None self.heartbeat_interval = 10 # 心跳间隔10秒 self.is_running = False self.buffer = "" self.fun_exec = fun_execdef connect(self):"""建立TCP连接并发送登录信息"""try:# 创建TCP套接字 self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# 设置超时时间(单位:秒)self.client_socket.settimeout(100)# 连接服务器 try:print('开始连接')self.client_socket.connect((self.host, self.port)) print(f"已连接") except socket.timeout: print("连接超时,请检查网络或服务器状态")return Falseexcept Exception as e:print(f"连接失败: {str(e)}")return False# 发送登录信息 login_data = {"cmd": "1","username": "账号","password": "密码"}self._send_json(login_data)return True except Exception as e:print(f"连接失败: {str(e)}")return False def _send_json(self, data):"""发送JSON数据"""try:json_str = json.dumps(data)# print(f"发送信息:{data}")self.client_socket.sendall(json_str.encode('utf-8')) except Exception as e:print(f"发送数据失败: {str(e)}")self.reconnect() def _heartbeat_task(self):"""心跳维持线程"""if 'all' == global_leida_log:print('开始发送心跳')while self.is_running: heartbeat_data = {"cmd": "2"}self._send_json(heartbeat_data)time.sleep(self.heartbeat_interval) def _process_message(self, message):"""处理接收到的JSON消息"""try:data = json.loads(message) print(f"收到服务器响应: {data}")self.fun_exec(data)except json.JSONDecodeError:print("收到无效的JSON数据")def reconnect(self):"""断线重连机制"""self.stop() print("尝试重新连接...")while True:if self.connect(): self.start() break time.sleep(5) def start(self):"""启动客户端服务"""self.is_running = True # 启动心跳线程 threading.Thread(target=self._heartbeat_task, daemon=True).start()# 启动接收线程 threading.Thread(target=self._receive_handler, daemon=True).start()def stop(self):"""停止客户端"""self.is_running = False if self.client_socket: try:self.client_socket.close() except:pass def _receive_handler(self): """接收数据处理器"""# print('开始接收数据:')while self.is_running: try: data = self.client_socket.recv(4096) if not data: print("连接已断开") self.reconnect() break self.buffer += data.decode('utf-8') # 处理缓冲区中的数据 while self.buffer: try: # 尝试解析 JSON 对象 obj, end_index = self._parse_json(self.buffer) if obj is not None: # 处理解析后的 JSON 对象 self._process_message(json.dumps(obj)) # 移除已处理的数据 self.buffer = self.buffer[end_index:].lstrip() else: break except json.JSONDecodeError: break except Exception as e: print(f"发生错误: {e}") def _parse_json(self, data): stack = [] start_index = None for i, char in enumerate(data): if char == '{': if not stack: start_index = i stack.append(char) elif char == '}': if stack: stack.pop() if not stack: try: json_obj = json.loads(data[start_index:i + 1]) return json_obj, i + 1 except json.JSONDecodeError: pass return None, 0 def temp_fun(d):print(d)if __name__ == "__main__":client = TCPClient(host='192.168.0.6', port=60, fun_exec=temp_fun)if client.connect(): try:client.start() # 保持主线程运行 while True:time.sleep(1) except KeyboardInterrupt:client.stop()