查询符合条件的最新数据
- 1.需求说明
- 2.代码实现
- 2.1 配置信息
- 2.2 映射对象
- 2.3 代码实现
- 3.算法分析
1.需求说明
业务上有大量从硬件采集到的数据通过Kafka入库GreenPlum数据库,虽然数据表已进行分区,每个分区少的有100+万条多的时候有1000+万条记录
,现在有一个接口要获取最新的20条数据用来展示,即便是从单个分区上查询由于需要全量数据排序,时间长的时候需要7~8秒
,这个时候就考虑直接从Kafka获取最新数据。
2.代码实现
2.1 配置信息
这里只贴出使用到的配置信息。
# kafka的服务地址
spring:kafka:bootstrap-servers: 127.0.0.1:xxxx
# tableName与topic的映射
tableNameKafkaTopic:mapping: "{\"table_name\":\"topic_name\"}"
<!-- 用到了这个依赖里的时间工具 将字符串时间转换成 Date 这个方法也可以自己写 -->
<dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.6.6</version>
</dependency>
2.2 映射对象
由于Kafka内的字段跟数据库的字段名称不同,这里要创建映射关系(仅保留几个字段用来说明问题)。
@Data
@ApiModel(value = "数据封装对象", description = "用于对Kafka内的数据进行封装")
public class DataRes implements Serializable {@ApiModelProperty(name = "LOCATION", value = "设备位置")@JsonProperty(value = "LOCATION")private String location;@ApiModelProperty(name = "IP", value = "设备ID")@JsonProperty(value = "IP")private String equip;@ApiModelProperty(name = "TME", value = "创建时间")@JsonProperty(value = "TME")private String tme;@ConstructorProperties({"LOCATION", "IP", "TME"})public DataGsmRes(String location, String ip, String tme) {this.location = location;this.equip = ip;this.tme = tme;}
}
Kafka的记录信息:
{"LOCATION":"河南郑州","IP":"xxxx","TME":"2022-01-12 15:29:55"}
接口返回的数据:
{"location": "河南郑州","equip": "xxxx","tme": "2022-01-12 15:29:55"
}
2.3 代码实现
为了简洁删掉了一些业务相关的代码
。
@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${tableNameKafkaTopic.mapping}")private String tableNameKafkaTopicMapping;@Overridepublic BaseResult<PageEntity<Map>> queryNewest(Map mapParam) {// 参数解析(根据tableName获取对应的Kafka主题)String tableName = MapUtils.getString(mapParam, "table_name", "");if (StringUtils.isBlank(tableName)) {return BaseResult.getInstance(101, "数据源参数table_name不能为空!");}// 获取equip信息用来筛选数据JSONObject jsonParam = new JSONObject();try {String paramStr = MapUtils.getString(mapParam, "param_json", "");JSONObject json = JSONObject.parseObject(paramStr);if (json != null) {for (String key : json.keySet()) {jsonParam.put(key.toLowerCase(), json.get(key));}}} catch (Exception e) {return BaseResult.getInstance(102, "请求参数param_json非JSON格式");}Object equip = jsonParam.get("equip");if (equip == null || StringUtils.isBlank(equip.toString())) {return BaseResult.getInstance(101, "请求参数param_json内的equip不能为空!");}List<String> equipList = Arrays.asList(equip.toString().split(","));// 从Kafka获取的符合条件的数据条数 equipKey用于筛选数据 timeKey用于排序String equipKey = "IP";String timeKey = "TME";int pageSize = MapUtils.getInteger(mapParam, "pageSize");int querySize = 1000;int queryTime = 50;int queryTotal = querySize * queryTime;// 结果数据封装List<Map> rows = new ArrayList<>();List<Map> rowsSorted;// 从Kafka获取最新数据Properties props = new Properties();props.put("bootstrap.servers", bootstrapServers);props.put("group.id", "queryNewest");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("max.poll.records", querySize);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 查询主题数据Object topicName = JSONObject.parseObject(tableNameKafkaTopicMapping).get(tableName);if (topicName != null && StringUtils.isNotBlank(topicName.toString())) {TopicPartition topicPartition = new TopicPartition(topicName.toString(), 0);List<TopicPartition> topicPartitionList = Collections.singletonList(topicPartition);consumer.assign(topicPartitionList);consumer.seekToEnd(topicPartitionList);// 获取当前最大偏移量long currentPosition = consumer.position(topicPartition);int recordsCount;try {for (int i = 1; i <= queryTime; i++) {long seekOffset = currentPosition - i * querySize;consumer.seek(topicPartition, seekOffset > 0 ? seekOffset : 0);ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));recordsCount = records.count();for (ConsumerRecord<String, String> record : records) {queryTotal--;Map map = JSONObject.parseObject(record.value());String ip = MapUtils.getString(map, equipKey);if (equipList.size() == 0 || equipList.contains(ip)) {rows.add(map);}}// 获取数据(达到 pageSize 或 queryTotal 或消息队列无数据 即停止查询)if (rows.size() >= pageSize || queryTotal <= 0 || recordsCount <= 0) {break;}}} finally {consumer.close();// 重新排序String finalTimeKey = timeKey;rowsSorted = rows.stream().sorted(Comparator.comparingLong(row -> -DateUtil.parse(row.get(finalTimeKey).toString(), "yyyy-MM-dd HH:mm:ss").getTime())).collect(Collectors.toList());}} else {return BaseResult.getInstance(301, "不存在" + tableName + "对应的KafkaTopic!");}// 结果封装(截取pageSize个结果并映射key值)List<Map> subList;if (rowsSorted.size() > pageSize) {subList = rowsSorted.subList(0, pageSize);} else {subList = rowsSorted;}List<Map> res = new ArrayList<>();PageEntity<Map> pageEntity = new PageEntity<>();// 重新封装Kafka数据(字段值映射)res = subList.stream().map(item -> JSONObject.parseObject(JSON.toJSONString(JSONObject.parseObject(item.toString(), DataRes.class)), Map.class)).collect(Collectors.toList());} pageEntity.setTotal(res.size());pageEntity.setRows(res);return BaseResult.getInstance(pageEntity);}
3.算法分析
- 筛选的是查询时最大偏移量向前
queryTotal
条数据,这个可以根据业务进行调整。 - 重新封装Kafka数据的算法实际上是修改map对象key的方法。
max.poll.records
参数明确了每次poll
的记录数便于统计。- 特别注意:当前代码适用的是
Partition
只有1️⃣个的情况,多个分区的情况需要先查询分区数,再轮询获取每个分区的最新数据,合并后重新排序。
// 仅查询了1个分区
TopicPartition topicPartition = new TopicPartition(topicName.toString(), 0);
// 获取主题的分区列表
List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topicName.toString());
//Partition(topic = gp_gsmdata, partition = 0, leader = 0, replicas = [0], isr = [0], offlineReplicas = [])