欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 焦点 > ElasticSearch第28讲:MySQL数据同步ES的常用思路及在网络货运项目中的使用

ElasticSearch第28讲:MySQL数据同步ES的常用思路及在网络货运项目中的使用

2025/2/4 20:44:26 来源:https://blog.csdn.net/qq_28959087/article/details/144201704  浏览:    关键词:ElasticSearch第28讲:MySQL数据同步ES的常用思路及在网络货运项目中的使用

MySQL数据同步ES的常用思路及在网络货运项目中的使用

在实际项目开发中,我们经常将MySQL作为业务数据库,ES作为查询数据库,用来实现读写分离,缓解MySQL数据库的查询压力,应对海量数据的复杂查询。这其中有一个很重要的问题,就是如何实现MySQL数据库和ES的数据同步,本文是ElasticSearch第28讲,和大家聊聊MySQL和ES数据同步的各种方案,以及在网络货运项目中的使用,在实际项目中,使用ES支持复杂查询,并使用canal解除业务系统和ES的耦合。

文章目录

  • MySQL数据同步ES的常用思路及在网络货运项目中的使用
    • 1、背景
    • 2、解决方案
      • 2.1、MySQL和ES各自的特点
        • 1、为什么选用MySQL
        • 2、为什么选用 ES
    • 问题1:数据存储为何选择ES ?
      • 2.2、数据同步方案
        • 1、同步双写
        • 2、异步双写(MQ方式)
        • 3、基于MySQL表定时扫描同步
        • 4、基于Binlog实时同步✅
    • 3、canal简介
      • 3.1、canal 简介
      • 3.2、使用注意事项
      • 3.3、canal适用场景
      • 3.4、重要版本更新说明
      • 3.5、canal消息示例
    • 问题1:如何判断canal服务端与MySQL连接是否成功?
    • 问题2:如何判断canal客户端与canal服务端连接是否成功?
    • 问题3:ES时间相比MySQL时间早8个小时?
    • 问题4:打开MySQL binlog后,对性能有什么影响?
    • 4、货运平台ES数据同步实战
      • 4.1、系统架构图
      • 4.2、业务架构图
      • 4.3、表结构设计
      • 4.4、项目涉及到的技术点
      • 4.5、核心功能
      • 4.6、项目代码及测试用例
    • 5、数据迁移同步工具选型
    • 问题1:Elasticsearch如何修改表结构?
    • 问题2:Elasticsearch的准实时性?
    • 问题3:Elasticsearch可能丢数据?
    • 问题4:Elasticsearch分页查询深分页问题?
    • 6、总结

1、背景

网络货运平台已经比较成熟,提供了给货源方提供找司机的交易匹配方案;其中包含这几个角色:货主、承运人(司机、车队长)、监管机构、平台。

其主要流程大体如下

  • 1、司机入驻平台,需要上传身份证,驾驶员信息,车辆信息;
  • 2、接下来货主在平台发布货源,然后司机进行抢单或者接单或者转让运单,货主也对这笔运单进行调价处理;
  • 3、在完成运单后,需要上报信息给平台,然后货主将应付款项支付给司机,且资金流水需要上报给平台,并将佣金支付给平台;
  • 4、监管机构需监管运单整体生命周期,读取驾驶员信息,车辆信息,订单信息和流水信息。

司机要想接单,需要进入货源大厅,对货主发布货源按重量,价格,距离,时间等多维度进行分页查询,随着业务系统的不断迭代,数据库的数据量也随着逐渐增大,就导致业务系统日常运行情况下很多接口查询请求缓慢,甚至读取超时 ,为了整体优化项目,提高系统性能 希望在系统中对于部分业务充分利用缓存操作或使用ElasticSearch,造成了如下几个问题

  • 1、对多维度数据分页查询,MySQL join查询性能低下,需要对货主发布货源按重量,价格,距离,时间等维度进行分页查询;
  • 2、数据如何同步到ES,同步双写存在数据一致性问题;

2、解决方案

  • 对于多维度数据分页查询,MySQL join查询性能低下问题,接入搜索中间件ES

  • 使用geo_point数据类型实现距离排序

  • 对于数据如何同步到ES问题,使用如2.2所述方案完成;

    采用定时任务实现MySQL全量数据dump ES,采用canal服务实现MySQL增量数据dump ES

2.1、MySQL和ES各自的特点

1、为什么选用MySQL

MySQL 在关系型数据库历史上并没有特别优势的位置,Oracle/DB2/PostgreSQL(Ingres) 三老比 MySQL 开发早了 20 来年,但是乘着 2000 年的互联网东风,LAMP 架构得到迅速的使用,特别在中国,大部分新兴企业的 IT 系统主数据沉淀于 MySQL 中。

  • 核心特点:开源免费、高并发、稳定、支持事务、支持SQL查询
  • 高并发能力:MySQL 内核特征特别适合高并发简单 SQL 操作 ,链接轻量化(线程模式),优化器、执行器、事务引擎相对简单粗暴,存储引擎做得比较细致
  • 稳定性好:主数据库最大的要求就是稳定、不丢数据,MySQL 内核特征反倒让其特点鲜明,从而达到很好的稳定性,主备系统也很早就 ready,应对崩溃情况下的快速切换,innodb 存储引擎也保障了 MySQL 下盘稳定
  • 操作便捷:良好、便捷的用户体验(相比 PostgreSQL),让应用开发者非常容易上手 ,学习成本较低
  • 开源生态:MySQL 是一款开源产品,让上下游厂商围绕其构建工具相对简单,HAproxy、分库分表中间件让其实用性大大加强,同时开源的特质让其有大量的用户
2、为什么选用 ES

