我们一般处理的数据都是结构化的数据,结构化的数据最好使用SQL来解决。
sparkCore就是对RDD的操作
sparkSql就是对dataframe的操作
SQL语句
DSL算子
获取dataframe的方式
1、直接通过文件
2、通过Rdd获取
3、使用spark.createDataFrame
data = [("Tom", 20), ("Jerry", 18)]
columns = ["name", "age"]
df = spark.createDataFrame(data, columns)当为一元组时
data = [(471,)]
columns = ["userId"]
userDf = spark.createDataFrame(data, columns)
wordcount案例
hadoop spark
hive hadoop spark spark
hue hbase hbase hue hue
hadoop sparkhive hadoop spark spark
hue hbase hbase hue hue
hadoop sparkhive hadoop spark spark
hue hbase hbase hue hue
hadoop spark
sparkSql写法
import osfrom pyspark.sql import SparkSessionif __name__ == '__main__':# 配置环境os.environ['JAVA_HOME'] = 'E:/java-configuration/jdk-8'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'E:/applications/bigdata_config/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe' # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 创建一个sparkSession对象spark = SparkSession.builder.master("local[2]").appName("第一个sparksql案例").config("spark.sql.shuffle.partitions",2).getOrCreate()# 将一个文件变成dataFrame,然后创建一个临时表df = spark.read.text("../../data/wordcount/input/data.txt")df.createOrReplaceTempView("wordcount")# 开始写sparkSqlspark.sql("""with t1 as ( select trim(word) word,1 i from wordcount lateral view explode(split(value," ")) words as word )select word,sum(i) sumCount from t1 where word != "" group by word""").show()#show 的使用: 第一个参数是展示的条数 默认为20行# 第二个参数truncate 默认为True ,表示若显示的数据过长就会折叠起来spark.stop()
DSL操作的写法
类似于RDD的编程方式:调用算子函数来实现处理
流程:直接调用DataFrame的DSL函数进行处理原生DSL函数【将SQL语法变成了函数】:select、where、groupBy、orderBy、limit、count、agg
import osfrom pyspark.sql import SparkSession
from pyspark.sql import functions as Fif __name__ == '__main__':# 配置环境os.environ['JAVA_HOME'] = 'E:/java-configuration/jdk-8'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'E:/applications/bigdata_config/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe' # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 创建一个sparkSession对象spark = SparkSession.builder.master("local[2]").appName("第一个sparksql案例").config("spark.sql.shuffle.partitions",2).getOrCreate()# 将一个文件变成dataFrame,然后创建一个临时表df = spark.read.text("../../data/wordcount/input/data.txt")df.createOrReplaceTempView("wordcount")# 打印表结构df.printSchema()# DSL操作 df.select(F.explode(F.split("value", " ")).alias("word")) \.where("trim(word) !=''").groupby("word").count().orderBy("count", ascending=False).show()spark.stop()