欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 社会 > 数据迁移(基于模板方法模式)

数据迁移(基于模板方法模式)

2025/3/14 19:24:58 来源:https://blog.csdn.net/weixin_45970964/article/details/146167521  浏览:    关键词:数据迁移(基于模板方法模式)

模板方法模式详解:定义程序骨架与框架设计

1. 代码概述

本节概述了整体设计思路和核心实现模式

本代码实现了一个 数据迁移 方案,采用 模板方法模式(Template Method Pattern) 组织迁移逻辑。核心类 AbstractMigrationService 作为抽象基类,定义了通用的迁移流程,而具体的迁移逻辑由 MigrationService 等子类实现。


2. 主要代码结构

这部分详细介绍了代码的分层结构和主要组件

2.1 AbstractMigrationService<T, D>(抽象基类)

抽象基类定义了通用迁移流程,是模板方法模式的核心

该类是所有迁移服务的基类,定义了通用的数据迁移流程。

关键方法:
public abstract class AbstractMigrationService<T,D> {protected abstract List<D> fetchData(Long lastProcessedId); // 读取源数据protected abstract T transformData(D dto); // 数据转换protected abstract void saveData(List<T> dataList); // 存储数据public final void migrate() {Long lastProcessedId = (Long) redisTemplate.opsForValue().get("migration:lastProcessedId");List<D> dtoList = fetchData(lastProcessedId); // 读取数据List<T> entityList = dtoList.stream().map(this::transformData).collect(Collectors.toList()); // 转换数据saveData(entityList); // 存储数据// 更新 Redis 中的最大 ID,支持断点续传if (!dtoList.isEmpty()) {Long newLastProcessedId = dtoList.get(dtoList.size() - 1).getId();redisTemplate.opsForValue().set("migration:lastProcessedId", newLastProcessedId);}}
}

2.2 MigrationService(具体实现类)

具体实现类负责具体业务逻辑,遵循基类定义的流程

该类继承 AbstractMigrationService,实现具体的迁移逻辑。

关键实现:
@Service
public class MigrationService extends AbstractMigrationService<SourceData, TargetDataDTO> {@Autowiredprivate JdbcTemplate jdbcTemplate;@Autowiredprivate MigrationLogService migrationLogService;@Autowiredprivate ExecutorService executorService;@Overrideprotected List<TargetDataDTO> fetchData(Long lastProcessedId) {return jdbcTemplate.query("SELECT * FROM table_name WHERE id > ? ORDER BY id LIMIT 1000", new Object[]{lastProcessedId}, new TdTjMhkrstDTOMapper());}@Overrideprotected SourceData transformData(TargetDataDTO dto) {return new SourceData(dto.getId(), dto.getName());}@Overrideprotected void saveData(List<SourceData> dataList) {executorService.submit(() -> {mongoTemplate.insert(dataList, "target_collection");migrationLogService.logMigration("Batch migration successful", dataList.size());});}
}

3. 运行机制解析

本节解析了整个迁移流程的工作原理和关键特性

  1. 模板方法执行流程migrate() 作为 模板方法,按照固定的步骤执行:
    • fetchData(lastProcessedId):从数据库读取数据,支持断点续传。
    • transformData():将数据转换为目标格式。
    • saveData():多线程存储数据,提高性能。
    • 日志记录:每次批量迁移后,记录日志。
    • Redis 断点续传:存储已迁移的最大 ID,避免重复迁移。
  2. 扩展性:具体子类实现 不同的数据源和存储方式,但遵循统一的流程。

4. 实战示例

基于真实业务场景的完整实现示例

博主前段时间进行数据迁移的一个案例,任务是一个MongoDB数据迁移到Oracle数据库,由于源数据都在Mongo一个文档内,需要将这部份数据按业务迁移到多个Oracle表中,所以使用了模板方法模式,来定义一个规范的流程,简单说是读数据,清洗数据,存储数据。用了数据分片多线程处理来提升效率。

4.1 迁移服务抽象基类

增强版迁移服务基类,支持多线程处理和完善的日志记录

