欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 美景 > Flink 实时数仓(十)【DWS 层搭建(四)交易域汇总表创建】

Flink 实时数仓(十)【DWS 层搭建(四)交易域汇总表创建】

2024/10/24 11:26:25 来源:https://blog.csdn.net/m0_64261982/article/details/141017854  浏览:    关键词:Flink 实时数仓(十)【DWS 层搭建(四)交易域汇总表创建】

前言

        今天完成 DWS 层交易域剩下的两个指标,估计一早上就完了,这两个需求用到的知识点和昨天的需求差不多;

1、交易域省份粒度下单各窗口汇总表

1.1、思路分析

        这个需求是比较简单的,province_id 字段是订单表中的字段,在 DWD 层的下单事务事实表中我们已经将该字段保留下来了,所以我们只需要读取 DWD 层的下单事务事实表即可;考虑到下单事务事实表来源于订单与处理表,而这张表在生成时需要和 活动、优惠券进行 left join,所以依然是迟到数据造成的数据重复问题,不过依然不影响,因为那俩表的字段我们用不上,所以我们只需要过滤出第一条数据即可完成去重;

  • 消费 dwd_trade_order_detail
  • 转为 JSON 流
  • 第一次去重(取迟到数据中的第一条即可)
  • 转为 JavaBean 流
  • 提取事件时间(取 create_time)并设置水位线
  • 按照粒度(province_id)分组
  • 开窗(依旧是 10s 的滚动窗口)
  • 聚合(依旧是增量聚合 + 全量聚合)
  • 关联省份信息补全 province_name
  • 写出到 clickhouse

1.2、代码实现

1.2.1、创建 ck 表及其实体类

字段依然是 粒度 + 窗口起止时间 + 度量值

create table if not exists dws_trade_province_order_window
(stt           DateTime,edt           DateTime,province_id   String,province_name String,order_count   UInt64,order_amount  Decimal(38, 20),ts            UInt64
) engine = ReplacingMergeTree(ts)partition by toYYYYMMDD(stt)order by (stt, edt, province_id);
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;import java.util.Set;@Data
@AllArgsConstructor
@Builder
public class TradeProvinceOrderWindow {// 窗口起始时间String stt;// 窗口结束时间String edt;// 省份 IDString provinceId;// 省份名称@Builder.DefaultString provinceName = "";// 累计下单次数Long orderCount;// 订单 ID 集合,用于统计下单次数@TransientSinkSet<String> orderIdSet;// 累计下单金额Double orderAmount;// 时间戳Long ts;
}

1.2.2、读取下单事务事实表并转为 JSON 流

// TODO 2. 读取 kafka dwd_trade_order_detailString groupId = "dws_trade_province_order_window";DataStreamSource<String> orderDetailDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer("dwd_trade_order_detail", groupId));// TODO 3. 转为 JSON 流SingleOutputStreamOperator<JSONObject> jsonDS = orderDetailDS.flatMap(new RichFlatMapFunction<String, JSONObject>() {@Overridepublic void flatMap(String value, Collector<JSONObject> out) throws Exception {try {JSONObject jsonObject = JSONObject.parseObject(value);out.collect(jsonObject);} catch (Exception e) {// 可以选择输出到侧输出流e.printStackTrace();}}});

1.2.3、第一次去重(上游 left join 迟到数据)

这里因为我们的数据和 left join 的右表无关,所以直接取第一条即可

// TODO 4. 第一次去重(根据 order_detail_id 进行分组)KeyedStream<JSONObject, String> keyedByIdStream = jsonDS.keyBy(json -> json.getString("id"));SingleOutputStreamOperator<JSONObject> filterDS = keyedByIdStream.filter(new RichFilterFunction<JSONObject>() {// 用来存储第一条数据,之后的数据来了全部丢弃(上游 left join 迟到的数据)private ValueState<String> state;@Overridepublic void open(Configuration parameters) throws Exception {StateTtlConfig ttlConfig = new StateTtlConfig.Builder(Time.seconds(5)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build();ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("first-value", String.class);stateDescriptor.enableTimeToLive(ttlConfig);state = getRuntimeContext().getState(stateDescriptor);}@Overridepublic boolean filter(JSONObject value) throws Exception {String data = state.value();if (data == null) {state.update("1"); //  随便存就行return true;}return false;}});

1.2.4、转为 JavaBean 流并设置水位线

