欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 房产 > 家装 > 大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动基于事件驱动

大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动基于事件驱动

2024/10/25 2:21:06 来源:https://blog.csdn.net/w776341482/article/details/141917540  浏览:    关键词:大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动基于事件驱动

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Flink Window 背景总览
  • Flink Window 滚动时间窗口
  • 基于时间驱动
  • 基于事件驱动

在这里插入图片描述

滑动时间窗口

在这里插入图片描述

滑动窗口是固定窗口更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。Flink 的滑动时间窗口(Sliding Window)是一种常用的窗口机制,适用于处理流式数据时需要在时间范围内定期计算的场景。滑动窗口会按照指定的窗口大小(window size)和滑动步长(slide interval)不断地划分数据,并对每个窗口内的数据进行聚合计算。

类型特点

窗口长度固定,可以有重叠。

  • 滑动窗口会有重叠部分,因此每个事件可能会被包含在多个窗口中。
  • 滑动窗口更适合定期计算某个时间范围内的聚合值,像是移动平均值、最近一段时间的活跃用户等场景。

关键参数

  • 窗口大小(window size):每个窗口包含的时间范围,例如 10 秒。
  • 滑动步长(slide interval):窗口每次滑动的时间步长,例如 5 秒。这意味着每隔 5 秒就会创建一个新的窗口,每个窗口覆盖的时间范围是 10 秒。

基于时间驱动

场景:我们可以每30秒计算一次最近一分钟用户购买的商品数

package icu.wzk;import org.apache.commons.math3.analysis.function.Sin;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import java.text.SimpleDateFormat;
import java.util.Random;public class SlidingWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");long timeMillis = System.currentTimeMillis();int random = new Random().nextInt(10);System.out.println("value: " + value + ", random: " + random +", timestamp: " + format.format(timeMillis));return Tuple2.of(value, random);}});KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, Tuple>() {@Overridepublic Tuple getKey(Tuple2<String, Integer> value) throws Exception {return Tuple1.of(value.f0);}});WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyedStream.timeWindow(Time.seconds(10), Time.seconds(5));timeWindow.apply(new MyTimeWindowFunction()).print();env.execute("SlidingWindow");}}

基于事件驱动

package icu.wzk;import org.apache.commons.math3.analysis.function.Sin;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import java.text.SimpleDateFormat;
import java.util.Random;public class SlidingWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");long timeMillis = System.currentTimeMillis();int random = new Random().nextInt(10);System.out.println("value: " + value + ", random: " + random +", timestamp: " + format.format(timeMillis));return Tuple2.of(value, random);}});KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, Tuple>() {@Overridepublic Tuple getKey(Tuple2<String, Integer> value) throws Exception {return Tuple1.of(value.f0);}});WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> globalWindow = keyedStream.countWindow(3, 2);globalWindow.apply(new MyCountWindowFuntion()).print();env.execute("SlidingWindow");}}

会话窗口

由一系列事件组合一个指定时间长度timeout间隙组成,类似于Web应用的Session,也就是一段时间没有接收到新数据会生成新的窗口。
Session窗口分配器通过Session活动来对元素进行分组,Session窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况。
Session窗口在一个固定的时间周期内不再收到元素,即非活动间隔产生,那么这个窗口就会关闭。
一个Session窗口通过一个Session间隔来配置,这个Session间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的Session将关闭并且后续的元素将被分配到新的Session窗口去。

类型特点

  • 会话窗口不重叠,没有固定的开始和结束时间
  • 于翻滚窗口和滑动窗口相反,当会话窗口在一段时间内没有接收到元素时会关闭会话窗口。
  • 后续的元素将会被分配到新的会话窗口

基于时间驱动

package icu.wzk;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;public class SessionWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return null;}});KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, Tuple>() {@Overridepublic Tuple getKey(Tuple2<String, Integer> value) throws Exception {return Tuple1.of(value.f0);}});WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window = keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)));window.apply(new MyTimeWindowFunction()).print();env.execute("SessionWindow");}}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com