目录

数据迁移基于模板方法模式

数据迁移(基于模板方法模式)

1. 代码概述

本节概述了整体设计思路和核心实现模式

本代码实现了一个 数据迁移 方案,采用 模板方法模式(Template Method Pattern) 组织迁移逻辑。核心类 AbstractMigrationService 作为抽象基类,定义了通用的迁移流程,而具体的迁移逻辑由 MigrationService 等子类实现。


2. 主要代码结构

这部分详细介绍了代码的分层结构和主要组件

2.1 AbstractMigrationService<T, D> (抽象基类)

抽象基类定义了通用迁移流程,是模板方法模式的核心

该类是所有迁移服务的基类,定义了通用的数据迁移流程。

关键方法:
public abstract class AbstractMigrationService<T,D> {
    protected abstract List<D> fetchData(Long lastProcessedId); // 读取源数据
    protected abstract T transformData(D dto); // 数据转换
    protected abstract void saveData(List<T> dataList); // 存储数据
    
    public final void migrate() {
        Long lastProcessedId = (Long) redisTemplate.opsForValue().get("migration:lastProcessedId");
        List<D> dtoList = fetchData(lastProcessedId); // 读取数据
        List<T> entityList = dtoList.stream().map(this::transformData).collect(Collectors.toList()); // 转换数据
        saveData(entityList); // 存储数据
        
        // 更新 Redis 中的最大 ID,支持断点续传
        if (!dtoList.isEmpty()) {
            Long newLastProcessedId = dtoList.get(dtoList.size() - 1).getId();
            redisTemplate.opsForValue().set("migration:lastProcessedId", newLastProcessedId);
        }
    }
}

2.2 MigrationService (具体实现类)

具体实现类负责具体业务逻辑,遵循基类定义的流程

该类继承 AbstractMigrationService ,实现具体的迁移逻辑。

关键实现:
@Service
public class MigrationService extends AbstractMigrationService<SourceData, TargetDataDTO> {
    @Autowired
    private JdbcTemplate jdbcTemplate;
    @Autowired
    private MigrationLogService migrationLogService;
    @Autowired
    private ExecutorService executorService;
    
    @Override
    protected List<TargetDataDTO> fetchData(Long lastProcessedId) {
        return jdbcTemplate.query("SELECT * FROM table_name WHERE id > ? ORDER BY id LIMIT 1000", new Object[]{lastProcessedId}, new TdTjMhkrstDTOMapper());
    }
    
    @Override
    protected SourceData transformData(TargetDataDTO dto) {
        return new SourceData(dto.getId(), dto.getName());
    }
    
    @Override
    protected void saveData(List<SourceData> dataList) {
        executorService.submit(() -> {
            mongoTemplate.insert(dataList, "target_collection");
            migrationLogService.logMigration("Batch migration successful", dataList.size());
        });
    }
}

3. 运行机制解析

本节解析了整个迁移流程的工作原理和关键特性

  1. 模板方法执行流程migrate() 作为 模板方法 ,按照固定的步骤执行:
    • fetchData(lastProcessedId) :从数据库读取数据,支持断点续传。
    • transformData() :将数据转换为目标格式。
    • saveData() :多线程存储数据,提高性能。
    • 日志记录 :每次批量迁移后,记录日志。
    • Redis 断点续传 :存储已迁移的最大 ID,避免重复迁移。
  2. 扩展性 :具体子类实现 不同的数据源和存储方式 ,但遵循统一的流程。

4. 实战示例

基于真实业务场景的完整实现示例

博主前段时间进行数据迁移的一个案例,任务是一个MongoDB数据迁移到Oracle数据库,由于源数据都在Mongo一个文档内,需要将这部份数据按业务迁移到多个Oracle表中,所以使用了模板方法模式,来定义一个规范的流程,简单说是读数据,清洗数据,存储数据。用了数据分片多线程处理来提升效率。

4.1 迁移服务抽象基类

增强版迁移服务基类,支持多线程处理和完善的日志记录

@Slf4j
@Component
public abstract class AbstractMigrationService<T,D> {

    @Resource
    private RedisTemplate redisTemplate;

