文章目录
- 一、Spark如何获取sc对象
- 1、windons 本地模式获取sc对象
- 2、linux 集群模式获取sc对象
- 二、创建RDD的两种方式
- 1、并行化一个已存在的集合
- 2、读取外部共享存储系统
一、Spark如何获取sc对象
不论是本地测试还是集群模式,都需要指定 JAVA_HOME 和 HADOOP_HOME 路径
最好下载Anaconda,使用Anaconda 进行Python 的部署
如果下载了 Anaconda ,需要指定其下面的 python 环境的路径
需要在 Anaconda 中下载 pyspark
1、windons 本地模式获取sc对象
import os
# 导入pyspark模块
from pyspark import SparkContext, SparkConfif __name__ == '__main__':# 配置环境os.environ['JAVA_HOME'] = 'D:/devs/javajdk/jdk8'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/learn_tools/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'D:/learn_apps/anaconda/python.exe' # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'D:/learn_apps/anaconda/python.exe'# 获取 conf 对象# setMaster 按照什么模式运行,local bigdata01:7077 yarn# local[2] 使用2核CPU * 你本地资源有多少核就用多少核# appName 任务的名字conf = SparkConf().setMaster("local[*]").setAppName("第一个Spark程序")# 假如我想设置压缩# conf.set("spark.eventLog.compression.codec","snappy")# 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字sc = SparkContext(conf=conf)# 可以打印sc验证是否获取成功print(sc)# 使用完后,记得关闭sc.stop()
2、linux 集群模式获取sc对象
集群模式需要额外指定 Master 所在位置
import os
import timefrom pyspark import SparkContext, SparkConf
import sysif __name__ == '__main__':# 配置环境os.environ['JAVA_HOME'] = '/opt/installs/jdk'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = '/opt/installs/hadoop'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = '/opt/installs/anaconda3/bin/python3' # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = '/opt/installs/anaconda3/bin/python3'# 获取sc 对象conf = SparkConf().setMaster("spark://node01:7077").setAppName("wordcount单词统计")sc = SparkContext(conf=conf)print(sc)# 关闭scsc.stop()
二、创建RDD的两种方式
1、并行化一个已存在的集合
方法:parallelize (并行的意思)
将一个集合转换为RDD
# 方式一:将一个已存在的集合转换为RDD
# 创建一个列表:会在Driver内存中构建
data = [1,2,3,4,5,6,7,8,9,10]
# 将列表转换为RDD:将在多个Executor内存中实现分布式存储, numSlices用于指定分区数,所谓的分区就是分为几份,每一份放在一台电脑上
list_rdd = sc.parallelize(data,numSlices=2)
# 打印这个RDD的内容
list_rdd.foreach(lambda x: print(x))
2、读取外部共享存储系统
方法:textFile、wholeTextFile、newAPIHadoopRDD等
读取外部存储系统的数据转换为RDD
# 方式二:读取外部系统
# 读取文件的数据变成RDD,minPartitions用于指定最小分区数
file_rdd =sc.textFile("../datas/function_data/filter.txt", minPartitions=2)
# 输出文件的内容
file_rdd.foreach(lambda line: print(line))