欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 美食 > kafka小实站

kafka小实站

2025/2/1 7:34:18 来源:https://blog.csdn.net/qq_52122048/article/details/144836438  浏览:    关键词:kafka小实站

需要先在前面的文章里面照着下载好kafka,并且启动 先启动zookeeper

项目目录

package kafka; 
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** kafka消费者***/
@Component
@Slf4j
public class KafkaConsumer {@KafkaListener(topics = "${kafka.topic.name}")public void listen(ConsumerRecord<?, ?> record) {log.info("topic={}, offset={}, message={}", record.topic(), record.offset(), record.value());}}

comsumer代码

定义了一个简单的 Kafka 消费者,它使用 Spring 的 Kafka Listener 来监听指定的主题,并处理接收到的消息

  • @Component
    标记这个类为一个 Spring 的组件,使其被 Spring 容器管理并可以被自动扫描和加载。

  • @Slf4j
    这是 Lombok 提供的注解,自动为类生成一个名为 log日志记录器,可以直接用 log.info() 等方法记录日志,而无需手动定义日志实例。

  • @KafkaListener
    这是 Spring Kafka 提供的注解,用于标记一个方法为 Kafka 消息的监听器。

    • topics:指定要监听的 Kafka 主题。这里用占位符 ${kafka.topic.name},说明具体的主题名称是从配置文件(如 application.propertiesapplication.yml)中读取的。
  • 自动监听
    当有新的消息发布到指定的 Kafka 主题时,Spring 会自动调用标注了 @KafkaListener 的方法来处理消息。

  • 参数 ConsumerRecord<?, ?> record
    代表 Kafka 中一条消费记录,包含了消息的元数据(如主题名、分区、偏移量)和消息体。

    • record.topic():获取消息所属的 Kafka 主题名。
    • record.offset():获取消息的偏移量(在 Kafka 中,每条消息都有一个唯一的偏移量)。
    • record.value():获取消息的内容。
  • log.info
    使用日志记录器将接收到的消息详细信息打印到日志中,包括:

    • 消息所在的 主题名
    • 偏移量offset),用于定位消息在分区中的位置。
    • 消息的 内容
package kafka; import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;/*** kafka生产者**/
@Component
@Slf4j
public class KafkaProducer implements CommandLineRunner {@ResourceKafkaTemplate kafkaTemplate;@Value("${kafka.topic.name}")private String topic;/*** 发送kafka消息** @param string*/public void send(String string) {ListenableFuture future = kafkaTemplate.send(topic, JSONObject.toJSONString(string));//future.addCallback(o -> log.info("kafka消息发送成功:" + jsonString), throwable -> log.error("kafka消息发送失败:" + jsonString));}/*** 容器启动完成后,生产测试数据*/@Overridepublic void run(String... args) {for (int i = 0; i < 10; i++) {send("hello world" + i);}}
}

producer代码

  • KafkaTemplate
    这是 Spring Kafka 提供的核心类,用于向 Kafka 发送消息。通过 @Resource 注解实现依赖注入。

  • @Value("${kafka.topic.name}")
    通过 Spring 的配置文件(如 application.propertiesapplication.yml)注入 Kafka 主题名称。

  • kafkaTemplate.send(topic, message)
    通过 KafkaTemplate 将消息发送到指定主题。

  • JSONObject.toJSONString(string)
    使用阿里巴巴的 fastjson 库将消息序列化为 JSON 格式。

  • ListenableFuture
    Kafka 消息发送是异步的,send() 方法返回一个 ListenableFuture 对象,可以用来监听消息的发送结果。

  • 日志记录(被注释)
    使用 addCallback 方法为发送成功和失败分别设置回调逻辑:

    • 发送成功时记录日志:log.info("kafka消息发送成功:" + string)
    • 发送失败时记录错误日志:log.error("kafka消息发送失败:" + string)
    • run(String... args)
      这是 CommandLineRunner 接口的方法,Spring 容器启动完成后会自动调用。

