欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 房产 > 建筑 > Doris的Routine Load方式消费Kafka数据进入Doris

Doris的Routine Load方式消费Kafka数据进入Doris

2024/10/26 3:34:42 来源:https://blog.csdn.net/qq_25954159/article/details/142488644  浏览:    关键词:Doris的Routine Load方式消费Kafka数据进入Doris

假设kafka已有嵌套JSON数据格式为

{"appId": "10000","platform": "YY","userId": "007","userAgent": "6","event": "login","package": "org.apache.doris","properties": {"phoneNumber": "13814516235","actionTime": "1694928000","deviceID": "device123","deviceType": "smartphone","appVersion": "1.0.0","networkType": "WiFi","os": "Android","userUID": "user123","nickname": "小王","clientIp": "192.168.1.1"},"clientIp": "10.225.36.85","timestamp": "1694928000","source": "mobileApp","sessionId": "12554122524422"
}

1、创建建表语句

CREATE TABLE test_app_dwh.rt_ods_log_app_loginout (appId VARCHAR(20) NOT NULL COMMENT "应用ID",userId VARCHAR(20) NOT NULL COMMENT "用户ID",timestamp BIGINT COMMENT "时间戳",platform VARCHAR(20) NOT NULL COMMENT "平台",userAgent VARCHAR(20) NOT NULL COMMENT "用户代理",event VARCHAR(20) NOT NULL COMMENT "事件类型",package VARCHAR(100) NOT NULL COMMENT "包名",phoneNumber VARCHAR(20) COMMENT "电话号码",actionTime BIGINT COMMENT "动作时间戳",deviceID VARCHAR(50) COMMENT "设备ID",deviceType VARCHAR(20) COMMENT "设备类型",appVersion VARCHAR(20) COMMENT "应用版本",networkType VARCHAR(20) COMMENT "网络类型",os VARCHAR(20) COMMENT "操作系统",userUID VARCHAR(50) COMMENT "用户唯一标识",nickname VARCHAR(50) COMMENT "昵称",clientIp VARCHAR(20) COMMENT "客户端IP",source VARCHAR(20) COMMENT "来源",sessionId VARCHAR(50) COMMENT "会话ID"
)
DUPLICATE KEY(appId, userId, timestamp)
DISTRIBUTED BY HASH(appId) BUCKETS 1;

2、导入命令

CREATE ROUTINE LOAD test_game_dwh.kafkajob_rt_ods_log_app_loginout ON rt_ods_log_app_loginout
COLUMNS(appId, userId, timestamp, platform, userAgent, event, package, phoneNumber, actionTime, deviceID, deviceType, appVersion, networkType, os, userUID, nickname, clientIp, source, sessionId)
PROPERTIES
("desired_concurrent_number" = "1","format" = "json","strict_mode" = "false","jsonpaths" = "[\"$.appId\",\"$.userId\",\"$.timestamp\",\"$.platform\",\"$.userAgent\",\"$.event\",\"$.package\",\"$.properties.phoneNumber\",\"$.properties.actionTime\",\"$.properties.deviceID\",\"$.properties.deviceType\",\"$.properties.appVersion\",\"$.properties.networkType\",\"$.properties.os\",\"$.properties.userUID\",\"$.properties.nickname\",\"$.properties.clientIp\",\"$.source\",\"$.sessionId\"]"
)
FROM KAFKA
("kafka_broker_list" = "ip1:9092,ip2:9092,ip3:9092","kafka_topic" = "loginout","property.group.id" = "kafka_job","property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

最后kafka的数据就可以源源不断的存储到doris表里面了

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com