欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 国际 > Pulsar客户端如何控制内存使用

Pulsar客户端如何控制内存使用

2025/1/14 22:17:56 来源:https://blog.csdn.net/N201871643/article/details/145081738  浏览:    关键词:Pulsar客户端如何控制内存使用

Pulsar客户端如何控制内存使用

一、使用场景

在实际应用中,Pulsar客户端的内存使用控制是一个重要的性能优化点。假设有一个搜索类业务需要记录用户搜索请求,以便后续分析搜索热点和优化搜索效果。以下是一个简化的代码示例:

PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Producer<byte[]> producer = pulsarClient.newProducer().topic("search-activities").create();
try {MessageId messageId = producer.send(/* message payload here */);log.debug("Search activity messageId={}", messageId);
} catch (Exception e) {log.error("Failed to record search activity", e);
}

在这个场景中,pulsarClientproducer 支持复用,推荐这么做,这里只是为了演示写到了一起。producer.send 是阻塞方式发送消息,线程会卡在这里等待发送结果返回。在现实中,根据消息在实际业务中的需要,可以选择阻塞和非阻塞两种方式。例如,业务上对搜索请求事件并无强依赖,因此使用阻塞方式发消息不太适合,从性能上考虑会加长整体的搜索延迟,从稳定性上考虑会增加搜索执行过程中的不确定性。因此,可以优化为非阻塞方式,将记录搜索事件放到其他线程中完成:

producer.sendAsync(/* message payload here */).whenComplete((msgId, ex) -> {if (ex != null) {log.error("Failed to record search activity", ex);} else {log.debug("Search activity messageId={}", msgId);}
});

在高TPS(例如单实例超过1000QPS)和大消息内容(例如100KB甚至1MB)的情况下,上述代码可能会遇到 MemoryBufferIsFullError 异常:

org.apache.pulsar.client.api.PulsarClientException$MemoryBufferIsFullError: Client memory buffer is full

此外,如果服务与Pulsar的broker之间出现网络波动,或者Pulsar服务内部组件之间出现网络波动,导致整体producer写入延迟升高,亦或是短时间出现大量写入,还可能会遇到 ProducerQueueIsFullError 异常:

org.apache.pulsar.client.api.PulsarClientException$ProducerQueueIsFullError: Producer send queue is full

二、Producer的内存控制

1. 配置项分析

在构建Producer时,ProducerBuilder 中与内存使用有关的配置项包括:

  • maxPendingMessages(int maxPendingMessages):控制producer内部队列中正在发送但还没有接收到broker确认的消息数量。若队列大小超出这个限制,默认行为是抛出 ProducerQueueIsFullError 异常。可以通过设置 blockIfQueueFull=true 调整为阻塞等待队列中空出新的空间。
  • maxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions):控制整个topic所有分区的总pending消息数量。最终到各个分区内部producer取 maxPendingMessages 和 maxPendingMessagesAcrossPartitions / partitions 的较小值。

2. 内存限制配置

在现实应用场景中,不同业务的消息大小差异很大,单纯基于消息数量控制内存使用是不切实际的。因此,在 PIP-74 中,ClientBuilder 提供了一个面向整个client实例统一的内存限制配置:

ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit);

当客户端所有producer中所有pending的消息大小总和超过这个限制时,默认会抛出 MemoryBufferIsFullError 异常。若同时配置了 blockIfQueueFull=true,则当前线程会阻塞等待前面pending的消息发送完成。

3. blockIfQueueFull 配置的使用

blockIfQueueFull 配置是为了限制客户端producer内存使用的同时,让开发者简化处理队列或者内存buffer满了的情况可以继续发送消息。然而,一旦配置为 true,不论是应用发送消息调用的是阻塞的 Producer.send 方法还是非阻塞的 Producer.sendAsync 方法都会出现阻塞等待,这可能会阻塞当前线程,对于某些业务场景是不可接受的。

4. 默认配置

PIP-120 对 2.10.0 以及之后版本的客户端中,默认启用了 memoryLimit 配置,其默认值为 64MB,同时默认禁用了 maxPendingMessagesmaxPendingMessagesAcrossPartitions 配置(默认值修改为0),并将 maxPendingMessagesAcrossPartitions 配置标记为 Deprecated

三、Consumer的内存控制

1. 配置项分析

在构造一个Consumer时,ConsumerBuilder 提供的与内存使用有关的选项包括:

  • receiverQueueSize(int receiverQueueSize):控制每个分区consumer的接收队列大小。
  • maxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions):控制所有分区consumer和parent consumer的接收队列总大小。

Pulsar客户端通过预接收队列临时存放broker推送过来的消息,以便应用程序调用 Consumer#receive 或者 Consumer#receiveAsync 方法时直接从内存中返回消息,这是出于消费吞吐的考虑,本质上是一种以空间换取时间的策略。

2. 自动扩展接收队列

在 PIP-74 中提出了一个新的控制Consumer内存使用的方案,即 autoScaledReceiverQueueSizeEnabled

ConsumerBuilder<T> autoScaledReceiverQueueSizeEnabled(boolean enabled);

当启用这个特性后,receiverQueueSize 会从1开始呈2的指数倍增长,直至达到 receiverQueueSize 的限制或达到client的 memoryLimit 限制,其目标是在有限制的内存使用下,达到最大的吞吐效率。

四、番外:ackTimeout 和 ackTimeoutTickTime 的配置

除了Producer和Consumer在生产和消费过程中的内存使用之外,还有一个容易被忽视的点是创建Consumer时 ackTimeoutackTimeoutTickTime 的配置如果不匹配,会消耗较多堆内内存。

ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit);
ConsumerBuilder<T> ackTimeoutTickTime(long tickTime, TimeUnit timeUnit);

若Consumer配置了 ackTimeout 并且配置了较大的时间窗口(例如1小时或者更长),应适当调大 ackTimeoutTickTime,这是因为Consumer内部使用了一个简单时间轮的算法对消息的处理时间计时,若 ackTimeout 时间窗口很大,ackTimeoutTickTime 仍然使用其默认值 1s,时间轮本身将会占用大量堆内存空间。具体细节可参考客户端源码 UnAckedMessageTracker.java

五、总结

  1. 使用 sendAsync 非阻塞方法要注意其不能保证消息一定发送成功,特别是开启了 blockIfQueueFull 之后,它会在特定情况下演变成阻塞方法。
  2. 对于同时使用到了Producer和Consumer的应用,推荐创建两个client,分别用来创建Producer和Consumer,做读写分离,避免由于共用 memoryLimit 导致相互影响

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com