1. 并行度(Parallelism)
1.1 什么是并行度?
- 基本概念:
在 Flink 中,每个操作(算子)都可以被拆分成多个“子任务”(subtasks),这些子任务可以同时在不同的线程、机器或容器上运行。这种同时运行的方式就称为 并行处理。 - 类比:
想象你有一个大任务需要做,比如在一大堆苹果中挑出红色的苹果。你可以自己一个人做,也可以叫上好几个人一起帮忙。如果你叫了 3 个人,那么总共就有 4 个人在同时挑苹果,这里就相当于并行度为 4。 - 在 Flink 中:
如果一个算子的并行度设置为 2,那么这个算子就会被分成 2 个子任务同时运行,从而提高处理速度。
1.2 如何设置并行度?
有几种方法可以设置并行度:
-
代码中设置:
对于某个具体算子:stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2);
这表示这个
map
操作会被分成 2 个并行子任务。 -
全局设置:
在创建执行环境时:env.setParallelism(2);
这样,默认情况下,所有算子都会使用 2 个并行任务。
-
提交任务时设置:
在使用命令提交作业时加上参数-p
,例如:bin/flink run -p 2 -c com.atguigu.wc.SocketStreamWordCount ./FlinkTutorial-1.0-SNAPSHOT.jar
-
配置文件设置:
在flink-conf.yaml
中:parallelism.default: 2
如果代码和提交参数中都没有设置并行度,则采用配置文件中的默认值。
2. 算子链(Operator Chain)
2.1 什么是算子链?
- 基本概念:
在数据处理过程中,数据需要经过多个算子(例如map
、filter
等)处理。如果这些算子之间的处理关系是一对一的(即一个算子处理完的数据直接传给下一个算子,而不需要重新分区或打乱顺序),它们可以被“链”在一起执行,这样就称为 算子链。 - 类比:
想象你在做一道菜,需要经过洗菜、切菜、炒菜三个步骤。如果你能把这三个步骤连在一起连续进行,就不需要每一步都停下来,这样能提高做菜的效率。在 Flink 中,把多个一对一的操作合并到同一个线程中运行,就能减少线程切换和数据在不同线程间传递的开销,从而提高效率。
示例说明
假设我们有一个简单的 Flink 流处理任务,它从 Socket 读取数据,然后依次执行以下操作:
- Map:将每个字符串转换成一个二元组(单词, 1)。
- Filter:过滤出以字母 “A” 开头的单词。
- KeyBy 和 Sum:按单词分组并求和。
在默认情况下,Flink 会对 Map 和 Filter 这两个一对一的算子进行链式合并,也就是说,它们会被打包到同一个任务(Task)中,由同一个线程执行。这可以减少线程切换和数据在不同任务之间传递的开销,从而提升性能。
代码示例
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class OperatorChainExample {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从 Socket 读取数据(例如:localhost 9999 端口)DataStream<String> text = env.socketTextStream("localhost", 9999);// 执行 Map、Filter、KeyBy 和 Sum 操作DataStream<Tuple2<String, Integer>> result = text// Map 算子:将输入字符串转换成 (单词, 1).map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}})// Filter 算子:只保留以 "A" 开头的单词.filter(new FilterFunction<Tuple2<String, Integer>>() {@Overridepublic boolean filter(Tuple2<String, Integer> value) throws Exception {return value.f0.startsWith("A");}})// keyBy 和 sum 通常会引入重分区(shuffle),这里单独作为一个阶段.keyBy(value -> value.f0).sum(1);// 打印结果result.print();// 执行任务env.execute("Operator Chain Example");}
}
具体说明
-
Map 与 Filter 的链合
- 在上述代码中,
map()
和filter()
都是简单的、一对一的算子,它们的输入与输出之间没有重分区或者数据打乱。 - Flink 会自动将这两个算子合并到同一个任务中运行,这就是 算子链。合并后,数据从
map()
输出后会直接传递给filter()
,而不需要经过额外的数据传输步骤。
- 在上述代码中,
-
KeyBy 与 Sum 的阶段
- 当你调用
keyBy()
时,会引入 重分区,因为需要根据 key 把数据重新分组。重分区通常会单独成为一个阶段,因此keyBy()
之后的sum()
可能会运行在不同的任务中,而不与前面的算子链合并。
- 当你调用
-
手动调整算子链
-
如果你希望
map()
或filter()
单独运行(例如为了调试或隔离资源),可以使用disableChaining()
:.map(new MapFunction<String, Tuple2<String, Integer>>() { ... }) .disableChaining() // 禁用后,该算子就不会与后续算子链合并
-
或者使用
startNewChain()
来从当前算子开始新的链:.map(new MapFunction<String, Tuple2<String, Integer>>() { ... }) .startNewChain() // 从此算子开始,新链不与前面的链合并
-
- 算子链:通过将多个一对一算子合并到同一个任务中运行,可以减少线程切换和数据传输的开销,从而提高处理效率。
- 自动与手动控制:Flink 默认会自动合并满足条件的算子链,但你可以通过
disableChaining()
或startNewChain()
手动控制合并情况。
2.2 什么时候使用算子链?
-
默认情况:
Flink 会自动对满足条件的一对一算子进行链式合并,从而优化执行效率。 -
手动控制:
如果你希望某个算子单独运行,不参与合并,可以使用:.map(word -> Tuple2.of(word, 1L)).disableChaining();
如果你希望从某个算子开始新链,也可以使用:
.map(word -> Tuple2.of(word, 1L)).startNewChain();
小结
- 并行度 让我们可以同时处理多个数据片段,就像叫多个人同时帮忙完成一项任务一样,可以大大提高处理速度。
- 算子链 则是优化处理过程的技术,把多个简单操作合并在一起执行,减少中间传递和线程切换的开销,从而提高整体效率。
通过这两个机制,Flink 能够高效地处理大规模数据流,并在集群环境下发挥出更强的并行处理能力。