转 JavaBean 流时,取 create_time 作为 ts 字段,以供下面提取它为事件时间(虽然之后还会把它设为系统时间作为 ck 表的版本字段)

// TODO 5. 转为 JavaBean 流SingleOutputStreamOperator<TradeProvinceOrderWindow> provinceOrderDS = filterDS.map(new MapFunction<JSONObject, TradeProvinceOrderWindow>() {@Overridepublic TradeProvinceOrderWindow map(JSONObject value) throws Exception {HashSet<String> orderIds = new HashSet<>();orderIds.add(value.getString("order_id"));return TradeProvinceOrderWindow.builder().orderAmount(value.getDouble("split_total_amount")).orderIdSet(orderIds).ts(DateFormatUtil.toTs(value.getString("create_time"),true)).provinceId(value.getString("province_id")).build();}});// TODO 6. 提取事件时间并设置水位线SingleOutputStreamOperator<TradeProvinceOrderWindow> provinceOrderWithWmDS = provinceOrderDS.assignTimestampsAndWatermarks(WatermarkStrategy.<TradeProvinceOrderWindow>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<TradeProvinceOrderWindow>() {@Overridepublic long extractTimestamp(TradeProvinceOrderWindow element, long recordTimestamp) {return element.getTs();}}));

1.2.5、分组开窗聚合

按照粒度分组,度量字段求和,并在窗口闭合后补充窗口起始时间和结束时间,时间戳字段设置为当前时间:

//  TODO 7. 分组开窗聚合SingleOutputStreamOperator<TradeProvinceOrderWindow> reduceDS = provinceOrderDS.keyBy(TradeProvinceOrderWindow::getProvinceId).window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10))).reduce(new ReduceFunction<TradeProvinceOrderWindow>() {@Overridepublic TradeProvinceOrderWindow reduce(TradeProvinceOrderWindow value1, TradeProvinceOrderWindow value2) throws Exception {value1.getOrderIdSet().addAll(value2.getOrderIdSet());value1.setOrderAmount(value1.getOrderAmount() + value2.getOrderAmount());return value1;}}, new WindowFunction<TradeProvinceOrderWindow, TradeProvinceOrderWindow, String, TimeWindow>() {@Overridepublic void apply(String s, TimeWindow window, Iterable<TradeProvinceOrderWindow> input, Collector<TradeProvinceOrderWindow> out) throws Exception {TradeProvinceOrderWindow next = input.iterator().next();next.setStt(DateFormatUtil.toYmdHms(window.getStart()));next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));next.setTs(System.currentTimeMillis());next.setOrderCount((long) next.getOrderIdSet().size());out.collect(next);}});

1.2.6、关联维表 dim_base_province 并写出到 ck

// TODO 8. 关联维保 province_infoSingleOutputStreamOperator<TradeProvinceOrderWindow> resultDS = AsyncDataStream.unorderedWait(reduceDS, new DimAsyncFunction<TradeProvinceOrderWindow>("DIM_BASE_PROVINCE") {@Overridepublic String getKey(TradeProvinceOrderWindow input) {return input.getProvinceId();}@Overridepublic void addAttribute(TradeProvinceOrderWindow pojo, JSONObject dimInfo) {pojo.setProvinceName(dimInfo.getString("NAME"));}}, 100, TimeUnit.SECONDS);// TODO 9. 写出到 clickhousereduceDS.addSink(ClickHouseUtil.getSinkFunction("insert into dws_trade_province_order_window values (?,?,?,?,?,?)"));// TODO 10. 启动任务env.execute("DwsTradeProvinceOrderWindow");

2、交易域品牌-品类-用户粒度退单各窗口汇总表

2.1、思路分析

这里是退单,DWD 层退单事务事实表的数据仅来自 ODS 中过滤出来的退单数据、字典表(字典表被关联两次,一次获取退单类型、一次获取退单原因)等

  • 从 dwd_trade_order_refund 读取数据
  • 转为 JavaBean 流(String -> JSON -> JavaBean)
  • 关联维表 sku_info 补充 tm_id、category3_id 字段
  • 提取事件时间生成水位线
  • 分组(按照粒度)开窗聚合
  • 关联维表
    • 关联base_trademark 获得 name
    • 关联 base_category3、base_category2、base_category1 获得 id 和 name
  • 写出到 clickhouse

2.2、代码实现

2.2.1、创建 ck 表及其 JavaBean

