欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 资讯 > sparkSql——wordcount案例

sparkSql——wordcount案例

2024/12/22 1:23:28 来源:https://blog.csdn.net/weixin_52642840/article/details/144457984  浏览:    关键词:sparkSql——wordcount案例

我们一般处理的数据都是结构化的数据,结构化的数据最好使用SQL来解决。

sparkCore就是对RDD的操作

sparkSql就是对dataframe的操作

          SQL语句

          DSL算子

获取dataframe的方式

1、直接通过文件
2、通过Rdd获取
3、使用spark.createDataFrame
data = [("Tom", 20), ("Jerry", 18)] 
columns = ["name", "age"]  
df = spark.createDataFrame(data, columns)当为一元组时
data = [(471,)]
columns = ["userId"]
userDf = spark.createDataFrame(data, columns)

wordcount案例

hadoop spark
hive hadoop spark  spark
hue hbase hbase hue  hue
hadoop sparkhive hadoop  spark spark
hue hbase  hbase hue hue
hadoop sparkhive hadoop spark  spark
hue hbase hbase  hue hue
hadoop spark

sparkSql写法

import osfrom pyspark.sql import SparkSessionif __name__ == '__main__':# 配置环境os.environ['JAVA_HOME'] = 'E:/java-configuration/jdk-8'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'E:/applications/bigdata_config/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 创建一个sparkSession对象spark = SparkSession.builder.master("local[2]").appName("第一个sparksql案例").config("spark.sql.shuffle.partitions",2).getOrCreate()# 将一个文件变成dataFrame,然后创建一个临时表df = spark.read.text("../../data/wordcount/input/data.txt")df.createOrReplaceTempView("wordcount")# 开始写sparkSqlspark.sql("""with t1 as ( select  trim(word) word,1 i from wordcount lateral view explode(split(value," ")) words as word )select word,sum(i) sumCount from t1 where word != "" group by word""").show()#show 的使用: 第一个参数是展示的条数 默认为20行# 第二个参数truncate 默认为True ,表示若显示的数据过长就会折叠起来spark.stop()

DSL操作的写法 

类似于RDD的编程方式:调用算子函数来实现处理

流程:直接调用DataFrame的DSL函数进行处理原生DSL函数【将SQL语法变成了函数】:select、where、groupBy、orderBy、limit、count、agg

 

import osfrom pyspark.sql import SparkSession
from pyspark.sql import functions as Fif __name__ == '__main__':# 配置环境os.environ['JAVA_HOME'] = 'E:/java-configuration/jdk-8'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'E:/applications/bigdata_config/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 创建一个sparkSession对象spark = SparkSession.builder.master("local[2]").appName("第一个sparksql案例").config("spark.sql.shuffle.partitions",2).getOrCreate()# 将一个文件变成dataFrame,然后创建一个临时表df = spark.read.text("../../data/wordcount/input/data.txt")df.createOrReplaceTempView("wordcount")# 打印表结构df.printSchema()# DSL操作 df.select(F.explode(F.split("value", " ")).alias("word")) \.where("trim(word) !=''").groupby("word").count().orderBy("count", ascending=False).show()spark.stop()

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com