欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 社会 > spark==centos安装hadoop集群,安装spark standalone集群,编写pyspark/java/scala代码使用集群

spark==centos安装hadoop集群,安装spark standalone集群,编写pyspark/java/scala代码使用集群

2024/11/7 18:59:19 来源:https://blog.csdn.net/hebian1994/article/details/143467301  浏览:    关键词:spark==centos安装hadoop集群,安装spark standalone集群,编写pyspark/java/scala代码使用集群

master地址 hadoop100:8080

历史服务器 hadoop100:18080

hdfs地址 http://hadoop100:9870/dfshealth.html#tab-overview

1

centos安装hadoop集群,

上传文件到hdfs

2

安装spark standalone集群,查看自带的pyspark使用的python版本,然后安装annaconda安装该版本的虚拟环境,安装该版本的pyspark依赖包

3 python pyspark代码

pycharm远程选择python解释器

编写pyspark代码

import timefrom pyspark.sql import SparkSession
from datetime import datetime# 获取当前年月日时分秒
current_time_str = datetime.now().strftime("%Y%m%d%H%M%S")
print(current_time_str)# 创建 SparkSession 并设置 Python 环境
spark = SparkSession.builder \.appName(f"Demo{current_time_str}") \.master('spark://192.168.111.100:7077') \.config("spark.pyspark.python", "python") \.config("spark.eventLog.enabled", "true") \.config("spark.eventLog.dir", "hdfs://hadoop100:9820/directory") \.getOrCreate()# 从 HDFS 中读取 CSV 文件
flights_df = spark.read.csv("hdfs://hadoop100:9820/input/flights.csv", header=True, inferSchema=True)result_f = flights_df.filter(flights_df['FLIGHT_NUMBER'] > 98)
result = result_f.groupBy("AIRLINE").count().orderBy('AIRLINE')
print(result.collect())# time.sleep(2000)

4编写java代码

编写好后传到服务器打jar包后执行

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>demo_java_spark</artifactId><version>1.0-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.5.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.5.0</version></dependency></dependencies><build><plugins><!-- Maven Compiler Plugin --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version></plugin><!-- Maven Shade Plugin --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><createDependencyReducedPom>true</createDependencyReducedPom><transformers><!-- 定义 Main-Class --><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>SparkApp</mainClass></transformer></transformers><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><!-- Additional configuration. --></configuration></execution></executions></plugin></plugins></build></project>

代码

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;import java.text.SimpleDateFormat;
import java.util.Date;public class SparkApp {public static void main(String[] args) {// 获取当前年月日时分秒String currentTimeStr = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());System.out.println(currentTimeStr);// 创建 SparkSession 并设置应用名称和 Spark masterSparkSession spark = SparkSession.builder().appName("Demo" + currentTimeStr).master("spark://192.168.111.100:7077").config("spark.eventLog.enabled", "true").config("spark.eventLog.dir", "hdfs://hadoop100:9820/directory").getOrCreate();// 从 HDFS 读取 CSV 文件Dataset<Row> flightsDf = spark.read().option("header", "true").option("inferSchema", "true").csv("hdfs://hadoop100:9820/input/flights.csv");// 过滤并分组计数Dataset<Row> resultF = flightsDf.filter(flightsDf.col("FLIGHT_NUMBER").gt(98));Dataset<Row> result = resultF.groupBy("AIRLINE").count().orderBy("AIRLINE");// 打印结果result.show();// 保持程序运行以查看 Spark UItry {Thread.sleep(2000 * 1000);  // 2000秒} catch (InterruptedException e) {e.printStackTrace();}// 关闭 SparkSessionspark.stop();}
}

5编写scala代码

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>demo_java_spark</artifactId><version>1.0-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.5.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.5.0</version></dependency></dependencies><build><plugins>
<!--            &lt;!&ndash; Maven Compiler Plugin &ndash;&gt;-->
<!--            <plugin>-->
<!--                <groupId>org.apache.maven.plugins</groupId>-->
<!--                <artifactId>maven-compiler-plugin</artifactId>-->
<!--                <version>3.8.1</version>-->
<!--            </plugin>--><!-- Maven Compiler Plugin for Scala --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin><!-- Maven Shade Plugin --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><createDependencyReducedPom>true</createDependencyReducedPom><transformers><!-- 定义 Main-Class --><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>scala.SparkApp</mainClass></transformer></transformers><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><!-- Additional configuration. --></configuration></execution></executions></plugin></plugins></build></project>

SparkApp.scala

package scalaimport org.apache.spark.sql.SparkSessionimport java.text.SimpleDateFormat
import java.util.Dateobject SparkApp {def main(args: Array[String]): Unit = {// 获取当前年月日时分秒val currentTimeStr = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())println(currentTimeStr)// 创建 SparkSession 并设置应用名称和 Spark masterval spark = SparkSession.builder().appName(s"Demo$currentTimeStr").master("spark://192.168.111.100:7077").config("spark.eventLog.enabled", "true").config("spark.eventLog.dir", "hdfs://hadoop100:9820/directory").getOrCreate()// 从 HDFS 读取 CSV 文件val flightsDf = spark.read.option("header", "true").option("inferSchema", "true").csv("hdfs://hadoop100:9820/input/flights.csv")// 过滤并分组计数val resultF = flightsDf.filter(flightsDf("FLIGHT_NUMBER") > 98)val result = resultF.groupBy("AIRLINE").count().orderBy("AIRLINE")// 显示结果result.show()// 保持程序运行以查看 Spark UIThread.sleep(2000 * 1000) // 2000秒// 关闭 SparkSessionspark.stop()}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com