业务需要支持对Json格式数据查询,解决MySQL对json格式数据查询性能不足的缺点

业务需要支持对商品名称、商品型号分词查询,解决MySQL不支持分词查询的缺点

业务需要支持对多张表多维数据分页查询,解决MySQL Join查询性能低下的缺点

ES 几个显著的特点,能够有效补足 MySQL 在企业级数据操作场景的缺陷,而这也是我们将其选择作为下游数据源重要原因

  • 核心特点:支持分词检索,多维筛选性能好,支持海量数据查询
  • 文本搜索能力:ES 是基于倒排索引实现的搜索系统,配合多样的分词器,在文本模糊匹配搜索上表现得比较好,业务场景广泛;
  • 多维筛选性能好:亿级规模数据使用宽表预构建(消除 join),配合全字段索引,使 ES 在多维筛选能力上具备压倒性优势,而这个能力是诸如 CRM, BOSS, MIS 等企业运营系统核心诉求,加上文本搜索能力,独此一家;
  • 开源和商业并行:ES 开源生态非常活跃,具备大量的用户群体,同时其背后也有独立的商业公司支撑,而这让用户根据自身特点有了更加多样、渐进的选择

问题1:数据存储为何选择ES ?

方案1:HBase 其设计初衷并不是用来做复杂查询,即使可以做到,效率也不高

方案2:MongoDB 团队没有人熟悉MongoDB。运维人员也没有MongoDB的运维经验

方案3:ES ✅

2.2、数据同步方案

1、同步双写

考虑到网货业务刚成立,并没有海量数据,而且有巡检工具比对MySQL与ES数据一致性,因此采用了同步双写方案。

这是一种最为简单的方式,在将数据写到 MySQL 时,同时将数据写到 ES。

在这里插入图片描述

伪代码:见demo1

/*** 新增商品*/
@Transactional(rollbackFor = Exception.class)
public void addGoods(GoodsDto goodsDto) {//1、保存MysqlGoods goods = new Goods();BeanUtils.copyProperties(goodsDto,goods);GoodsMapper.insert();//2、保存ESIndexRequest indexRequest = new IndexRequest("goods_index","_doc");indexRequest.source(JSON.toJSONString(goods), XContentType.JSON);indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);highLevelClient.index(indexRequest);
}

优点

1、业务逻辑简单
2、实时性高

缺点

1、 硬编码,有需要写入MySQL的地方都需要添加写入ES的代码;

2、 业务强耦合;

3、 存在双写失败丢数据风险 (有巡检工具);

4、 性能较差:本来MySQL的性能不是很高,再加一个ES,系统的性能必然会下降。

Tips:上面说的双写失败风险,包括以下几种:

1) ES系统不可用;

2) 程序和 ES 之间的网络故障;

3) 程序重启,导致系统来不及写入ES等。

针对这种情况,有数据强一致性要求的,就必须双写放到事务中来处理,而一旦用上事务,则性能下降更加明显。

2、异步双写(MQ方式)

针对多数据源写入的场景,可以借助MQ实现异步的多源写入,这种情况下各个源的写入逻辑互不干扰,不会由于单个数据源写入异常或缓慢影响其他数据源的写入,虽然整体写入的吞吐量增大了,但是由于MQ消费是异步消费,所以不适合实时业务场景。

在这里插入图片描述

优点

1、性能高

2、不易出现数据丢失问题,主要基于MQ消息的消费保障机制,比如ES宕机或者写入失败,还能重新消费MQ消息。

3、多源写入之间相互隔离,便于扩展更多的数据源写入

缺点

1、硬编码问题,接入新的数据源需要实现新的消费者代码

2、系统复杂度增加:引入了消息中间件

3、可能出现延时问题:MQ是异步消费模型,用户写入的数据不一定可以马上看到,造成延时。

3、基于MySQL表定时扫描同步

上面两种方案中都存在硬编码问题,也就是有任何对MySQL进行增删改查的地方要么植入ES代码,要么替换为MQ代码,代码的侵入性太强

如果对实时性要求不高的情况下,可以考虑用定时器来处理,具体步骤如下:

1、数据库的相关表中增加一个字段为 timestamp 的字段,任何crud操作都会导致该字段的时间发生变化;

2、原来程序中的CURD操作不做任何变化;

3、增加一个定时器程序,让该程序按一定的时间周期扫描指定的表,把该时间段内发生变化的数据提取出来;

4、逐条写入到ES中。

如下图所示:
在这里插入图片描述

该方案的典型实现是借助logstash实现数据同步,其底层实现原理就是根据配置定期使用sql查询新增的数据写入ES中,实现数据的增量同步

具体实现可以参考这篇文章:通过Logstash实现mysql数据定时增量同步到ES

在这里插入图片描述

优点

1、不改变原来代码,没有侵入性、没有硬编码;

2、没有业务强耦合,不改变原来程序的性能;

3、Worker代码编写简单不需要考虑增删改查;

缺点

1、时效性较差,由于是采用定时器根据固定频率查询表来同步数据,尽管将同步周期设置到秒级,也还是会存在一定时间的延迟

2、对数据库有一定的轮询压力,一种改进方法是将轮询放到压力不大的从库上

4、基于Binlog实时同步✅

使用 canal 监听 binlog 变更,配置需要监听的字段,然后投递到MQ中,供业务方使用,ES收到消息后,dump数据

上面三种方案要么有代码侵入,要么有硬编码,要么有延迟,那么有没有一种方案既能保证数据同步的实时性又没有代码侵入呢?当然有,可以利用MySQL的binlog来进行同步。其实现原理如下:

在这里插入图片描述

