1、部署
# 解压
[appuser@localhost app]$ tar -zxvf kafka_2.13-3.0.0.tgz# 修改配置文件(具体配置见下文)
[appuser@localhost app]$ cd kafka_2.13-3.0.0
[appuser@localhost kafka_2.13-3.0.0]$ vim config/server.properties # 在所有集群节点启动Kafka
[appuser@localhost kafka_2.13-3.0.0]$ bin/kafka-server-start.sh -daemon config/server.properties# 创建topic
[appuser@localhost kafka_2.13-3.0.0]$ bin/kafka-topics.sh --bootstrap-server 10.20.10.1:9092 --create --topic test --partitions 3 --replication-factor 3# 查询Topic
[appuser@localhost kafka_2.13-3.0.0]$ bin/kafka-topics.sh --bootstrap-server [2106:410:a10::911]:9092 --list# 查询消费组
#[appuser@localhost kafka_2.13-3.0.0]$ bin/kafka-consumer-groups.sh --bootstrap-server [2106:410:a10::911]:9092 --list
2、server.properties
kafka_2.13-3.0.0/config/server.properties
############################# Server Basics #############################
#必须为每个代理设置一个唯一的整数
broker.id=1 ############################# Socket Server Settings #############################
#套接字服务器侦听的地址。如果未配置,它将从 java.net.InetAddress.getCanonicalHostName()获取。
#listeners=PLAINTEXT://10.200.34.5:9092
listeners=PLAINTEXT://[2106:410:a10::911]:9092 #代理生产和消费的主机端口。如果未设置,则在配置时使用“listeners”的值。否则,它将使用从java.net.InetAddress.getCanonicalHostName()返回的值。
# advertised.listeners=PLAINTEXT://10.20.10.1:9092
advertised.listeners=PLAINTEXT://[2106:410:a10::911]:9092 #服务器用于从网络接收请求并向网络发送响应的线程数(默认是3),接收线程会将接收到的消息放到内存中,然后再从内存中写入磁盘。
num.network.threads=3#消息从内存中写入磁盘是时候使用的线程数量(默认是8)
num.io.threads=8 #套接字服务器使用的发送缓冲区(SO_SNDBUF)
socket.send.buffer.bytes=102400#套接字服务器使用的接收缓冲区(SO_RCVBUF)
socket.receive.buffer.bytes=102400#套接字服务器将接受的请求的最大大小(防止OOM)
socket.request.max.bytes=104857600############################# Log Basics #############################
#日志
log.dirs=/app/kafka_2.13-3.0.0/logs#每个topic的默认日志分区数。更多的分区允许更大的并行性以供使用,但这也会导致代理之间有更多的文件,建议broker少的话,默认就几个broker 就设置成几个分区
num.partitions=1 #在启动时用于日志恢复和在关闭时刷新的每个数据目录的线程数。对于数据目录位于RAID阵列中的安装,建议增加此值。
num.recovery.threads.per.data.dir=1############################# Internal Topic Settings #############################
#“__consumer_offsets”和“__transaction_state” topic内部的组元数据的复制因子。对于除开发测试之外的任何其他内容,建议使用大于1的值以确保可用性
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1#是否允许自动创建 Topic。
# auto.create.topics.enable=false
#是否允许 Unclean Leader 选举。
# unclean.leader.election.enable=false
#是否允许定期进行 Leader 选举。
auto.leader.rebalance.enable=false############################# Log Flush Policy (日志刷新政策) #############################
#消息立即写入文件系统,但默认情况下我们只有fsync()才能同步
#懒惰的操作系统缓存。以下配置控制将数据刷新到磁盘。
#这里有一些重要的权衡:
#1。持久性:如果您不使用复制,则可能会丢失未刷新的数据。
#2。延迟:当刷新确实发生时,非常大的刷新间隔可能会导致延迟峰值,因为会有大量数据需要刷新。
#3。吞吐量:冲洗通常是最昂贵的操作,并且小的冲洗间隔可能导致过多的搜索。
#以下设置允许配置刷新策略以在一段时间后刷新数据或每N条消息(或两者)。这可以在全局范围内完成,并在每个主题的基础上进行覆盖。#强制刷新数据到磁盘之前要接受的消息数
#log.flush.interval.messages=10000#强制刷新之前消息可以在日志中停留的最长时间
#log.flush.interval.ms=1000############################# Log Retention Policy(日志保留政策) #############################
#以下配置控制日志段的处理。政策可以设置为在一段时间后或在累积给定大小后删除段。只要满足这些条件*either*,就会删除一个段。删除总是发生从日志的末尾开始。# segment文件保留的最长时间,默认保留7天(168小时),超时将被删除,也就是说7天之前的数据将被清理掉
log.retention.hours=168# 日志的基于大小的保留策略。消息数量大于改值将删除最旧的数据。函数独立于log.retenation.h。
log.retention.bytes=1073741824#Kafka 调用cleanupLogs的时间间隔 ,该函数对所有 Logs 执行清理操作,(目前不确定 Logs 对应的是 Topic 还是 Partition,目测应当是 Partition)
log.cleanup.interval.mins=2# 日志段文件的最大大小。达到此大小时,将创建新的日志段。
log.segment.bytes=104857600#检查日志段以查看是否可以删除日志段的时间间隔
log.retention.check.interval.ms=300000############################# Zookeeper #############################
#zookeeper集群的地址,可以是多个,多个之间用逗号分割
zookeeper.connect=10.20.11.1:2181,10.20.11.2:2181,10.20.11.3:2181 #设置连接Zeekeeper超时时间
zookeeper.connection.timeout.ms=6000 ############################# Group Coordinator Settings(组协调员设置) #############################
#以下配置指定GroupCoordinator将延迟初始消费者重新平衡的时间(以毫秒为单位)。当新成员加入组时,重新平衡将进一步延迟group.initial.rebalance.delay.ms的值,最多为max.poll.interval.ms。默认值为3秒。我们将此覆盖为0,因为它为开发和测试提供了更好的开箱即用体验。但是,在生产环境中,默认值3秒更合适,因为这有助于避免在应用程序启动期间不必要且可能很昂贵的重新平衡。
group.initial.rebalance.delay.ms = 0s