欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 维修 > Spark-streaming核心编程

Spark-streaming核心编程

2025/4/30 2:24:48 来源:https://blog.csdn.net/2401_89907524/article/details/147476816  浏览:    关键词:Spark-streaming核心编程

1.导入依赖

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>

<version>3.0.0</version>

</dependency>

2.编写代码

创建SparkConfStreamingContext

定义Kafka相关参数,如bootstrap serversgroup idkeyvaluedeserializer

使用KafkaUtils.createDirectStream方法创建DStream,该方法接受StreamingContext、位置策略、消费者策略等参数。

提取数据中的value部分,并进行word count计算。

启动StreamingContext并等待其终止。

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}

import org.apache.spark.SparkConf

import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.apache.spark.streaming.dstream.{DStream, InputDStream}

import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

object DirectAPI {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("direct")

    val ssc = new StreamingContext(sparkConf,Seconds(3))

    //定义kafka相关参数

    val kafkaPara :Map[String,Object] = Map[String,Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG

      ->"node01:9092,node02:9092,node03:9092",

      ConsumerConfig.GROUP_ID_CONFIG->"kafka",

      "key.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer",

      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"

    )

    //通过读取kafka数据,创建DStream

    val kafkaDStream:InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](

      ssc,LocationStrategies.PreferConsistent,

      ConsumerStrategies.Subscribe[String,String](Set("kafka"),kafkaPara)

    )

    //提取出数据中的value部分

    val valueDStream :DStream[String] = kafkaDStream.map(record=>record.value())

    //wordCount计算逻辑

    valueDStream.flatMap(_.split(" "))

      .map((_,1))

      .reduceByKey(_+_)

      .print()

    ssc.start()

    ssc.awaitTermination()

  }

  }

3.运行程序

开启Kafka集群。

4.使用Kafka生产者产生数据。

kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic kafka

​5运行Spark Streaming程序,接收Kafka生产的数据并进行处理。

6.查看消费进度

使用Kafka提供的kafka-consumer-groups.sh脚本查看消费组的消费进度。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词