具体步骤如下

  • 1) 读取MySQL的binlog日志,获取指定表的日志信息;
  • 2) 将读取的信息转为MQ;
  • 3) 编写一个MQ消费程序;
  • 4) 不断消费MQ,每消费完一条消息,将消息写入到ES中。

优点

1、没有代码侵入、没有硬编码;

2、原有系统不需要任何变化,没有感知;

3、性能高;

4、业务解耦,不需要关注原来系统的业务逻辑。

缺点

1、构建 Binlog 系统复杂;

2、如果采用MQ消费解析的binlog信息,也会像方案二一样存在MQ延时的风险。

业界目前较为流行的方案:使用canal监听binlog同步数据到es

这篇文章在2021年9月发表,基于 canal 1.1.4 mysql 5.6.33 elasticsearch6.4.2 比较旧了

3、canal简介

canal官网 https://github.com/alibaba/canal/releases

3.1、canal 简介

canal ,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。说白了就是,根据MySQL的binlog日志进行增量同步数据。要理解canal的原理,就要先了解mysql的主从复制原理:

1、所有的create update delete操作都会进入MySQL master节点

2、master节点会生成binlog文件,每次操作 MySQL 数据库就会记录到binlog文件中

3、slave节点会订阅master节点的binlog文件,以增量备份的形式同步数据到slave数据

canal原理就是伪装成MySQL的从节点,从而订阅master节点的binlog日志,主要流程为:

1、canal服务端向MySQL的master节点传输 dump 协议

2、mysql的master节点接收到dump请求后推送binlog日志给canal服务端,解析binlog对象(原始为byte流)转成Json格式

3、canal客户端通过TCP协议或MQ形式监听canal服务端,同步数据到ES

3.2、使用注意事项

image-20241202162141779

  • 幂等控制

    • 业务系统消费canal数据同步消息时需自行进行幂等控制,避免因MQ重复投递消息导致业务异常
  • 异常处理

    • 业务系统消费canal数据同步消息时需捕获业务处理异常,并记录日志,必要时设置监控报警,不要对外抛出异常,抛出异常也会被捕获,并不会触发MQ消息回滚

    • canal同步任务生效后,需要配置该topic、cid的监控告警项,监控分为两个维度:消息积压量、消息消费延迟,监控告警配置详见文档

      • rocketmq告警规则配置

  • 处理效率

    • sdk拉取一批MQ消息(batchSize条MQ消息)后,需要在60秒内处理完,如无法处理完,MQ会触发回滚。因canal数据同步的顺序性特性,sdk不会跳过该批MQ消息,也不会将其放到队列末尾,而是重新将该批消息推送给MQ消费者,造成死循环消费同一批消息,引起数据同步阻塞。所以业务系统需合理评估batchSize值,确保能在60秒内处理完拉取的MQ消息。对于处理过程较慢的业务系统,建议采用异步方式处理同步数据,可采用CountDownLatch方式或者 MQ队列转发方式 来提升同步消息处理速度。1.1.0-RELEASE版本sdk包可传入参数调整该时间,最长可调整为600秒
    • 理想情况下是否能做到SDK自动感知业务消费处理速度,做背压处理,即在消费迟缓的情况下自动调整batchSize,努力保证消费正常进行,而不是一直重复投递

3.3、canal适用场景

常见场景1:更新缓存

在商品中心用于使用canal监听类目、品牌、店铺、SPU、厂家商品等数据变更,然后更新缓存

通过canal更新缓存

场景2:抓取业务数据新增变化表,用于制作拉链表

如果某店铺发布一个商品,需要将受影响的类目统计起来,然后定时读取通知ES dump 店铺类目索引数据

场景3:抓取业务表的新增变化数据,用于制作实时统计

拿到新增变化商品数据,然后通过canal通知ES同步商品索引数据

3.4、重要版本更新说明

canal 1.1.x 版本(release_note),性能与功能层面有较大的突破,重要提升包括:

  • 整体性能测试&优化,提升了150%. #726 参考: Performance

  • 原生支持prometheus监控 #765 Prometheus QuickStart

  • 原生支持kafka消息投递 #695 Canal Kafka/RocketMQ QuickStart

  • 原生支持aliyun rds的binlog订阅 (解决自动主备切换/oss binlog离线解析) 参考: Aliyun RDS QuickStart

  • 原生支持docker镜像 #801 参考: Docker QuickStart

canal 1.1.4版本,迎来最重要的WebUI能力,引入canal-admin工程,支持面向WebUI的canal动态管理能力,支持配置、任务、日志等在线白屏运维能力,具体文档:Canal Admin Guide

2021年9月 v1.1.6

3.5、canal消息示例

