文章目录
- 1. 流式词频统计
- 1.1 Spark Streaming编程步骤
- 1.2 流式词频统计项目
- 1.2.1 创建项目
- 1.2.2 添加项目依赖
- 1.2.3 修改源目录
- 1.2.4 添加scala-sdk库
- 1.2.5 创建日志属性文件
- 1.3 创建词频统计对象
- 1.4 利用nc发送数据
- 1.5 启动应用,查看结果
- 2. 编程模型的基本概念
- 2.1 数据源依赖的配置
- 2.2 SparkConf概述
- 2.3 StreamingContext概述
- 2.4 初始化StreamingContext详解
- 3. 离散化数据流
- 3.1 DStream概述
- 3.2 DStream内部原理
- 4. 基本数据源
- 4.1 Spark Streaming支持两种数据源
- 4.2 Spark Streaming 资源分配注意事项
- 4.3 网络端口使用单个接收器获取数据
- 1. 获取程序入口
- 2. 业务代码编写
- 3. 启动实时流程序
- 4. 查看程序运行结果
- 5. 基本DStream转换操作
- 6. DStream输出操作
1. 流式词频统计
- 本实战演示了如何使用 Spark Streaming 实现实时词频统计。通过创建 Spark Streaming 项目,添加依赖,编写 Scala 代码,监听网络端口接收数据流,并按批次处理数据。利用
nc
工具发送数据,程序每10秒统计一次词频并输出结果。该示例展示了 Spark Streaming 的微批处理特性,适用于实时数据处理场景。
1.1 Spark Streaming编程步骤
- 添加SparkStreaming相关依赖
- 获取程序入口接收数据
- 对数据进行业务处理
- 获取最终结果
- 启动程序等待程序执行结束
1.2 流式词频统计项目
1.2.1 创建项目
- 设置项目基本信息
- 单击【Create】按钮,生成项目基本骨架
1.2.2 添加项目依赖
- 在
pom.xml
文件里添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>net.huawei.streaming</groupId><artifactId>SparkStreamingDemo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.3.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.3.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.3.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version></dependency></dependencies></project>
- 刷新项目依赖
1.2.3 修改源目录
-
将
java
修改为scala
-
在
pom.xml
里设置源目录
1.2.4 添加scala-sdk库
- 在项目结构对话里添加
- 单击【Add to Modules】菜单项
- 单击【OK】按钮以后,就可以在
scala
里创建Scala Class
了
1.2.5 创建日志属性文件
- 在
resources
里创建log4j2.properties
文件
rootLogger.level = ERROR
rootLogger.appenderRef.stdout.ref = consoleappender.console.type = Console
appender.console.name = console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex
1.3 创建词频统计对象
- 创建
net.huawei.streaming
包
- 在
net.huawei.streaming
包里创建SparkStreamingWordCount
对象
package net.huawei.streamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** 功能:流式词频统计* 作者:华卫* 日期:2025年01月23日*/
object SparkStreamingWordCount {def main(args: Array[String]): Unit = {// 创建SparkConf对象,2个线程,本地运行val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingWordCount")// 创建StreamingContext对象,10秒一个批次val ssc: StreamingContext = new StreamingContext(conf, Seconds(10))// 创建ReceiverInputDStream对象接收来自网络端口的数据val lines: ReceiverInputDStream[String] = ssc.socketTextStream("bigdata1", 9999)// lines中每条数据按照空格进行切分然后扁平化处理val words: DStream[String] = lines.flatMap(_.split(" "))// words中每条数据转换成(word,1)二元组val wordmap: DStream[(String, Int)] = words.map(word => (word, 1))// wordmap中每条数据按key分组,按value进行累加求和val wordcount: DStream[(String, Int)] = wordmap.reduceByKey(_ + _)// 打印词频统计结果 wordcount.print()// 启动实时流程序ssc.start()// 等待实时流程序结束ssc.awaitTermination()}
}
- 代码说明:这段代码实现了一个基于Spark Streaming的实时词频统计程序。它通过监听指定端口(
bigdata1:9999
)接收数据流,将每行数据按空格切分并扁平化为单词,然后统计每个单词的出现次数。程序每10秒处理一个批次的数据,并打印词频统计结果。代码结构清晰,适用于实时数据处理场景。
1.4 利用nc发送数据
- 在
bigdata1
节点利用nc
发送数据,执行命令:nc -lp 9999
1.5 启动应用,查看结果
- 启动
SparkStreamingWordCount
对象,在bigdata1
节点上输入数据,在控制台查看词频统计结果
- 结果说明:Spark Streaming 采用微批处理,每批次数据独立处理,批次间不共享状态或共同计数。默认情况下,批次间数据互不影响。如需跨批次状态管理,可使用
updateStateByKey
或mapWithState
实现累加计数等功能。这种设计确保了流数据处理的灵活性和高效性。
2. 编程模型的基本概念
2.1 数据源依赖的配置
- 对于Spark Streaming Core API中不存在的数据源(例如Kafka/Kinesis)获取数据,必须要在工程中引入数据源相关的依赖:
Source | Artifact |
---|---|
Kafka | spark-streaming-kafka-0-10_2.12 |
Kinesis | spark-streaming-kinesis-asl_2.12 [Amazon Software License] |
- 需要注意的是引入的依赖要和环境保护一致,例如
spark-streaming-kafka-0-10_2.12
这个是 scala 的版本,要和工程的 scala 版本保持一致。
2.2 SparkConf概述
SparkConf
是 Apache Spark 中用于管理应用程序配置的核心类。它通过键值对的形式设置和存储配置参数,如 Master URL、应用程序名称、资源分配等。SparkConf
支持从系统属性加载配置,并允许动态设置和获取参数。其配置优先级高于默认配置文件,确保应用程序在不同环境中灵活运行。SparkConf
还提供克隆和调试功能,便于配置管理和问题排查。
2.3 StreamingContext概述
StreamingContext
是 Apache Spark Streaming 的核心类,负责流式数据处理的初始化和管理。它通过SparkContext
和SparkConf
配置运行环境,支持多种数据源(如 Socket、文件、队列等),并提供启动、停止和检查点功能。StreamingContext
还允许用户定义数据处理逻辑,并通过监听器监控作业状态,确保流式应用的可靠性和高效性。
2.4 初始化StreamingContext详解
// 创建SparkConf对象,2个线程,本地运行
val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingWordCount")
// 创建StreamingContext对象,10秒一个批次
val ssc: StreamingContext = new StreamingContext(conf, Seconds(10))
- conf对象是创建任何spark组件的入口,使用new SparkConf()可以构建对象。
- SparkConf实例对象管理Spark应用程序的配置,所有的配置采用key-value键值对的方式进行配置,代码中设置的参数优先级别大于配置文件中的配置。
- 如果想做单元测试的话,可以使用new SparkConf(false)的方式跳过加载外部的配置文件和系统属性(默认会加载spark和java系统所有配置信息)。
- 将SparkConf对象传递给 SparkContext 实例之后,SparkConf对象将被克隆,用户将无法再对其进行修改。Spark不支持在运行时修改配置。
- setMaster()可以设置Master运行模式,本地测试一般使用local方式,local[*]这种方式一般不推荐使用,默认使用当前机器所有的core,线上环境建使用spark-submit的方式进行资源配置,防止编写硬编码的程序。
- setAppName()设置程序运行时的名字,本地测试和集群线上环境都是通用的。
- 任何一个流式计算程序都需要一个StreamingContext实例,创建当前实例一般需要2个参数,一个是conf配置信息,另外一个是多长时间处理一批数据的参数,Seconds(num),num的单位是秒,具体num设置多少,取决于集群处理数据的能力。
3. 离散化数据流
3.1 DStream概述
- SparkStreaming数据模型:DStream是Spark Streaming中最基本最重要的一个抽象概念(可以把DStream简单的理解为一个类,类中可以保存数据并且提供一系列操作数据的方法)。
- DStream数据结构含义:DStream表示为一个连续的数据流,内部由一系列的数据组成,这些数据既可以是从数据源接收到的数据,也可以是从数据源接收到的数据经过transform操作转换后的数据。
3.2 DStream内部原理
- 从本质上来说一个DStream是由一系列连续的RDDs组成,DStream中的每一个RDD包含了一个batch的数据,每个batch的数据都包含了特定时间间隔的数据(也可以理解为DStream中的每个RDD都是包含了特定时间间隔的数据)
DStream转换本质 | SparkStreaming默认参数 |
---|---|
DStream类中存在数据和对数据的操作方法(函数),如果对DStream中的数据使用函数进行操作的话,本质是对其内部的RDD进行操作的,这些底层RDD转换由Spark引擎进行计算。DStream相关操作隐藏了大部分这些细节,并为开发人员提供了更高级别的API以方便它们 | 在SparkStreaming中默认200ms会形成一个数据块(无论是否接收到数据),当到达StreamingContext实例设置的批次时间时会触发业务逻辑算子处理这些数据 |
- DStream上做的所有操作在底层都会转换成RDD的操作,每一个时间批次的数据都会被算子与算子之间的血缘关系链中的所有算子进行处理。
4. 基本数据源
4.1 Spark Streaming支持两种数据源
- 基本数据源:通过网络端口获取数据、监控文件系统
- 扩展数据源:Kafka、Kinesis 等第三方数据存储系统
- 注意:从扩展数据源中获取数据时,Spark Streaming 中没有直接支持的 API,需要额外引入依赖
4.2 Spark Streaming 资源分配注意事项
- Spark Streaming 是长期运行的任务,需要占用一定的内存和 CPU 核。
- 在分配资源时,必须确保 CPU 核数大于接收数据的核数。
- 如果 CPU 核数不足,可能会导致所有 CPU 核都用于接收数据,而没有 CPU 核来处理数据。
4.3 网络端口使用单个接收器获取数据
- 在正式编写程序之前,需要准备一个对外开放的端口来传输数据。可以使用
nc -lp port
命令开放一个端口用于传输数据。启动实时流程序后,从port
获取数据形成 DStream。需要注意的是,在启动实时流程序前,必须提前开放一个传输数据的端口。
1. 获取程序入口
- 导入相关类
- 设置日志打印级别
- 构建
SparkConf
实例对象
- 构建
SparkContext
实例对象
- 构建
StreamingContext
实例对象,每间隔5
秒处理一批数据
2. 业务代码编写
- 测试目的:从网络端口获取数据
- 业务代码功能:仅打印从网络端口获取的数据
3. 启动实时流程序
4. 查看程序运行结果
- 启动
nc
,准备输入数据
- 启动
SparkStreamingNetcat
对象,在bigdata1
节点通过nc
输入数据
- 查看控制台输出结果