@Slf4j
@Component
public abstract class AbstractMigrationService<T,D> {@Resourceprivate RedisTemplate redisTemplate;@Resource@Qualifier("oracleTransactionTemplate")private TransactionTemplate transactionTemplate;private final static String prefixRedis = "migration:";@Resourceprivate MigrationLogService migrationLogService;@Resourceprivate ExecutorService executorService;// 模板方法,定义迁移的步骤public final void startMigration(int batchSize, boolean restart, String tableName) {while (true) {String lastMigratedId = getLastMigratedId(tableName);List<T> documents = readFromSource(lastMigratedId, batchSize);if (CollectionUtils.isEmpty(documents)) {logMigrationBatch(tableName, null, lastMigratedId, null, 0, tableName+" Migration SUCCESS", null);log.info("数据为空,退出程序!");break;}//开始迁移数据{try {//多线程来做 每一千条切为一个线程List<List<T>> batches = splitIntoBatches(documents, 1000);CompletableFuture<Void> allFutures = CompletableFuture.allOf(batches.stream().map(batch -> CompletableFuture.runAsync(() -> {List<D> transformedData = transformData(batch);writeToTarget(transformedData);}, executorService).exceptionally(ex -> {log.error("线程执行异常", ex);throw new CompletionException(ex); // 抛出包装后的异常})).toArray(CompletableFuture[]::new));allFutures.join();// 提交迁移状态String endId = updateMigrationStatus(tableName, documents);// 记录成功日志logMigrationBatch(tableName, UUID.randomUUID().toString(), lastMigratedId, endId,documents.size(), "SUCCESS", null);} catch (Exception e) {// 记录失败日志logMigrationBatch(tableName, UUID.randomUUID().toString(), lastMigratedId, null,documents.size(), "FAILED", e.getMessage());log.error("迁移过程中出现错误,", e);//把错误往外抛出throw new RuntimeException(e);}};}}// 具体步骤由子类实现//读取数据 lastMigratedId:最大id batchSize:每次读取多少条数据protected abstract List<T> readFromSource(String lastMigratedId, int batchSize);//转换数据 此处使用泛型D,转换后的数据类型protected abstract List<D> transformData(List<T> documents);  // 使用泛型T//写入数据 此处使用泛型D,写入的目标数据类型protected abstract void writeToTarget(List<D> transformedData);// 读取最后一次迁移的ID,用于续传private String getLastMigratedId(String tableName) {String o = (String) redisTemplate.opsForValue().get(prefixRedis+tableName);log.info("缓存值:"+tableName+"-"+o);return o;}// 更新迁移状态,并返回最新的IDprivate String updateMigrationStatus(String tableName, List<T> documents) {if (!documents.isEmpty()) {// 找出ID最大的文档T lastDocument = documents.stream().max(Comparator.comparing(this::getLastId)).orElse(null);String lastId = getLastId(lastDocument);redisTemplate.opsForValue().set(prefixRedis+tableName, lastId);return lastId;}return null;}// 获取文档的ID,用于存在redis中记录上次迁移的ID,以便下次续传protected abstract String getLastId(T document);// 将数据分片以便多线程处理private List<List<T>> splitIntoBatches(List<T> documents, int batchSize) {List<List<T>> batches = new ArrayList<>();for (int i = 0; i < documents.size(); i += batchSize) {batches.add(documents.subList(i, Math.min(i + batchSize, documents.size())));}return batches;}// 日志记录方法,用于单独记录迁移日志private void logMigrationBatch(String tableName, String batchId, String startId,String endId, int recordCount, String status, String errorMessage) {MigrationLog migrationLog = MigrationLog.builder().tableName(tableName).batchId(batchId).startId(startId).endId(endId).recordCount(recordCount).status(status).errorMessage(errorMessage).createTime(LocalDateTime.now()).build();migrationLogService.log(migrationLog);}
}

4.2 具体实现子类

特定业务场景的实现,包含了MongoDB到Oracle的数据转换逻辑

@Service
@Slf4j
public class MigrationService extends AbstractMigrationService<SourceData,TargetDataDTO>{private final MongoTemplate mongoTemplate;private final JdbcTemplate oracleJdbcTemplate;@Autowiredpublic MigrationService(MongoTemplate mongoTemplate,@Qualifier("oracleJdbcTemplate") JdbcTemplate oracleJdbcTemplate) {this.mongoTemplate = mongoTemplate;this.oracleJdbcTemplate = oracleJdbcTemplate;}@Overrideprotected List<SourceData> readFromSource(String lastMigratedId, int batchSize) {// 构造查询条件Query query = BaseQueryToMongodb.getBaseQueryByMigration(lastMigratedId,batchSize);// 执行查询,返回结果return mongoTemplate.find(query, SourceData.class, Constants.TJDECRP);}@Overrideprotected List<TargetDataDTO> transformData(List<SourceData> documents) {return  documents.stream().map(sourceData ->TargetDataDTO targetDataDTO  = new TargetDataDTO();//此处省略转换逻辑,仅为示意return targetDataDTO;).collect(Collectors.toList());}@Overrideprotected void writeToTarget(List<TargetDataDTO> transformedData) {if (CollectionUtils.isEmpty(transformedData)) {log.warn("传入的数据列表为空,跳过写入");return;}try {//sql插入预处理语句 也可以使用其他方法去写插入String sql = "INSERT INTO xxx (ID, CREATE_DATE)" +"VALUES (?, ?)";List<Object[]> batchArgs = transformedData.stream().map(dto -> new Object[]{dto.getRid(),new Timestamp(dto.getCREATE_DATE())}).collect(Collectors.toList());int[] updateCounts = oracleJdbcTemplate.batchUpdate(sql, batchArgs);log.info("成功写入 {} 条数据到Oracle数据库;插入语句:{}", updateCounts.length,JSON.toJSONString(batchArgs));} catch (DataAccessException e) {throw new RuntimeException("数据库写入失败", e);}}@Overrideprotected String getLastId(SourceData document) {return document.getId();}
}

4.3 通过API启动迁移

提供RESTful接口启动迁移任务,支持参数化配置

@RestController
public class MigrationController {@Resourceprivate MigrationService migrationService;@PostMapping("/migrate/start")public ResponseEntity<String> startMigration(@RequestParam(defaultValue = "4000") int batchSize,@RequestParam(defaultValue = "false") boolean restart) {CompletableFuture.runAsync(() -> {migrationService.startMigration(batchSize, restart, "TableName");});return ResponseEntity.ok("Migration started");}
}

5. 实现说明与优化建议

关于实现细节的补充说明和潜在的改进方向

5.1 事务处理说明

关于事务,在插入时是按分片每个1000,差分成多个线程去执行,事务是没法保证的,因为每个线程都是一个独立的连接。如果出现问题导致程序中断,那就从Redis中查最后成功的那个ID,手动去回滚大于这个ID的数据,然后修改异常后重新运行,程序会从最后成功的那个批次的最大ID开始迁移(断点续传)。

5.2 潜在改进方向

  • 事务管理:考虑在每个线程内部使用事务模板,确保单批次的原子性
  • 错误恢复:增加自动回滚机制,减少人工干预
  • 性能监控:添加迁移性能指标收集,便于调优
  • 数据校验:增加源数据和目标数据的一致性校验机制

有好的想法或者改进的思路还请和博主分享呀~~~

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词