beforeColumns {index: 0sqlType: 4name: "id"isKey: trueupdated: falseisNull: falsevalue: "24"mysqlType: "int"
}
beforeColumns {index: 1sqlType: 12name: "name"isKey: falseupdated: falseisNull: falsevalue: "\345\260\217\346\260\264_v2"mysqlType: "varchar(45)"
}
beforeColumns {index: 2sqlType: 12name: "email"isKey: falseupdated: falseisNull: falsevalue: "110@qq.com"mysqlType: "varchar(45)"
}
beforeColumns {index: 3sqlType: 12name: "gender"isKey: falseupdated: falseisNull: falsevalue: "\347\224\267"mysqlType: "varchar(32)"
}
beforeColumns {index: 4sqlType: 4name: "phone_number"isKey: falseupdated: falseisNull: falsevalue: "110"mysqlType: "int"
}
beforeColumns {index: 5sqlType: 12name: "creator"isKey: falseupdated: falseisNull: falsevalue: "\347\256\241\347\220\206\345\221\2306"mysqlType: "varchar(255)"
}
beforeColumns {index: 6sqlType: 12name: "modifier"isKey: falseupdated: falseisNull: falsevalue: "\347\256\241\347\220\206\345\221\2306"mysqlType: "varchar(255)"
}
beforeColumns {index: 7sqlType: 93name: "create_time"isKey: falseupdated: falseisNull: falsevalue: "2024-10-28 22:50:22"mysqlType: "datetime"
}
beforeColumns {index: 8sqlType: 93name: "update_time"isKey: falseupdated: falseisNull: falsevalue: "2024-10-28 22:50:22"mysqlType: "datetime"
}
afterColumns {index: 0sqlType: 4name: "id"isKey: trueupdated: falseisNull: falsevalue: "24"mysqlType: "int"
}
afterColumns {index: 1sqlType: 12name: "name"isKey: falseupdated: trueisNull: falsevalue: "\345\260\217\346\260\264_v3"mysqlType: "varchar(45)"
}
afterColumns {index: 2sqlType: 12name: "email"isKey: falseupdated: falseisNull: falsevalue: "110@qq.com"mysqlType: "varchar(45)"
}
afterColumns {index: 3sqlType: 12name: "gender"isKey: falseupdated: falseisNull: falsevalue: "\347\224\267"mysqlType: "varchar(32)"
}
afterColumns {index: 4sqlType: 4name: "phone_number"isKey: falseupdated: falseisNull: falsevalue: "110"mysqlType: "int"
}
afterColumns {index: 5sqlType: 12name: "creator"isKey: falseupdated: falseisNull: falsevalue: "\347\256\241\347\220\206\345\221\2306"mysqlType: "varchar(255)"
}
afterColumns {index: 6sqlType: 12name: "modifier"isKey: falseupdated: falseisNull: falsevalue: "\347\256\241\347\220\206\345\221\2306"mysqlType: "varchar(255)"
}
afterColumns {index: 7sqlType: 93name: "create_time"isKey: falseupdated: falseisNull: falsevalue: "2024-10-28 22:50:22"mysqlType: "datetime"
}
afterColumns {index: 8sqlType: 93name: "update_time"isKey: falseupdated: falseisNull: falsevalue: "2024-10-28 22:50:22"mysqlType: "datetime"
}

问题1:如何判断canal服务端与MySQL连接是否成功?

instance.properties 为与mysql连接相关配置

看example.log文件,看连接是否成功

2024-11-17 22:51:00.502 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2024-11-17 22:51:00.541 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position binlog.000013:4:1731854433000
2024-11-17 22:51:01.042 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=binlog.000013,position=4,serverId=1,gtid=<null>,timestamp=1731854433000] cost : 526ms , the next step is binlog dump

问题2:如何判断canal客户端与canal服务端连接是否成功?

canal.properties 为与canal 服务端连接相关配置

看canal.log文件,是否有异常信息

问题3:ES时间相比MySQL时间早8个小时?

ES 使用的UTC 时间,而MySQL使用 UTC + 8

可以暂不处理,知道有这个问题即可。

问题4:打开MySQL binlog后,对性能有什么影响?

打开binlog不会影响查询性能,只会影响写入性能(insert,update,delete),一般情况下,在读写均衡的数据库中,开启binlog后对性能影响不超过10%

如何打开binlog配置?

log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式,虽然Canal支持各种模式,但是想用otter,必须用ROW模式
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复

4、货运平台ES数据同步实战

4.1、系统架构图

image-20241202234847224

4.2、业务架构图

image-20241202235704371

场景1:商品搜索/dump 使用ElasticSearch 重点

背景:之前在业务中通过顺序MQ发送消息给ES,在业务高峰期时存在消息堆积问题,使用canal监听MySQL binlog变更,然后同步给ES

代码见Spring-demos 270-demo11 todo

场景2:网货-货源大厅支持 ElasticSearch 重点

背景:需要实现货源大厅滚动查询搜索需求,支持按与送箱点的距离正序排列

可以使用 geo_distance 排序来根据与指定坐标的距离对结果进行排序。建立索引时需要使用啥数据类型?

见Spring-demos 270-demo8

  • UTC时间确认,判断是否多了8小时

4.3、表结构设计

承运人表

