欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 维修 > RabbitMQ和Seata冲突吗?Seata与Spring中的事务管理冲突吗

RabbitMQ和Seata冲突吗?Seata与Spring中的事务管理冲突吗

2025/4/26 9:50:07 来源:https://blog.csdn.net/m0_73889530/article/details/147373331  浏览:    关键词:RabbitMQ和Seata冲突吗?Seata与Spring中的事务管理冲突吗

1. @GlobalTransactional@Transactional 是否冲突?

答:不冲突,它们可以协同工作,但作用域不同。

  • @Transactional: 这是 Spring 提供的注解,用于管理单个数据源内的本地事务。在你当前的 register 方法中,它确保了 userRepository.save(user) 操作要么成功提交到 auth_service_new 的数据库 (mall_auth_new),要么在发生异常时回滚(比如数据库连接失败、约束冲突等)。它只关心本服务内的数据库操作原子性。
  • @GlobalTransactional: 这是 Seata 提供的注解,用于开启一个分布式全局事务。它的目的是协调跨多个服务、多个数据源的操作,保证这些操作要么全部成功,要么全部回滚。Seata 的 AT 模式(你目前可能使用的模式,因其最简单)通过代理数据源 (DataSourceProxy) 自动记录 SQL 执行前后的镜像,并在需要时生成反向 SQL 来实现回滚。

如何协同? Seata AT 模式下的分支事务实际上是基于本地事务的。当 @GlobalTransactional 存在时,Seata 会拦截 @Transactional 管理的本地事务的提交/回滚。

  • 本地事务提交时:Seata RM (Resource Manager) 会先向 TC (Transaction Coordinator) 注册分支,报告本地事务执行成功(Phase 1),但不会立即真正提交物理数据库连接。它会等待 TC 的统一指令。
  • 全局事务提交时:TC 通知所有 RM 提交分支事务(Phase 2),这时 RM 才提交本地事务对应的物理数据库连接。
  • 全局事务回滚时:TC 通知所有 RM 回滚分支事务(Phase 2),RM 会根据之前记录的 Undo Log 生成反向 SQL 来回滚本地数据库的更改。

结论:同时使用两者是常见且必要的。@Transactional 保证本地操作的原子性,而 @GlobalTransactional 则将这种原子性扩展到分布式环境下的多个参与者。

2. Seata (AT 模式) 和 RabbitMQ 是否冲突?

答:冲突!在期望跨服务数据库原子性的场景下,同步调用 Seata AT 模式和异步发送 RabbitMQ 消息是矛盾的。

  • Seata AT 模式的局限性: Seata AT 模式主要设计用于同步调用场景下的数据库操作。它无法管理消息队列(如 RabbitMQ)的操作。也就是说,Seata 不能

    • 保证消息发送成功后,如果后续全局事务需要回滚,能把消息“撤回”。
    • 保证消息被消费者成功处理后,如果全局事务需要回滚,能让消费者的操作也回滚。
  • 你当前代码的问题:

    1. AuthServiceImpl.register 方法在 @GlobalTransactional 内执行 userRepository.save(user)。这个操作被 Seata 纳入了全局事务分支。
    2. 紧接着,它调用 messageService.sendUserCreatedEvent(savedUser) 发送 RabbitMQ 消息。这个发送操作本身不受 Seata 全局事务的管理
    3. user_moudle 中的 UserEventListener异步地消费这个消息,并执行 userService.createUserFromEvent 来写入 user_moudle 的数据库 (mall_users)。这个数据库写入操作也不在 AuthServiceImpl.register 发起的那个 Seata 全局事务的范围内。
  • 后果:

    • 如果在发送消息之后AuthServiceImpl.register 方法内部(或其调用的其他同步下游服务)发生了需要全局回滚的异常,auth_service_new 数据库的 User 记录会被 Seata 回滚,但 RabbitMQ 消息已经发出去了user_moudle 仍然会收到消息并尝试创建用户,导致数据不一致(user_moudle 有用户,auth_service_new 没有)。
    • 如果在 user_moudle 消费消息并写入数据库时失败auth_service_new 的事务早已提交(因为消息是异步的),也无法回滚,同样导致数据不一致。

结论:如果你希望 auth_service_new 写入 auth_user 表 和 user_moudle 写入 ums_user 表这两个数据库操作具有原子性(要么都成功,要么都失败),那么在 @GlobalTransactional 方法内部使用 RabbitMQ 进行跨服务通信是错误的设计。

3. 应该怎么做?

为了实现 auth_service_newuser_moudle 在用户注册时的数据库写入原子性,最佳实践是使用同步调用,让 user_moudle 的数据库操作也成为 Seata 全局事务的一个分支。

