欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 财经 > 产业 > 分布式微服务系统架构第105集:协议,高性能下单系统示例项目

分布式微服务系统架构第105集:协议,高性能下单系统示例项目

2025/4/19 14:24:47 来源:https://blog.csdn.net/qq_36232611/article/details/147186119  浏览:    关键词:分布式微服务系统架构第105集:协议,高性能下单系统示例项目

加群联系作者vx:xiaoda0423

仓库地址:https://webvueblog.github.io/JavaPlusDoc/

https://1024bat.cn/

// 项目结构说明(Spring Boot + Maven 简化版)/*** 目录结构:* ├── Application.java* ├── controller/OrderController.java* ├── service/OrderService.java* ├── service/RetryService.java* ├── bloom/BloomFilterManager.java* ├── kafka/OrderProducer.java* ├── kafka/OrderConsumer.java* ├── model/OrderMessage.java* ├── config/KafkaConfig.java* ├── config/MetricsConfig.java* ├── retry/RetryTaskScheduler.java*/// Application.java
@SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}
}// controller/OrderController.java
@RestController
@RequestMapping("/order")
public class OrderController {@Autowired private OrderService orderService;@PostMapping("/create")public ResponseEntity<String> createOrder(@RequestParam Long skuId) {orderService.createOrder(skuId);return ResponseEntity.ok("Order request received");}
}// service/OrderService.java
@Service
public class OrderService {@Autowired private BloomFilterManager bloomFilter;@Autowired private RedisTemplate<String, Object> redis;@Autowired private RedissonClient redisson;@Autowired private OrderProducer producer;@Autowired private MeterRegistry meterRegistry;public void createOrder(Long skuId) {Timer.Sample sample = Timer.start(meterRegistry);try {if (!bloomFilter.contains(skuId)) throw new RuntimeException("SKU not exists");String key = "sku:" + skuId;Object cached = redis.opsForValue().get(key);if (cached != null) return;RLock lock = redisson.getLock("lock:sku:" + skuId);lock.lock();try {// 模拟 DB 查询Object dbData = new Object();redis.opsForValue().set(key, dbData, 30, TimeUnit.MINUTES);CompletableFuture.runAsync(() -> {try {Thread.sleep(1000);redis.delete(key);} catch (InterruptedException ignored) {}});} finally {lock.unlock();}// 发送 Kafka 消息producer.sendOrder(new OrderMessage(skuId, UUID.randomUUID().toString()));} finally {sample.stop(meterRegistry.timer("order.create.timer"));}}
}// bloom/BloomFilterManager.java
@Component
public class BloomFilterManager {private BloomFilter<Long> filter = BloomFilter.create(Funnels.longFunnel(), 1000000, 0.01);@PostConstructpublic void init() {// 模拟加载数据for (long i = 1; i <= 1000; i++) filter.put(i);}public boolean contains(Long skuId) {return filter.mightContain(skuId);}
}// kafka/OrderProducer.java
@Component
public class OrderProducer {@Autowired private KafkaTemplate<String, String> kafka;public void sendOrder(OrderMessage msg) {kafka.send("order_topic", JSON.toJSONString(msg));}
}// kafka/OrderConsumer.java
@Component
public class OrderConsumer {@KafkaListener(topics = "order_topic", groupId = "order_group")public void onMessage(String message, Acknowledgment ack) {try {OrderMessage order = JSON.parseObject(message, OrderMessage.class);// 处理订单逻辑ack.acknowledge();} catch (Exception e) {// 可转发至 retry_topic,由 RetryService 定期处理}}
}// model/OrderMessage.java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderMessage {private Long skuId;private String orderId;
}// config/KafkaConfig.java
@Configuration
public class KafkaConfig {@Beanpublic KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) {return new KafkaTemplate<>(pf);}
}// config/MetricsConfig.java
@Configuration
public class MetricsConfig {@Beanpublic MeterRegistryCustomizer<MeterRegistry> metricsCommonTags() {return registry -> registry.config().commonTags("app", "order-service");}
}// retry/RetryTaskScheduler.java
@Component
public class RetryTaskScheduler {@Scheduled(fixedDelay = 5000)public void retry() {// 查询 DB 或 Redis 获取重试任务,重新发送 Kafka}
}

加 Thread.currentThread().interrupt(); // 恢复中断标志

这是一个 Java 多线程编程中非常经典的问题,关键点在于 中断标志(interrupt flag)机制是协作式的,需要手动维护或检查。


✅ 一句话解释:

Thread.currentThread().interrupt(); 的作用是:在捕获 InterruptedException 异常后, “重新设置”线程的中断标志位,以便后续代码知道线程已经被中断。


🔍 背景知识:线程中断机制

在 Java 中,线程的中断(interrupt)是一种“温和的请求”,不会强制终止线程,而是通过设置一个中断标志位,让线程自己检查并响应。

