欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 房产 > 建筑 > Filnk并行度和算子链

Filnk并行度和算子链

2025/3/26 0:22:18 来源:https://blog.csdn.net/znjy111/article/details/146478351  浏览:    关键词:Filnk并行度和算子链

1. 并行度(Parallelism)

1.1 什么是并行度?

  • 基本概念:
    在 Flink 中,每个操作(算子)都可以被拆分成多个“子任务”(subtasks),这些子任务可以同时在不同的线程、机器或容器上运行。这种同时运行的方式就称为 并行处理
  • 类比:
    想象你有一个大任务需要做,比如在一大堆苹果中挑出红色的苹果。你可以自己一个人做,也可以叫上好几个人一起帮忙。如果你叫了 3 个人,那么总共就有 4 个人在同时挑苹果,这里就相当于并行度为 4。
  • 在 Flink 中:
    如果一个算子的并行度设置为 2,那么这个算子就会被分成 2 个子任务同时运行,从而提高处理速度。

1.2 如何设置并行度?

有几种方法可以设置并行度:

  • 代码中设置:
    对于某个具体算子:

    stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2);
    

    这表示这个 map 操作会被分成 2 个并行子任务。

  • 全局设置:
    在创建执行环境时:

    env.setParallelism(2);
    

    这样,默认情况下,所有算子都会使用 2 个并行任务。

  • 提交任务时设置:
    在使用命令提交作业时加上参数 -p,例如:

    bin/flink run -p 2 -c com.atguigu.wc.SocketStreamWordCount ./FlinkTutorial-1.0-SNAPSHOT.jar
    
  • 配置文件设置:
    flink-conf.yaml 中:

    parallelism.default: 2
    

    如果代码和提交参数中都没有设置并行度,则采用配置文件中的默认值。

2. 算子链(Operator Chain)

2.1 什么是算子链?

  • 基本概念:
    在数据处理过程中,数据需要经过多个算子(例如 mapfilter 等)处理。如果这些算子之间的处理关系是一对一的(即一个算子处理完的数据直接传给下一个算子,而不需要重新分区或打乱顺序),它们可以被“链”在一起执行,这样就称为 算子链
  • 类比:
    想象你在做一道菜,需要经过洗菜、切菜、炒菜三个步骤。如果你能把这三个步骤连在一起连续进行,就不需要每一步都停下来,这样能提高做菜的效率。在 Flink 中,把多个一对一的操作合并到同一个线程中运行,就能减少线程切换和数据在不同线程间传递的开销,从而提高效率。

示例说明

假设我们有一个简单的 Flink 流处理任务,它从 Socket 读取数据,然后依次执行以下操作:

  1. Map:将每个字符串转换成一个二元组(单词, 1)。
  2. Filter:过滤出以字母 “A” 开头的单词。
  3. KeyBySum:按单词分组并求和。

在默认情况下,Flink 会对 MapFilter 这两个一对一的算子进行链式合并,也就是说,它们会被打包到同一个任务(Task)中,由同一个线程执行。这可以减少线程切换和数据在不同任务之间传递的开销,从而提升性能。

代码示例

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class OperatorChainExample {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从 Socket 读取数据(例如:localhost 9999 端口)DataStream<String> text = env.socketTextStream("localhost", 9999);// 执行 Map、Filter、KeyBy 和 Sum 操作DataStream<Tuple2<String, Integer>> result = text// Map 算子:将输入字符串转换成 (单词, 1).map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}})// Filter 算子:只保留以 "A" 开头的单词.filter(new FilterFunction<Tuple2<String, Integer>>() {@Overridepublic boolean filter(Tuple2<String, Integer> value) throws Exception {return value.f0.startsWith("A");}})// keyBy 和 sum 通常会引入重分区(shuffle),这里单独作为一个阶段.keyBy(value -> value.f0).sum(1);// 打印结果result.print();// 执行任务env.execute("Operator Chain Example");}
}

具体说明

  1. Map 与 Filter 的链合

    • 在上述代码中,map()filter() 都是简单的、一对一的算子,它们的输入与输出之间没有重分区或者数据打乱。
    • Flink 会自动将这两个算子合并到同一个任务中运行,这就是 算子链。合并后,数据从 map() 输出后会直接传递给 filter(),而不需要经过额外的数据传输步骤。
  2. KeyBy 与 Sum 的阶段

    • 当你调用 keyBy() 时,会引入 重分区,因为需要根据 key 把数据重新分组。重分区通常会单独成为一个阶段,因此 keyBy() 之后的 sum() 可能会运行在不同的任务中,而不与前面的算子链合并。
  3. 手动调整算子链

    • 如果你希望 map()filter() 单独运行(例如为了调试或隔离资源),可以使用 disableChaining()

      .map(new MapFunction<String, Tuple2<String, Integer>>() { ... })
      .disableChaining() // 禁用后,该算子就不会与后续算子链合并
      
    • 或者使用 startNewChain() 来从当前算子开始新的链:

      .map(new MapFunction<String, Tuple2<String, Integer>>() { ... })
      .startNewChain() // 从此算子开始,新链不与前面的链合并
      
  • 算子链:通过将多个一对一算子合并到同一个任务中运行,可以减少线程切换和数据传输的开销,从而提高处理效率。
  • 自动与手动控制:Flink 默认会自动合并满足条件的算子链,但你可以通过 disableChaining()startNewChain() 手动控制合并情况。

2.2 什么时候使用算子链?

  • 默认情况:
    Flink 会自动对满足条件的一对一算子进行链式合并,从而优化执行效率。

  • 手动控制:
    如果你希望某个算子单独运行,不参与合并,可以使用:

    .map(word -> Tuple2.of(word, 1L)).disableChaining();
    

    如果你希望从某个算子开始新链,也可以使用:

    .map(word -> Tuple2.of(word, 1L)).startNewChain();
    

小结

  • 并行度 让我们可以同时处理多个数据片段,就像叫多个人同时帮忙完成一项任务一样,可以大大提高处理速度。
  • 算子链 则是优化处理过程的技术,把多个简单操作合并在一起执行,减少中间传递和线程切换的开销,从而提高整体效率。

通过这两个机制,Flink 能够高效地处理大规模数据流,并在集群环境下发挥出更强的并行处理能力。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词