create table if not exists dws_trade_trademark_category_user_refund_window
(stt            DateTime,edt            DateTime,trademark_id   String,trademark_name String,category1_id   String,category1_name String,category2_id   String,category2_name String,category3_id   String,category3_name String,user_id        String,refund_count   UInt64,ts             UInt64
) engine = ReplacingMergeTree(ts)partition by toYYYYMMDD(stt)order by (stt, edt, trademark_id, trademark_name, category1_id,category1_name, category2_id, category2_name, category3_id, category3_name, user_id);
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;import java.util.Set;@Data
@AllArgsConstructor
@Builder
public class TradeTrademarkCategoryUserRefundBean {// 窗口起始时间String stt;// 窗口结束时间String edt;// 品牌 IDString trademarkId;// 品牌名称String trademarkName;// 一级品类 IDString category1Id;// 一级品类名称String category1Name;// 二级品类 IDString category2Id;// 二级品类名称String category2Name;// 三级品类 IDString category3Id;// 三级品类名称String category3Name;// 订单 ID@TransientSinkSet<String> orderIdSet;// sku_id@TransientSinkString skuId;// 用户 IDString userId;// 退单次数Long refundCount;// 时间戳Long ts;
}

2.2.2、读取退单事务事实表并转换格式

// TODO 2. 读取 kafka dwd_trade_order_detailString groupId = "dws_trade_trademark_category_user_refund_window";DataStreamSource<String> orderDetailDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer("dwd_trade_order_refund", groupId));// TODO 3. 转为 JSON 流SingleOutputStreamOperator<TradeTrademarkCategoryUserRefundBean> tmCateUserDS = orderDetailDS.flatMap(new RichFlatMapFunction<String, TradeTrademarkCategoryUserRefundBean>() {@Overridepublic void flatMap(String value, Collector<TradeTrademarkCategoryUserRefundBean> out) throws Exception {try {JSONObject jsonObject = JSONObject.parseObject(value);HashSet<String> orderIds = new HashSet<>();orderIds.add(jsonObject.getString("order_id"));out.collect(TradeTrademarkCategoryUserRefundBean.builder().userId(jsonObject.getString("user_id")).ts(jsonObject.getString(DateFormatUtil.toTs("create_time",true))).skuId(jsonObject.getString("sku_id")).orderIdSet(orderIds).build());} catch (Exception e) {// 可以选择输出到侧输出流e.printStackTrace();}}});

2.2.3、关联维表 sku_info

关联维表 sku_info 补充 tm_id、category3_id 字段

// TODO 4. 关联维表 sku_info 补充 tm_id、category3_id 字段SingleOutputStreamOperator<TradeTrademarkCategoryUserRefundBean> withSkuCateUserIdDS = AsyncDataStream.unorderedWait(tmCateUserDS, new DimAsyncFunction<TradeTrademarkCategoryUserRefundBean>("DIM_SKU_INFO") {@Overridepublic String getKey(TradeTrademarkCategoryUserRefundBean input) {return input.getSkuId();}@Overridepublic void addAttribute(TradeTrademarkCategoryUserRefundBean pojo, JSONObject dimInfo) {pojo.setCategory3Id(dimInfo.getString("CATEGORY3_ID"));pojo.setTrademarkId(dimInfo.getString("TM_ID"));}}, 60 * 5, TimeUnit.SECONDS);

2.2.4、设置水位线并分组开窗聚合

// TODO 5. 设置水位线SingleOutputStreamOperator<TradeTrademarkCategoryUserRefundBean> withWmDS = withSkuCateUserIdDS.assignTimestampsAndWatermarks(WatermarkStrategy.<TradeTrademarkCategoryUserRefundBean>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<TradeTrademarkCategoryUserRefundBean>() {@Overridepublic long extractTimestamp(TradeTrademarkCategoryUserRefundBean element, long recordTimestamp) {return element.getTs();}}));// TODO 6. 分组(按照粒度)开窗聚合SingleOutputStreamOperator<TradeTrademarkCategoryUserRefundBean> reduceDS = withWmDS.keyBy(data -> Tuple3.of(data.getCategory3Id(), data.getTrademarkId(), data.getUserId())).window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10))).reduce(new ReduceFunction<TradeTrademarkCategoryUserRefundBean>() {@Overridepublic TradeTrademarkCategoryUserRefundBean reduce(TradeTrademarkCategoryUserRefundBean value1, TradeTrademarkCategoryUserRefundBean value2) throws Exception {value1.getOrderIdSet().addAll(value2.getOrderIdSet());return value1;}}, new WindowFunction<TradeTrademarkCategoryUserRefundBean, TradeTrademarkCategoryUserRefundBean, Tuple3<String, String, String>, TimeWindow>() {@Overridepublic void apply(Tuple3<String, String, String> stringStringStringTuple3, TimeWindow window, Iterable<TradeTrademarkCategoryUserRefundBean> input, Collector<TradeTrademarkCategoryUserRefundBean> out) throws Exception {TradeTrademarkCategoryUserRefundBean next = input.iterator().next();next.setTs(System.currentTimeMillis());next.setStt(DateFormatUtil.toYmdHms(window.getStart()));next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));next.setRefundCount((long) next.getOrderIdSet().size());out.collect(next);}});