# 账户表,主要用来记录角色信息,账户余额,冻结余额,目前共5条数据,两承运人(保存了经纬度),一个货主,一平台,一个监管机构
CREATE TABLE `account` (`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',`name` varchar(16) NOT NULL COMMENT '名称',`account_no` varchar(64) NOT NULL COMMENT '账户编号',`role` tinyint DEFAULT NULL COMMENT '1-承运人 2-货主 3-平台 4-监管',`balance` bigint NOT NULL DEFAULT '0' COMMENT '余额',`plate_no` varchar(64) DEFAULT NULL COMMENT '车牌号 沪A98765挂',`plate_color` varchar(32) DEFAULT NULL COMMENT '车牌颜色 1黄牌 2绿牌 3黄绿牌',`vehicle_type` varchar(32) DEFAULT NULL COMMENT '车辆类型 1小型轿车 2轻型箱式货车 101重型低平板半挂车',`owner` varchar(64) DEFAULT NULL COMMENT '车辆所有人',`register_date` date DEFAULT NULL COMMENT '行驶证注册日期',`lon` decimal(18,8) DEFAULT NULL COMMENT '经度',`lat` decimal(18,8) DEFAULT NULL COMMENT '纬度',`is_deleted` tinyint unsigned NOT NULL DEFAULT '0' COMMENT '0-正常 1-删除',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb3 COMMENT='司机信息表';

运单表

# 记录运单详情,运单状态及对应时间,点位经纬度信息,承运人信息,货主信息等
CREATE TABLE `order_waybill` (`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键id',`waybill_no` bigint unsigned NOT NULL COMMENT '运单编号',`goods_name` varchar(128) NOT NULL COMMENT '货物名称',`goods_weight` decimal(10,3) DEFAULT NULL COMMENT '货物毛重 单位吨',`waybill_state` int unsigned NOT NULL DEFAULT '0' COMMENT '运单状态 0已创建 10已发布 20已接单 40已送达 60已完成 99已取消',`waybill_audit_state` int unsigned NOT NULL DEFAULT '0' COMMENT '运单审核状态 0待审核 1已审核',`transport_type` tinyint unsigned NOT NULL COMMENT '运单类型 10门到门 11短驳',`container_type` varchar(20) NOT NULL COMMENT '箱型',`make_time` datetime NOT NULL COMMENT '做箱时间',`loading_addr` varchar(255) NOT NULL COMMENT '装货点地址',`loading_lon` decimal(18,8) DEFAULT NULL COMMENT '装货点经度',`loading_lat` decimal(18,8) DEFAULT NULL COMMENT '装货点纬度',`receipt_addr` varchar(255) NOT NULL COMMENT '收货点地址',`price` decimal(18,2) NOT NULL COMMENT '外发价格,结给司机的运费,单位元',`consigner_addr` varchar(64) DEFAULT NULL COMMENT '货主地址',`consigner_name` varchar(128) NOT NULL COMMENT '托运人,个人时为姓名,企业时为公司名称',`driver_addr` varchar(64) DEFAULT NULL COMMENT '司机地址',`driver_name` varchar(64) DEFAULT NULL COMMENT '司机姓名',`plate_no` varchar(64) DEFAULT NULL COMMENT '车辆车牌号',`plate_color` varchar(32) DEFAULT NULL COMMENT '车辆车牌颜色 1黄牌 2绿牌 3黄绿牌',`publish_time` datetime DEFAULT NULL COMMENT '发单时间',`confirm_time` datetime DEFAULT NULL COMMENT '接单时间',`receipt_time` datetime DEFAULT NULL COMMENT '收货时间',`need_data_sync` tinyint DEFAULT '0' COMMENT '是否需要数据同步 0-不需要 1-需要',`create_user` bigint unsigned NOT NULL COMMENT '创建人',`create_user_name` varchar(128) NOT NULL DEFAULT '' COMMENT '创建人姓名',`update_user` bigint unsigned DEFAULT NULL COMMENT '更新人',`update_user_name` varchar(128) DEFAULT '' COMMENT '更新人姓名',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`),UNIQUE KEY `waybill_no` (`waybill_no`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb3 COMMENT='运单主表';

运单ES mapping

// 运单mapping,使用geo_point标识地理信息
{"settings": {"index": {"search": {"slowlog": {"threshold": {"fetch": {"warn": "3s"},"query": {"warn": "3s"}}}},"refresh_interval": "1s","number_of_shards": "5","analysis": {"analyzer": {"fuzzy_match_analyzer": {"max_token_length": "1","type": "standard"}}},"number_of_replicas": "1"}},"mappings": {"dynamic": "false","properties": {"id": {"type": "long"},"waybillNo": {"type": "long"},"goodsName": {"search_analyzer": "ik_smart","analyzer": "ik_max_word","type": "text"},"goodsWeight": {"type": "float"},"waybillState": {"type": "integer"},"waybillAuditState": {"type": "integer"},"transportType": {"type": "integer"},"containerType": {"type": "keyword"},"makeTime": {"type": "date"},"loadingAddr": {"type": "text"},"loadingLocation": {"type": "geo_point"},"receiptAddr": {"type": "text"},"price": {"type": "float"},"consignerAddr": {"type": "text"},"consignerName": {"type": "text"},"driverAddr": {"type": "text"},"driverName": {"type": "text"},"plateNo": {"type": "keyword"},"plateColor": {"type": "keyword"},"publishTime": {"type": "date"},"confirmTime": {"type": "date"},"receiptTime": {"type": "date"},"createUser": {"type": "long"},"createUserName": {"type": "text"},"updateUser": {"type": "long"},"updateUserName": {"type": "text"},"createTime": {"type": "date"},"updateTime": {"type": "date"}}},"aliases": {"order-waybill-read-v1": {},"order-waybill-write-v1": {}}
}

4.4、项目涉及到的技术点

  1. 基础环境 jdk11
  2. MySQL 9.0
  3. Canal.deployer-1.1.4
  4. RocketMQ-all-5.2.0-bin-release
  5. ElasticSearch-7.12
  6. SpringBoot2.6 集成 ElasticSearch 做CRUD
  7. SpringBoot2.6 集成 ElasticSearch 做复杂查询

4.5、核心功能

承运人表补充经纬度字段

第一步:货源大厅功能开发,并实现db分页查询

  • 对货主发布货源按重量,价格,距离,时间进行分页查询

货主1创建运单

  • 0xfd917601078bb946bacba9a46f66012b9b4a7321 运单id 1002 价格 500
  • 提箱点:北京市朝阳区望京东路6号望京国际研发园3期h座3层

平台审核运单

  • 0xcf33a0eef94c244b2d926d3dbba2666b51332b75 审核运单1002

货源大厅

在这里插入图片描述

  • 支持运单类型查询、按货重范围查询、箱型、做箱时间范围查询
  • 支持按发布时间、价格、距离排序

司机1接单:

  • 司机1 0x0cfd803ea8323207e02479bdf8c64a20eab48ae2 接单 1002

司机1 配送运单,运单已送达

货主1 将审核运单,状态变为已完成 0xfd917601078bb946bacba9a46f66012b9b4a7321 货主1

第二步:货源大厅接入ES,通过ES实现分页查询

  • 业务逻辑实现(geo_distance),并完成单元测试

在创建运单和更新运单时,通过同步双写的方式写入ES
查询货源大厅时,构建地理距离、按状态和审核状态过滤、按货物重量范围过滤、容器类型过滤、按做箱时间范围过滤,并构建排序规则

Demo如下

NativeSearchQueryBuilder searchQuery = new NativeSearchQueryBuilder();
// 构建地理距离查询
GeoDistanceQueryBuilder geoDistanceQueryBuilder = QueryBuilders.geoDistanceQuery("loadingLocation").point(param.getLat().doubleValue(), param.getLon().doubleValue()).distance(10000, DistanceUnit.KILOMETERS) // 可以设置查询范围.geoDistance(GeoDistance.ARC);
searchQuery.withQuery(geoDistanceQueryBuilder);// 状态和审核状态过滤
BoolQueryBuilder mustBuilder = QueryBuilders.boolQuery().must(QueryBuilders.termQuery("waybillState", WaybillStateEnum.PUBLISHED.getCode())).must(QueryBuilders.termQuery("waybillAuditState", OrderContractService.OrderAuditStatus.finish.getCode()));
searchQuery.withFilter(mustBuilder);// 货物重量范围过滤
BoolQueryBuilder weightQueryBuilder = QueryBuilders.boolQuery();
if (param.getGoodsWeightList() != null && !param.getGoodsWeightList().isEmpty()) {for (CarrierWaybillSourceListParam.GoodsWeight goodsWeight : param.getGoodsWeightList()) {weightQueryBuilder.should(QueryBuilders.rangeQuery("goodsWeight").gte(goodsWeight.getGoodsWeightStart()).lte(goodsWeight.getGoodsWeightEnd()));}searchQuery.withFilter(weightQueryBuilder);
}// 容器类型过滤
if (param.getContainerType() != null && !param.getContainerType().isEmpty()) {searchQuery.withFilter(QueryBuilders.termQuery("containerType", param.getContainerType()));
}// 做箱时间范围过滤
if (param.getMakeTimeStart() != null && param.getMakeTimeEnd() != null) {searchQuery.withFilter(QueryBuilders.rangeQuery("makeTime").gte(param.getMakeTimeStart()).lte(param.getMakeTimeEnd()));
}// 排序
SortBuilder<?> sortBuilder;
SortOrder sortOrder = "asc".equals(param.getOrderType()) ? SortOrder.ASC : SortOrder.DESC;
if ("price".equals(param.getSortType())) {sortBuilder = SortBuilders.fieldSort("price").order(sortOrder);
} else if ("distance".equals(param.getSortType())) {sortBuilder = SortBuilders.geoDistanceSort("loadingLocation", param.getLat().doubleValue(), param.getLon().doubleValue()).order(sortOrder).unit(DistanceUnit.KILOMETERS).geoDistance(GeoDistance.ARC);
} else {// 默认按publish_time排序sortBuilder = SortBuilders.fieldSort("publishTime").order(sortOrder);
}
searchQuery.withSort(sortBuilder);// 创建查询
Pageable pageable = PageRequest.of(currentPage != 0 ? currentPage - 1 : 0, pageSize);
searchQuery.withPageable(pageable);
// 承运做箱时间排序
SearchHits<OrderWaybillES> searchHits = elasticsearchRestTemplate.search(searchQuery.build(), OrderWaybillES.class);

第三步:接入canal,实现db数据变更联动ES数据更新

  • 如果db数据变更,将数据同步到ES order-waybill-v1索引

canal服务器 与mysql连接相关规则配置

Instance.properties

  • ## mysql serverId
    ## 这个id不能和目标源数据库的id一样
    canal.instance.mysql.slaveId = 1234# 数据库地址,binlog订阅开始点
    canal.instance.master.address=127.0.0.1:3306
    canal.instance.master.journal.name = mysql-binlog.000005
    canal.instance.master.position = 126596922
    canal.instance.master.timestamp = # 配置备用源数据库
    #canal.instance.standby.address = 
    #canal.instance.standby.journal.name = 
    #canal.instance.standby.position = 
    #canal.instance.standby.timestamp = # username/password
    canal.instance.dbUsername = canal
    canal.instance.dbPassword = canal
    canal.instance.defaultDatabaseName =
    canal.instance.connectionCharset = UTF-8# table regex 订阅哪些表的binlog,支持正则表达式
    # canal.instance.filter.regex=.*\\..*
    canal.instance.filter.regex=db_test.order_waybill
    # table black regex
    # canal.instance.filter.black.regex=
    canal.instance.filter.field=db_test.order_waybill:id/waybill_no/goods_name/waybill_state/waybill_audit_state/make_time/loading_addr/loading_lon/loading_lat/receipt_addr/price/consigner_addr/consigner_name/driver_addr/plate_no/publish_time/confirm_time/receipt_time
    

canal客户端与服务端连接相关规则配置

canal.properties

  • canal.id= 1000001
    canal.ip= 127.0.0.1
    canal.port= 11111
    # canal通过zk做负载均衡
    canal.zkServers=
    # flush data to zk
    canal.zookeeper.flush.period = 1000
    # flush meta cursor/parse position to file
    canal.file.data.dir = ${canal.conf.dir}
    canal.file.flush.period = 1000
    ## memory store RingBuffer size, should be Math.pow(2,n)
    canal.instance.memory.buffer.size = 16384
    ## memory store RingBuffer used memory unit size , default 1kb
    canal.instance.memory.buffer.memunit = 1024 
    ## meory store gets mode used MEMSIZE or ITEMSIZE
    canal.instance.memory.batch.mode = MEMSIZE## detecing config
    canal.instance.detecting.enable = false
    #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
    canal.instance.detecting.sql = select 1
    canal.instance.detecting.interval.time = 3
    canal.instance.detecting.retry.threshold = 3
    canal.instance.detecting.heartbeatHaEnable = false# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
    canal.instance.transaction.size =  1024
    # mysql fallback connected to new master should fallback times
    canal.instance.fallbackIntervalInSeconds = 60# network config
    canal.instance.network.receiveBufferSize = 16384
    canal.instance.network.sendBufferSize = 16384
    canal.instance.network.soTimeout = 30# binlog filter config
    canal.instance.filter.query.dcl = false
    canal.instance.filter.query.dml = false
    canal.instance.filter.query.ddl = false
    canal.instance.filter.table.error = false# binlog format/image check
    canal.instance.binlog.format = ROW,STATEMENT,MIXED 
    canal.instance.binlog.image = FULL,MINIMAL,NOBLOB# binlog ddl isolation
    canal.instance.get.ddl.isolation = false#################################################
    ######### destinations ############# 
    #################################################
    canal.destinations= example
    # conf root dir
    canal.conf.dir = ../conf
    # auto scan instance dir add/remove and start/stop instance
    canal.auto.scan = true
    canal.auto.scan.interval = 5canal.instance.global.mode = spring 
    canal.instance.global.lazy = false
    #canal.instance.global.manager.address = 127.0.0.1:1099
    #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
    canal.instance.global.spring.xml = classpath:spring/file-instance.xml
    #canal.instance.global.spring.xml = classpath:spring/default-instance.xml
    

创建运单通过canal同步给ES

变更运单通过canal同步给ES

删除运单通过canal同步给ES

采用拉模式,定时任务默认每秒执行一次

// canal数据同步demo
@Scheduled(fixedDelay = 1) //每隔1秒执行@Overridepublic void run() {long batchId = -1;int batchSize = 1000;try {Message message = canalConnector.getWithoutAck(batchSize);//批次idbatchId = message.getId();List<CanalEntry.Entry> entries = message.getEntries();if (batchId != -1 && !entries.isEmpty()) {entries.forEach(entry -> {//MySQL种my.cnf中配置的是binlog_format = ROW,这里只解析ROW类型if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {//解析处理 这里entry已经是mysql中的一行数据publishCanalEvent(entry);}});}canalConnector.ack(batchId);} catch (CanalClientException e) {log.warn("canal读取数据失败", e);canalConnector.rollback(batchId);}}private void publishCanalEvent(CanalEntry.Entry entry) {//表名String tableName = entry.getHeader().getTableName();//数据库名String database = entry.getHeader().getSchemaName();CanalEntry.RowChange rowChange = null;try {rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (InvalidProtocolBufferException e) {log.warn("解析binlog数据失败", e);return;}//不是目标库中的,不处理if (!StringUtils.equals(DB_NAME, database)) {return;}if (!StringUtils.equals(TABLE_NAME, tableName)) {log.warn("不是目标表中的数据暂不处理");return;}// 仅处理增加/修改/删除事件CanalEntry.EventType eventType = rowChange.getEventType();rowChange.getRowDatasList().forEach(rowData -> {switch (eventType) {case INSERT:handleInsert(rowData, tableName, database);break;case UPDATE:handleUpdate(rowData, tableName, database);break;case DELETE:handleDelete(rowData, tableName, database);break;default:System.out.println("Unsupported event type: " + eventType);}});}
}

