欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 新车 > 分布式微服务系统架构第117集:Kafka发送工具,标准ASCII

分布式微服务系统架构第117集:Kafka发送工具,标准ASCII

2025/4/25 3:29:24 来源:https://blog.csdn.net/qq_36232611/article/details/147434846  浏览:    关键词:分布式微服务系统架构第117集:Kafka发送工具,标准ASCII

加群联系作者vx:xiaoda0423

仓库地址:https://webvueblog.github.io/JavaPlusDoc/

https://1024bat.cn/

  • 每分钟输出报警频率指标(TPS、QPS)

  • 超过阈值时,自动报警(比如推送到运维)

  • 异步批量提交 Kafka(提升吞吐)

  • 限流/熔断(防止疯狂报警拖垮系统)

📈 1. 每分钟输出报警发送频率(TPS、QPS)

加一个定时器(比如用 ScheduledExecutorService),每分钟统计一次:

private static final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private static AtomicLong kafkaSendCounter = new AtomicLong(0);@PostConstruct
public void initKafkaMetrics() {scheduler.scheduleAtFixedRate(() -> {long count = kafkaSendCounter.getAndSet(0);logger.info("==> 报警Kafka 1分钟发送量: {}, TPS: {}", count, count / 60.0);}, 1, 1, TimeUnit.MINUTES);
}

然后每次 sendAlarmMsg 时增加一下:

kafkaSendCounter.incrementAndGet();

🚨 2. 超过阈值自动报警(比如推送到运维系统)

比如设置每分钟最大1万条,超过就报警:

