欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 国际 > kafka和Flume的整合

kafka和Flume的整合

2024/11/17 4:34:50 来源:https://blog.csdn.net/m0_57764570/article/details/143723212  浏览:    关键词:kafka和Flume的整合

目录

一、Kafka作为Source 【数据进入到kafka中,抽取出来】

 1、在我的flume的conf文件夹下,有个myconf文件夹:

2、 创建一个flume脚本文件: kafka-memory-logger.conf

 3、测试

二、kafka作为Sink 【数据从别的地方抽取到kafka里面】 

1、创建一个flume脚本文件:flume-kafka-sink.conf

 2、测试


 

一、Kafka作为Source 【数据进入到kafka中,抽取出来】

 1、在我的flume的conf文件夹下,有个myconf文件夹:

2、 创建一个flume脚本文件: kafka-memory-logger.conf

flume学习网站:Flume 1.9用户手册中文版 — 可能是目前翻译最完整的版本了 (liyifeng.org)

# 来到这个目录下
cd /opt/installs/flume/conf/myconf
# 创建一个conf文件
vi kafka-memory-logger.conf

在kafka-memory-logger.conf文件中写入:

a1.sources = r1

a1.channels = c1

a1.sinks=k1

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource

a1.sources.r1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092

a1.sources.r1.kafka.topics = bigdata

a1.sources.r1.kafka.consumer.group.id = text7

a1.sources.r1.batchSize = 100

a1.sources.r1.batchDurationMillis = 2000

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = logger

a1.sinks.k1.maxBytesToLog = 128

 3、测试

启动一个消息生产者,向topic中发送消息,启动flume,接收消息

  • 启动一个消息生产者,向topic中发送消息:
kafka-console-producer.sh --bootstrap-server bigdata01:9092 --topic bigdata

  • 启动flume,接收消息 
flume-ng agent -n a1 -c ../ -f kafka-memory-logger.conf -Dflume.root.logger=INFO,console

二、kafka作为Sink 【数据从别的地方抽取到kafka里面】 

 

 

 

1、创建一个flume脚本文件:flume-kafka-sink.conf

 在flume-kafka-sink.conf文件中写入:

a1.sources = r1

a1.channels = c1

a1.sinks=k1

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

a1.sources.r1.type = netcat

a1.sources.r1.bind = bigdata01

a1.sources.r1.port = 44444

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.kafka.topic = bigdata

a1.sinks.k1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092

a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.kafka.producer.linger.ms = 1

 2、测试

启动:

flume-ng agent -n a1 -c ../ -f flume-kafka-sink.conf -Dflume.root.logger=INFO,console

 使用telnet命令,向端口发送消息:

yum -y install telnettelnet bigdata01 44444

 

 在窗口不断的发送文本数据,数据被抽取到了kafka中,如何获取kafka数据呢?使用消费者:

kafka-console-consumer.sh --topic bigdata --bootstrap-server bigdata01:9092

 

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com