4.6、项目代码及测试用例

开发时间:开始开发:1111 结束开发:1122 系统演示:1122

  • 实现货源大厅功能开发,并实现db分页查询

    • 功能拓展:

      • 1、按月查询账户流水 ✅
      • 2、对货主发布货源按重量,价格,距离,时间进行分页查询 ✅
      • 3、额外有时间:使用websocket给司机推送运单
        • 实现将距离小于10公里的运单推荐给司机,站内信功能

      表结构设计,运单表,承运人表需要加字段

  • 实现货源大厅接入ES,通过ES实现分页查询

    • es mapping设计,业务逻辑实现(geo_distance),并完成单元测试 ✅

      采用同步的方式接入ES ✅

  • 接入canal,实现db数据变更联动ES数据更新

    • 将canal运行起来,异步接收消息,变更数据 ✅

      用在网货业务中,如果db数据变更,将数据同步到ES order-waybill-v1索引 ✅

  • 执行测试用例,并演示系统

效果:提供rest接口并演示整体功能 (不提供页面)

见这个项目 https://gitee.com/qiwenjie1993/spring-demos

5、数据迁移同步工具选型

数据迁移同步工具的选择比较多样,下表仅从 MySQL 同步 ES 这个场景下,对一些研究过的数据同步工具进行对比,用户可以根据自己的实际需要选取适合自己的产品。