private static final long MAX_KAFKA_SEND_PER_MINUTE = 10_000;@PostConstruct
public void initKafkaMetrics() {scheduler.scheduleAtFixedRate(() -> {long count = kafkaSendCounter.getAndSet(0);logger.info("==> 报警Kafka 1分钟发送量: {}, TPS: {}", count, count / 60.0);if (count > MAX_KAFKA_SEND_PER_MINUTE) {logger.error("==> 报警发送量异常!!! 超过阈值: {}", count);// TODO: 发通知到钉钉/微信机器人/运维平台sendOpsAlert("Kafka发送量异常,1分钟达到:" + count + "条!");}}, 1, 1, TimeUnit.MINUTES);
}private void sendOpsAlert(String msg) {// 模拟发送告警,可以接钉钉机器人、Prometheus AlertManager、Grafana告警logger.warn("【运维告警】{}", msg);
}

🚀 3. 异步批量提交 Kafka(提升吞吐量)

思路:不是每一条都发,而是攒一批一起发(比如100条或者100ms发一次)

可以用 LinkedBlockingQueue + 批量发送:

private static final BlockingQueue<ProducerRecord<String, String>> kafkaQueue = new LinkedBlockingQueue<>();
private static final int BATCH_SIZE = 100;
private static final int BATCH_WAIT_MS = 100;@PostConstruct
public void initKafkaBatchSender() {scheduler.scheduleWithFixedDelay(() -> {List<ProducerRecord<String, String>> batch = new ArrayList<>();kafkaQueue.drainTo(batch, BATCH_SIZE);if (!batch.isEmpty()) {batch.forEach(record -> {try {gatewayKafkaTemplate.send(record.topic(), record.key(), record.value());} catch (Exception e) {logger.error("==> 批量发送Kafka异常", e);}});logger.info("==> 批量发送Kafka {}条", batch.size());}}, 0, BATCH_WAIT_MS, TimeUnit.MILLISECONDS);
}public void asyncSendKafka(String topic, String key, String value) {kafkaQueue.offer(new ProducerRecord<>(topic, key, value));kafkaSendCounter.incrementAndGet();
}

以后调用就改成:

asyncSendKafka("cabinets-alarm", kafkaKey, jsonMessage);

真正高频发时吞吐量可以提高几倍甚至十几倍


⚡ 4. 限流+熔断(防止被报警搞死系统)

比如:1分钟超10W条就直接丢弃+熔断保护

private static final long MAX_KAFKA_QUEUE_SIZE = 100_000;public void asyncSendKafka(String topic, String key, String value) {if (kafkaQueue.size() > MAX_KAFKA_QUEUE_SIZE) {logger.error("==> Kafka报警发送队列已满,触发限流丢弃!size={}", kafkaQueue.size());return; // 丢弃这条}kafkaQueue.offer(new ProducerRecord<>(topic, key, value));kafkaSendCounter.incrementAndGet();
}

想高级一点还可以:

  • 短时间熔断(比如5分钟自动恢复)

  • 降级处理(比如记录到本地磁盘,后续补发)

1. 基本解码规则

  • 只处理以 { 开始,} 结束的完整 JSON 格式报文。

  • 允许处理嵌套{},即内部嵌套对象。

  • 数据最大只处理到xKB以内,防止异常数据撑爆内存。

  • 如果一条报文找不到开始符号 { ,或找不到合法结束符号 } ,则:

    • 直接丢弃或等待下一批数据补齐。

  • 粘包半包场景均有容错处理。

  • 0x7B(16进制)

  • 十进制:123

  • 对应 ASCII 字符:{ (左花括号)

完整表格(标准ASCII 0x00 ~ 0x7F片段,含0x7B重点标红)

16进制

10进制

字符

描述

0x00

0

NUL

空字符

0x01

1

SOH

标题开始

...

...

...

...

0x09

9

TAB

水平制表符(Tab)

0x0A

10

LF

换行

0x0D

13

CR

回车

...

...

...

...

0x20

32

空格

空格

0x21

33

!

感叹号

0x22

34

"

双引号

0x23

35

#

井号

0x24

36

$

美元符号

...

...

...

...

0x28

40

(

左小括号

0x29

41

)

右小括号

0x2A

42

*

星号

0x2B

43

+

加号

0x2C

44

,

逗号

0x2D

45

-

减号

0x2E

46

.

句号

0x2F

47

/

斜线

0x30

48

0

数字0

...

...

...

...

0x39

57

9

数字9

0x3A

58

:

冒号

0x3B

59

;

分号

0x3C

60

<

小于号

0x3D

61

=

等号

0x3E

62

>

大于号

0x3F

63

?

问号

0x40

64

@

at符号

0x41

65

A

大写字母A

...

...

...

...

0x5A

90

Z

大写字母Z

0x5B

91

[

左中括号

0x5C

92

``

反斜杠

0x5D

93

]

右中括号

0x5E

94

^

脱字符

0x5F

95

_

下划线

0x60

96

`

反引号

0x61

97

a

小写字母a

...

...

...

...

0x7A

122

z

小写字母z

🔥0x7B

123

{左花括号

0x7C

124

`

`

0x7D

125

}

右花括号

0x7E

126

~

波浪号

0x7F

127

DEL

删除字符

📋 超全ASCII对照表 (0x00 ~ 0xFF)

HEX

DEC

CHAR

描述

HEX

DEC

CHAR

描述

0x00

0

NUL

空字符(Null)

0x80

128

控制字符

0x01

1

SOH

标题开始

0x81

129

控制字符

0x02

2

STX

文本开始

0x82

130

拉丁文补充

0x03

3

ETX

文本结束

0x83

131

ƒ

拉丁文补充

0x04

4

EOT

传输结束

0x84

132

拉丁文补充

0x05

5

ENQ

请求

0x85

133

省略号

0x06

6

ACK

接收确认

0x86

134

匕首符号

0x07

7

BEL

响铃(Beep)

0x87

135

双匕首

0x08

8

BS

退格

0x88

136

ˆ

抑扬符

0x09

9

TAB

水平制表(Tab键)

0x89

137

千分号

0x0A

10

LF

换行(Line Feed)

0x8A

138

Š

拉丁补充

0x0B

11

VT

垂直制表

0x8B

139

单引号(开)

0x0C

12

FF

换页

0x8C

140

Œ

拉丁补充

0x0D

13

CR

回车(Carriage Return)

0x8D

141

控制字符

0x0E

14

SO

移出

0x8E

142

Ž

拉丁补充

0x0F

15

SI

移入

0x8F

143

控制字符

...

...

...

...

...

...

...

...

0x20

32

空格

Space

0xA0

160

不间断空格(NBSP)

0x21

33

!

感叹号

0xA1

161

¡

反感叹号

0x22

34

"

双引号

0xA2

162

¢

分币符

0x23

35

#

井号(#)

0xA3

163

£

英镑符号

0x24

36

$

美元符号

0xA4

164

¤

货币符号

...

...

...

...

...

...

...

...

0x28

40

(

左括号

0xB0

176

°

度数符号

0x29

41

)

右括号

0xB1

177

±

加减号

0x2A

42

*

星号(乘号)

0xB2

178

²

平方

0x2B

43

+

加号

0xB3

179

³

立方

...

...

...

...

...

...

...

...

🔥0x7B

123

{

左花括号

0xFB

251

û

拉丁扩展

0x7C

124

`

`

竖线

0xFC

252

ü

0x7D

125

}

右花括号

0xFD

253

ý

拉丁扩展

0x7E

126

~

波浪号

0xFE

254

þ

拉丁扩展

0x7F

127

DEL

删除符(Delete)

0xFF

255

ÿ

拉丁扩展

Netty编解码器实战小册

目录

章节

内容概览

1. Netty编解码器简介

Encoder、Decoder、Codec 全景图

2. 常用编解码器类型

自定义、内置(LengthFieldBasedFrameDecoder等)

3. 粘包/拆包问题

原因、现象、标准处理套路

4. 编解码器实战(基础版)

自定义 MessageToByteEncoder、ByteToMessageDecoder

5. 编解码器实战(进阶版)

包头包尾协议、分隔符协议、多协议动态识别

6. 性能优化技巧

零拷贝、池化ByteBuf、线程优化

7. 问题排查技巧

如何定位粘包/丢包/内存泄露


1. Netty编解码器简介

编解码器(Codec)= Encoder + Decoder

  • Encoder(编码器) :出站,把消息对象 ➔ ByteBuf。

  • Decoder(解码器) :入站,把ByteBuf ➔ 消息对象。

  • Codec(编解码器) :组合版,常用 MessageToMessageCodec

Netty内部是责任链(Pipeline)模式,编码器和解码器在 pipeline 中按顺序处理。


2. 常用编解码器类型

编解码器

描述

MessageToByteEncoder

消息 ➔ ByteBuf

ByteToMessageDecoder

ByteBuf ➔ 消息

MessageToMessageEncoder

消息 ➔ 消息

MessageToMessageDecoder

消息 ➔ 消息

CombinedChannelDuplexHandler

编解码合并器

还有Netty内置的超强编解码器:

  • LengthFieldBasedFrameDecoder ➔ 按包头长度字段自动拆包

  • DelimiterBasedFrameDecoder ➔ 按分隔符拆包

  • LineBasedFrameDecoder ➔ 按换行符拆包


3. 粘包/拆包问题

为什么会粘包拆包?

TCP本身是流式协议,不保证一条消息完整到达。

  • 发送慢,数据被切成多段(拆包

  • 多条小消息合并一起发(粘包

常见现象

  • 收到的数据长度异常(超长或超短)

  • 反序列化失败(JSON、Protobuf等解析错误)

标准处理套路

  1. 约定消息格式,比如:

  • 固定长度

  • 包头 + 包体

  • 特殊分隔符

  • 在解码器中正确处理拆包粘包逻辑。


  • 4. 编解码器实战(基础版)

    Encoder 示例:发送JSON字符串

    @ChannelHandler.Sharable
    public class JsonEncoder extends MessageToByteEncoder<Object> {@Overrideprotected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {byte[] data = new Gson().toJson(msg).getBytes(StandardCharsets.UTF_8);out.writeBytes(data);}
    }

    Decoder 示例:接收JSON字符串

    public class JsonDecoder<T> extends ByteToMessageDecoder {private final Class<T> clazz;public JsonDecoder(Class<T> clazz) {this.clazz = clazz;}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {byte[] bytes = new byte[in.readableBytes()];in.readBytes(bytes);T obj = new Gson().fromJson(new String(bytes, StandardCharsets.UTF_8), clazz);out.add(obj);}
    }

    5. 编解码器实战(进阶版)

    包头包尾协议示例

    假设消息格式:
    [魔数4字节][总长度4字节][实际内容]

    • 解码器检查魔数 + 读总长度 + 组装完整包

    • 编码器打包加上魔数和长度字段

    这种就很适合防止半包/粘包。


    6. 性能优化技巧

    优化点

    实战建议

    ByteBuf

    尽量用 PooledByteBufAllocator,避免频繁GC

    零拷贝

    充分使用slice()、duplicate()

    Pipeline顺序

    编解码器要靠近 I/O 端口,提高效率

    多协议处理

    动态切换ChannelHandler


    7. 问题排查技巧

    • 抓包(wireshark、tcpdump)

    • 加日志:入站出站拦截器打印 ByteBuf 十六进制

    • 注意ByteBuf使用生命周期,避免内存泄漏(Netty内部有引用计数机制)


    🚀 附送:最常用 ASCII 字符(适合Netty调试)

    字符

    HEX

    说明

    {

    0x7B

    JSON开头

    }

    0x7D

    JSON结尾

    \r

    0x0D

    回车符

    \n

    0x0A

    换行符

    `

    `

    0x7C

    ,

    0x2C

    逗号分隔

    📘《高并发环境编解码性能优化手册》


    1. 编解码器并发基本认知

    项目

    说明

    编码(出站)

    每次发送时执行,通常是单线程串行(但需要注意共享资源)

    解码(入站)

    每次接收时执行,多线程并发(特别是Reactor线程)

    ChannelHandler

    要注意是否加了@Sharable,否则每个连接实例化一份

    总结:解码器更容易出并发问题,编码器要注意资源共享。


    2. 编解码器高并发优化核心要点

    2.1 保证线程安全

    • 如果ChannelHandler标了@Sharable ➔ 内部必须无状态或者状态线程安全

    • 尽量不要在Handler中用成员变量存储连接状态,否则多线程下必挂。

    • 推荐:用ChannelHandlerContext.channel().attr()存储每个连接独立的变量。

    AttributeKey<Long> SESSION_ID = AttributeKey.valueOf("SESSION_ID");// 设置
    ctx.channel().attr(SESSION_ID).set(sessionId);// 获取
    Long sessionId = ctx.channel().attr(SESSION_ID).get();

    2.2 避免内存拷贝

    • ByteBuf自带零拷贝特性,尽量用:

      • slice()

      • duplicate()

      • retain()/release()

    • 不要轻易用byte[] ➔ 会触发拷贝!

    比如读取消息推荐这样:

    ByteBuf buf = in.retainedSlice(start, length); // 轻量切片,不拷贝数据

    而不是:

    byte[] arr = new byte[length];
    in.readBytes(arr); // 直接拷贝到数组,GC负担大

    2.3 合理使用 PooledByteBufAllocator(池化)

    高并发下 PooledByteBufAllocator(对象池)能大幅减少GC压力。

    Netty默认在Linux/64位系统上开启,如果想手动指定:

    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

    2.4 拆包粘包优化处理

    常用的:

    • LengthFieldBasedFrameDecoder(基于包长度自动拆包)

    • DelimiterBasedFrameDecoder(基于分隔符拆包)

    优先推荐用LengthFieldBasedFrameDecoder,CPU更友好!

    示例(4字节包头长度):

    new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 4)

    参数解释:

    • 最大帧长度

    • 包头起始位置

    • 包头长度

    • 额外字节补偿

    • 跳过包头

    2.5 小心 JSON/XML 等大对象解析

    大对象解析(如Gson、Jackson):

    • 尽量提前判断数据大小 ➔ 防止 OOM

    • 异常时丢弃,避免线程卡死

    比如你的 Decoder 里应该这样判断:

    if (byteBuf.readableBytes() > 1024 * 64) {log.error("==> 单包过大,直接丢弃");byteBuf.skipBytes(byteBuf.readableBytes());return;
    }

    防止有人恶意发送超级大包,导致内存溢出

    2.6 解码失败保护机制

    高并发下,有一条数据出问题,不能影响后续数据处理。

    所以推荐:

    • 解码失败直接 resetReaderIndex,保留数据,等待更多数据。

    • 不要在异常里直接ctx.close(),除非真确定是致命异常。

    3. 编解码性能压测指标参考

    指标

    高性能目标

    单连接TPS(事务数/秒)

    >10万

    编码平均耗时

    <50μs

    解码平均耗时

    <100μs

    内存使用增长速率

    平稳,无抖动

    GC次数

    每分钟<1次

    压测工具可以用:

    • Netty自带的EchoClient

    • wrk+自定义TCP代理

    • JMeter TCP Sampler


    4. 典型问题案例

    问题

    原因

    优化

    解码异常连锁崩溃

    未resetReaderIndex

    捕获异常后 reset

    CPU拉满

    粘包拆包逻辑死循环

    抓包分析,优化拆包算法

    GC频繁

    每次readBytes()导致大量对象创建

    用ByteBuf slice

    内存泄漏

    ByteBuf未release

    finally块中统一release

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词