    @Resource
    @Qualifier("oracleTransactionTemplate")
    private TransactionTemplate transactionTemplate;

    private final static String prefixRedis = "migration:";

    @Resource
    private MigrationLogService migrationLogService;

    @Resource
    private ExecutorService executorService;


    // 模板方法,定义迁移的步骤
    public final void startMigration(int batchSize, boolean restart, String tableName) {
        while (true) {
            String lastMigratedId = getLastMigratedId(tableName);
            List<T> documents = readFromSource(lastMigratedId, batchSize);
            if (CollectionUtils.isEmpty(documents)) {
                logMigrationBatch(tableName, null, lastMigratedId, null, 0, tableName+" Migration SUCCESS", null);
                log.info("数据为空,退出程序!");
                break;
            }
            //开始迁移数据
           {
                try {
                    //多线程来做 每一千条切为一个线程
                    List<List<T>> batches = splitIntoBatches(documents, 1000);
                    CompletableFuture<Void> allFutures = CompletableFuture.allOf(batches.stream().map(batch -> CompletableFuture.runAsync(() -> {
                        List<D> transformedData = transformData(batch);
                        writeToTarget(transformedData);
                    }, executorService).exceptionally(
                            ex -> {
                                log.error("线程执行异常", ex);
                                throw new CompletionException(ex); // 抛出包装后的异常
                            }
                    )).toArray(CompletableFuture[]::new));
                    allFutures.join();

                    // 提交迁移状态
                   String endId = updateMigrationStatus(tableName, documents);

                    // 记录成功日志
                    logMigrationBatch(tableName, UUID.randomUUID().toString(), lastMigratedId, endId,
                            documents.size(), "SUCCESS", null);

                } catch (Exception e) {
                    // 记录失败日志
                    logMigrationBatch(tableName, UUID.randomUUID().toString(), lastMigratedId, null,
                            documents.size(), "FAILED", e.getMessage());
                    log.error("迁移过程中出现错误,", e);
                    //把错误往外抛出
                    throw new RuntimeException(e);

                }
            };
        }
    }

      // 具体步骤由子类实现
    //读取数据 lastMigratedId:最大id batchSize:每次读取多少条数据
    protected abstract List<T> readFromSource(String lastMigratedId, int batchSize);
    //转换数据 此处使用泛型D,转换后的数据类型
    protected abstract List<D> transformData(List<T> documents);  // 使用泛型T
    //写入数据 此处使用泛型D,写入的目标数据类型
    protected abstract void writeToTarget(List<D> transformedData);

    // 读取最后一次迁移的ID,用于续传
    private String getLastMigratedId(String tableName) {
        String o = (String) redisTemplate.opsForValue().get(prefixRedis+tableName);
         log.info("缓存值:"+tableName+"-"+o);
        return o;
    }
   

   // 更新迁移状态,并返回最新的ID
    private String updateMigrationStatus(String tableName, List<T> documents) {
        if (!documents.isEmpty()) {
            // 找出ID最大的文档
            T lastDocument = documents.stream()
                    .max(Comparator.comparing(this::getLastId))
                    .orElse(null);

            String lastId = getLastId(lastDocument);
            redisTemplate.opsForValue().set(prefixRedis+tableName, lastId);
            return lastId;
        }
        return null;
    }
    
    

    // 获取文档的ID,用于存在redis中记录上次迁移的ID,以便下次续传
    protected abstract String getLastId(T document);


    // 将数据分片以便多线程处理
    private List<List<T>> splitIntoBatches(List<T> documents, int batchSize) {
        List<List<T>> batches = new ArrayList<>();
        for (int i = 0; i < documents.size(); i += batchSize) {
            batches.add(documents.subList(i, Math.min(i + batchSize, documents.size())));
        }
        return batches;
    }

    // 日志记录方法,用于单独记录迁移日志
    private void logMigrationBatch(String tableName, String batchId, String startId,
                                   String endId, int recordCount, String status, String errorMessage) {
        MigrationLog migrationLog = MigrationLog.builder()
                .tableName(tableName)
                .batchId(batchId)
                .startId(startId)
                .endId(endId)
                .recordCount(recordCount)
                .status(status)
                .errorMessage(errorMessage)
                .createTime(LocalDateTime.now())
                .build();

        migrationLogService.log(migrationLog);
    }
}

