模板方法模式详解:定义程序骨架与框架设计
1. 代码概述
本节概述了整体设计思路和核心实现模式
本代码实现了一个 数据迁移 方案,采用 模板方法模式(Template Method Pattern) 组织迁移逻辑。核心类 AbstractMigrationService
作为抽象基类,定义了通用的迁移流程,而具体的迁移逻辑由 MigrationService
等子类实现。
2. 主要代码结构
这部分详细介绍了代码的分层结构和主要组件
2.1 AbstractMigrationService<T, D>
(抽象基类)
抽象基类定义了通用迁移流程,是模板方法模式的核心
该类是所有迁移服务的基类,定义了通用的数据迁移流程。
关键方法:
public abstract class AbstractMigrationService<T,D> {protected abstract List<D> fetchData(Long lastProcessedId); // 读取源数据protected abstract T transformData(D dto); // 数据转换protected abstract void saveData(List<T> dataList); // 存储数据public final void migrate() {Long lastProcessedId = (Long) redisTemplate.opsForValue().get("migration:lastProcessedId");List<D> dtoList = fetchData(lastProcessedId); // 读取数据List<T> entityList = dtoList.stream().map(this::transformData).collect(Collectors.toList()); // 转换数据saveData(entityList); // 存储数据// 更新 Redis 中的最大 ID,支持断点续传if (!dtoList.isEmpty()) {Long newLastProcessedId = dtoList.get(dtoList.size() - 1).getId();redisTemplate.opsForValue().set("migration:lastProcessedId", newLastProcessedId);}}
}
2.2 MigrationService
(具体实现类)
具体实现类负责具体业务逻辑,遵循基类定义的流程
该类继承 AbstractMigrationService
,实现具体的迁移逻辑。
关键实现:
@Service
public class MigrationService extends AbstractMigrationService<SourceData, TargetDataDTO> {@Autowiredprivate JdbcTemplate jdbcTemplate;@Autowiredprivate MigrationLogService migrationLogService;@Autowiredprivate ExecutorService executorService;@Overrideprotected List<TargetDataDTO> fetchData(Long lastProcessedId) {return jdbcTemplate.query("SELECT * FROM table_name WHERE id > ? ORDER BY id LIMIT 1000", new Object[]{lastProcessedId}, new TdTjMhkrstDTOMapper());}@Overrideprotected SourceData transformData(TargetDataDTO dto) {return new SourceData(dto.getId(), dto.getName());}@Overrideprotected void saveData(List<SourceData> dataList) {executorService.submit(() -> {mongoTemplate.insert(dataList, "target_collection");migrationLogService.logMigration("Batch migration successful", dataList.size());});}
}
3. 运行机制解析
本节解析了整个迁移流程的工作原理和关键特性
- 模板方法执行流程:
migrate()
作为 模板方法,按照固定的步骤执行:fetchData(lastProcessedId)
:从数据库读取数据,支持断点续传。transformData()
:将数据转换为目标格式。saveData()
:多线程存储数据,提高性能。- 日志记录:每次批量迁移后,记录日志。
- Redis 断点续传:存储已迁移的最大 ID,避免重复迁移。
- 扩展性:具体子类实现 不同的数据源和存储方式,但遵循统一的流程。
4. 实战示例
基于真实业务场景的完整实现示例
博主前段时间进行数据迁移的一个案例,任务是一个MongoDB数据迁移到Oracle数据库,由于源数据都在Mongo一个文档内,需要将这部份数据按业务迁移到多个Oracle表中,所以使用了模板方法模式,来定义一个规范的流程,简单说是读数据,清洗数据,存储数据。用了数据分片多线程处理来提升效率。
4.1 迁移服务抽象基类
增强版迁移服务基类,支持多线程处理和完善的日志记录
@Slf4j
@Component
public abstract class AbstractMigrationService<T,D> {@Resourceprivate RedisTemplate redisTemplate;@Resource@Qualifier("oracleTransactionTemplate")private TransactionTemplate transactionTemplate;private final static String prefixRedis = "migration:";@Resourceprivate MigrationLogService migrationLogService;@Resourceprivate ExecutorService executorService;// 模板方法,定义迁移的步骤public final void startMigration(int batchSize, boolean restart, String tableName) {while (true) {String lastMigratedId = getLastMigratedId(tableName);List<T> documents = readFromSource(lastMigratedId, batchSize);if (CollectionUtils.isEmpty(documents)) {logMigrationBatch(tableName, null, lastMigratedId, null, 0, tableName+" Migration SUCCESS", null);log.info("数据为空,退出程序!");break;}//开始迁移数据{try {//多线程来做 每一千条切为一个线程List<List<T>> batches = splitIntoBatches(documents, 1000);CompletableFuture<Void> allFutures = CompletableFuture.allOf(batches.stream().map(batch -> CompletableFuture.runAsync(() -> {List<D> transformedData = transformData(batch);writeToTarget(transformedData);}, executorService).exceptionally(ex -> {log.error("线程执行异常", ex);throw new CompletionException(ex); // 抛出包装后的异常})).toArray(CompletableFuture[]::new));allFutures.join();// 提交迁移状态String endId = updateMigrationStatus(tableName, documents);// 记录成功日志logMigrationBatch(tableName, UUID.randomUUID().toString(), lastMigratedId, endId,documents.size(), "SUCCESS", null);} catch (Exception e) {// 记录失败日志logMigrationBatch(tableName, UUID.randomUUID().toString(), lastMigratedId, null,documents.size(), "FAILED", e.getMessage());log.error("迁移过程中出现错误,", e);//把错误往外抛出throw new RuntimeException(e);}};}}// 具体步骤由子类实现//读取数据 lastMigratedId:最大id batchSize:每次读取多少条数据protected abstract List<T> readFromSource(String lastMigratedId, int batchSize);//转换数据 此处使用泛型D,转换后的数据类型protected abstract List<D> transformData(List<T> documents); // 使用泛型T//写入数据 此处使用泛型D,写入的目标数据类型protected abstract void writeToTarget(List<D> transformedData);// 读取最后一次迁移的ID,用于续传private String getLastMigratedId(String tableName) {String o = (String) redisTemplate.opsForValue().get(prefixRedis+tableName);log.info("缓存值:"+tableName+"-"+o);return o;}// 更新迁移状态,并返回最新的IDprivate String updateMigrationStatus(String tableName, List<T> documents) {if (!documents.isEmpty()) {// 找出ID最大的文档T lastDocument = documents.stream().max(Comparator.comparing(this::getLastId)).orElse(null);String lastId = getLastId(lastDocument);redisTemplate.opsForValue().set(prefixRedis+tableName, lastId);return lastId;}return null;}// 获取文档的ID,用于存在redis中记录上次迁移的ID,以便下次续传protected abstract String getLastId(T document);// 将数据分片以便多线程处理private List<List<T>> splitIntoBatches(List<T> documents, int batchSize) {List<List<T>> batches = new ArrayList<>();for (int i = 0; i < documents.size(); i += batchSize) {batches.add(documents.subList(i, Math.min(i + batchSize, documents.size())));}return batches;}// 日志记录方法,用于单独记录迁移日志private void logMigrationBatch(String tableName, String batchId, String startId,String endId, int recordCount, String status, String errorMessage) {MigrationLog migrationLog = MigrationLog.builder().tableName(tableName).batchId(batchId).startId(startId).endId(endId).recordCount(recordCount).status(status).errorMessage(errorMessage).createTime(LocalDateTime.now()).build();migrationLogService.log(migrationLog);}
}
4.2 具体实现子类
特定业务场景的实现,包含了MongoDB到Oracle的数据转换逻辑
@Service
@Slf4j
public class MigrationService extends AbstractMigrationService<SourceData,TargetDataDTO>{private final MongoTemplate mongoTemplate;private final JdbcTemplate oracleJdbcTemplate;@Autowiredpublic MigrationService(MongoTemplate mongoTemplate,@Qualifier("oracleJdbcTemplate") JdbcTemplate oracleJdbcTemplate) {this.mongoTemplate = mongoTemplate;this.oracleJdbcTemplate = oracleJdbcTemplate;}@Overrideprotected List<SourceData> readFromSource(String lastMigratedId, int batchSize) {// 构造查询条件Query query = BaseQueryToMongodb.getBaseQueryByMigration(lastMigratedId,batchSize);// 执行查询,返回结果return mongoTemplate.find(query, SourceData.class, Constants.TJDECRP);}@Overrideprotected List<TargetDataDTO> transformData(List<SourceData> documents) {return documents.stream().map(sourceData ->TargetDataDTO targetDataDTO = new TargetDataDTO();//此处省略转换逻辑,仅为示意return targetDataDTO;).collect(Collectors.toList());}@Overrideprotected void writeToTarget(List<TargetDataDTO> transformedData) {if (CollectionUtils.isEmpty(transformedData)) {log.warn("传入的数据列表为空,跳过写入");return;}try {//sql插入预处理语句 也可以使用其他方法去写插入String sql = "INSERT INTO xxx (ID, CREATE_DATE)" +"VALUES (?, ?)";List<Object[]> batchArgs = transformedData.stream().map(dto -> new Object[]{dto.getRid(),new Timestamp(dto.getCREATE_DATE())}).collect(Collectors.toList());int[] updateCounts = oracleJdbcTemplate.batchUpdate(sql, batchArgs);log.info("成功写入 {} 条数据到Oracle数据库;插入语句:{}", updateCounts.length,JSON.toJSONString(batchArgs));} catch (DataAccessException e) {throw new RuntimeException("数据库写入失败", e);}}@Overrideprotected String getLastId(SourceData document) {return document.getId();}
}
4.3 通过API启动迁移
提供RESTful接口启动迁移任务,支持参数化配置
@RestController
public class MigrationController {@Resourceprivate MigrationService migrationService;@PostMapping("/migrate/start")public ResponseEntity<String> startMigration(@RequestParam(defaultValue = "4000") int batchSize,@RequestParam(defaultValue = "false") boolean restart) {CompletableFuture.runAsync(() -> {migrationService.startMigration(batchSize, restart, "TableName");});return ResponseEntity.ok("Migration started");}
}
5. 实现说明与优化建议
关于实现细节的补充说明和潜在的改进方向
5.1 事务处理说明
关于事务,在插入时是按分片每个1000,差分成多个线程去执行,事务是没法保证的,因为每个线程都是一个独立的连接。如果出现问题导致程序中断,那就从Redis中查最后成功的那个ID,手动去回滚大于这个ID的数据,然后修改异常后重新运行,程序会从最后成功的那个批次的最大ID开始迁移(断点续传)。
5.2 潜在改进方向
- 事务管理:考虑在每个线程内部使用事务模板,确保单批次的原子性
- 错误恢复:增加自动回滚机制,减少人工干预
- 性能监控:添加迁移性能指标收集,便于调优
- 数据校验:增加源数据和目标数据的一致性校验机制
有好的想法或者改进的思路还请和博主分享呀~~~