欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 养生 > kafka学习-02

kafka学习-02

2024/11/30 9:56:18 来源:https://blog.csdn.net/qq_62984376/article/details/144035744  浏览:    关键词:kafka学习-02

kafka分区的分配以及再平衡:

4个:

1、Range 以及再平衡

        1)Range 分区策略原理

2)Range 分区分配策略案例

(1)修改主题 first 为 7 个分区。

bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --alter --topic first --partitions 7

注意:分区数可以增加,但是不能减少。

一个主题,假如副本数想修改,是否可以直接修改?答案是不可以。

如果想修改,如何修改?制定计划,执行计划。

(2)这样可以由三个消费者

CustomConsumer、CustomConsumer1、CustomConsumer2 组成消费者组,组名都为“test”, 同时启动 3 个消费者。

(3)启动 CustomProducer 生产者,发送 500 条消息,随机发送到不同的分区。

备注:只需要将以前的CustomProducerCallback,修改发送次数为500次即可。

2、RoundRobin(轮询) 以及再平衡

1)RoundRobin 分区策略原理

2RoundRobin 分区分配策略案例

(1)依次在 CustomConsumer、CustomConsumer1、CustomConsumer2 三个消费者代 码中修改分区分配策略为 RoundRobin。

3、Sticky 以及再平衡

粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前, 考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。 粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区 到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。

4、CooperativeSticky 的解释【新的kafka中刚添加的策略】

在消费过程中,会根据消费的偏移量情况进行重新再平衡,也就是粘性分区,运行过程中还会根据消费的实际情况重新分配消费者,直到平衡为止。

好处是:负载均衡,不好的地方是:多次平衡浪费性能。

动态平衡,在消费过程中,实施再平衡,而不是定下来,等某个消费者退出再平衡。

offset 位移[偏移量](重要)

记录消费到了哪里的这个值,就是偏移量。

记录:哪个主题,哪个分区,哪个位置。

1) 消费 offset 案例

(0)思想:__consumer_offsets 为 Kafka 中的 topic,那就可以通过消费者进行消费。

(1)在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false,

默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。

如果不修改是无法查看offset的值的,因为这些都是加密数据。

创建一个新的主题:bigdata

kafka-topics.sh --bootstrap-server bigdata01:9092 --create --topic bigdata --partitions 2 --replication-factor 2

在kafka3下执行此命令

查看消费者消费主题__consumer_offsets。 -- from-beginning 表示查看历史所有的偏移量

2) 自动提交案例:

写java代码即可

和之前的基本相同,只是加入了几个参数

        //设置自动提交偏移量 (默认就是true)properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);  // 可以设置为false 之后就需要手动提交// 提交 offset 的时间周期 1000ms,默认 5sproperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