4.2 具体实现子类

特定业务场景的实现,包含了MongoDB到Oracle的数据转换逻辑

@Service
@Slf4j
public class MigrationService extends AbstractMigrationService<SourceData,TargetDataDTO>{

    private final MongoTemplate mongoTemplate;
    private final JdbcTemplate oracleJdbcTemplate;

    @Autowired
    public MigrationService(
            MongoTemplate mongoTemplate,
            @Qualifier("oracleJdbcTemplate") JdbcTemplate oracleJdbcTemplate
    ) {
        this.mongoTemplate = mongoTemplate;
        this.oracleJdbcTemplate = oracleJdbcTemplate;
    }


    @Override
    protected List<SourceData> readFromSource(String lastMigratedId, int batchSize) {
        // 构造查询条件
        Query query = BaseQueryToMongodb.getBaseQueryByMigration(lastMigratedId,batchSize);

        // 执行查询,返回结果
        return mongoTemplate.find(query, SourceData.class, Constants.TJDECRP);
    }


    @Override
    protected List<TargetDataDTO> transformData(List<SourceData> documents) {
        return  documents.stream().
                map(sourceData ->
                       TargetDataDTO targetDataDTO  = new TargetDataDTO();
                       //此处省略转换逻辑,仅为示意
                       return targetDataDTO;
                       ).collect(Collectors.toList());
    }

    @Override
    protected void writeToTarget(List<TargetDataDTO> transformedData) {
        if (CollectionUtils.isEmpty(transformedData)) {
            log.warn("传入的数据列表为空,跳过写入");
            return;
        }
        try {
            //sql插入预处理语句 也可以使用其他方法去写插入
            String sql = "INSERT INTO xxx (ID, CREATE_DATE)" +
                    "VALUES (?, ?)";
            List<Object[]> batchArgs = transformedData.stream()
                    .map(dto -> new Object[]{
                            dto.getRid(),
                            new Timestamp(dto.getCREATE_DATE())
                    })
                    .collect(Collectors.toList());
            int[] updateCounts = oracleJdbcTemplate.batchUpdate(sql, batchArgs);
            log.info("成功写入 {} 条数据到Oracle数据库;插入语句:{}", updateCounts.length,JSON.toJSONString(batchArgs));
        } catch (DataAccessException e) {
            throw new RuntimeException("数据库写入失败", e);
        }
    }


    @Override
    protected String getLastId(SourceData document) {
        return document.getId();
    }
}

4.3 通过API启动迁移

提供RESTful接口启动迁移任务,支持参数化配置

@RestController
public class MigrationController {
    @Resource
    private MigrationService migrationService;
    
    @PostMapping("/migrate/start")
    public ResponseEntity<String> startMigration(
            @RequestParam(defaultValue = "4000") int batchSize,
            @RequestParam(defaultValue = "false") boolean restart) {
        CompletableFuture.runAsync(() -> {
            migrationService.startMigration(batchSize, restart, "TableName");
        });
        return ResponseEntity.ok("Migration started");
    }
}

5. 实现说明与优化建议

关于实现细节的补充说明和潜在的改进方向

5.1 事务处理说明

关于事务,在插入时是按分片每个1000,差分成多个线程去执行,事务是没法保证的,因为每个线程都是一个独立的连接。如果出现问题导致程序中断,那就从Redis中查最后成功的那个ID,手动去回滚大于这个ID的数据,然后修改异常后重新运行,程序会从最后成功的那个批次的最大ID开始迁移(断点续传)。

5.2 潜在改进方向

  • 事务管理 :考虑在每个线程内部使用事务模板,确保单批次的原子性
  • 错误恢复 :增加自动回滚机制,减少人工干预
  • 性能监控 :添加迁移性能指标收集,便于调优
  • 数据校验 :增加源数据和目标数据的一致性校验机制

有好的想法或者改进的思路还请和博主分享呀~~~