修改步骤:

  1. auth_service_new 中定义 Feign 客户端调用 user_moudle:

    • 创建一个接口,例如 UserModuleClient.java:
    package com.mall.auth.client;import com.mall.auth.dto.UserSyncDTO; // 需要创建一个简单的DTO传递必要信息
    import org.springframework.cloud.openfeign.FeignClient;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestBody;// name 指向 user_moudle 的服务名 (spring.application.name)
    @FeignClient(name = "userservice", path = "/api/internal/users") // 使用内部API路径
    public interface UserModuleClient {@PostMapping("/sync-create")void syncCreateUser(@RequestBody UserSyncDTO userSyncDTO);
    }
    
    • 创建 UserSyncDTO.java (可以简化 RegisterRequestUserCreatedEvent 的字段):
    package com.mall.auth.dto;import lombok.Builder;
    import lombok.Data;@Data
    @Builder
    public class UserSyncDTO {private Long authUserId;private String username;private String email;private String phone;// 注意:不需要传递密码,user_moudle 只存占位符
    }
    
  2. 修改 AuthServiceImpl.register 方法:

    • 移除 messageService.sendUserCreatedEvent 调用。
    • 注入并使用 UserModuleClient 进行同步调用。
    // ... 其他注入 ...
    import com.mall.auth.client.UserModuleClient;
    import com.mall.auth.dto.UserSyncDTO;
    // ...@Service
    public class AuthServiceImpl implements AuthService {// ... 其他字段和构造函数 ...private final UserModuleClient userModuleClient;public AuthServiceImpl(// ... 其他参数 ...UserModuleClient userModuleClient, // 添加注入MessageService messageService) { // MessageService 仍然可以注入,但注册时不在此调用// ... 其他赋值 ...this.userModuleClient = userModuleClient;this.messageService = messageService; // 保留注入}@GlobalTransactional(name = "user-register-tx", rollbackFor = Exception.class)@Override@Transactional // 本地事务仍然需要public User register(RegisterRequest registerRequest) {log.info("开始用户注册流程 (同步事务): {}", registerRequest.getUsername());// ... (省略之前的检查逻辑) ...// 创建新用户User user = User.builder()// ... (省略属性设置) ....build();// 1. 保存到 auth_service_new 数据库 (参与 Seata 分支事务)User savedUser = userRepository.save(user);log.info("AuthService: 用户基础信息保存成功: {}", savedUser.getUsername());// 2. 同步调用 user_moudle 保存用户信息 (参与 Seata 分支事务)try {UserSyncDTO syncDTO = UserSyncDTO.builder().authUserId(savedUser.getId()).username(savedUser.getUsername()).email(savedUser.getEmail()).phone(savedUser.getPhone()).build();log.info("AuthService: 准备同步调用 UserModule 创建用户...");userModuleClient.syncCreateUser(syncDTO); // 通过 Feign 调用log.info("AuthService: UserModule 同步调用成功");} catch (Exception e) {log.error("AuthService: 同步调用 UserModule 失败: {}", e.getMessage(), e);// 抛出异常,触发 @GlobalTransactional 回滚// 注意:需要确保 Feign 客户端在调用失败时能正确抛出异常被 Seata 捕获// 可能需要配置 Feign 的 ErrorDecoderthrow new RuntimeException("同步用户模块失败,触发全局回滚", e);}// 发送消息的操作可以移到事务成功提交之后 (如果还需要的话)// 例如使用 Spring 的 @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)// messageService.sendUserCreatedEvent(savedUser); // 从事务中移除return savedUser;}// ... 其他方法 ...
    }
    
  3. user_moudle 中添加对应的 Controller Endpoint:

    • 创建一个新的 Controller 或在 UserController 中添加一个内部接口(路径建议与 Feign Client 对应,如 /api/internal/users)。
    package com.user.controler;import com.user.dto.UserSyncDTO; // 引入对应的 DTO
    import com.user.service.UserService;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.http.ResponseEntity;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;@Slf4j
    @RestController
    @RequestMapping("/api/internal/users") // 内部调用路径
    @RequiredArgsConstructor
    public class UserInternalController {private final UserService userService;@PostMapping("/sync-create")public ResponseEntity<Void> syncCreateUser(@RequestBody UserSyncDTO userSyncDTO) {log.info("UserModule: 收到同步创建用户请求: authUserId={}", userSyncDTO.getAuthUserId());try {// 这里需要一个类似 createUserFromEvent 的方法,但参数是 UserSyncDTO// 或者直接调用现有的 createUserFromEvent,但需要适配 DTOuserService.createUserFromSync(userSyncDTO); // 假设有这个方法log.info("UserModule: 同步创建用户成功: authUserId={}", userSyncDTO.getAuthUserId());return ResponseEntity.ok().build();} catch (Exception e) {log.error("UserModule: 同步创建用户失败: {}", e.getMessage(), e);// 抛出异常,让 Seata 感知到错误,触发全局回滚// Spring MVC 默认会将未捕获的异常转换为 500 错误,// Feign 客户端默认会将 4xx/5xx 视为异常throw new RuntimeException("创建用户记录失败", e);}}
    }
    
    • 确保 user_moudleUserService 有一个处理 UserSyncDTO 的方法,并且这个方法的数据库操作会因为配置了 DataSourceProxy 而自动加入到 Seata 的全局事务中。

总结:

  • @GlobalTransactional@Transactional 不冲突,前者依赖后者。
  • 在期望数据库原子性的场景下,Seata AT 与异步消息(RabbitMQ)冲突
  • 最佳实践:将跨服务需要原子性的数据库操作改为同步 RPC 调用 (如 Feign),并在调用发起方的方法上使用 @GlobalTransactional。移除事务内的异步消息发送。

这样修改后,用户注册时对两个数据库的写入就能真正实现分布式事务的原子性了。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词