在flink中,开窗后的聚合方式有两种,增量聚合和全量聚合,前者处理速度快,不积压数据,但是拿不到窗口中全量的信息,后者积压数据但是能拿到窗口内全量的数据。两者各有利弊。
因此滚动聚合窗口和全量聚合窗口连用,即利用了滚动聚合性能好,资源占用率低的优点,又能拥有全量窗口函数中的窗口信息。
此时的增量聚合函数中只每次处理1条数据,当所有的数据在增量函数完成后,最后再经过一次窗口函数,因此窗口全量聚合算子中只有一条reduce聚合后的数据。
winodw().reduce(ReduceFunction(),ProcessWindowFunction())
// 开窗,Tumbling(10s)SingleOutputStreamOperator<TrafficPageViewBean> unionWateredKeyedReduced = unionWateredKeyed.window(TumblingEventTimeWindows.of(Time.seconds(10))).reduce(new ReduceFunction<TrafficPageViewBean>() {@Overridepublic TrafficPageViewBean reduce(TrafficPageViewBean t0, TrafficPageViewBean t1) throws Exception {t1.setSvCt(t1.getSvCt() + t0.getSvCt());t1.setPvCt(t1.getPvCt() + t0.getPvCt());t1.setDurSum(t1.getDurSum() + t0.getDurSum());t1.setUjCt(t1.getUjCt() + t0.getUjCt());t1.setUvCt(t1.getUvCt() + t0.getUvCt());return t1;}}, new ProcessWindowFunction<TrafficPageViewBean, TrafficPageViewBean, Tuple4<String, String, String, String>, TimeWindow>() {@Overridepublic void process(Tuple4<String, String, String, String> tp4, Context context, Iterable<TrafficPageViewBean> iterable, Collector<TrafficPageViewBean> collector) throws Exception {/*将滚动聚合窗口和全量聚合窗口连用,即利用了滚动聚合性能好,资源占用率低的优点,又能拥有全量窗口函数中的窗口信息此时的迭代器中只有1条:当所有的数据reduce完成后在经过一次窗口函数,因此窗口函数中只有一条reduce聚合后的数据*/TrafficPageViewBean reduced = iterable.iterator().next();// 组装窗口字段long start = context.window().getStart();long end = context.window().getEnd();reduced.setStt(DateFormatUtils.toYmdHms(start));reduced.setEdt(DateFormatUtils.toYmdHms(end));// 置于一个新的ts作为去重比较字段reduced.setTs(System.currentTimeMillis());// 发送出去!!!collector.collect(reduced);}});