数据迁移基于模板方法模式
目录
数据迁移(基于模板方法模式)
1. 代码概述
本节概述了整体设计思路和核心实现模式
本代码实现了一个
数据迁移
方案,采用
模板方法模式(Template Method Pattern)
组织迁移逻辑。核心类
AbstractMigrationService
作为抽象基类,定义了通用的迁移流程,而具体的迁移逻辑由
MigrationService
等子类实现。
2. 主要代码结构
这部分详细介绍了代码的分层结构和主要组件
2.1 AbstractMigrationService<T, D>
(抽象基类)
抽象基类定义了通用迁移流程,是模板方法模式的核心
该类是所有迁移服务的基类,定义了通用的数据迁移流程。
关键方法:
public abstract class AbstractMigrationService<T,D> {
protected abstract List<D> fetchData(Long lastProcessedId); // 读取源数据
protected abstract T transformData(D dto); // 数据转换
protected abstract void saveData(List<T> dataList); // 存储数据
public final void migrate() {
Long lastProcessedId = (Long) redisTemplate.opsForValue().get("migration:lastProcessedId");
List<D> dtoList = fetchData(lastProcessedId); // 读取数据
List<T> entityList = dtoList.stream().map(this::transformData).collect(Collectors.toList()); // 转换数据
saveData(entityList); // 存储数据
// 更新 Redis 中的最大 ID,支持断点续传
if (!dtoList.isEmpty()) {
Long newLastProcessedId = dtoList.get(dtoList.size() - 1).getId();
redisTemplate.opsForValue().set("migration:lastProcessedId", newLastProcessedId);
}
}
}
2.2 MigrationService
(具体实现类)
具体实现类负责具体业务逻辑,遵循基类定义的流程
该类继承
AbstractMigrationService
,实现具体的迁移逻辑。
关键实现:
@Service
public class MigrationService extends AbstractMigrationService<SourceData, TargetDataDTO> {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private MigrationLogService migrationLogService;
@Autowired
private ExecutorService executorService;
@Override
protected List<TargetDataDTO> fetchData(Long lastProcessedId) {
return jdbcTemplate.query("SELECT * FROM table_name WHERE id > ? ORDER BY id LIMIT 1000", new Object[]{lastProcessedId}, new TdTjMhkrstDTOMapper());
}
@Override
protected SourceData transformData(TargetDataDTO dto) {
return new SourceData(dto.getId(), dto.getName());
}
@Override
protected void saveData(List<SourceData> dataList) {
executorService.submit(() -> {
mongoTemplate.insert(dataList, "target_collection");
migrationLogService.logMigration("Batch migration successful", dataList.size());
});
}
}
3. 运行机制解析
本节解析了整个迁移流程的工作原理和关键特性
- 模板方法执行流程
:
migrate()
作为 模板方法 ,按照固定的步骤执行:fetchData(lastProcessedId)
:从数据库读取数据,支持断点续传。transformData()
:将数据转换为目标格式。saveData()
:多线程存储数据,提高性能。- 日志记录 :每次批量迁移后,记录日志。
- Redis 断点续传 :存储已迁移的最大 ID,避免重复迁移。
- 扩展性 :具体子类实现 不同的数据源和存储方式 ,但遵循统一的流程。
4. 实战示例
基于真实业务场景的完整实现示例
博主前段时间进行数据迁移的一个案例,任务是一个MongoDB数据迁移到Oracle数据库,由于源数据都在Mongo一个文档内,需要将这部份数据按业务迁移到多个Oracle表中,所以使用了模板方法模式,来定义一个规范的流程,简单说是读数据,清洗数据,存储数据。用了数据分片多线程处理来提升效率。
4.1 迁移服务抽象基类
增强版迁移服务基类,支持多线程处理和完善的日志记录
@Slf4j
@Component
public abstract class AbstractMigrationService<T,D> {
@Resource
private RedisTemplate redisTemplate;
@Resource
@Qualifier("oracleTransactionTemplate")
private TransactionTemplate transactionTemplate;
private final static String prefixRedis = "migration:";
@Resource
private MigrationLogService migrationLogService;
@Resource
private ExecutorService executorService;
// 模板方法,定义迁移的步骤
public final void startMigration(int batchSize, boolean restart, String tableName) {
while (true) {
String lastMigratedId = getLastMigratedId(tableName);
List<T> documents = readFromSource(lastMigratedId, batchSize);
if (CollectionUtils.isEmpty(documents)) {
logMigrationBatch(tableName, null, lastMigratedId, null, 0, tableName+" Migration SUCCESS", null);
log.info("数据为空,退出程序!");
break;
}
//开始迁移数据
{
try {
//多线程来做 每一千条切为一个线程
List<List<T>> batches = splitIntoBatches(documents, 1000);
CompletableFuture<Void> allFutures = CompletableFuture.allOf(batches.stream().map(batch -> CompletableFuture.runAsync(() -> {
List<D> transformedData = transformData(batch);
writeToTarget(transformedData);
}, executorService).exceptionally(
ex -> {
log.error("线程执行异常", ex);
throw new CompletionException(ex); // 抛出包装后的异常
}
)).toArray(CompletableFuture[]::new));
allFutures.join();
// 提交迁移状态
String endId = updateMigrationStatus(tableName, documents);
// 记录成功日志
logMigrationBatch(tableName, UUID.randomUUID().toString(), lastMigratedId, endId,
documents.size(), "SUCCESS", null);
} catch (Exception e) {
// 记录失败日志
logMigrationBatch(tableName, UUID.randomUUID().toString(), lastMigratedId, null,
documents.size(), "FAILED", e.getMessage());
log.error("迁移过程中出现错误,", e);
//把错误往外抛出
throw new RuntimeException(e);
}
};
}
}
// 具体步骤由子类实现
//读取数据 lastMigratedId:最大id batchSize:每次读取多少条数据
protected abstract List<T> readFromSource(String lastMigratedId, int batchSize);
//转换数据 此处使用泛型D,转换后的数据类型
protected abstract List<D> transformData(List<T> documents); // 使用泛型T
//写入数据 此处使用泛型D,写入的目标数据类型
protected abstract void writeToTarget(List<D> transformedData);
// 读取最后一次迁移的ID,用于续传
private String getLastMigratedId(String tableName) {
String o = (String) redisTemplate.opsForValue().get(prefixRedis+tableName);
log.info("缓存值:"+tableName+"-"+o);
return o;
}
// 更新迁移状态,并返回最新的ID
private String updateMigrationStatus(String tableName, List<T> documents) {
if (!documents.isEmpty()) {
// 找出ID最大的文档
T lastDocument = documents.stream()
.max(Comparator.comparing(this::getLastId))
.orElse(null);
String lastId = getLastId(lastDocument);
redisTemplate.opsForValue().set(prefixRedis+tableName, lastId);
return lastId;
}
return null;
}
// 获取文档的ID,用于存在redis中记录上次迁移的ID,以便下次续传
protected abstract String getLastId(T document);
// 将数据分片以便多线程处理
private List<List<T>> splitIntoBatches(List<T> documents, int batchSize) {
List<List<T>> batches = new ArrayList<>();
for (int i = 0; i < documents.size(); i += batchSize) {
batches.add(documents.subList(i, Math.min(i + batchSize, documents.size())));
}
return batches;
}
// 日志记录方法,用于单独记录迁移日志
private void logMigrationBatch(String tableName, String batchId, String startId,
String endId, int recordCount, String status, String errorMessage) {
MigrationLog migrationLog = MigrationLog.builder()
.tableName(tableName)
.batchId(batchId)
.startId(startId)
.endId(endId)
.recordCount(recordCount)
.status(status)
.errorMessage(errorMessage)
.createTime(LocalDateTime.now())
.build();
migrationLogService.log(migrationLog);
}
}
4.2 具体实现子类
特定业务场景的实现,包含了MongoDB到Oracle的数据转换逻辑
@Service
@Slf4j
public class MigrationService extends AbstractMigrationService<SourceData,TargetDataDTO>{
private final MongoTemplate mongoTemplate;
private final JdbcTemplate oracleJdbcTemplate;
@Autowired
public MigrationService(
MongoTemplate mongoTemplate,
@Qualifier("oracleJdbcTemplate") JdbcTemplate oracleJdbcTemplate
) {
this.mongoTemplate = mongoTemplate;
this.oracleJdbcTemplate = oracleJdbcTemplate;
}
@Override
protected List<SourceData> readFromSource(String lastMigratedId, int batchSize) {
// 构造查询条件
Query query = BaseQueryToMongodb.getBaseQueryByMigration(lastMigratedId,batchSize);
// 执行查询,返回结果
return mongoTemplate.find(query, SourceData.class, Constants.TJDECRP);
}
@Override
protected List<TargetDataDTO> transformData(List<SourceData> documents) {
return documents.stream().
map(sourceData ->
TargetDataDTO targetDataDTO = new TargetDataDTO();
//此处省略转换逻辑,仅为示意
return targetDataDTO;
).collect(Collectors.toList());
}
@Override
protected void writeToTarget(List<TargetDataDTO> transformedData) {
if (CollectionUtils.isEmpty(transformedData)) {
log.warn("传入的数据列表为空,跳过写入");
return;
}
try {
//sql插入预处理语句 也可以使用其他方法去写插入
String sql = "INSERT INTO xxx (ID, CREATE_DATE)" +
"VALUES (?, ?)";
List<Object[]> batchArgs = transformedData.stream()
.map(dto -> new Object[]{
dto.getRid(),
new Timestamp(dto.getCREATE_DATE())
})
.collect(Collectors.toList());
int[] updateCounts = oracleJdbcTemplate.batchUpdate(sql, batchArgs);
log.info("成功写入 {} 条数据到Oracle数据库;插入语句:{}", updateCounts.length,JSON.toJSONString(batchArgs));
} catch (DataAccessException e) {
throw new RuntimeException("数据库写入失败", e);
}
}
@Override
protected String getLastId(SourceData document) {
return document.getId();
}
}
4.3 通过API启动迁移
提供RESTful接口启动迁移任务,支持参数化配置
@RestController
public class MigrationController {
@Resource
private MigrationService migrationService;
@PostMapping("/migrate/start")
public ResponseEntity<String> startMigration(
@RequestParam(defaultValue = "4000") int batchSize,
@RequestParam(defaultValue = "false") boolean restart) {
CompletableFuture.runAsync(() -> {
migrationService.startMigration(batchSize, restart, "TableName");
});
return ResponseEntity.ok("Migration started");
}
}
5. 实现说明与优化建议
关于实现细节的补充说明和潜在的改进方向
5.1 事务处理说明
关于事务,在插入时是按分片每个1000,差分成多个线程去执行,事务是没法保证的,因为每个线程都是一个独立的连接。如果出现问题导致程序中断,那就从Redis中查最后成功的那个ID,手动去回滚大于这个ID的数据,然后修改异常后重新运行,程序会从最后成功的那个批次的最大ID开始迁移(断点续传)。
5.2 潜在改进方向
- 事务管理 :考虑在每个线程内部使用事务模板,确保单批次的原子性
- 错误恢复 :增加自动回滚机制,减少人工干预
- 性能监控 :添加迁移性能指标收集,便于调优
- 数据校验 :增加源数据和目标数据的一致性校验机制
有好的想法或者改进的思路还请和博主分享呀~~~