欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 教育 > 幼教 > 大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据

大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据

2024/10/25 11:23:39 来源:https://blog.csdn.net/w776341482/article/details/141664705  浏览:    关键词:大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节完成了如下的内容:

  • Flink 基本介绍
  • 基本概述 适用场景 框架特点 核心组成 生态发展 处理模型 组件架构

在这里插入图片描述
在这里插入图片描述

再次回到最初的起点,Hello Word Count!

Flink 流处理 (Stream Processing)

定义

流处理是指对持续不断的数据流进行实时处理。Flink 的流处理模式非常适合处理持续产生的数据,例如来自传感器、日志记录系统或金融交易的数据流。

核心概念

  • 无界数据流:流处理通常处理无界数据流,即数据流没有明确的结束点,持续不断地产生。
  • 事件时间:Flink 支持基于事件时间的处理,能够处理乱序和延迟数据,使得处理更加精确。事件时间指的是数据在其产生源头的时间。
  • 窗口操作:在流处理过程中,通常需要将数据按时间窗口(如滑动窗口、滚动窗口、会话窗口)进行分组,以便执行聚合或其他操作。
  • 状态管理:Flink 支持有状态的流处理,这意味着处理每条数据时,可以记住之前的数据状态。例如,在流中计算一个累积的总和或频率。

Flink 批处理 (Batch Processing)

定义

批处理是指对静态的、有界的数据集进行处理。这种处理通常用于一次性的大规模数据分析,如定期的业务报告生成、数据转换和加载任务。

核心概念

  • 有界数据集:批处理通常处理有界数据集,即数据集是固定大小的,有明确的开始和结束点。
  • 任务并行化:在批处理模式下,Flink 会将数据集划分为多个子任务,并行执行这些任务,以加快处理速度。
  • DataSet API:Flink 的 DataSet API 提供了一组高层次的操作符,用于对批数据集执行各种操作,如映射(map)、过滤(filter)、联接(join)和聚合(aggregate)。

单词统计(批数据)

需求说明

统计一个文件中各个单词出现的次数,把统计结果输出到文件

  • 读取数据源
  • 处理数据源
  • 将读取到的数据源文件中的每一行根据空格切分
  • 将切分好的每个单词拼接1
  • 根据单词聚合(将相同的单词放到一起)
  • 累加相同的单词(单词后面的1进行累加)
  • 保存处理结果

导入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flink-test</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.11.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.11.1</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.11.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>1.11.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>1.11.1</version><scope>provided</scope></dependency></dependencies></project>

编写代码

package icu.wzk;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;public class WordCount {public static void main(String[] args) throws Exception {String inPath = "word-count/word-count.txt";String outPath = "word-count/word-count-result.csv";// 获取Flink批处理执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 读取文件中的内容DataSet<String> text = env.readTextFile(inPath);// 对数据进行处理DataSet<Tuple2<String, Integer>> dataSet = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {for (String word : line.split(" ")) {collector.collect(new Tuple2<>(word, 1));}}}).groupBy(0).sum(1);dataSet.writeAsCsv(outPath, "\n", " ", FileSystem.WriteMode.OVERWRITE).setParallelism(1);// 触发执行程序env.execute("Word Count");}}

测试数据

Stateful Computations over Data Streams
Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.
Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.
Correctness guarantees
Exactly-once state consistency
Event-time processing
Sophisticated late data handling
SQL on Stream & Batch Data
DataStream API & DataSet API
ProcessFunction (Time & State)
Flexible deployment
High-availability setup
Savepoints

运行测试

在这里插入图片描述

结果数据

查看 word-count/word-count-result.csv 打开即可看到以下内容:

Stateful 1
any 1
common 1
computations 2
on 1
setup 1
state 1
streams. 1
unbounded 1
& 3
Data 2
DataStream 1
High-availability 1
for 1
perform 1
run 1
to 1
Event-time 1
Flexible 1
Sophisticated 1
framework 1
is 1
scale. 1
Exactly-once 1
ProcessFunction 1
Stream 1
a 1
been 1
handling 1
in 1
late 1
processing 2
Batch 1
DataSet 1
at 2
bounded 1
consistency 1
deployment 1
distributed 1
engine 1
has 1
API 2
Apache 1
Flink 2
SQL 1
Streams 1
all 1
designed 1
over 2
Computations 1
Savepoints 1
and 3
data 2
environments, 1
in-memory 1
speed 1
stateful 1
(Time 1
Correctness 1
State) 1
cluster 1
guarantees 1

单词统计(流数据)

需求说明

Socket模拟实时发送单词,使用Flink实时接收数据,对指定时间窗口内(如5秒)的数据进行聚合统计,每隔1秒汇总计算一次,并且把时间窗口内计算结果打印出来。

编写代码

Server部分

编写一个Socket服务,提供一定的数据流。

package icu.wzk;import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Random;public class WordCountServer {public static void main(String[] args) throws IOException, InterruptedException {String ip = "localhost";int port = 9999;Random random = new Random();ServerSocket serverSocket = new ServerSocket();InetSocketAddress address = new InetSocketAddress(ip, port);serverSocket.bind(address);Socket socket = serverSocket.accept();OutputStream outputStream = socket.getOutputStream();PrintWriter writer = new PrintWriter(outputStream, true);for (int i = 0; i < 1000; i ++) {int number = random.nextInt(100);System.out.println(number);writer.println(number);Thread.sleep((random.nextInt(900) + 100));}socket.close();serverSocket.close();}}

Flink部分

连接到上述的Server部分

package icu.wzk;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;public class WordCount2 {public static void main(String[] args) throws Exception {String ip = "localhost";int port = 9999;// 获取 Flink 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 获取 Socket 输入数据DataStreamSource<String> textStream = env.socketTextStream(ip, port, "\n");SingleOutputStreamOperator<Tuple2<String, Integer>> wordCount = textStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] splits = value.split("\\s");for (String word : splits) {out.collect(new Tuple2<>(word, 1));}}});SingleOutputStreamOperator<Tuple2<String, Integer>> word = wordCount.keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {@Overridepublic Object getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}}).timeWindow(Time.seconds(5), Time.seconds(1)).sum(1);// 输出并运行word.print();env.execute("Word Count");}}

观察结果

Server部分

35
18
84
72
24
51
15
13
65
98
55
68
22
84
17

Flink部分

3> (35,1)
4> (18,1)
3> (35,1)
5> (84,1)
4> (18,1)
6> (72,1)
3> (35,1)
5> (84,1)
5> (24,1)
3> (35,1)
6> (72,1)
4> (18,1)
7> (51,1)
5> (24,1)
5> (84,1)
4> (15,1)
6> (72,1)
7> (51,1)
3> (35,1)
4> (15,1)
4> (18,1)

运行结果过程截图如下所示:
在这里插入图片描述

过程总结

  • 获得一个执行环境
  • 加载、创建 初始化环境
  • 指定数据操作的算子
  • 指定结果数据存放位置
  • 调用Execute触发执行程序

注意:Flink程序是延迟计算的,只有最后调用execute()方法的时候才会真正的触发执行程序。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com