  • 调用 thread.interrupt():设置中断标志为 true。

  • 某些阻塞方法(如 Thread.sleep()wait()join())会抛出 InterruptedException并自动清除中断标志(设为 false)。


🚨 为什么要恢复中断标志?

举个典型场景:

try {Thread.sleep(1000);
} catch (InterruptedException e) {// 中断标志会被清除(设为 false)// 所以需要重新设置为 true,以便上层逻辑能感知Thread.currentThread().interrupt(); // 恢复中断状态
}

如果你不加 Thread.currentThread().interrupt();,那么线程会“忘记”自己被中断过,上层逻辑或框架将无法感知到这次中断事件。


🧠 举个实际例子(不加就会出问题)

public void run() {try {while (!Thread.currentThread().isInterrupted()) {// do somethingThread.sleep(1000); // 可被中断}} catch (InterruptedException e) {// 中断了,但 sleep 会清除中断标志// 如果不恢复,这里会继续循环// 正确做法是:Thread.currentThread().interrupt(); }
}

✅ 总结

目的

说明

恢复中断状态

InterruptedException

 会清除中断标志,你要手动恢复

便于上层感知中断

保证中断信号能沿上传递,线程能优雅退出

遵循中断协议

线程设计中推荐通过检查中断状态实现协作式退出

✅ 上层如何感知线程被中断?

通过 Thread.currentThread().isInterrupted() 或响应 InterruptedException

public void outerLogic() {try {workerLogic(); // 内层调用} catch (InterruptedException e) {// ✅ 这里感知到了中断,可以做善后逻辑(释放资源、标记失败、退出等)System.out.println("Outer: Thread was interrupted, cleaning up...");// 也可以继续向上传递Thread.currentThread().interrupt(); // 再次设置中断标志,方便更上层知道}
}

✅ 内层 try-catch 如何恢复中断状态?

public void workerLogic() throws InterruptedException {try {// 可能阻塞的方法Thread.sleep(1000);} catch (InterruptedException e) {// ⚠️ 中断标志被清除了,这里必须手动恢复Thread.currentThread().interrupt(); // 恢复中断标志throw e; // ❗向上抛异常,让上层处理}
}

❗如果你不恢复中断状态,会怎么样?

中断被吃掉了,线程不会再退出:

try {Thread.sleep(1000);
} catch (InterruptedException e) {// 没有恢复中断,线程状态被重置为未中断// 线程后续逻辑继续执行,不会退出
}

这是最容易被忽略的 Bug:线程实际被中断了,但程序表现上“什么都没发生”。

✅ 上层如何感知线程被中断?

通过 Thread.currentThread().isInterrupted() 或响应 InterruptedException

public void outerLogic() {try {workerLogic(); // 内层调用} catch (InterruptedException e) {// ✅ 这里感知到了中断,可以做善后逻辑(释放资源、标记失败、退出等)System.out.println("Outer: Thread was interrupted, cleaning up...");// 也可以继续向上传递Thread.currentThread().interrupt(); // 再次设置中断标志,方便更上层知道}
}

优化点说明

类别

优化

说明

注解写法

使用 private + 注解

避免不必要的包作用域暴露,提高封装性

构造器

删除无参构造器

Spring 默认注入不需要写明空构造器

命名

管道节点命名更清晰

可方便后期动态 debug

对象重用

解码器每次 new 是合理的

编解码器 stateless 可共用,业务处理器建议 Spring 单例注入

内存优化

设置合理帧长度限制

40960

 限制消息长度,防御恶意请求,防止 OOM

JVM / Netty 层级调优建议

  1. TCP 参数调优(在 Netty 启动类中):

    .childOption(ChannelOption.TCP_NODELAY, true)
    .childOption(ChannelOption.SO_KEEPALIVE, true)
    .childOption(ChannelOption.SO_REUSEADDR, true)
  2. 对象池复用:比如 ByteBuf 和业务对象可使用 FastThreadLocal 或 Recycler

  3. 线程模型优化:合理配置 Netty Boss/Worker 线程数量,比如:

    new NioEventLoopGroup(Math.max(2, Runtime.getRuntime().availableProcessors()))
  4. 关闭 Netty 的资源泄露检测(若已稳定) :

    ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED);
  5. 避免过多日志与堆栈打印,尤其是 Idle 触发过多或帧异常抛出。

Netty 的“责任链管道”,可理解为消息处理的流水线

  1. 空闲检测:xx秒无任何读写操作,则触发 xxx 事件(可用于心跳机制)

  2. 帧解码器:以 0x7e 为分隔符拆包,避免 TCP 粘包/拆包问题

  • 最大帧长度限制为 xxx 字节,防止恶意数据造成 OOM

  • 传入多个分隔符实现灵活识别(如 0x7e、0x7e 0x7e)

  • 解码器:将原始二进制数据解析成 JT808 消息对象(下层逻辑使用 POJO)

  • 编码器:将响应对象转为二进制 JT808 协议格式并发送

  • 核心业务处理器:处理业务逻辑(登录、位置信息、心跳等)

  • 防止 TCP 粘包,按 0x7e 拆包

    粘包(Sticky Packet)和拆包(Half Packet)是 TCP 网络编程中的常见问题,特别是你在做像 JT808 协议这种基于 TCP 长连接的数据传输时,必须考虑并解决这两个问题


    🚩 一句话理解

    • 粘包: 多个消息黏在一起,一次接收收到了多个。

    • 拆包: 一个消息被拆成多段发出,需要多次接收才能完整拼回。


    🎯 为什么会出现粘包/拆包?

    因为 TCP 是流式协议,不像 UDP 一包一发,它不关心消息边界,只负责“字节流”的可靠传输。比如:

    • 应用层连续发送两条消息 → TCP 可能合并发送(粘包) 。

    • 消息太大,超过了一次 TCP 报文长度 → TCP 可能分多次发送(拆包) 。


    📦 粘包/拆包 Java 示例

    我们用 Java 的 ServerSocket + Socket 模拟:


    🧪 示例一:粘包现象

    👨‍💻 客户端连续发送两条消息:
    Socket socket = new Socket("localhost", 8080);
    OutputStream out = socket.getOutputStream();
    out.write("Hello".getBytes());
    out.write("World".getBytes());
    out.flush();
    👨‍💻 服务端一次性读取收到:
    ServerSocket serverSocket = new ServerSocket(8080);
    Socket client = serverSocket.accept();
    InputStream in = client.getInputStream();
    byte[] buffer = new byte[1024];
    int len = in.read(buffer);
    System.out.println(new String(buffer, 0, len)); 
    // 输出可能是 "HelloWorld"(粘包),看不到分隔

    🧪 示例二:拆包现象

    👨‍💻 客户端发送一条大消息:
    byte[] largeMsg = new byte[10000]; // 比 MTU 或缓冲区大
    Arrays.fill(largeMsg, (byte)'A');
    out.write(largeMsg);
    out.flush();
    👨‍💻 服务端多次才能读取完:
    byte[] buffer = new byte[1024];
    int total = 0;
    while ((len = in.read(buffer)) != -1) {total += len;// 模拟接收多次才拼完整
    }
    System.out.println("Total bytes received: " + total);

    🧩 怎么解决粘包 / 拆包?

    ✅ 方式一:使用分隔符(如 JT808 用 0x7e)

    pipeline.addLast(new DelimiterBasedFrameDecoder(40960,Unpooled.wrappedBuffer(new byte[]{0x7e})));
    • 报文结构:[0x7e][消息体][0x7e]

    • 解码器自动识别每一条完整的帧,避免粘包/拆包


    ✅ 方式二:使用固定长度帧

    pipeline.addLast(new FixedLengthFrameDecoder(20)); // 每20字节一帧

    适用于消息长度固定的协议。


    ✅ 方式三:消息头 + 长度字段

    自定义协议格式:

    | 消息头 | 长度字段 | 数据内容 |
    |  2字节 |   4字节 |   N字节 |

    你可以用 LengthFieldBasedFrameDecoder

    pipeline.addLast(new LengthFieldBasedFrameDecoder(65536, 2, 4, 0, 0));

    📌 总结对比表

    类型

    说明

    场景

    粘包

    多条消息黏一起

    TCP合并发送

    拆包

    一条消息被拆

    TCP 分片、缓存不够

    解决方案

    分隔符、定长、长度字段

    取决于协议设计

    报文结构设计(符合 JT808 协议)

    | 起始位 | 消息头(12字节) | 消息体(N字节) | CRC校验码 | 结束位 |
    |  0x7e  | 消息ID + 属性 + 终端手机号 + 流水号 | 业务数据 | 1字节 |  0x7e  |

    📦 示例:完整 Java 客户端(模拟构造 JT808 报文)

    public class JT808ClientWithCRC {public static void main(String[] args) throws Exception {Socket socket = new Socket("127.0.0.1", 9000);OutputStream out = socket.getOutputStream();byte[] body = new byte[]{0x01, 0x02, 0x03, 0x04}; // 示例消息体byte[] message = buildJT808Packet((short) 0x0200, body); // 0x0200 = 位置信息out.write(message);out.flush();System.out.println("已发送 JT808 报文: " + bytesToHex(message));socket.close();}// 构造完整 JT808 报文(含 CRC、起止位)private static byte[] buildJT808Packet(short msgId, byte[] body) {ByteArrayOutputStream baos = new ByteArrayOutputStream();baos.write(0x7e); // 起始位ByteArrayOutputStream headerAndBody = new ByteArrayOutputStream();// 消息 ID (2字节)headerAndBody.write((msgId >> 8) & 0xff);headerAndBody.write(msgId & 0xff);// 消息体属性 (2字节) - 此处只填消息长度int bodyLen = body.length;headerAndBody.write((bodyLen >> 8) & 0xff);headerAndBody.write(bodyLen & 0xff);// 终端手机号(6字节 BCD 编码)示例:12345678901byte[] phoneBCD = bcd("12345678901");headerAndBody.write(phoneBCD, 0, 6);// 流水号(2字节)headerAndBody.write(0x00);headerAndBody.write(0x01);// 消息体headerAndBody.writeBytes(body);byte[] content = headerAndBody.toByteArray();// CRC 校验(从 消息ID 到 消息体 之前的所有字节)byte crc = calcChecksum(content);baos.writeBytes(content);baos.write(crc);baos.write(0x7e); // 结束位return baos.toByteArray();}// BCD 编码(手机号等)private static byte[] bcd(String digits) {if (digits.length() % 2 != 0) {digits = "0" + digits;}byte[] result = new byte[digits.length() / 2];for (int i = 0; i < digits.length(); i += 2) {result[i / 2] = (byte) (((digits.charAt(i) - '0') << 4) | (digits.charAt(i + 1) - '0'));}return result;}// 校验码(JT808 采用异或校验)private static byte calcChecksum(byte[] bytes) {byte checksum = bytes[0];for (int i = 1; i < bytes.length; i++) {checksum ^= bytes[i];}return checksum;}private static String bytesToHex(byte[] bytes) {StringBuilder sb = new StringBuilder();for (byte b : bytes) sb.append(String.format("%02X ", b));return sb.toString();}
    }

    🔍 报文示例(打印结果)

    7E 02 00 00 04 01 23 45 67 89 01 00 01 01 02 03 04 XX 7E↑ 手机号(BCD)         ↑ CRC
    • 02 00:消息 ID(0x0200 = 位置信息)

    • 00 04:消息体长度(4字节)

    • 01 23 45 67 89 01:终端手机号(BCD 编码)

    • 00 01:流水号

    • 01 02 03 04:消息体

    • XX:CRC 校验值

    • 7E:起止标识

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词