一 平台搭建与运维
任务一:大数据平台搭建
子任务一:Hadoop安装配置
修改主机名
hostname master & bash
hostname slave1 & bash
hostname slave2 & bash
配置hosts文件
vim /ets/hosts
172.18.2.44 master
172.18.14.185 slave1
172.18.14.184 slave2
1.解压hadoop和jdk。
mkdir -p /opt/software/
mkdir -p /opt/module/
cp /root/software/package/* /opt/software/
tar -zxvf jdk-8u191-linux-x64.tar.gz -C /opt/module/
tar -zxvf hadoop-3.1.3.tar.gz -C /opt/module/
2. JDK 环境变量并使其生效。
vim /etc/profile
export JAVA_HOME=/opt/module/jdk1.8.0_191
export PATH= P A T H : PATH: PATH:JAVA_HOME/bin
source /etc/profile
免密登录
ssh-keygen
ssh-copy-id -i /root/.ssh/id_rsa.pub master
分发jdk及配置文件
scp -r /opt/module/jdk1.8.0_191/ root@slave1:/opt/module/
scp /etc/profile root@slave1:/etc/profile
修改 hadoop-env.sh 文件,添加 jdk 环境变量
echo “export JAVA_HOME=/opt/module/jdk1.8.0_191” >> /opt/module/hadoop-3.1.3/etc/hadoop/hadoop-env.sh
vim core-site.xml
fs.defaultFS
hdfs://master:9000
hadoop.tmp.dir
/opt/module/hadoop-3.1.3/data
hadoop.security.authorization
true
vim hdfs-site.xml
dfs.namenode.http-address
master:9870
dfs.replication
3
dfs.permissions.enabled
true
dfs.permissions.superusergroup
root
vim mapred-site.xml
mapreduce.framework.name
yarn
yarn.app.mapreduce.am.env
HADOOP_MAPRED_HOME=/opt/module/hadoop-3.1.3
mapreduce.map.env
HADOOP_MAPRED_HOME=/opt/module/hadoop-3.1.3
mapreduce.reduce.env
HADOOP_MAPRED_HOME=/opt/module/hadoop-3.1.3
vim yarn-site.xml
yarn.nodemanager.aux-services
mapreduce_shuffle
yarn.resourcemanager.hostname
master
yarn.resourcemanager.bind-host
master
yarn.resourcemanager.webapp.address
0.0.0.0:0
yarn.resourcemanager.webapp.https.address
0.0.0.0:0
yarn.nodemanager.webapp.address
0.0.0.0:0
yarn.nodemanager.webapp.https.address
0.0.0.0:0
yarn.resourcemanager.scheduler.address
0.0.0.0:0
yarn.nodemanager.env-whitelist
JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME
vim workers
master
slave1
slave2
分发hadoop
scp -r /opt/module/hadoop-3.1.3/ root@slave1:/opt/module/
配置hadoop环境变量
export HADOOP_HOME=/opt/module/hadoop-3.1.3
export PATH= P A T H : PATH: PATH:HADOOP_HOME/bin: H A D O O P H O M E / s b i n e x p o r t H A D O O P C O N F D I R = HADOOP_HOME/sbin export HADOOP_CONF_DIR= HADOOPHOME/sbinexportHADOOPCONFDIR=HADOOP_HOME/etc/hadoop
export HADOOP_CLASSPATH=$(hadoop classpath)
export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root
export HDFS_JOURNALNODE_USER=root
export HDFS_ZKFC_USER=root
分发环境变量
scp /etc/profile root@slave1:/etc/
格式化文件系统
hdfs namenode -format
启动集群
start-all.sh
子任务二:flume安装配置
解压
tar -xzvf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
vim /etc/profile
export FLUME_HOME=/opt/module/apache-flume-1.9.0-bin
export PATH= P A T H : PATH: PATH:FLUME_HOME/bin
flume-ng version
修改配置文件
mv /opt/module/apache-flume-1.9.0-bin/conf/flume-env.sh.template flume-env.sh
echo “export JAVA_HOME=/opt/module/jdk1.8.0_191” >> /opt/module/apache-flume-1.9.0-bin/conf/flume-env.sh
vim flume.conf
定义Flume代理名称
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
定义输入源
agent1.sources.source1.type = exec
agent1.sources.source1.command = cat /opt/module/hadoop-3.1.3/logs/hadoop-root-namenode-master.log
定义输出目标
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = hdfs://master:9000/tmp/flume/
agent1.sinks.sink1.hdfs.filePrefix = log-
agent1.sinks.sink1.hdfs.fileSuffix = .txt
agent1.sinks.sink1.hdfs.rollInterval = 120
agent1.sinks.sink1.hdfs.rollSize = 1024
agent1.sinks.sink1.hdfs.rollCount = 0
定义通道
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 1000
agent1.channels.channel1.transactionCapacity = 100
将输入源与输出目标连接起来
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
解决flume和hadoop包冲突
mv /opt/module/apache-flume-1.9.0-bin/lib/guava-11.0.2.jar /opt/module/apache-flume-1.9.0-bin/lib/guava-11.0.2.jar_back
cp /opt/module/hadoop-3.1.3/share/hadoop/common/lib/guava-27.0-jre.jar /opt/module/apache-flume-1.9.0-bin/lib/
启动flume传输日志
bin/flume-ng agent -n agent1 -c conf/ -f conf/flume.conf -Dflume.root.logger=INFO,console
hadoop fs -cat /tmp/flume/*
子任务三:Flink on Yarn安装配置
解压配置环境变量
tar -xvf /opt/software/flink-1.14.0-bin-scala_2.12.tgz -C /opt/module/
vim /etc/profile
export FLINK_HOME=/opt/module/flink-1.14.0
export PATH= P A T H : PATH: PATH:FLINK_HOME/bin
source /etc/profile
追加 hadoop 配置
echo "
fs.hdfs.hadoopconf: $HADOOP_HOME/etc/hadoop
classloader.check-leaked-classloader: false
log4j.logger.org.apache.flink: DEBUG, console" >> /opt/module/flink-1.14.0/conf/flink-conf.yaml
注意需要将yarn-site.xml文件拷贝到flink的conf目录下,添加如下内容,重启hadoop。
vim /opt/module/hadoop-3.1.3/etc/hadoop/yarn-site.xml
拷贝yarn-site.xml到flink中
cp /opt/module/hadoop-3.1.3/etc/hadoop/yarn-site.xml /opt/module/flink-1.14.0/conf/
分发hadoop配置文件并重启hadoop集群
scp /opt/module/hadoop-3.1.3/etc/hadoop/yarn-site.xml root@slave1:/opt/module/hadoop-3.1.3/etc/hadoop/
stop-all.shstart-all.sh
start-all.sh
离开安全模式
hdfs dfsadmin -safemode leave
运行wordcount
flink run -m yarn-cluster -p 2 -yjm 2G -ytm 2G $FLINK_HOME/examples/batch/WordCount.jar
任务二:数据库配置维护
子任务一:数据库配置
sql
初始化数据库
/usr/sbin/mysqld --initialize-insecure --console --user=mysql
开启MySQL服务
systemctl start mysqld
登录MySQL
mysql -uroot -p123456
查看 mysql 库下的所有表
show tables in mysql;
修改登录权限
select user,host from mysql.user;
update mysql.user set host=‘%’ where host=‘localhost’;
flush privileges;
创建新用户并授权
create user ‘user01’@‘localhost’ IDENTIFIED BY ‘123456’;
GRANT ALL PRIVILEGES ON . To ‘user01’@‘localhost’ ;
flush privileges;
子任务二:创建相关表
sql
创建数据库
create database test01;
use test01;
创建数据表
CREATE TABLE hotel (
Id INT PRIMARY KEY,
hotel_name VARCHAR(255),
City VARCHAR(255),
Province VARCHAR(255),
Level VARCHAR(255),
room_num INT,
Score FLOAT,
shopping VARCHAR(255)
);
CREATE TABLE comment (
Id INT PRIMARY KEY,
Name VARCHAR(255),
commentator VARCHAR(255),
Score FLOAT,
comment_time TIMESTAMP,
Content TEXT
);
子任务三:维护数据表
mysql
导入本地数据
source /root/eduhq/04shoping/hotel_all_data.sql;
source /root/eduhq/04shoping/comment_all_data.sql;
删除id为25的hotel_all数据
delete from hotel_all where id =25;
在comment_all表中将id为30的评分改为5
update comment_all set score=5 where id=30;
二 数据获取与处理
任务一:数据获取与清洗
子任务一:数据获取
python
vim M2-T1-S1-1.py
import pandas as pd
df = pd.read_csv(‘/root/eduhq/04shoping/shopping.csv’)
print(df)
子任务二:数据清洗
python
vim 1.py
删除库存小于10和大于10000的数据
import pandas as pd
data1 = pd.read_csv(‘/root/eduhq/04shoping/shopping.csv’)
data1 = data1[(data1[‘库存’] >= 10) & (data1[‘库存’] <= 10000)]
data1.to_csv(‘/root/eduhq/04shoping/M2/T1/S2/shop1.csv’, encoding=‘utf8’, index=False)
print(‘删除库存小于 10 或库存大于 10000 的数据已经保存~’)
python
vim 2.py 有问题***************
import pandas as pd
data = pd.read_csv(‘/root/eduhq/04shoping/shopping.csv’)
filtered_data = data[~data[‘名称’].str.contains(“刷单|捡漏”)]
filtered_data.to_csv(“/root/eduhq/04shoping/M2/T1/S2/shop2.csv”, encoding=‘utf8’, index=False)
python
vim 3.py
import pandas as pd
data = pd.read_csv(‘/root/eduhq/04shoping/shopping.csv’)
filtered_data = data[~data[‘名称’].str.contains(“女装”)]
filtered_data.to_csv(“/root/eduhq/04shoping/M2/T1/S2/shop3.csv”, index=False, encoding=‘utf8’)
python
vim 4.py 手机价格为区间数据的,设置为价格区间的平均数
import pandas as pd
def mean_price(x):
if ‘-’ in x:
low, high = x.split(‘-’)
price = (round(float(low), 2) + round(float(high), 2)) / 2
return price
else:return round(float(x), 2)
def main():
data = pd.read_csv('/root/eduhq/04shoping/shopping.csv')
data['价格'] = data['价格'].apply(lambda x: mean_price(x))data.to_csv('/root/eduhq/04shoping/M2/T1/S2/shop4.csv', encoding='utf8', index=False)
print('平均价格数据已保存~')
if name == ‘main’:
main()
任务二:数据标注
python
vim 1.py cat model_comment.csv | grep 正向 | wc -l
import pandas as pd
from snownlp import SnowNLP
读取数据
df = pd.read_csv(‘/root/eduhq/04shoping/model_comment.csv’, encoding=‘utf-8’) 修改为正确的文件路径和编码格式
定义情感倾向标注函数
def get_sentiment_label(sentiment):
if sentiment >= 0.6:
return ‘正向’
elif sentiment > 0.4:
return ‘中性’
else:
return ‘负向’
标注情感倾向并存入新的 DataFrame
standard_df = pd.DataFrame(columns=[‘编号’, ‘手机品牌’, ‘评论信息’, ‘情感倾向’, ‘编号’])
for index, row in df.iterrows():
comment = row[‘评论信息’]
sentiment = SnowNLP(comment).sentiments
label = get_sentiment_label(sentiment)
standard_df.loc[index] = [index+1, row[‘手机品牌’], comment, label, index+1]
存储标注结果
standard_df.to_csv(‘/root/eduhq/04shoping/M2/T2/S1/model_sen.csv’, encoding=‘utf-8’, index=False) 修改保存文件的编码格式
standard_df.to_excel(‘/root/eduhq/04shoping/M2/T2/S1/model_sen.xlsx’, sheet_name=‘sheet1’, index=False)
任务三:数据统计
子任务一:HDFS文件上传下载
上传文件
hdfs dfs -mkdir /input/
hdfs dfs -put mobile.txt /input/
hdfs dfs -cp /input/mobile.txt /user/hive/warehouse/
子任务二:处理异常数据
<dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.1.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency>
</dependencies>
java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;
import java.io.IOException;
/**
- 清除数据中分隔符混乱的,多于11个字段的数据
*/
public class Mobile_ETL {
public static class MyMapper extends Mapper<LongWritable, Text,Text, NullWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 获取行数据
String line = value.toString();
// 切分数据
String[] split = line.split(“\t”);
// 按照字段分隔符切分,过滤数据字段长度大于11的数据
if (split.length <= 11){
context.write(new Text(line),NullWritable.get());
}
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
// 设置程序可通过主机名访问hdfs
System.setProperty(“HADOOP_USER_NAME”,“root”);
conf.set(“dfs.client.use.datanode.hostname”, “true”);
conf.set(“fs.defaultFS”,“hdfs://bigdata:8020”);// HDFS集群中NameNode的URI,获取DistributedFileSystem实例);
// 自动补全log4j日志
BasicConfigurator.configure();
// 创建job实例
Job job = Job.getInstance(conf);
// 配置job
job.setJarByClass(Mobile_ETL.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job,new Path(“/input/mobile.txt”));
Path path = new Path(“/mr/output/result/”);
// 文件对象实例
FileSystem fileSystem = path.getFileSystem(conf);
if (fileSystem.exists(path)){
fileSystem.delete(path,true);
}
FileOutputFormat.setOutputPath(job,path);
// 提交应用程序
System.exit(job.waitForCompletion(true) ? 0:1);
}
}
子任务三:数据统计
java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;
import java.io.IOException;
import java.util.*;
/*******************************
-
@Author: 叶明君
-
@DateTime: 2024/10/16 10:54
-
TODO:根据user_impression这一字段,统计买家对商家销售的手机商品的印象,结果按照印象数降序排序,将结果写入 HDFS
********************************/
public class Mobile_IM {
public static class MyMapper extends Mapper<LongWritable, Text,Text, IntWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 获取行数据
String line = value.toString();
// 切分数据
String[] split = line.split(“\t”);
// System.out.println(split.length);
if (split.length == 11){String[] user_impressions = split[6].split(" ");for (String string : user_impressions) {// 获取user_impression字段数据context.write(new Text(string),new IntWritable(1));}}}
}
public static class MyReducer extends Reducer<Text, IntWritable,Text, IntWritable>{private TreeMap<Integer, List<String>> sortedMap;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {sortedMap = new TreeMap<>(Collections.reverseOrder());}@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable value : values) {sum += value.get();}// 根据值将键添加到 TreeMap 中if (sortedMap.containsKey(sum)) {sortedMap.get(sum).add(key.toString());} else {List<String> keysList = new ArrayList<>();keysList.add(key.toString());sortedMap.put(sum, keysList);}}@Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {// 输出按值降序排列的结果for (Map.Entry<Integer, List<String>> entry : sortedMap.entrySet()) {int value = entry.getKey();List<String> keysList = entry.getValue();for (String key : keysList) {context.write(new Text(key), new IntWritable(value));}}}
}
public static void main(String[] args) throws Exception{Configuration conf = new Configuration();// 设置程序可通过主机名访问hdfsSystem.setProperty("HADOOP_USER_NAME","root");conf.set("dfs.client.use.datanode.hostname", "true");conf.set("fs.defaultFS","hdfs://bigdata:8020");// HDFS集群中NameNode的URI,获取DistributedFileSystem实例);// 自动补全log4j日志BasicConfigurator.configure();// 创建job实例Job job = Job.getInstance(conf);// 配置jobjob.setJarByClass(Mobile_IM.class);job.setMapperClass(MyMapper.class);job.setReducerClass(MyReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);job.setNumReduceTasks(1);FileInputFormat.setInputPaths(job,new Path("/input/mobile.txt"));Path path = new Path("/impress_count/");// 文件对象实例FileSystem fileSystem = path.getFileSystem(conf);if (fileSystem.exists(path)){fileSystem.delete(path,true);}FileOutputFormat.setOutputPath(job,path);// 提交应用程序System.exit(job.waitForCompletion(true) ? 0:1);
}
}
三 业务数据分析与可视化
任务一:数据分析与可视化
子任务一:数据分析
python
vim M3-T1-S1-1.py
import pandas as pd
df = pd.read_csv(‘/root/eduhq/04shoping/shopping.csv’)
df[‘brand’] = df[‘名称’].apply(lambda x: x.split(’ ')[0])
brand_counts = df[‘brand’].value_counts().sort_values()
print(brand_counts.sort_values())
获取出现次数最多的前十个品牌
top_brands = brand_counts.tail(10)
输出结果
print(“前十名品牌:”)
print(top_brands)
python
M3-T1-S1-2.py
import pandas as pd
df = pd.read_csv(‘/root/eduhq/04shoping/shopping.csv’)
品牌名称
df[‘brand’] = df[‘名称’].apply(lambda x: x.split(’ ‘)[0])
特征长度
df[‘fea’] = df[‘名称’].apply(lambda x: len(x.split(’ '))-1)
每个品牌名称的特征总长度
fea_count = df.groupby([‘brand’])[‘fea’].sum()
排序
fea_count_sort = fea_count.sort_values()
print(fea_count_sort[-6:])
python
M3-T1-S1-3.py
import pandas as pd
df = pd.read_csv(‘/root/eduhq/04shoping/shopping.csv’)
df[‘brand’] = df[‘名称’].apply(lambda x: x.split(’ ')[0])
brand_sales = df[[‘brand’,‘销量’]].groupby(‘brand’)[‘销量’].count()
brand_sales_sort = brand_sales.sort_values()
print(brand_sales_sort[-5:])
子任务二:数据可视化
python
1.py
import pandas as pd
import matplotlib.pyplot as plt
plt.rcParams[‘font.sans-serif’] = [‘SimHei’] 用来正常显示中文标签
plt.rcParams[‘axes.unicode_minus’] = False 用来正常显示负号
读取CSV文件
df = pd.read_csv(‘/root/eduhq/04shoping/shopping.csv’)
将价格范围划分为5个区间,并计算每个区间的销量总和
bins = [0, 1000, 2000, 3000, 4000, 5000,6000,7000,8000,100000]
group_names = [‘<1000’, ‘1000-2000’, ‘2000-3000’, ‘3000-4000’, ‘4000-5000’, ‘5000-6000’, ‘6000-7000’, ‘7000-8000’,‘>8000’]
df[‘价格’] = df[‘价格’].str.split(‘-’).str[0].astype(float) 转换价格数据为float类型
df[‘价格区间’] = pd.cut(df[‘价格’], bins=bins, labels=group_names)
df_grouped = df.groupby(‘价格区间’)[‘销量’].sum().reset_index()
绘制柱状图
plt.bar(df_grouped[‘价格区间’], df_grouped[‘销量’])
plt.title(‘不同价格区间手机销售情况’)
plt.xlabel(‘价格区间’)
plt.xticks(rotation=45)
plt.ylabel(‘销量’)
plt.savefig(‘/root/eduhq/04shoping/M3/T1/S2/1.png’)
python
2.py
import pandas as pd
import matplotlib.pyplot as plt
读取CSV文件
df = pd.read_csv(‘/root/eduhq/04shoping/M3/T1/S2/shopping1.csv’)
提取品牌名称
df[‘brand’] = df[‘名称’].str.split(’ ').str[0]
计算每个地区的手机品牌销售数量
brand_counts = df.groupby([‘地区’, ‘brand’])[‘销量’].sum().reset_index()
设置中文显示和负号显示
plt.rcParams.update({‘font.sans-serif’: [‘SimHei’], ‘axes.unicode_minus’: False})
绘制饼图
fig, axes = plt.subplots(2, 1, figsize=(10, 10))
for area, ax in zip(df[‘地区’].unique(), axes):
data = brand_counts[brand_counts[‘地区’] == area]
data.set_index(‘brand’)[‘销量’].plot(kind=‘pie’, autopct=‘%1.1f%%’, ax=ax)
ax.set_title(f’{area}手机品牌销量占比’)
plt.savefig(‘/root/eduhq/04shoping/M3/T1/S2/2.png’)
任务二:业务分析与方案设计
子任务一:业务分析
python
import pandas as pd
import matplotlib.pyplot as plt
plt.rcParams[‘font.sans-serif’] = [‘SimHei’] 用来正常显示中文标签
plt.rcParams[‘axes.unicode_minus’] = False 用来正常显示负号
读取数据
df = pd.read_csv(‘/root/eduhq/04shoping/M3/T2/S1/model_sen1.csv’)
数据清洗
df[‘评论信息’] = df[‘评论信息’].str.strip(‘"’) 去除评论信息中的双引号
根据手机品牌和情感倾向对数据进行分组,并计算每种情感倾向的数量
grouped = df.groupby([‘手机品牌’, ‘情感倾向’]).size().unstack(fill_value=0)
选择特定品牌的数据
brand = ‘Xiaomi Redmi’
brand_data = grouped.loc[brand]
绘制折线图
plt.figure(figsize=(12,10))
brand_data.plot(kind=‘line’, marker=‘o’)
plt.title(f’Sentiment Analysis for {brand}‘)
plt.xticks(rotation=30)
plt.xlabel(‘Date’)
plt.ylabel(‘Count’)
plt.savefig(’/root/eduhq/04shoping/M3/T2/S1/3.png’)
子任务二:报表分析
mkdir -p /root/eduhq/04shoping/M3/T2/S2/