欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > 能源 > flinkSql中累计窗口CUMULATE

flinkSql中累计窗口CUMULATE

2025/2/24 3:11:29 来源:https://blog.csdn.net/weixin_52642840/article/details/144278123  浏览:    关键词:flinkSql中累计窗口CUMULATE

eventTime

package com.bigdata.day08;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class _05_flinkSql_Cumulate_eventTime {/*** 累积窗口 + eventTime* 1 分钟 每十秒计算一次 3秒水印* 数据格式* {"username":"zs","price":20,"event_time":"2023-07-18 12:12:43.000"}* {"username":"zs","price":20,"event_time":"2023-07-18 12:12:53.000"}* {"username":"zs","price":20,"event_time":"2023-07-18 12:13:03.000"}* {"username":"zs","price":20,"event_time":"2023-07-18 12:13:13.000"}*/public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//2. 创建表tenv.executeSql("CREATE TABLE table1 (\n" +"  `username` String,\n" +"  `price` int,\n" +"  `event_time` TIMESTAMP(3),\n" +"   watermark for event_time as event_time - interval '3' second\n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'topic1',\n" +"  'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',\n" +"  'properties.group.id' = 'testGroup1',\n" +"  'scan.startup.mode' = 'latest-offset',\n" +"  'format' = 'json'\n" +")");//3. 通过sql语句统计结果tenv.executeSql("select \n" +"   window_start,\n" +"   window_end,\n" +"   username,\n" +"   count(1) zongNum,\n" +"   sum(price) totalMoney \n" +"   from table(CUMULATE(TABLE table1, DESCRIPTOR(event_time), INTERVAL '10' second ,INTERVAL '60' second))\n" +"group by window_start,window_end,username").print();//4. sink-数据输出//5. execute-执行env.execute();}
}

processTime

package com.bigdata.day08;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class _06_flinkSql_Cumulate_processTime {/*** 累积窗口 + processTime* 1 分钟 每十秒计算一次* 数据格式* {"username":"zs","price":20}* {"username":"lisi","price":15}* {"username":"lisi","price":20}* {"username":"zs","price":20}* {"username":"zs","price":20}* {"username":"zs","price":20}* {"username":"zs","price":20}*/public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//2. 创建表tenv.executeSql("CREATE TABLE table1 (\n" +"  `username` String,\n" +"  `price` int,\n" +"  `event_time` as proctime()\n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'topic1',\n" +"  'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',\n" +"  'properties.group.id' = 'testGroup1',\n" +"  'scan.startup.mode' = 'latest-offset',\n" +"  'format' = 'json'\n" +")");//3. 通过sql语句统计结果tenv.executeSql("select \n" +"   window_start,\n" +"   window_end,\n" +"   username,\n" +"   count(1) zongNum,\n" +"   sum(price) totalMoney \n" +"   from table(CUMULATE(TABLE table1, DESCRIPTOR(event_time), INTERVAL '10' second ,INTERVAL '60' second))\n" +"group by window_start,window_end,username").print();//4. sink-数据输出//5. execute-执行env.execute();}
}

topN案例

需求:在每个分钟内找出点击量最多的Top 3网页。 滚动窗口(1分钟)+eventTime+3秒水印hive sqlwith t1 as (select page_id,sum(clicks)  totalSum  from  table1group by page_id
), t2 as(select page_id,totalSum,row_number() over ( order by totalSum desc) px from t1 
) select  * from t2 where px <=3flink sqlwith t1 as (select window_start,window_end,page_id,sum(clicks)  totalSum  from table(tumble(table table1,DESCRIPTOR(event_time), INTERVAL '60' second )) group by window_start,window_end,page_id
), t2 as(select window_start,window_end,page_id,totalSum,row_number() over (partition by window_start,window_end order by totalSum desc) px from t1 
) select  * from t2 where px <=3* 数据格式
{"ts": "2023-09-05 12:00:10", "page_id": 1, "clicks": 100}
{"ts": "2023-09-05 12:00:20", "page_id": 2, "clicks": 90}
{"ts": "2023-09-05 12:00:30", "page_id": 3, "clicks": 110}
{"ts": "2023-09-05 12:00:40", "page_id": 4, "clicks": 23}
{"ts": "2023-09-05 12:00:50", "page_id": 5, "clicks": 456}
{"ts": "2023-09-05 12:00:55", "page_id": 5, "clicks": 456}
// 触发数据
{"ts": "2023-09-05 12:01:03", "page_id": 5, "clicks": 456}
package com.bigdata.day08;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class _07_flinkSql_topN {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//2. 创建表//3. 通过sql语句统计结果tenv.executeSql("CREATE TABLE table1 (\n" +"    `page_id` INT,\n" +"    `clicks` INT,\n" +"  `ts` TIMESTAMP(3) ,\n" +"   watermark for ts as ts - interval '3' second \n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'topic1',\n" +"  'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',\n" +"  'properties.group.id' = 'testGroup1',\n" +"  'scan.startup.mode' = 'latest-offset',\n" +"  'format' = 'json'\n" +")");tenv.executeSql("with t1 as (\n" +"\tselect window_start,window_end,page_id,sum(clicks)  totalSum  from table(tumble(table table1,DESCRIPTOR(ts), INTERVAL '60' second )) group by window_start,window_end,page_id\n" +"), t2 as(\n" +"\tselect window_start,window_end,page_id,totalSum,row_number() over (partition by window_start,window_end order by totalSum desc) px from t1 \n" +") select  * from t2 where px <=3").print();//4. sink-数据输出//5. execute-执行env.execute();}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词