技术选型:政采云使用的阿里云DTS,对于品牌数据的迁移,使用的循环读取MySQL数据,然后调用ES bulk APi 插入数据,白龙马使用定时任务吧

特性\产品CanalDTSCloudCanal
是否支持自建ES
ES对端版本支持丰富度中 支持ES6和ES7高 支持ES5,ES6和ES7中 支持ES6和ES7
嵌套类型支持join/nested/objectobjectnested/object
join支持方式基于join父子文档&反查基于宽表预构建&反查
是否支持结构迁移
是否支持全量迁移
是否支持增量迁移
数据过滤能力中 仅全量可添加where条件高 全增量阶段where条件高 全增量阶段where条件
是否支持时区转换
同步限流能力
任务编辑能力
数据源支持丰富度
架构模式订阅消费模式 需先写入消息队列直连模式直连模式
监控指标丰富度中 性能指标监控中 性能指标监控高 性能指标、资源指标监控
报警能力针对延迟、异常的电话报警针对延迟、异常的钉钉、短信、邮件报警
任务可视化创建&配置&管理能力
是否开源
是否免费是 社区版、 SAAS版免费
是否支持独立输出否 依赖云平台整体输出
是否支持SAAS化使用

历史数据迁移方案

  • 使用了定时任务 + canal字段过滤规则配置
  • 将所有的历史数据加上标识 needSyncData=true,定时任务程序就会自动处理了。 并且canal只会关注过滤字段的变更,不会重复处理变更

