欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 财经 > 创投人物 > 58、Flink 的处理时间定时器代码示例

58、Flink 的处理时间定时器代码示例

2024/11/30 10:39:06 来源:https://blog.csdn.net/m0_50186249/article/details/140005397  浏览:    关键词:58、Flink 的处理时间定时器代码示例

1、概述

1)TimerService 按 key 和时间戳消除重复的定时器,即每个 key 和时间戳最多有一个定时器,如果为同一时间戳注册了多个计时器,那么 onTimer() 方法将只被调用一次;

2)Flink 会同步 onTimer() 和 processElement() 的调用,无需担心同时修改状态;

3)当应用程序从故障中恢复或从保存点启动时,本应在恢复前启动的检查点中的处理时间定时器将立即启动;

4)合并定时器

对于1秒(事件或处理时间)的定时器精度,可以将目标时间四舍五入到整秒,定时器最多会提前1秒触发,但不会晚于要求的毫秒精度,每个键和秒最多有一个计时器。

long coalescedTime = (ctx.timerService().currentProcessingTime() + timeout) / 1000  1000; ctx.timerService().registerProcessingTimeTimer(coalescedTime);long coalescedTime = ctx.timerService().currentWatermark() + 1; ctx.timerService().registerEventTimeTimer(coalescedTime);

2、完整代码示例

package com.xu.flink.datastream.day11;import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;/*** 1、TimerService 按 key 和时间戳消除重复的定时器,即每个 key 和时间戳最多有一个定时器,如果为同一时间戳注册了多个计时器,那么 onTimer() 方法将只被调用一次;* <p>* 2、Flink 会同步 onTimer() 和 processElement() 的调用,无需担心同时修改状态;* <p>* 3、当应用程序从故障中恢复或从保存点启动时,本应在恢复前启动的检查点中的处理时间定时器将立即启动;* <p>* 4、合并定时器* <p>* 对于1秒(事件或处理时间)的定时器精度,可以将目标时间四舍五入到整秒,定时器最多会提前1秒触发,但不会晚于要求的毫秒精度,每个键和秒最多有一个计时器* <p>* long coalescedTime = (ctx.timerService().currentProcessingTime() + timeout) / 1000 * 1000; ctx.timerService().registerProcessingTimeTimer(coalescedTime);** long coalescedTime = ctx.timerService().currentWatermark() + 1; ctx.timerService().registerEventTimeTimer(coalescedTime);*/
public class _04_OnTimerProcessTime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> input = env.socketTextStream("localhost", 8888);// 测试时限制了分区数,生产中需要设置空闲数据源env.setParallelism(2);/*** 间隔输入两个 a** 注册的定时器时间为=>1718685731000,当前的 timestamp=>null* 注册的定时器时间为=>1718685732000,当前的 timestamp=>null** 定时器触发时,当前的 Key=>a,当前的 Watermark=>-9223372036854775808,当前的 timestamp=>1718685731000* 2> (a,-a-a)** 定时器触发时,当前的 Key=>a,当前的 Watermark=>-9223372036854775808,当前的 timestamp=>1718685732000* 2> (a,-a-a)** ---------------------------------------------------------------------------------------------** 连续输入两个 a** 注册的定时器时间为=>1718685781000,当前的 timestamp=>null* 注册的定时器时间为=>1718685781000,当前的 timestamp=>null** 定时器触发时,当前的 Key=>a,当前的 Watermark=>-9223372036854775808,当前的 timestamp=>1718685781000** 2> (a,-a-a-a-a)*/input.keyBy(e -> e).process(new KeyedProcessFunction<String, String, Tuple2<String, String>>() {private ListState<String> listState;@Overridepublic void open(Configuration parameters) throws Exception {System.out.println("open 方法被调用");ListStateDescriptor<String> listStateDescriptor = new ListStateDescriptor<>("list-state", String.class);listState = getRuntimeContext().getListState(listStateDescriptor);}@Overridepublic void close() throws Exception {System.out.println("close 方法被调用");}@Overridepublic void onTimer(long timestamp, KeyedProcessFunction<String, String, Tuple2<String, String>>.OnTimerContext ctx, Collector<Tuple2<String, String>> out) throws Exception {String currentKey = ctx.getCurrentKey();long currentWatermark = ctx.timerService().currentWatermark();System.out.println("定时器触发时,当前的 Key=>" + currentKey + ",当前的 Watermark=>" + currentWatermark + ",当前的 timestamp=>" + timestamp);String res = "";for (String word : listState.get()) {res += "-" + word;}out.collect(new Tuple2<>(currentKey, res));}@Overridepublic void processElement(String input, KeyedProcessFunction<String, String, Tuple2<String, String>>.Context context, Collector<Tuple2<String, String>> collector) throws Exception {listState.add(input);long coalescedTime = (context.timerService().currentProcessingTime() + 5000) / 1000 * 1000;Long timestamp = context.timestamp();System.out.println("注册的定时器时间为=>" + coalescedTime + ",当前的 timestamp=>" + timestamp);context.timerService().registerProcessingTimeTimer(coalescedTime);}}).print();env.execute();}
}

3、测试用例

		  间隔输入两个 a注册的定时器时间为=>1718685731000,当前的 timestamp=>null注册的定时器时间为=>1718685732000,当前的 timestamp=>null定时器触发时,当前的 Key=>a,当前的 Watermark=>-9223372036854775808,当前的 timestamp=>17186857310002> (a,-a-a)定时器触发时,当前的 Key=>a,当前的 Watermark=>-9223372036854775808,当前的 timestamp=>17186857320002> (a,-a-a)---------------------------------------------------------------------------------------------连续输入两个 a注册的定时器时间为=>1718685781000,当前的 timestamp=>null注册的定时器时间为=>1718685781000,当前的 timestamp=>null定时器触发时,当前的 Key=>a,当前的 Watermark=>-9223372036854775808,当前的 timestamp=>17186857810002> (a,-a-a-a-a)

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com