欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 文化 > Apache Seata 新版本集成了 RocketMQ 事务消息

Apache Seata 新版本集成了 RocketMQ 事务消息

2024/10/27 5:56:17 来源:https://blog.csdn.net/zjj2006/article/details/143194624  浏览:    关键词:Apache Seata 新版本集成了 RocketMQ 事务消息

大家好,我是君哥。

Apache Seata 是一款高性能、简单易用的分布式事务中间件,它包含 AT、TCC、SAGA 和 XA 四种模式。

在最近发布的新版本中,Apache Seata 引入了 RocketMQ 中间件,并且跟 RocketMQ 的事务消息配合使用。今天我们来聊一聊这个话题。

Seata 两阶段提交

Seata 的分布式事务解决方案使用两阶段提交,以最流行的 TCC 模式为例,Seata 中设计了 3 个主要的角色:

  • TM:管理全局事务,包括开启全局事务,提交/回滚全局事务;

  • RM:管理分支事务;

  • TC: 事务协调器,管理全局事务和分支事务的状态。

下图是一个电商场景的分布式事务场景,包括订单、库存和账户三个服务。TM 开启全局事务后,在第一阶段,订单服务、库存服务和账户服务分别作为一个 RM 向 TC 注册分支事务,并且向 TC 上报分支事务执行状态。如果三个分支事务都执行成功,TM 会给 TC 发送 commit 执行,否则发送 rollback 指令。在第二阶段,TC 会根据 TM 的指令,决定分支事务 commit 或者 rollback。

RocketMQ 事务消息

作为分布式事务的解决方案,RocketMQ 的事务消息也可以供我们选择。RocketMQ 的事务消息,主要是保证了本地方法执行和消息发送在一个分布式事务中,要不全部成功,要不全部失败。见下图:

这里给出一个账户服务和库存服务在一个分布式事务中的例子,本地方法执行扣减账户金额,RocketMQ 消费者消费账户服务发送的消息后执行扣减库存。RocketMQ 通过发送 half 消息来实现,下面详细说明一下:

  1. 账户服务向 Broker 发送一条 half 消息;

  2. half 消息发送成功后,账户服务执行本地事务;

  3. 如果账户服务执行本地事务成功,则向 Broker 发送 commit 请求,否则发送 rollback 请求;

  4. 如果 Broker 收到的是 rollback 请求,则删除保存的 half 消息;

  5. 如果 Broker 收到的是 commit 请求,则把 half 消息投递到真实的队列等待库存服务来拉取,然后删除保存的 half 消息;

  6. 如果 Broker 没有收到请求,则会发送请求到 Producer 查询本地事务状态,然后根据 Producer 返回的本地状态做 commit/rollback 相关处理。

Seata 的改进

Seata 的改进主要在 prepare 阶段。Seata 提供了一个 SeataMQProducer 类,把 RocketMQ 中 TransactionListener 的方法加入到全局事务。见下面代码:

SeataMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {super(namespace, producerGroup, rpcHook);this.transactionListener = new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {return LocalTransactionState.UNKNOW;}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {String xid = msg.getProperty(PROPERTY_SEATA_XID);if (StringUtils.isBlank(xid)) {LOGGER.error("msg has no xid, msgTransactionId: {}, msg will be rollback", msg.getTransactionId());return LocalTransactionState.ROLLBACK_MESSAGE;}GlobalStatus globalStatus = DefaultResourceManager.get().getGlobalStatus(SeataMQProducerFactory.ROCKET_BRANCH_TYPE, xid);if (COMMIT_STATUSES.contains(globalStatus)) {return LocalTransactionState.COMMIT_MESSAGE;} else if (ROLLBACK_STATUSES.contains(globalStatus) || GlobalStatus.isOnePhaseTimeout(globalStatus)) {return LocalTransactionState.ROLLBACK_MESSAGE;} else if (GlobalStatus.Finished.equals(globalStatus)) {LOGGER.error("global transaction finished, msg will be rollback, xid: {}", xid);return LocalTransactionState.ROLLBACK_MESSAGE;}return LocalTransactionState.UNKNOW;}};
}

在 prepare 阶段,SeataMQProducer 向 RocketMQ Broker 发送 half 消息,执行本地事务,如果执行成功,则强行把 LocalTransactionState 改回 UNKNOW,等待 TC 发送指令,决定是 commit 或 rollback。如果执行失败,返回 ROLLBACK_MESSAGE,TC 下发指令,回滚全局事务。

需要注意的是,Broker 收到 half 消息后,如果没有收到 commit/rollback 消息,则会回查 Producer 本地事务状态,也就是上面代码中的 checkLocalTransaction。这里不是检查分支事务状态,而是检查全局事务状态,使用 XID 去查询。

集成 RocketMQ 之后,Seata 的分布式事务调用流程如下图,这里以 订单服务、库存服务两个服务为例:

总结

Apache Seata 引入 RocketMQ 后,支持的分布式事务场景更加丰富,MQ 的异步特性也让事务模型整体性能提升。希望本文对你理解 Seata 中 RocketMQ 的使用有所帮助。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com