问题1:Elasticsearch如何修改表结构?

如果想增加新的字段,Elasticsearch可以支持直接添加,

但如果想修改字段类型或者改名,ES是不支持的,修改字段的类型会导致索引失效

如果想修改字段的映射,首先需要新建一个索引,然后使用Elasticsearch的reindex功能将旧索引复制到新索引中。

直接重命名字段时,使用reindex功能会导致原来保存的旧字段名的索引数据失效,这种情况该如何解决?可以使用alias索引功能

在这里插入图片描述

不建议直接修改Elasticsearch的表结构

那如果确实有修改的需求呢?一般而言,会先保留旧的字段,然后直接添加并使用新的字段,直到新版本的代码全部稳定运行后,再找机会清理旧的不用的字段,即分成两个版本完成修改需求。

问题2:Elasticsearch的准实时性?

数据索引的整个过程涉及Elasticsearch的Shard(分片),以及Lucene Index、Segment、Document三者之间的关系等知识点

Elasticsearch的一个Shard(Elasticsearch分片的具体介绍可参考官方文档)就是一个Lucene Index(索引),每一个Lucene Index由多个Segment(段)构成,即Lucene Index的子集就是Segment,如图2-9所示。

shard(lucene index)–》segment --》Document(一条记录),Shard收到写请求时,请求会被写入Translog中,然后Document被存放在MemoeyBuffer(内存缓冲区)中,最终Translog保存所有修改记录。每隔一秒,Refresh操作被执行一次,且MemoryBuffer中的数据会被写入一个Segment,并存放在File System Cache(文件系统缓存)中,这时新的数据就可以被搜索到了。即1秒延时

  • 解法:提示用户查询的数据会有一定延时即可。

问题3:Elasticsearch可能丢数据?

Memory Buffer中的数据会被写入Segment中,此时这部分数据可被用户搜索到,但没有持久化,一旦系统宕机,数据就会丢失。解法:使用Lucene中的Commit操作

将 index.translog.durability 设置为fsync,每次Elasticsearch宕机启动后,先将主数据和Elasticsearch数据进行对比,再将Elasticsearch 缺失的数据找出来。

问题4:Elasticsearch分页查询深分页问题?

Elasticsearch的读操作流程主要分为两个阶段:Query Phase、Fetch Phase。

1)Query Phase:协调的节点先把请求分发到所有分片,然后每个分片在本地查询后建一个结果集队列,并将命令中的Document ID以及搜索分数存放在队列中,再返回给协调节点,最后协调节点会建一个全局队列,归并收到的所有结果集并进行全局排序

2)Fetch Phase:协调节点先根据结果集里的Document ID向所有分片获取完整的Document,然后所有分片返回完整的Document给协调节点,最后协调节点将结果返回给客户端。

最大结果数量为10000

在Elasticsearch查询过程中,如果search方法带有from和size参数,Elasticsearch集群需要给协调节点返回分片数*(from+size)条数据,然后在单机上进行排序,最后给客户端返回size大小的数据。如果用户确实有深度翻页的需求,使用Elasticsearch中search_after的功能也能解决,只是无法实现跳页了

在这里插入图片描述

这个search_after里的值,就是上次查询结果排序字段的结果值。

示例代码见Spring-Demos270的demo9

6、总结

本文主要对Mysql和ES进行数据同步的常见方案进行了汇总说明。

  • 同步双写是最简单的同步方式,能最大程度保证数据同步写入的实时性,最大的问题是代码侵入性太强。
  • 异步双写引入了消息中间件,由于MQ都是异步消费模型,所以可能出现数据同步延迟的问题。好处是在大规模消息同步时吞吐量更、高性能更好,便于接入更多的数据源,且各个数据源数据消费写入相互隔离互不影响。
  • 基于Mysql表定时扫描同步 ,原理是通过定时器定时扫描表中的增量数据进行数据同步,不会产生代码侵入,但由于是定时扫描同步,所以也会存在数据同步延迟问题,典型实现是采用 Logstash 实现增量同步。
  • 基于Binlog实时同步 ,原理是通过监听Mysql的binlog日志进行增量同步数据。不会产生代码侵入,数据同步的实时也能得到保障,弊端是Binlog系统都较为复杂。典型实现是采用 canal 实现数据同步。

该canal同步ES方案局限性

1)使用Elasticsearch存储查询数据时,就要接受上面列出的一些局限性:有一定延时,深度分页不能自由跳页,会有丢数据的可能性

2)主数据量越来越大后,写操作还是慢,到时还是会出问题。比如这里的工单数据,虽然已经去掉了所有外键,但是当数据量上亿的时候,插入还是会有问题。

3)主数据和ES数据不一致时,如果业务逻辑需要ES数据保持一致性呢?这里的ES数据同步到最新数据会有一定的延时,大约为2秒。某些业务场景下用户可能无法接受这个延时,特别是跟钱有关的场景。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com