欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > 能源 > kafka基础知识(持续更新中~)

kafka基础知识(持续更新中~)

2024/11/30 18:47:14 来源:https://blog.csdn.net/m0_74969835/article/details/141858250  浏览:    关键词:kafka基础知识(持续更新中~)

#broker.id属性在kafka集群中必须要是唯⼀

broker.id=0

#kafka部署的机器ip和提供服务的端⼝号

listeners=PLAINTEXT://192.168.65.60:9092

#kafka的消息存储⽂件

log.dir=/usr/local/data/kafka-logs

#kafka连接zookeeper的地址

zookeeper.connect=192.168.65.60:2181

./kafka-server-start.sh -daemon ../config/server.properties

./kafka-topics.sh --create --zookeeper 172.16.253.35:2181 --replication-factor 1 --partitions 1 --topic test

./kafka-topics.sh --list --zookeeper 172.16.253.35:2181 test

./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --topic test

./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --topic test

./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --from-beginning --topic test

/usr/local/kafka/data/kafka-logs/主题-分区/00000000.log

./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --consumer-property group.id=testGroup --topic test

./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --

consumer-property group.id=testGroup1 --topic test

./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --consumer-property group.id=testGroup2 --topic test

/kafka-consumer-groups.sh --bootstrap-server 172.16.253.38:9092 --describe --group testGroup

./kafka-topics.sh --create --zookeeper 172.16.253.35:2181 --replication-factor 1 --partitions 2 --topic test1

1# 0 1 2

2broker.id=2

3// 9092 9093 9094

4listeners=PLAINTEXT://192.168.65.60:9094

5//kafka-logs kafka-logs-1 kafka-logs-2

6log.dir=/usr/local/data/kafka-logs-2

1./kafka-server-start.sh -daemon ../config/server.properties

2./kafka-server-start.sh -daemon ../config/server1.properties

3./kafka-server-start.sh -daemon ../config/server2.properties

./kafka-topics.sh --create --zookeeper 172.16.253.35:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic

# 查看topic情况 ./kafka-topics.sh --describe --zookeeper 172.16.253.35:2181 --topic myreplicated-topic

./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094 --frombeginning --consumer-property group.id=testGroup1 --topic my-replicatedtopic

./kafka-console-producer.sh --broker-list 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094 --topic myreplicated-topic

./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094 --frombeginning --consumer-property group.id=testGroup1 --topic my-replicatedtopic

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency>

package com.qf.kafka;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class MySimpleProducer {private final static String TOPIC_NAME = "my-replicated-topic";public static void main(String[] args) throws ExecutionException, 
InterruptedException {//1.设置参数Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094");//把发送的key从字符串序列化为字节数组props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());//把发送消息value从字符串序列化为字节数组props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());//2.创建⽣产消息的客户端,传⼊参数Producer<String,String> producer = new KafkaProducer<String, 
String>(props);//3.创建消息//key:作⽤是决定了往哪个分区上发,value:具体要发送的消息内容ProducerRecord<String,String> producerRecord = new ProducerRecord<>
(TOPIC_NAME,"mykeyvalue","hellokafka");//4.发送消息,得到消息发送的元数据并输出RecordMetadata metadata = producer.send(producerRecord).get();System.out.println("同步⽅式发送消息结果:" + "topic-" + 
metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" + metadata.offset());}
}

RecordMetadata metadata = producer.send(producerRecord).get();System.out.println("同步⽅式发送消息结果:" + "topic-" + 
metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" + metadata.offset());

//5.异步发送消息producer.send(producerRecord, new Callback() {public void onCompletion(RecordMetadata metadata, Exception 
exception) {if (exception != null) {System.err.println("发送消息失败:" + 
exception.getStackTrace());}if (metadata != null) {System.out.println("异步⽅式发送消息结果:" + "topic-" + 
metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" + metadata.offset());}}});

props.put(ProducerConfig.ACKS_CONFIG, "1");/*发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造
成消息重复发送,⽐如⽹络抖动,所以需要在接收者那边做好消息接收的幂等性处理*/props.put(ProducerConfig.RETRIES_CONFIG, 3);//重试间隔设置props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);

props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

props.put(ProducerConfig.LINGER_MS_CONFIG, 10);

package com.qf.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class MySimpleConsumer {private final static String TOPIC_NAME = "my-replicated-topic";private final static String CONSUMER_GROUP_NAME = "testGroup";public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094");// 消费分组名props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());//1.创建⼀个消费者的客户端KafkaConsumer<String, String> consumer = new KafkaConsumer<String, 
String>(props);//2. 消费者订阅主题列表consumer.subscribe(Arrays.asList(TOPIC_NAME));while (true) {/** 3.poll() API 是拉取消息的⻓轮询*/ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {//4.打印消息System.out.printf("收到消息:partition = %d,offset = %d, key = 
%s, value = %s%n", record.partition(),record.offset(), record.key(), record.value());}}}
}

// 是否⾃动提交offset,默认就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// ⾃动提交offset的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

while (true) {/** poll() API 是拉取消息的⻓轮询*/ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("收到消息:partition = %d,offset = %d, key 
= %s, value = %s%n", record.partition(),record.offset(), record.key(), record.value());}//所有的消息已消费完if (records.count() > 0) {//有消息// ⼿动同步提交offset,当前线程会阻塞直到offset提交成功// ⼀般使⽤同步提交,因为提交之后⼀般也没有什么逻辑代码了consumer.commitSync();//=======阻塞=== 提交成功}}}

while (true) {/** poll() API 是拉取消息的⻓轮询*/ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("收到消息:partition = %d,offset = %d, key 
= %s, value = %s%n", record.partition(),record.offset(), record.key(), record.value());}//所有的消息已消费完if (records.count() > 0) {// ⼿动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后⾯
的程序逻辑consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, 
OffsetAndMetadata> offsets, Exception exception) {if (exception != null) {System.err.println("Commit failed for " + offsets);System.err.println("Commit failed exception: " + 
exception.getStackTrace());}
}});}}}

//⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);

while (true) {/** poll() API 是拉取消息的⻓轮询*/ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("收到消息:partition = %d,offset = %d, key = %s, 
value = %s%n", record.partition(),record.offset(), record.key(), record.value());}

//⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);//如果两次poll的时间如果超出了30s的时间间隔,kafka会认为其消费能⼒过弱,将其踢
出消费组。将分区分配给其他消费者。-rebalanceprops.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);

//consumer给broker发送⼼跳的间隔时间props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);//kafka如果超过10秒没有收到消费者的⼼跳,则会把消费者踢出消费组,进⾏
rebalance,把分区分配给其他消费者。props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);

consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));

consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 
0)));

consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);

List<PartitionInfo> topicPartitions = 
consumer.partitionsFor(TOPIC_NAME);//从1⼩时前开始消费long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;Map<TopicPartition, Long> map = new HashMap<>();for (PartitionInfo par : topicPartitions) {map.put(new TopicPartition(TOPIC_NAME, par.partition()), 
fetchDataTime);}Map<TopicPartition, OffsetAndTimestamp> parMap = 
consumer.offsetsForTimes(map);for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : 
parMap.entrySet()) {TopicPartition key = entry.getKey();OffsetAndTimestamp value = entry.getValue();if (key == null || value == null) continue;Long offset = value.offset();System.out.println("partition-" + key.partition() + 
"|offset-" + offset);System.out.println();//根据消费⾥的timestamp确定offsetif (value != null) {consumer.assign(Arrays.asList(key));consumer.seek(key, offset);}}

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

server:port: 8080
spring:kafka:bootstrap-servers: 
172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094producer: # ⽣产者retries: 3 # 设置⼤于0的值,则客户端会将发送失败的记录重新发送batch-size: 16384buffer-memory: 33554432acks: 1# 指定消息key和消息体的编解码⽅式key-serializer: 
org.apache.kafka.common.serialization.StringSerializervalue-serializer: 
org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: default-groupenable-auto-commit: falseauto-offset-reset: earliestkey-deserializer: 
org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: 
org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 500listener:
# 当每⼀条记录被消费者监听器(ListenerConsumer)处理之后提交# RECORD# 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交# BATCH# 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上
次提交时间⼤于TIME时提交# TIME# 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理
record数量⼤于等于COUNT时提交# COUNT# TIME | COUNT 有⼀个条件满⾜时提交# COUNT_TIME# 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后, ⼿动调
⽤Acknowledgment.acknowledge()后提交# MANUAL# ⼿动调⽤Acknowledgment.acknowledge()后⽴即提交,⼀般使⽤这种# MANUAL_IMMEDIATEack-mode: MANUAL_IMMEDIATEredis:host: 172.16.253.21

package com.qf.kafka.spring.boot.demo.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/msg")
public class MyKafkaController {private final static String TOPIC_NAME = "my-replicated-topic";@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;
@RequestMapping("/send")public String sendMessage(){kafkaTemplate.send(TOPIC_NAME,0,"key","this is a message!");return "send success!";}
}

package com.qf.kafka.spring.boot.demo.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Component
public class MyConsumer {@KafkaListener(topics = "my-replicated-topic",groupId = "MyGroup1")public void listenGroup(ConsumerRecord<String, String> record, 
Acknowledgment ack) {String value = record.value();System.out.println(value);System.out.println(record);//⼿动提交offsetack.acknowledge();}
}

@KafkaListener(groupId = "testGroup", topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0", "1"}),@TopicPartition(topic = "topic2", partitions = "0",partitionOffsets = @PartitionOffset(partition = "1", 
initialOffset = "100"))},concurrency = "3")//concurrency就是同组下的消费者个数,就是并发消费数,建
议⼩于等于分区总数public void listenGroupPro(ConsumerRecord<String, String> record, 
Acknowledgment ack) {String value = record.value();System.out.println(value);System.out.println(record);//⼿动提交offsetack.acknowledge();}

export KE_HOME=/usr/local/kafka-eagle
export PATH=$PATH:$KE_HOME/bin

 

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com