个人的毕业设计项目,仅供学习交流使用
一、项目基本介绍
1. 项目简介
随着互联网和计算机技术的快速发展,计算机行业的招聘需求日益增多。各大招聘平台(如智联招聘、前程无忧、Boss直聘等)上发布了大量的招聘信息,但这些数据分散在不同的平台,且数据量大、格式不统一。为了更好地了解计算机行业的招聘行情,帮助求职者、企业和就业市场官方人员更好地掌握行业动态,我设计并实现了一个基于大数据技术的计算机行业招聘数据分析系统,代码已上传至GitHub。
该系统通过对各大招聘平台的招聘数据进行爬取、清洗、分析和可视化,帮助用户从多个维度了解计算机行业的招聘现状。系统支持不同角色的用户(如求职者、企业、就业市场官方人员等)登录,并根据角色展示不同的数据分析结果。通过可视化的图表(如饼状图、地图、词云图等),用户可以直观地了解招聘市场的趋势、薪资水平、技术要求等信息。
2. 技术栈
-
数据爬取与采集
-
Python:用于编写爬虫脚本,从各大招聘平台(如智联招聘、前程无忧、Boss直聘)爬取招聘数据。
-
Flume:用于将爬取到的数据从本地文件系统上传到HDFS(Hadoop分布式文件系统)中,实现数据的实时采集。
-
-
数据清洗与预处理
-
Spark Core:用于对爬取到的原始数据进行清洗和预处理,统一数据格式(如薪资、城市、学历要求等),并将多个平台的数据合并为一个数据集。
-
正则表达式:用于处理文本数据中的不规范格式(如薪资单位不统一、城市名称不一致等)。
-
-
数据存储
-
HDFS:用于存储清洗后的原始数据,支持大规模数据的分布式存储。
-
Hive:用于将清洗后的数据加载到数据仓库中,创建贴源数据表和明细宽表,支持按月份分区存储数据。
-
MySQL:用于存储分析后的结果数据(如平台招聘量、城市招聘分布、技术要求权重等),供可视化模块使用。
-
-
数据分析
-
Spark SQL:用于对Hive中的数据进行统计分析,生成各类指标表(如平均薪资、招聘数量等)。
-
Scala:用于编写Spark SQL程序,实现复杂的数据分析逻辑。
-
-
数据可视化
-
Spring Boot:用于构建后端服务,提供RESTful API接口,供前端调用。
-
MyBatis:用于与MySQL数据库进行交互,实现数据的增删改查操作。
-
ECharts:用于前端数据可视化,支持多种图表类型(如饼状图、地图、词云图、折线图等)。
-
HTML/CSS/JavaScript:用于构建前端页面,展示数据分析结果。
-
Bootstrap:用于前端页面的样式设计,确保页面的响应式布局。
-
Ajax:用于实现前后端的异步数据交互,提升用户体验。
-
-
任务调度
- Azkaban:用于定时调度数据清洗与数据分析任务,确保系统每月自动更新数据。
-
其他工具
-
Git:用于版本控制,管理项目代码。
-
Tomcat:用于部署Spring Boot项目,提供Web服务。
-
Linux:项目运行在Linux服务器上,使用Shell脚本管理项目运行流程。
-
技术栈总结
模块 | 技术栈 |
---|---|
数据爬取与采集 | Python、八爪鱼、Flume |
数据清洗与预处理 | Spark Core、正则表达式 |
数据存储 | HDFS、Hive、MySQL |
数据分析 | Spark SQL、Scala |
数据可视化 | Spring Boot、MyBatis、ECharts、HTML/CSS/JavaScript、Bootstrap、Ajax |
任务调度 | Azkaban |
其他工具 | Git、Tomcat、Linux |
3. 项目功能
-
招聘平台数据量分析模块
-
功能描述:统计各大招聘平台(如智联招聘、前程无忧、Boss直聘)的招聘数据量,展示各平台的招聘量占比。
-
可视化方式:使用饼状图展示各平台的招聘量分布。
-
用户价值:帮助用户了解哪些平台的招聘信息更丰富,便于求职者选择合适的平台投递简历。
-
-
招聘数据地域分析模块
-
功能描述:统计不同城市、不同岗位的招聘数量和平均薪资,展示招聘热点城市。
-
可视化方式:使用中国地图展示招聘分布情况,地图上的波纹大小表示招聘数量,提示框展示具体数量和平均薪资。
-
用户价值:帮助求职者了解哪些城市的招聘机会更多,薪资水平更高;帮助企业了解各城市的招聘竞争情况。
-
-
招聘数据学历经验要求分析模块
-
功能描述:统计不同学历、不同经验要求的招聘数量和平均薪资,分析学历和经验对薪资的影响。
-
可视化方式:使用折线图展示招聘数量变化,使用柱状图展示平均薪资变化。
-
用户价值:帮助求职者了解不同学历和经验要求的岗位薪资水平,便于制定职业规划;帮助企业了解市场对学历和经验的需求。
-
-
招聘数据技术要求权重分析模块
-
功能描述:统计不同岗位的技术要求出现频次,分析各技术的市场需求。
-
可视化方式:使用词云图展示技术要求的权重,字体大小表示技术需求的频次。
-
用户价值:帮助求职者了解哪些技术更受欢迎,便于提升技能;帮助企业了解市场对技术的需求。
-
-
招聘数据不同行业薪资分析模块
-
功能描述:统计不同行业的平均薪资和招聘数量,展示各行业的薪资水平。
-
可视化方式:使用排行榜图展示各行业的招聘数量和薪资水平。
-
用户价值:帮助求职者了解哪些行业的薪资水平更高;帮助企业了解各行业的招聘竞争情况。
-
-
用户登录与用户管理模块
-
功能描述:
- 注册与登录:用户可以通过邮箱注册账号,选择角色(求职者、企业、就业市场官方人员等),登录后根据角色查看不同的数据分析结果。
- 用户管理:系统管理员可以查看所有用户信息,并对用户进行删除操作。
-
用户价值:确保系统的安全性,不同角色的用户只能查看与其相关的数据分析结果。
-
-
数据时效性与系统健壮性
-
数据时效性:系统采用离线数据分析技术,每月进行一次数据采集与分析,确保数据的准确性和可靠性。
-
系统健壮性:在注册和登录时,系统会对用户输入的信息进行校验(如邮箱格式、密码正确性等),确保系统的安全性。
-
二、项目开发流程分析
1. 开发流程
下面对上述开发流程图进行详细解释:
(1)数据爬取阶段:爬取三大招聘网络平台的数据以CSV格式保存到本地Windows系统上,成果为三个CSV文件。
(2)数据采集阶段:将三个CSV上传到Linux虚拟机上,使用Flume采集技术将数据从Linux虚拟机上传到HDFS文件存储系统上。
(3)数据清洗与预处理阶段:使用Spark Core技术将HDFS上的三个平台的CSV数据清洗并预处理后,合成一个CSV文件,保存到HDFS上。
(4)数据存储阶段:创建一个Hive贴源数据表,将HDFS上经清洗阶段完成的一个CSV文件数据以月份分区加载到Hive数据表中,并创建Hive的明细宽表,将贴源数据表中的月份拆分得到年和月,将贴源数据表的数据加载到明细宽表中。
(5)数据分析阶段:使用Spark SQL On Hive 将Hive上的数据依据功能需求进行分析处理,形成多张指标表,存储到MySQL上。
(6)数据可视化阶段:使用Spring Boot和MyBatis等后端技术将数据进行格式化并提供数据接口给前端。前端使用HTML、Ajax、jQuery、ECharts、Mustache、CSS、JavaScript、Bootstrap等技术将后端传来的数据以功能需求中所要求的可视化图表展示到网站上。网站使用不同角色不同展示内容的方案。
(7)Azkaban任务调度阶段:需要对数据清洗与预处理、数据分析两个阶段进行定时的任务调度。
2. 技术架构
三、项目代码分析
1. 项目代码架构
项目的代码架构主要分为以下几个模块,每个模块对应不同的功能阶段。以下是项目在Linux系统中的文件夹结构展示:
- crow:数据爬取模块,包含爬虫脚本和爬取到的原始数据。
- collect:数据采集模块,负责将爬取到的数据上传到HDFS。
- clean:数据清洗与预处理模块,对原始数据进行清洗和格式化。
- analyze:数据存储与分析模块,将清洗后的数据存储到Hive中,并进行统计分析。
- show:数据可视化模块,将分析结果通过前端页面展示。
2. 数据分析各阶段代码与配置分析
(1)数据爬取阶段和采集阶段:
本阶段的文件夹为crow文件夹和collect文件夹
- crow:包含爬虫脚本和爬取到的原始数据。
- collect:包含Flume配置文件和数据采集脚本。
实现步骤:
- 数据爬取:使用Python编写的爬虫脚本从招聘平台(如智联招聘、前程无忧、Boss直聘)爬取数据,保存为CSV文件。
- 数据采集:
- 将爬取到的数据上传到
collect/origin
文件夹。 - 运行
collect.sh
脚本,启动Flume监听程序,将数据从monitor
文件夹上传到HDFS。 - 运行
zhaopin-crow.sh
脚本,使用Java程序将origin
文件夹中的数据缓慢写入monitor
文件夹,模拟数据流的生成。
- 将爬取到的数据上传到
- 逻辑:
- Flume:负责实时监控数据变化,并将数据上传到HDFS。
- Java程序:模拟数据流的生成,确保Flume能够持续采集数据。
Flume配置文件(zhaopin.conf)
# 1、进程中的source、channel、sink的别名和个数dataCollect.sources = s1 s2 s3
dataCollect.channels = c1 c2 c3
dataCollect.sinks = k1 k2 k3# 2、配置source数据源
dataCollect.sources.s1.type = exec
dataCollect.sources.s1.command = tail -F /opt/zhaopin/collect/monitor/51job.csvdataCollect.sources.s2.type = exec
dataCollect.sources.s2.command = tail -F /opt/zhaopin/collect/monitor/boss.csvdataCollect.sources.s3.type = exec
dataCollect.sources.s3.command = tail -F /opt/zhaopin/collect/monitor/zlzp.csv# 3、配置channel管道
dataCollect.channels.c1.type = memory
dataCollect.channels.c1.capacity = 30000
dataCollect.channels.c1.transactionCapacity = 10000
dataCollect.channels.c1.byteCapacity = 6000000dataCollect.channels.c2.type = memory
dataCollect.channels.c2.capacity = 30000
dataCollect.channels.c2.transactionCapacity = 10000
dataCollect.channels.c2.byteCapacity = 6000000dataCollect.channels.c3.type = memory
dataCollect.channels.c3.capacity = 30000
dataCollect.channels.c3.transactionCapacity = 10000
dataCollect.channels.c3.byteCapacity = 6000000# 4、配置sink下沉地
dataCollect.sinks.k1.type = hdfs
#指定采集的数据存储到HDFS的哪个目录下
dataCollect.sinks.k1.hdfs.path = hdfs://HadoopCluster/zhaopin/%Y%m
#上传文件的前缀
dataCollect.sinks.k1.hdfs.filePrefix = 51job
#上传文件的后缀
dataCollect.sinks.k1.hdfs.fileSuffix = .csv
#是否按照时间滚动文件夹
dataCollect.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
dataCollect.sinks.k1.hdfs.roundValue = 24
#重新定义时间单位
dataCollect.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
dataCollect.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
dataCollect.sinks.k1.hdfs.batchSize = 10000
#设置文件类型,可支持压缩
dataCollect.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
dataCollect.sinks.k1.hdfs.rollInterval = 180
#设置每个文件的滚动大小
dataCollect.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
dataCollect.sinks.k1.hdfs.rollCount = 0
#最小冗余数
dataCollect.sinks.k1.hdfs.minBlockReplicas = 1dataCollect.sinks.k2.type = hdfs
#指定采集的数据存储到HDFS的哪个目录下
dataCollect.sinks.k2.hdfs.path = hdfs://HadoopCluster/zhaopin/%Y%m
#上传文件的前缀
dataCollect.sinks.k2.hdfs.filePrefix = boss
#上传文件的后缀
dataCollect.sinks.k2.hdfs.fileSuffix = .csv
#是否按照时间滚动文件夹
dataCollect.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
dataCollect.sinks.k2.hdfs.roundValue = 24
#重新定义时间单位
dataCollect.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
dataCollect.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
dataCollect.sinks.k2.hdfs.batchSize = 10000
#设置文件类型,可支持压缩
dataCollect.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
dataCollect.sinks.k2.hdfs.rollInterval = 60
#设置每个文件的滚动大小
dataCollect.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
dataCollect.sinks.k2.hdfs.rollCount = 0
#最小冗余数
dataCollect.sinks.k2.hdfs.minBlockReplicas = 1dataCollect.sinks.k3.type = hdfs
#指定采集的数据存储到HDFS的哪个目录下
dataCollect.sinks.k3.hdfs.path = hdfs://HadoopCluster/zhaopin/%Y%m
#上传文件的前缀
dataCollect.sinks.k3.hdfs.filePrefix = zlzp
#上传文件的后缀
dataCollect.sinks.k3.hdfs.fileSuffix = .csv
#是否按照时间滚动文件夹
dataCollect.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
dataCollect.sinks.k3.hdfs.roundValue = 24
#重新定义时间单位
dataCollect.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
dataCollect.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
dataCollect.sinks.k3.hdfs.batchSize = 10000
#设置文件类型,可支持压缩
dataCollect.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
dataCollect.sinks.k3.hdfs.rollInterval = 30
#设置每个文件的滚动大小
dataCollect.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
dataCollect.sinks.k3.hdfs.rollCount = 0
#最小冗余数
dataCollect.sinks.k3.hdfs.minBlockReplicas = 1# 5、配置agent中的source、sink、channel的关联
dataCollect.sources.s1.channels = c1
dataCollect.sinks.k1.channel = c1dataCollect.sources.s2.channels = c2
dataCollect.sinks.k2.channel = c2dataCollect.sources.s3.channels = c3
dataCollect.sinks.k3.channel = c3
Flume配置文件定义了数据采集的流程,主要包括:
- Source:监听
monitor
文件夹中的CSV文件变化。 - Channel:使用内存通道缓存数据。
- Sink:将数据上传到HDFS,按月份分区存储。
(2)数据清洗与预处理阶段:
本阶段的文件夹为clean文件夹,包含数据清洗脚本和清洗后的数据。
实现步骤
- 运行
zhaopin.sh
脚本,启动Spark Core程序。 - 从HDFS中读取原始数据,进行清洗和预处理。
- 将清洗后的数据保存到HDFS的指定目录中。
清洗与预处理的Scala项目结构:
代码(以zhaopinClean.scala为例)
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession, functions}import java.util.Calendar//将三个清洗后的数据源简单聚合
object zhaopinClean {def main(args: Array[String]): Unit = {//spark配置文件对象val sparkConf:SparkConf = new SparkConf().setAppName("zhaopinClean")val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()import sparkSession.implicits._val sc = sparkSession.sparkContext// 分别获取数据源val rddBoss : RDD[String] = sc.textFile(s"hdfs://HadoopCluster/zhaopin/${CollectDate.getCollectDate()}/boss/part-00000")val rddZlzp : RDD[String] = sc.textFile(s"hdfs://HadoopCluster/zhaopin/${CollectDate.getCollectDate()}/zlzp/part-00000")val rdd51job : RDD[String] = sc.textFile(s"hdfs://HadoopCluster/zhaopin/${CollectDate.getCollectDate()}/51job/part-00000")val rdd:RDD[String] = rddBoss.union(rddZlzp).union(rdd51job)// 定义schema信息val schemaString = "jobName salary jobCity experienceRequirement educationRequirement jobRequirement platform"// 对schema信息按空格进行分割// 最终fileds里包含了7个StructFieldval fields = schemaString.split(" ")// 字段类型,字段名称判断是不是为空.map(fieldName => StructField(fieldName, StringType, nullable = true))val schema = StructType(fields)// 把schema信息作用到RDD上// 形成Row类型的RDDval rowRDD = rdd.map(_.split(",")).map(x => Row(x(0), x(1), x(2), x(3), x(4), x(5), x(6)))// 通过SparkSession创建一个DataFrame// 传进来一个rowRDD和schema,将schema作用到rowRDD上val zhaopinDF = sparkSession.createDataFrame(rowRDD, schema)//对岗位名称的按“-”展开操作(三个清洗后的数据源以这种格式的岗位名称存在)val zhaopinDF1= zhaopinDF.withColumn("jobName",functions.explode(functions.split(functions.col("jobName"),"-"))).coalesce(1)//聚合并写入HDFS的一个文件中zhaopinDF1.write.mode(SaveMode.Overwrite).csv(s"hdfs://HadoopCluster/zhaopin/${CollectDate.getCollectDate()}/zhaopin")}
}
- 数据读取:从HDFS中读取三个平台的招聘数据。
- 数据清洗:
- 统一岗位名称格式(如将“Java开发工程师-高级”拆分为“Java开发工程师”和“高级”)。
- 统一薪资格式(如将“5-10k”转换为“7k”)。
- 统一城市名称格式(如将“北京·海淀”转换为“北京”)。
- 数据合并:将三个平台的数据合并为一个数据集,并保存到HDFS。
(3)数据存储与分析阶段:
本阶段的文件夹为analyze文件夹,包含Hive SQL脚本和Spark分析程序。
实现步骤
-
数据存储:
-
运行
hive.sh
脚本,将清洗后的数据加载到Hive表中。 -
zhaopin.sql创建明细宽表,按年份和月份分区存储数据。
-
hive.sh:
#!/bin/bash lastmonth=`date -d "last-month" "+%Y%m"` hive --hiveconf lastmonth=${lastmonth} -f /opt/zhaopin/analyze/zhaopin.sql
-
zhaopin.sql:
create database if not exists zhaopin;use zhaopin;create external table if not exists zhaopin_table(jobName string,salary string,jobCity string,experienceRequirement string,educationRequirement string,jobRequirement string,platform string)partitioned by (collect_date string) row format delimited fields terminated by ",";load data inpath "hdfs://HadoopCluster//zhaopin/${hiveconf:lastmonth}/zhaopin/*" into table zhaopin_table partition(collect_date="${hiveconf:lastmonth}");create table if not exists zhaopin_table_detail(jobName string,salary string,jobCity string,experienceRequirement string,educationRequirement string,jobRequirement string,platform string,collect_year string,collect_month string)partitioned by (collect_date string) row format delimited fields terminated by ",";insert into table zhaopin_table_detail partition(collect_date="${hiveconf:lastmonth}")selectjobName ,salary ,jobCity ,experienceRequirement ,educationRequirement ,jobRequirement ,platform ,substr(collect_date, 1, 4) ,substr(collect_date, 5, 2)from zhaopin_table where collect_date="${hiveconf:lastmonth}";
-
-
数据分析:
-
运行
analyze.sh
脚本,启动Spark SQL程序。 -
对Hive表中的数据进行分析,生成各类指标表(如平均薪资、招聘数量等)。
-
将分析结果存储到MySQL中,供可视化模块使用。
-
analyze.sh:
#!/bin/bash spark-submit --class zhaopinAnalyze --master yarn /opt/zhaopin/analyze/zhaopin-analyze-1.0-SNAPSHOT.jar
-
Spark SQL程序代码(zhaopinAnalyze.scala)
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{DataTypes, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession, functions}object zhaopinAnalyze {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setAppName("SparkSQL")val ss = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()//要放在第一个salary_avg_generate(ss)platform_analyze_generate(ss)name_city_generate(ss)experience_analyze_generate(ss)education_analyze_generate(ss)name_technology_generate(ss)ss.stop()}/*** 5-10k--->7* @param ss*/def salary_avg_generate(ss:SparkSession)={val dataFrameorgin = ss.sql("select * from zhaopin.zhaopin_table_detail")val rdd:RDD[Row] = dataFrameorgin.rddval rdd1:RDD[(String, String, String, String, String)] = rdd.map(row => {(row.getAs[String](0), row.getAs[String](1), row.getAs[String](2),row.getAs[String](3), row.getAs[String](4))})val rdd2:RDD[(String, String, String, String, String)] = rdd1.filter(line=>{if (line._2.contains("-")) trueelse false})val rdd3:RDD[(String, String, String, String, String)] = rdd2.map((line: (String, String, String, String, String)) => {val strings = line._2.dropRight(1).split("-")val salary: Int = ((strings(0).toInt + strings(1).toInt) / 2).toInt(line._1, salary.toString, line._3, line._4, line._5)})val frame = ss.createDataFrame(rdd3)frame.createOrReplaceTempView("salary_temp")}def experience_analyze_generate(ss:SparkSession)={experience_temp(ss)val dataFrame = ss.sql("select _1 experienceRequirement, round(avg(_2),2) avg_salary, sum(_3) num from experience_temp group by _1")val tableName="experience_analyze"put_mysql(dataFrame, tableName)}/*** 将经验5-10年转变为5条数据,分别是5年、6年。。。* @param ss*/def experience_temp(ss:SparkSession)={val dataFrameorgin = ss.sql("select _4 , round(avg(_2),2), count(*) from salary_temp group by _4")
// dataFrameorgin.show(20)val rdd:RDD[Row] = dataFrameorgin.rddval rdd1:RDD[(String, String, String)] = rdd.map(row => {(row.getAs[String](0), row.getAs[String](1), row.getAs[String](2))})val rdd2:RDD[(String, String, String)] = rdd1.filter(line=>{if (line._1.contains("-") && line._1.contains("年")) trueelse if ("经验不限".equals(line._1)) trueelse if ("10年以上".equals(line._1)) trueelse if ("1年以内".equals(line._1)) trueelse if ("在校/应届".equals(line._1)) trueelse false})val rdd3:RDD[(String, String, String)] = rdd2.map((line: (String, String, String)) => {var experience = line._1if (line._1.contains("-")) {val strings = line._1.dropRight(1).split("-")val newexperience = new StringBuffer("")val start = strings(0).toIntval end = strings(1).toIntfor( a <- start to end){newexperience.append(a+"-")}experience = newexperience.toString.dropRight(1)}(experience, line._2,line._3)})// rdd3.foreach(f=>{println(f.toString())})val structType = StructType(Array(StructField("_1", DataTypes.StringType),StructField("_2", DataTypes.DoubleType),StructField("_3", DataTypes.LongType)))val rdd4:RDD[Row] = rdd3.map(touple3 => {Row(touple3._1, touple3._2, touple3._3)})val dataFrame1:DataFrame = ss.createDataFrame(rdd4, structType)
// dataFrame1.show()val dataFrame= dataFrame1.withColumn("_1",functions.explode(functions.split(functions.col("_1"),"-"))).coalesce(1)dataFrame.show()dataFrame.createOrReplaceTempView("experience_temp")}def education_analyze_generate(ss:SparkSession)={val dataFrame = ss.sql("select _5 educationRequirement, round(avg(_2),2) avg_salary, count(*) num from salary_temp where _5 != 'null' group by _5")val tableName="education_analyze"put_mysql(dataFrame, tableName)}def name_technology_generate(ss:SparkSession)={val dataFrameorgin = ss.sql("select jobName, jobRequirement from zhaopin.zhaopin_table_detail")val dataFrame:DataFrame = dataFrameorgin.withColumn("jobRequirement", functions.explode(functions.split(functions.col("jobRequirement"), "-"))).coalesce(1)dataFrame.createOrReplaceTempView("technology_temp")val frame = ss.sql("select jobName, jobRequirement, count(*) num from technology_temp group by jobName, jobRequirement")val tableName="name_technology"put_mysql(frame, tableName)}def name_city_generate(ss:SparkSession)={val dataFrame = ss.sql("select _1 jobName, _3 jobCity, round(avg(_2),2) avg_salary, count(*) num from salary_temp group by _1, _3")val tableName="name_city"put_mysql(dataFrame, tableName)}def platform_analyze_generate(ss:SparkSession)={val dataFrame = ss.sql("select platform, count(*) num from zhaopin.zhaopin_table_detail group by platform")val tableName="platform_analyze"put_mysql(dataFrame, tableName)}def put_mysql (dataFrame: DataFrame, tableName: String) ={val url = "jdbc:mysql://192.168.88.101:3306/zhaopin?characterEncoding=UTF-8"val prop = new java.util.Propertiesprop.setProperty("user","root")prop.setProperty("password","123456")dataFrame.write.mode(SaveMode.Append).option("driver","com.mysql.jdbc.Driver").jdbc(url, tableName, prop)}}
- 薪资计算:将薪资区间(如“5-10k”)转换为中间值(如“7k”)。
- 经验分析:将经验要求(如“3-5年”)拆分为单个年份(如“3年”、“4年”、“5年”)。
- 学历分析:统计不同学历要求的招聘数量和平均薪资。
- 技术要求分析:统计不同岗位的技术要求出现频次,生成词云图数据。
- 数据存储:将分析结果存储到MySQL中。
3. 可视化网页代码分析
本阶段的文件夹为show文件夹
本阶段的实现步骤为运行zhaopin-show.sh可执行文件。使用Spring Boot+jQuery+ECharts程序将保存在MySQL数据库中的表数据以图表形式展示到可视化系统上来。zhaopin-show.sh中运行SpringBoot项目的运行主类,项目在java文件夹下的zhaopin-show文件夹下,项目就是简单的对于MySQL数据库中的结构化数据的简单展示,用ECharts做的展示。
4. Azkaban任务调度阶段实现
本阶段的实现步骤及结果如下所述。
(1)编写project.flow文件。编写Azkaban任务流,将数据清洗与预处理阶段和数据存储与分析阶段的任务按照依赖关系写入任务流。
(2)将project.flow文件和保存着Azkaban版本信息的azkaban.project文件打包成为压缩包。
(3)将压缩包文件上传到Azkaban项目中,生成任务调度流。
(4)设置调度规则。每月首日凌晨一点对上月的数据进行调度。