欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 国际 > 【Spark中创建RDD的两种方式】Spark中如何获取sc对象、以及创建RDD的两种方式

【Spark中创建RDD的两种方式】Spark中如何获取sc对象、以及创建RDD的两种方式

2024/11/8 18:29:18 来源:https://blog.csdn.net/lzhlizihang/article/details/143431930  浏览:    关键词:【Spark中创建RDD的两种方式】Spark中如何获取sc对象、以及创建RDD的两种方式

文章目录

  • 一、Spark如何获取sc对象
    • 1、windons 本地模式获取sc对象
    • 2、linux 集群模式获取sc对象
  • 二、创建RDD的两种方式
    • 1、并行化一个已存在的集合
    • 2、读取外部共享存储系统


一、Spark如何获取sc对象

不论是本地测试还是集群模式,都需要指定 JAVA_HOME 和 HADOOP_HOME 路径

最好下载Anaconda,使用Anaconda 进行Python 的部署
如果下载了 Anaconda ,需要指定其下面的 python 环境的路径

需要在 Anaconda 中下载 pyspark

1、windons 本地模式获取sc对象

import os
# 导入pyspark模块
from pyspark import SparkContext, SparkConfif __name__ == '__main__':# 配置环境os.environ['JAVA_HOME'] = 'D:/devs/javajdk/jdk8'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/learn_tools/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'D:/learn_apps/anaconda/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'D:/learn_apps/anaconda/python.exe'# 获取 conf 对象# setMaster  按照什么模式运行,local  bigdata01:7077  yarn#  local[2]  使用2核CPU   * 你本地资源有多少核就用多少核#  appName 任务的名字conf = SparkConf().setMaster("local[*]").setAppName("第一个Spark程序")# 假如我想设置压缩# conf.set("spark.eventLog.compression.codec","snappy")# 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字sc = SparkContext(conf=conf)# 可以打印sc验证是否获取成功print(sc)# 使用完后,记得关闭sc.stop()

2、linux 集群模式获取sc对象

集群模式需要额外指定 Master 所在位置

import os
import timefrom pyspark import SparkContext, SparkConf
import sysif __name__ == '__main__':# 配置环境os.environ['JAVA_HOME'] = '/opt/installs/jdk'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = '/opt/installs/hadoop'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = '/opt/installs/anaconda3/bin/python3'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = '/opt/installs/anaconda3/bin/python3'# 获取sc 对象conf = SparkConf().setMaster("spark://node01:7077").setAppName("wordcount单词统计")sc = SparkContext(conf=conf)print(sc)# 关闭scsc.stop()

二、创建RDD的两种方式

1、并行化一个已存在的集合

方法:parallelize (并行的意思)
将一个集合转换为RDD

# 方式一:将一个已存在的集合转换为RDD
# 创建一个列表:会在Driver内存中构建
data = [1,2,3,4,5,6,7,8,9,10]
# 将列表转换为RDD:将在多个Executor内存中实现分布式存储, numSlices用于指定分区数,所谓的分区就是分为几份,每一份放在一台电脑上
list_rdd = sc.parallelize(data,numSlices=2)
# 打印这个RDD的内容
list_rdd.foreach(lambda x: print(x))

2、读取外部共享存储系统

方法:textFile、wholeTextFile、newAPIHadoopRDD等
读取外部存储系统的数据转换为RDD

# 方式二:读取外部系统
# 读取文件的数据变成RDD,minPartitions用于指定最小分区数
file_rdd =sc.textFile("../datas/function_data/filter.txt", minPartitions=2)
# 输出文件的内容
file_rdd.foreach(lambda line: print(line))

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com