2.2.5、关联其它维表并写出数据到 ck

// TODO  7. 关联其它维表// TODO 7.2 关联 tmSingleOutputStreamOperator<TradeTrademarkCategoryUserRefundBean> reduceWithTmDS = AsyncDataStream.unorderedWait(reduceDS,new DimAsyncFunction<TradeTrademarkCategoryUserRefundBean>("DIM_BASE_TRADEMARK") {@Overridepublic String getKey(TradeTrademarkCategoryUserRefundBean input) {return input.getTrademarkId();}@Overridepublic void addAttribute(TradeTrademarkCategoryUserRefundBean pojo, JSONObject dimInfo) {pojo.setTrademarkName(dimInfo.getString("TM_NAME"));}},100, TimeUnit.SECONDS);// TODO 7.3 关联 category3SingleOutputStreamOperator<TradeTrademarkCategoryUserRefundBean> reduceWithCate3DS = AsyncDataStream.unorderedWait(reduceWithTmDS,new DimAsyncFunction<TradeTrademarkCategoryUserRefundBean>("DIM_BASE_CATEGORY3") {@Overridepublic String getKey(TradeTrademarkCategoryUserRefundBean input) {return input.getCategory3Id();}@Overridepublic void addAttribute(TradeTrademarkCategoryUserRefundBean pojo, JSONObject dimInfo) {pojo.setCategory3Name(dimInfo.getString("NAME"));pojo.setCategory2Id("CATEGORY2_ID");}},100, TimeUnit.SECONDS);// TODO 7.4 关联 category2SingleOutputStreamOperator<TradeTrademarkCategoryUserRefundBean> reduceWithCate2DS = AsyncDataStream.unorderedWait(reduceWithCate3DS,new DimAsyncFunction<TradeTrademarkCategoryUserRefundBean>("DIM_BASE_CATEGORY2") {@Overridepublic String getKey(TradeTrademarkCategoryUserRefundBean input) {return input.getCategory2Id();}@Overridepublic void addAttribute(TradeTrademarkCategoryUserRefundBean pojo, JSONObject dimInfo) {pojo.setCategory2Name(dimInfo.getString("NAME"));pojo.setCategory1Id("CATEGORY1_ID");}},100, TimeUnit.SECONDS);// TODO 7.5 关联 category1SingleOutputStreamOperator<TradeTrademarkCategoryUserRefundBean> reduceWithCate1DS = AsyncDataStream.unorderedWait(reduceWithCate2DS,new DimAsyncFunction<TradeTrademarkCategoryUserRefundBean>("DIM_BASE_CATEGORY1") {@Overridepublic String getKey(TradeTrademarkCategoryUserRefundBean input) {return input.getCategory1Id();}@Overridepublic void addAttribute(TradeTrademarkCategoryUserRefundBean pojo, JSONObject dimInfo) {pojo.setCategory1Name(dimInfo.getString("NAME"));}},100, TimeUnit.SECONDS);// TODO 8. 写出到 clickhousereduceWithCate1DS.addSink(ClickHouseUtil.getSinkFunction("insert into dws_trade_trademark_category_user_refund_window values(?,?,?,?,?,?,?,?,?,?,?,?,?)"));// TODO 9. 启动任务env.execute("DwsTradeTrademarkCategoryUserRefundWindow");

总结

        至此,DWS 层搭建完毕,总的来说每一层的逻辑差不太多,难点就是对实时数据的抽象理解;接下来就是 ADS 层,今天搞定它!

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com