    • 循环发送消息
      启动后向 Kafka 主题发送 10 条测试消息,例如:hello world0hello world1 等。

## kafka ##
spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.group-id=testspring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.topic.name=mqtt_location_data
#
#spring.data.mongodb.host=localhost
#spring.data.mongodb.port=27017
#spring.data.mongodb.database=your_database_name

配置文件application.properties 

运行结果:

2024-12-26 01:32:30.839  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=2, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:00.850  INFO 22936 --- [t-thread | test] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid due to cause: null.isDisconnected: true. Rediscovery will be attempted.
2024-12-26 01:33:00.855  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-26 01:33:00.856  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-26 01:33:00.856  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: false. Rediscovery will be attempted.
2024-12-26 01:33:00.970  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-26 01:33:00.971  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-26 01:33:00.980  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=2, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:15.847  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] SyncGroup failed: The group began another rebalance. Need to re-join the group. Sent generation was Generation{generationId=2, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:15.847  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-26 01:33:15.874  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=3, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:15.875  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Finished assignment for group at generation 3: {consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a=Assignment(partitions=[mqtt_location_data-0])}
2024-12-26 01:33:15.879  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Successfully synced group in generation Generation{generationId=3, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:15.880  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Notifying assignor about the new Assignment(partitions=[mqtt_location_data-0])
2024-12-26 01:33:15.881  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Adding newly assigned partitions: mqtt_location_data-0
2024-12-26 01:33:15.887  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Setting offset for partition mqtt_location_data-0 to the committed offset FetchPosition{offset=10, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2024-12-26 01:33:15.888  INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : test: partitions assigned: [mqtt_location_data-0]
2024-12-26 01:33:15.900  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=10, message="hello world0"
2024-12-26 01:33:15.900  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=11, message="hello world1"
2024-12-26 01:33:15.900  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=12, message="hello world2"
2024-12-26 01:33:15.900  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=13, message="hello world3"
2024-12-26 01:33:15.900  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=14, message="hello world4"
2024-12-26 01:33:15.900  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=15, message="hello world5"
2024-12-26 01:33:15.900  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=16, message="hello world6"
2024-12-26 01:33:15.900  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=17, message="hello world7"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=18, message="hello world8"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=19, message="hello world9"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=20, message="hello world0"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=21, message="hello world1"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=22, message="hello world2"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=23, message="hello world3"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=24, message="hello world4"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=25, message="hello world5"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=26, message="hello world6"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=27, message="hello world7"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=28, message="hello world8"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=29, message="hello world9"
2024-12-26 04:36:36.611  INFO 22936 --- [ntainer#0-0-C-1] o.a.kafka.clients.FetchSessionHandler    : [Consumer clientId=consumer-test-1, groupId=test] Error sending fetch request (sessionId=1770952277, epoch=389) to node 0:org.apache.kafka.common.errors.DisconnectException: null2024-12-26 04:36:36.612  INFO 22936 --- [t-thread | test] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: true. Rediscovery will be attempted.
2024-12-26 04:36:36.727  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-26 04:36:36.731 ERROR 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Offset commit failed on partition mqtt_location_data-0 at offset 30: The coordinator is not aware of this member.
2024-12-26 04:36:36.731  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] OffsetCommit failed with Generation{generationId=3, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}: The coordinator is not aware of this member.
2024-12-26 04:36:36.732  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Resetting generation due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2024-12-26 04:36:36.732  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Request joining group due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2024-12-26 04:36:36.732  WARN 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Asynchronous auto-commit of offsets {mqtt_location_data-0=OffsetAndMetadata{offset=30, leaderEpoch=0, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2024-12-26 04:36:36.732  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Failing OffsetCommit request since the consumer is not part of an active group
2024-12-26 04:36:36.733  WARN 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Synchronous auto-commit of offsets {mqtt_location_data-0=OffsetAndMetadata{offset=30, leaderEpoch=0, metadata=''}} failed: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
2024-12-26 04:36:36.733  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2024-12-26 04:36:36.733  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Lost previously assigned partitions mqtt_location_data-0
2024-12-26 04:36:36.733  INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : test: partitions lost: [mqtt_location_data-0]
2024-12-26 04:36:36.733  INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : test: partitions revoked: [mqtt_location_data-0]
2024-12-26 04:36:36.733  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-26 04:36:36.734  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Attempt to heartbeat with stale Generation{generationId=3, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'} and group instance id Optional.empty failed due to UNKNOWN_MEMBER_ID, ignoring the error
2024-12-26 04:36:36.735  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Request joining group due to: need to re-join with the given member-id
2024-12-26 04:36:36.735  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-26 04:36:37.543  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=5, memberId='consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61', protocol='range'}
2024-12-26 04:36:37.543  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Finished assignment for group at generation 5: {consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61=Assignment(partitions=[mqtt_location_data-0])}
2024-12-26 04:36:37.553  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Successfully synced group in generation Generation{generationId=5, memberId='consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61', protocol='range'}
2024-12-26 04:36:37.554  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Notifying assignor about the new Assignment(partitions=[mqtt_location_data-0])
2024-12-26 04:36:37.554  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Adding newly assigned partitions: mqtt_location_data-0
2024-12-26 04:36:37.557  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Setting offset for partition mqtt_location_data-0 to the committed offset FetchPosition{offset=30, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2024-12-26 04:36:37.589  INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : test: partitions assigned: [mqtt_location_data-0]
2024-12-30 22:44:08.518  INFO 22936 --- [t-thread | test] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: true. Rediscovery will be attempted.
2024-12-30 22:44:09.620  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-30 22:44:09.629  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Attempt to heartbeat with Generation{generationId=5, memberId='consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61', protocol='range'} and group instance id Optional.empty failed due to UNKNOWN_MEMBER_ID, resetting generation
2024-12-30 22:44:09.629  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Resetting generation due to: encountered UNKNOWN_MEMBER_ID from HEARTBEAT response
2024-12-30 22:44:09.629  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Request joining group due to: encountered UNKNOWN_MEMBER_ID from HEARTBEAT response
2024-12-30 22:44:09.629  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Failing OffsetCommit request since the consumer is not part of an active group
2024-12-30 22:44:09.630  WARN 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Synchronous auto-commit of offsets {mqtt_location_data-0=OffsetAndMetadata{offset=30, leaderEpoch=0, metadata=''}} failed: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
2024-12-30 22:44:09.630  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2024-12-30 22:44:09.630  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Lost previously assigned partitions mqtt_location_data-0
2024-12-30 22:44:09.630  INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : test: partitions lost: [mqtt_location_data-0]
2024-12-30 22:44:09.630  INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : test: partitions revoked: [mqtt_location_data-0]
2024-12-30 22:44:09.630  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-30 22:44:09.636 ERROR 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Offset commit failed on partition mqtt_location_data-0 at offset 30: The coordinator is not aware of this member.
2024-12-30 22:44:09.636  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] OffsetCommit failed with Generation{generationId=5, memberId='consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61', protocol='range'}: The coordinator is not aware of this member.
2024-12-30 22:44:09.637  WARN 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Asynchronous auto-commit of offsets {mqtt_location_data-0=OffsetAndMetadata{offset=30, leaderEpoch=0, metadata=''}} failed: Offset commit cannot be completed since the consumer member's generation is already stale, meaning it has already participated another rebalance and got a new generation. You can try completing the rebalance by calling poll() and then retry commit again
2024-12-30 22:44:09.640  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Request joining group due to: need to re-join with the given member-id
2024-12-30 22:44:09.640  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-30 22:44:09.663  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=7, memberId='consumer-test-1-2c8145c0-d35d-4c70-bbe0-3c96144b69fd', protocol='range'}
2024-12-30 22:44:09.663  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Finished assignment for group at generation 7: {consumer-test-1-2c8145c0-d35d-4c70-bbe0-3c96144b69fd=Assignment(partitions=[mqtt_location_data-0])}
2024-12-30 22:44:09.667  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Successfully synced group in generation Generation{generationId=7, memberId='consumer-test-1-2c8145c0-d35d-4c70-bbe0-3c96144b69fd', protocol='range'}
2024-12-30 22:44:09.667  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Notifying assignor about the new Assignment(partitions=[mqtt_location_data-0])
2024-12-30 22:44:09.667  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Adding newly assigned partitions: mqtt_location_data-0
2024-12-30 22:44:09.672  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Setting offset for partition mqtt_location_data-0 to the committed offset FetchPosition{offset=30, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2024-12-30 22:44:10.200  INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : test: partitions assigned: [mqtt_location_data-0]

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com