package com.bigdata.day04;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class Demo02_自动提交offset {public static void main(String[] args) {Properties properties = new Properties();// 连接kafkaproperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");// 字段反序列化   key 和  valueproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");//设置自动提交偏移量 (默认就是true)properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);  // 可以设置为false 之后就需要手动提交// 提交 offset 的时间周期 1000ms,默认 5sproperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);ArrayList<String> list = new ArrayList<>();list.add("bigdata");kafkaConsumer.subscribe(list);while (true){// 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(1));// 循环打印每一条数据for (ConsumerRecord record : records) {// 打印一条数据System.out.println(record);// 打印数据中的值System.out.println(record.value());}}}}

3) 手动提交案例

package com.bigdata.day04;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class Demo02_手动提交offset {public static void main(String[] args) {Properties properties = new Properties();// 连接kafkaproperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");// 字段反序列化   key 和  valueproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");//设置自动提交偏移量 (默认就是true)properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);  // 可以设置为false 之后就需要手动提交KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);ArrayList<String> list = new ArrayList<>();list.add("bigdata");kafkaConsumer.subscribe(list);// 可以指定条件提交int i = 1;while (true){// 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(1));// 循环打印每一条数据for (ConsumerRecord record : records) {// 打印一条数据System.out.println(record);// 打印数据中的值System.out.println(record.value());i++;}// 当生产者发送了10条消息后,再提交偏移量(offset)if (i==10){kafkaConsumer.commitAsync();}}}}

4) 指定分区和偏移量消费 (重要)

package com.bigdata.day04;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
import java.util.Set;public class Demo02_指定提交offset {public static void main(String[] args) {Properties properties = new Properties();// 连接kafkaproperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");// 字段反序列化   key 和  valueproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3");properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);ArrayList<String> list = new ArrayList<>();list.add("bigdata");kafkaConsumer.subscribe(list);//执行计划Set<TopicPartition> assignment = kafkaConsumer.assignment();while (assignment.size() == 0){// 拉去数据的代码,此处可以帮助快速构建分区方案kafkaConsumer.poll(Duration.ofSeconds(1));// 一直获取它的分区方案,什么时候由方案了就跳出循环assignment = kafkaConsumer.assignment();}// 获取所有分区的 偏移量(offset)等于 5 以后的数据for (TopicPartition topicPartition : assignment) {kafkaConsumer.seek(topicPartition,5);}// 获取指定分区 5 以后的数据
//        kafkaConsumer.seek(new TopicPartition("bigdata",0),5);
//
//        // 还可以这样写
//        for (TopicPartition topicPartition : assignment) {
//            if (topicPartition.partition() == 0){
//                kafkaConsumer.seek(topicPartition,5);
//            }
//        }while (true){// 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(1));// 循环打印每一条数据for (ConsumerRecord record : records) {// 打印一条数据System.out.println(record);// 打印数据中的值System.out.println(record.value());}}}}

5) 指定时间消费

package com.bigdata.day04;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.*;public class Demo02_指定时间提交offset {public static void main(String[] args) {Properties properties = new Properties();// 连接kafkaproperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");// 字段反序列化   key 和  valueproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test4");properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);ArrayList<String> list = new ArrayList<>();list.add("bigdata");kafkaConsumer.subscribe(list);//执行计划Set<TopicPartition> assignment = kafkaConsumer.assignment();while (assignment.size() == 0){// 拉去数据的代码,此处可以帮助快速构建分区方案kafkaConsumer.poll(Duration.ofSeconds(1));// 一直获取它的分区方案,什么时候由方案了就跳出循环assignment = kafkaConsumer.assignment();}HashMap<TopicPartition, Long> map = new HashMap<>();for (TopicPartition topicPartition : assignment) {map.put(topicPartition,System.currentTimeMillis()-60*60*1000*10);}Map<TopicPartition, OffsetAndTimestamp> timestampMap = kafkaConsumer.offsetsForTimes(map);Set<Map.Entry<TopicPartition, OffsetAndTimestamp>> entries = timestampMap.entrySet();for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : entries) {kafkaConsumer.seek(entry.getKey(),entry.getValue().offset());}while (true){// 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(1));// 循环打印每一条数据for (ConsumerRecord record : records) {// 打印一条数据System.out.println(record);// 打印数据中的值System.out.println(record.value());}}}}

漏消费和重复消费

重复消费:

当我们在使用kafka消费者的时候,已经消费了数据,但是没有提交偏移量,这时就可能造成数据重复

假如我们往消费者中发送了10条数据,并且设置了手动提交,但是并没有提交,这时我们是可以看到消费者数据的,但是此时消费者中的数据没有偏移量等(如果有的话偏移量应该是10),再次消费的时候,消费者就可能接着从偏移量为0的地方开始消费,造成重复消费

漏消费:

先提交 offset 后消费,有可能会造成数据的漏消费。

在flume中使用kafka:

生产者将数据发送到kafka中,再使用flume将数据从kafka中抽取到hdfs上

a1.sources = r1
a1.sinks = k1
a1.channels = c1# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 100
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
a1.sources.r1.kafka.topics = bigdata
a1.sources.r1.kafka.consumer.group.id = donghu# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.filePrefix = events
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp=truea1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Text

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com