欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > 名人名企 > Java技术栈 —— Spark入门(二)之实时WordCount

Java技术栈 —— Spark入门(二)之实时WordCount

2025/2/25 1:29:34 来源:https://blog.csdn.net/weixin_44327736/article/details/140025908  浏览:    关键词:Java技术栈 —— Spark入门(二)之实时WordCount

Java技术栈 —— Spark入门(二)

  • 一、kafka
    • 1.1 创建topic
    • 1.2 准备input与查看output
  • 二、spark
    • 2.1 spark下的程序文件
    • 2.2 用spark-submit提交作业

参考文章:

参考文章或视频链接
[1] 《Kafka + Spark Stream实时WordCount》

实验环境:
假设你的用户为root,以下软件安装路径为/opt

软件版本
spark: 3.5.2 (scala 2.12)
kafka: 3.8.0 (scala 2.13)

实验结构图

在这里插入图片描述

一、kafka

1.1 创建topic

# 创建input
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test.wordcount.input --partitions 1 --replication-factor 1
# 创建output
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test.wordcount.output --partitions 1 --replication-factor 1

1.2 准备input与查看output

# 打开两个terminal终端
# 准备键盘输入作为prodcuer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test.wordcount.input
# 在屏幕上查看输出
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test.wordcount.output

二、spark

2.1 spark下的程序文件

# coding=utf-8
# /opt/spark-3.5.2-bin-hadoop3/jobs/pyjobs/kafka-wordcount.py
from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql import functions as FbootstrapServers = "localhost:9092"spark = SparkSession\.builder\.appName("StructuredKafkaWordCount")\.getOrCreate()# 基于来自kafka的数据流,创建dataframe
lines = spark\.readStream\.format("kafka")\.option("kafka.bootstrap.servers", bootstrapServers)\.option("subscribe", "test.wordcount.input")\.option("failOnDataLoss", False)\.option("group.id", "wordcount-group3")\.load()\.selectExpr("CAST(value AS STRING)")# 将单行数据拆分,转成多行数据
words = lines.select(explode(split(lines.value, ' ')).alias('word')
)# 对单词进行分组,并计算总数
wordCounts = words.groupBy('word').count()# 将两列数据合并成单列数据
wordCounts = wordCounts.select(F.concat(F.col("word"), F.lit("|"), F.col("count").cast("string")).alias("value"))# 测试时,可以不将结果写入kafka,直接输出到控制台
# query = wordCounts \
#     .writeStream \
#     .outputMode("complete") \
#     .format("console") \
#     .start()# 将结果输出到 test.wordcount.output
query = wordCounts \.writeStream \.format('kafka') \.outputMode('update') \.option("kafka.bootstrap.servers", bootstrapServers) \.option('checkpointLocation', '/spark/job-checkpoint') \.option("topic", "test.wordcount.output") \.start()query.awaitTermination()

2.2 用spark-submit提交作业

# 提交Spark作业,这个过程需要保证网络畅通,会将一些依赖下载到/root/.ivy2/jars目录下
$SPARK_HOME/bin/spark-submit \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,\
org.apache.kafka:kafka-clients:3.5.2 \
/opt/spark-3.5.2-bin-hadoop3/jobs/pyjobs/kafka-wordcount.py

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词