欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 财经 > 产业 > Flink 窗口触发器Triggers

Flink 窗口触发器Triggers

2024/10/23 19:48:38 来源:https://blog.csdn.net/happycao123/article/details/142990026  浏览:    关键词:Flink 窗口触发器Triggers

Triggers

定义:触发器决定了窗口何时被触发。在Flink中,窗口的触发是通过设置定时器来实现的。

作用:控制窗口数据的聚合时机,确保数据在适当的时间点被处理和输出。

Trigger关键方法

onElement: 当元素被添加到窗口时调用,用于注册定时器或更新窗口状态。

onElement(T element, long timestamp, W window, TriggerContext ctx);

onEventTime:当事件时间计时器触发时调用,用于处理事件时间相关的触发逻辑。

onEventTime(long time, W window, TriggerContext ctx);

onProcessingTime :当处理时间计时器触发时调用,这里时间指机器处理时间,而不考虑时间本身的时间。见后文ProcessingTimeTrigger实现

onProcessingTime(long time, W window, TriggerContext ctx);

clear 当窗口被删除时调用,用于清理窗口的状态和定时器。

clear(W window, TriggerContext ctx);

内置Trigger

Flink提供了多种内置的触发器,以下为几种常用类型:

  • EventTimeTrigger 工作原理:基于事件时间和水印(Watermark)机制来触发窗口计算。当窗口的最大时间戳小于等于当前的水印时,立即触发窗口计算。

  • ProcessingTimeTrigger 工作原理:基于处理时长(即机器的系统时间)来触发窗口计算。当处理时间达到窗口的结束时间时,触发窗口计算。

    @Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {return TriggerResult.FIRE;}
  • CountTrigger 工作原理:根据窗口内元素的数量来触发计算。当窗口内的元素数量达到预设的阈值时,触发窗口计算。

  ReducingState<Long> count = ctx.getPartitionedState(stateDesc);count.add(1L);if (count.get() >= maxCount) {count.clear();return TriggerResult.FIRE;}return TriggerResult.CONTINUE;
  • ContinuousEventTimeTrigger和ContinuousProcessingTimeTrigger 工作原理:根据间隔时间周期性触发窗口计算,或者当窗口的结束时间小于当前的时间(事件时间或处理时间)时触发计算。适用场景:适用于需要周期性处理数据的场景,如实时监控、周期性报表等。

       if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {// if the watermark is already past the window fire immediatelyreturn TriggerResult.FIRE;} else {ctx.registerEventTimeTimer(window.maxTimestamp());}

  • DeltaTrigger 工作原理:根据接入数据计算出的Delta指标是否超过指定的阈值来触发窗口计算。适用场景:适用于需要基于数据变化量进行处理的场景,如异常检测、趋势分析等。

    if (deltaFunction.getDelta(lastElementState.value(), element) > this.threshold) {lastElementState.update(element);return TriggerResult.FIRE;}

自定义一个Trigger

实现一个CountTrigger 窗口元素数量达到阈值时,触发计算

package com.codetonight.datastream.trigger;import org.apache.flink.streaming.api.windowing.triggers.Trigger;import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;  
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;  public class CountTrigger<T> extends Trigger<T, GlobalWindow> {  private final long countThreshold;  private long count = 0L;  public CountTrigger(long countThreshold) {  this.countThreshold = countThreshold;  }  @Override  public TriggerResult onElement(T element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {  count++;  if (count >= countThreshold) {  // 触发窗口并清除计数器count = 0;return TriggerResult.FIRE_AND_PURGE;}  return TriggerResult.CONTINUE;  }@Overridepublic TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {return TriggerResult.CONTINUE;}// 其他方法(onEventTime, onProcessingTime, onMerge, clear)可以留空或实现特定的逻辑  @Override  public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {count = 0L;  }  }
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;  
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;  
import org.apache.flink.streaming.api.windowing.windows.Window;  public class FlinkGlobalWindowExample {  public static void main(String[] args) throws Exception {  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  DataStream<Long> source = env.fromElements(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L);  // 应用全局窗口和自定义触发器  DataStream<Long> result = source.keyBy(value -> 1).windowAll(GlobalWindows.create()).trigger(new CountTrigger<>(5)) // 当接收到5个元素时触发.reduce(new ReduceFunction<Long>() {  @Override  public Long reduce(Long value1, Long value2) {  return value1 + value2;  }  });  // 打印结果  result.print();  // 执行作业  env.execute("Flink Global Window Example");  }  
}

Evictor

Flink 的窗口模型允许在指定 WindowAssigner 和 Trigger 之外,还可以选择性地指定一个 Evictor。

Evictor 的功能是在触发器触发后,且在窗口函数应用之前和/或之后,从窗口中移除元素。为了实现这一功能,Evictor 接口定义了两个方法:

public interface Evictor<T, W extends Window> extends Serializable {void evictBefore(Iterable<TimestampedValue<T>> elements,int size,W window,EvictorContext evictorContext);void evictAfter(Iterable<TimestampedValue<T>> elements,int size,W window,EvictorContext evictorContext);}

通过这两个方法,Evictor 提供了在窗口生命周期中灵活控制元素保留与移除的能力。

内置Evictor

这些 Evictor 可以单独使用,也可以与 Flink 的 WindowAssigner 和 Trigger 一起使用, 以创建复杂而强大的窗口处理逻辑。通过灵活组合这些组件, Flink 用户可以处理各种实时数据流场景,包括滑动窗口、滚动窗口、会话窗口等。

CountEvictor: 功能:保留窗口中用户指定的元素数量,并从窗口缓冲区的开头丢弃剩余的元素。应用场景:当你只需要保留窗口中最新的 N 个元素时,这个 Evictor 非常有用。


DeltaEvictor: 移除逻辑代码比较清晰:

  1. 取窗口最后一个元素lastElement

  2. 所有元素与lastElement 比较计算出差值( Delta )

  3. 差值( Delta ) 超过阈值则移除

DeltaFunction用于计算两个元素之间的差值

    private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) {TimestampedValue<T> lastElement = Iterables.getLast(elements);for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext(); ) {TimestampedValue<T> element = iterator.next();if (deltaFunction.getDelta(element.getValue(), lastElement.getValue())>= this.threshold) {iterator.remove();}}}

TimeEvictor: 功能:基于时间戳来移除窗口中的元素。它接受一个时间间隔(以毫秒为单位),对于给定的窗口,它会找到元素中的最大时间戳 max_ts,并移除所有时间戳小于 max_ts 减去指定时间间隔的元素。应用场景:当你希望基于时间戳来过滤窗口中的旧元素时,这个 Evictor 非常有用。

TimeEvictor evcit 方法代码逻辑,方法命名很清晰。

  1. 取窗口元素最大的时间戳 currentTime,

  2. 保留的时间戳阈值evictCutoff = currentTime -windowSize

  3. 循环遍历移除不在evictCutoff 之前的元素

    private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {if (!hasTimestamp(elements)) {return;}long currentTime = getMaxTimestamp(elements);long evictCutoff = currentTime - windowSize;for (Iterator<TimestampedValue<Object>> iterator = elements.iterator();iterator.hasNext(); ) {TimestampedValue<Object> record = iterator.next();if (record.getTimestamp() <= evictCutoff) {iterator.remove();}}}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com