欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 新车 > Spark-SQL核心编程

Spark-SQL核心编程

2025/4/16 10:43:01 来源:https://blog.csdn.net/2401_82459288/article/details/147240923  浏览:    关键词:Spark-SQL核心编程

 Spark-SQL核心编程

实验内容:利用IDEA开发Spark-SQL。

实验步骤:利用IDEA开发Spark-SQL

  1. 创建子模块Spark-SQL,并添加依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.0.0</version>
</dependency>

  1. 创建Spark-SQL的测试代码:

case class User(id:Int,name:String,age:Int)

object SparkSQLDemo {
  def main(args: Array[String]): Unit = {
    //创建上下文环境配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQLDemo")
    //创建SparkSession对象
    val spark :SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    import spark.implicits._
    //读取json文件
    val df : DataFrame = spark.read.json("Spark-SQL/input/user.json")
    df.show()
    //SQL风格语法
    df.createOrReplaceTempView("user")
    spark.sql("select * from user").show
    spark.sql("select avg(age) from user").show

    //DSL风格语法
    df.select("username","age").show()

    //RDD=>DataFrame=>DataSet
    //RDD
    val rdd1 :RDD[(Int,String,Int)] = spark.sparkContext.makeRDD(
      List((1,"zhangsan",30),(2,"lisi",40))
    )
    //DataFrame
    val df1 :DataFrame = rdd1.toDF("id","name","age")
    df1.show()
    //DataSet
    val ds1 :Dataset[User] = df1.as[User]
    ds1.show()

    //DataSet=>DataFrame=>RDD
    val df2 =ds1.toDF()
    df2.show()

    val rdd2 :RDD[Row] = df2.rdd
    rdd2.foreach(a=>println(a.getString(1)))

    rdd1.map{
      case (id,name,age)=>User(id,name,age)
    }.toDS().show()

    val rdd3 = ds1.rdd
    rdd3.foreach(a=>println(a.age))
    rdd3.foreach(a=>println(a.id))
    rdd3.foreach(a=>println(a.name))

    spark.stop()
  }

}

自定义函数:

UDF:

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQLDemo")
//创建SparkSession对象
val spark :SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

import spark.implicits._
//读取json文件
val df : DataFrame = spark.read.json("Spark-SQL/input/user.json")

spark.udf.register("addName",(x:String)=>"Name:"+x)

df.createOrReplaceTempView("people")
spark.sql("select addName(username),age from people").show()

spark.stop()

UDAF(自定义聚合函数)

强类型的 Dataset 和弱类型的 DataFrame 都提供了相关的聚合函数, 如 count(),

countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。Spark3.0之前我们使用的是UserDefinedAggregateFunction作为自定义聚合函数,从 Spark3.0 版本后可以统一采用强类型聚合函数 Aggregator

实验需求:计算平均工资

实现方式一:RDD

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

object Ycc {

  def main(args: Array[String]): Unit = {

    // 创建 SparkConf 对象

    val sparkconf: SparkConf = new SparkConf().setAppName("app").setMaster("local[*]")

    // 创建 SparkContext 对象

    val sc: SparkContext = new SparkContext(sparkconf)

    // 创建 RDD 并进行转换和聚合操作

    val resRDD = sc.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangwu", 40))).map {

      case (name, salary) => {

        (salary, 1)

      }

    }.reduce {

      (t1, t2) => {

        (t1._1 + t2._1, t1._2 + t2._2)

      }

    }

    // 计算平均工资

    println(resRDD._1.toDouble / resRDD._2)

    // 关闭 SparkContext 连接

    sc.stop()

  }

}  

实现方式:强类型UDAF

import org.apache.spark.SparkConf

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.Encoder

import org.apache.spark.sql.Encoders

import org.apache.spark.sql.functions

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.DataFrame

// 定义一个用于存储总和和计数的缓冲区类

case class Buff(var sum: Long, var cnt: Long)

// 自定义聚合函数类,继承自 Aggregator

class MyAverageUDAF extends org.apache.spark.sql.expressions.Aggregator[Long, Buff, Double] {

  // 初始化缓冲区

  override def zero: Buff = Buff(0, 0)

  // 合并输入值到缓冲区

  override def reduce(b: Buff, a: Long): Buff = {

    b.sum += a

    b.cnt += 1

    b

  }

  // 合并两个缓冲区

  override def merge(b1: Buff, b2: Buff): Buff = {

    b1.sum += b2.sum

    b1.cnt += b2.cnt

    b1

  }

  // 完成聚合操作,计算最终结果

  override def finish(reduction: Buff): Double = {

    reduction.sum.toDouble / reduction.cnt

  }

  // 定义缓冲区的编码器

  override def bufferEncoder: Encoder[Buff] = Encoders.product

  // 定义输出结果的编码器

  override def outputEncoder: Encoder[Double] = Encoders.scalaDouble

}

object Cyy {

  def main(args: Array[String]): Unit = {

    // 创建 SparkConf 对象,设置应用名称和运行模式

    val sparkconf: SparkConf = new SparkConf().setAppName("app").setMaster("local[*]")

    // 创建 SparkSession 对象

    val spark: SparkSession = SparkSession.builder().config(sparkconf).getOrCreate()

    import spark.implicits._

    // 创建 RDD 并存储人员姓名和工资信息

    val res: RDD[(String, Int)] = spark.sparkContext.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangwu", 40)))

    // 将 RDD 转换为 DataFrame

    val df: DataFrame = res.toDF("name", "salary")

    // 在 Spark 中注册临时视图

    df.createOrReplaceTempView("user")

    // 创建自定义聚合函数实例

    var myAverage = new MyAverageUDAF

    // 在 Spark 中注册聚合函数

    spark.udf.register("avgSalary", functions.udaf(myAverage))

    // 执行 SQL 查询,计算平均工资并显示结果

    spark.sql("select avgSalary(salary) from user").show()

    // 关闭 SparkSession 连接

    spark.stop()

  }

}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词