master地址 hadoop100:8080
历史服务器 hadoop100:18080
hdfs地址 http://hadoop100:9870/dfshealth.html#tab-overview
1
centos安装hadoop集群,
上传文件到hdfs
2
安装spark standalone集群,查看自带的pyspark使用的python版本,然后安装annaconda安装该版本的虚拟环境,安装该版本的pyspark依赖包
3 python pyspark代码
pycharm远程选择python解释器
编写pyspark代码
import timefrom pyspark.sql import SparkSession
from datetime import datetime# 获取当前年月日时分秒
current_time_str = datetime.now().strftime("%Y%m%d%H%M%S")
print(current_time_str)# 创建 SparkSession 并设置 Python 环境
spark = SparkSession.builder \.appName(f"Demo{current_time_str}") \.master('spark://192.168.111.100:7077') \.config("spark.pyspark.python", "python") \.config("spark.eventLog.enabled", "true") \.config("spark.eventLog.dir", "hdfs://hadoop100:9820/directory") \.getOrCreate()# 从 HDFS 中读取 CSV 文件
flights_df = spark.read.csv("hdfs://hadoop100:9820/input/flights.csv", header=True, inferSchema=True)result_f = flights_df.filter(flights_df['FLIGHT_NUMBER'] > 98)
result = result_f.groupBy("AIRLINE").count().orderBy('AIRLINE')
print(result.collect())# time.sleep(2000)
4编写java代码
编写好后传到服务器打jar包后执行
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>demo_java_spark</artifactId><version>1.0-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.5.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.5.0</version></dependency></dependencies><build><plugins><!-- Maven Compiler Plugin --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version></plugin><!-- Maven Shade Plugin --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><createDependencyReducedPom>true</createDependencyReducedPom><transformers><!-- 定义 Main-Class --><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>SparkApp</mainClass></transformer></transformers><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><!-- Additional configuration. --></configuration></execution></executions></plugin></plugins></build></project>
代码
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;import java.text.SimpleDateFormat;
import java.util.Date;public class SparkApp {public static void main(String[] args) {// 获取当前年月日时分秒String currentTimeStr = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());System.out.println(currentTimeStr);// 创建 SparkSession 并设置应用名称和 Spark masterSparkSession spark = SparkSession.builder().appName("Demo" + currentTimeStr).master("spark://192.168.111.100:7077").config("spark.eventLog.enabled", "true").config("spark.eventLog.dir", "hdfs://hadoop100:9820/directory").getOrCreate();// 从 HDFS 读取 CSV 文件Dataset<Row> flightsDf = spark.read().option("header", "true").option("inferSchema", "true").csv("hdfs://hadoop100:9820/input/flights.csv");// 过滤并分组计数Dataset<Row> resultF = flightsDf.filter(flightsDf.col("FLIGHT_NUMBER").gt(98));Dataset<Row> result = resultF.groupBy("AIRLINE").count().orderBy("AIRLINE");// 打印结果result.show();// 保持程序运行以查看 Spark UItry {Thread.sleep(2000 * 1000); // 2000秒} catch (InterruptedException e) {e.printStackTrace();}// 关闭 SparkSessionspark.stop();}
}
5编写scala代码
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>demo_java_spark</artifactId><version>1.0-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.5.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.5.0</version></dependency></dependencies><build><plugins>
<!-- <!– Maven Compiler Plugin –>-->
<!-- <plugin>-->
<!-- <groupId>org.apache.maven.plugins</groupId>-->
<!-- <artifactId>maven-compiler-plugin</artifactId>-->
<!-- <version>3.8.1</version>-->
<!-- </plugin>--><!-- Maven Compiler Plugin for Scala --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin><!-- Maven Shade Plugin --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><createDependencyReducedPom>true</createDependencyReducedPom><transformers><!-- 定义 Main-Class --><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>scala.SparkApp</mainClass></transformer></transformers><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><!-- Additional configuration. --></configuration></execution></executions></plugin></plugins></build></project>
SparkApp.scala
package scalaimport org.apache.spark.sql.SparkSessionimport java.text.SimpleDateFormat
import java.util.Dateobject SparkApp {def main(args: Array[String]): Unit = {// 获取当前年月日时分秒val currentTimeStr = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())println(currentTimeStr)// 创建 SparkSession 并设置应用名称和 Spark masterval spark = SparkSession.builder().appName(s"Demo$currentTimeStr").master("spark://192.168.111.100:7077").config("spark.eventLog.enabled", "true").config("spark.eventLog.dir", "hdfs://hadoop100:9820/directory").getOrCreate()// 从 HDFS 读取 CSV 文件val flightsDf = spark.read.option("header", "true").option("inferSchema", "true").csv("hdfs://hadoop100:9820/input/flights.csv")// 过滤并分组计数val resultF = flightsDf.filter(flightsDf("FLIGHT_NUMBER") > 98)val result = resultF.groupBy("AIRLINE").count().orderBy("AIRLINE")// 显示结果result.show()// 保持程序运行以查看 Spark UIThread.sleep(2000 * 1000) // 2000秒// 关闭 SparkSessionspark.stop()}
}