1、概述
1)TimerService 按 key 和时间戳消除重复的定时器,即每个 key 和时间戳最多有一个定时器,如果为同一时间戳注册了多个计时器,那么 onTimer() 方法将只被调用一次;
2)Flink 会同步 onTimer() 和 processElement() 的调用,无需担心同时修改状态;
3)当应用程序从故障中恢复或从保存点启动时,本应在恢复前启动的检查点中的处理时间定时器将立即启动;
4)合并定时器
对于1秒(事件或处理时间)的定时器精度,可以将目标时间四舍五入到整秒,定时器最多会提前1秒触发,但不会晚于要求的毫秒精度,每个键和秒最多有一个计时器。
long coalescedTime = (ctx.timerService().currentProcessingTime() + timeout) / 1000 1000; ctx.timerService().registerProcessingTimeTimer(coalescedTime);long coalescedTime = ctx.timerService().currentWatermark() + 1; ctx.timerService().registerEventTimeTimer(coalescedTime);
2、完整代码示例
package com.xu.flink.datastream.day11;import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;/*** 1、TimerService 按 key 和时间戳消除重复的定时器,即每个 key 和时间戳最多有一个定时器,如果为同一时间戳注册了多个计时器,那么 onTimer() 方法将只被调用一次;* <p>* 2、Flink 会同步 onTimer() 和 processElement() 的调用,无需担心同时修改状态;* <p>* 3、当应用程序从故障中恢复或从保存点启动时,本应在恢复前启动的检查点中的处理时间定时器将立即启动;* <p>* 4、合并定时器* <p>* 对于1秒(事件或处理时间)的定时器精度,可以将目标时间四舍五入到整秒,定时器最多会提前1秒触发,但不会晚于要求的毫秒精度,每个键和秒最多有一个计时器* <p>* long coalescedTime = (ctx.timerService().currentProcessingTime() + timeout) / 1000 * 1000; ctx.timerService().registerProcessingTimeTimer(coalescedTime);** long coalescedTime = ctx.timerService().currentWatermark() + 1; ctx.timerService().registerEventTimeTimer(coalescedTime);*/
public class _04_OnTimerProcessTime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> input = env.socketTextStream("localhost", 8888);// 测试时限制了分区数,生产中需要设置空闲数据源env.setParallelism(2);/*** 间隔输入两个 a** 注册的定时器时间为=>1718685731000,当前的 timestamp=>null* 注册的定时器时间为=>1718685732000,当前的 timestamp=>null** 定时器触发时,当前的 Key=>a,当前的 Watermark=>-9223372036854775808,当前的 timestamp=>1718685731000* 2> (a,-a-a)** 定时器触发时,当前的 Key=>a,当前的 Watermark=>-9223372036854775808,当前的 timestamp=>1718685732000* 2> (a,-a-a)** ---------------------------------------------------------------------------------------------** 连续输入两个 a** 注册的定时器时间为=>1718685781000,当前的 timestamp=>null* 注册的定时器时间为=>1718685781000,当前的 timestamp=>null** 定时器触发时,当前的 Key=>a,当前的 Watermark=>-9223372036854775808,当前的 timestamp=>1718685781000** 2> (a,-a-a-a-a)*/input.keyBy(e -> e).process(new KeyedProcessFunction<String, String, Tuple2<String, String>>() {private ListState<String> listState;@Overridepublic void open(Configuration parameters) throws Exception {System.out.println("open 方法被调用");ListStateDescriptor<String> listStateDescriptor = new ListStateDescriptor<>("list-state", String.class);listState = getRuntimeContext().getListState(listStateDescriptor);}@Overridepublic void close() throws Exception {System.out.println("close 方法被调用");}@Overridepublic void onTimer(long timestamp, KeyedProcessFunction<String, String, Tuple2<String, String>>.OnTimerContext ctx, Collector<Tuple2<String, String>> out) throws Exception {String currentKey = ctx.getCurrentKey();long currentWatermark = ctx.timerService().currentWatermark();System.out.println("定时器触发时,当前的 Key=>" + currentKey + ",当前的 Watermark=>" + currentWatermark + ",当前的 timestamp=>" + timestamp);String res = "";for (String word : listState.get()) {res += "-" + word;}out.collect(new Tuple2<>(currentKey, res));}@Overridepublic void processElement(String input, KeyedProcessFunction<String, String, Tuple2<String, String>>.Context context, Collector<Tuple2<String, String>> collector) throws Exception {listState.add(input);long coalescedTime = (context.timerService().currentProcessingTime() + 5000) / 1000 * 1000;Long timestamp = context.timestamp();System.out.println("注册的定时器时间为=>" + coalescedTime + ",当前的 timestamp=>" + timestamp);context.timerService().registerProcessingTimeTimer(coalescedTime);}}).print();env.execute();}
}
3、测试用例
间隔输入两个 a注册的定时器时间为=>1718685731000,当前的 timestamp=>null注册的定时器时间为=>1718685732000,当前的 timestamp=>null定时器触发时,当前的 Key=>a,当前的 Watermark=>-9223372036854775808,当前的 timestamp=>17186857310002> (a,-a-a)定时器触发时,当前的 Key=>a,当前的 Watermark=>-9223372036854775808,当前的 timestamp=>17186857320002> (a,-a-a)---------------------------------------------------------------------------------------------连续输入两个 a注册的定时器时间为=>1718685781000,当前的 timestamp=>null注册的定时器时间为=>1718685781000,当前的 timestamp=>null定时器触发时,当前的 Key=>a,当前的 Watermark=>-9223372036854775808,当前的 timestamp=>17186857810002> (a,-a-a-a-a)