由于业务的扩展或者其他原因,常常会有迁移系统数据库的场景,对于有大量用户7*24小时不间断使用的系统,如何不宕机实现数据库迁移,这是个很有挑战的话题。
之前看过一篇公众号文章(100亿数据平滑数据迁移,不影响服务)介绍如何实现不宕机数据库迁移,个人觉得写的很好。但这篇文章更多的是理论层面的分享,本文会从实践层面来讨论如何在应用层实现数据迁移,基于的理论基础是这篇文章的双写法。双写法的基本原理是:首先,在应用程序中对原数据库的所有写操作(创建,更新,删除)之后,同时也对新数据库做同样的操作;其次,利用一个工具把老数据迁移到新数据库,完成之后做数据完整性校验;最后,完全切换到新数据库。
由于每个项目用的开发技术不一样,所以在具体实现上也会有差异,那随之而来也会碰到不一样的问题。接下来给大家分享在Java、JPA、Spring技术栈下如何实现不宕机数据库迁移。
用JPA实现双写
JPA(Java Persistence API)作为一个Java标准,带来的最显著的便利是,使开发人员可以通过以直接操作Java对象的方式操作数据库,无需在代码中直接写sql语句操作数据库,因此也无需考虑各个数据库平台的差异带来的影响。现在比较流行的实现有eclipselink、hibernate。
据我所知,现在很多项目都有在使用JPA。那么如果你的项目采用的是JPA,如何用JPA实现同时往两个数据库写呢?
首先,需要构建两个EntityManagerFactory,一个操作老数据库,一个操作新数据库。
在Spring环境下,如何构建EntityManagerFactory Bean,参考如下代码:
@Bean
public LocalContainerEntityManagerFactoryBean hanaDatabaseEntityManagerFactory() {
final LocalContainerEntityManagerFactoryBean em = new LocalContainerEntityManagerFactoryBean();
em.setDataSource(hanaDataSource); //数据库不一样,数据源不一样
em.setPackagesToScan("com.example.model"); //如果操作新数据库的entity结构不一样,需要指定不同的entity包的路径
em.setPersistenceUnitName("hana");
EclipseLinkJpaVendorAdapter jpaVendorAdapter = new EclipseLinkJpaVendorAdapter();
em.setJpaVendorAdapter(jpaVendorAdapter);
final HashMap<String, Object> properties = new HashMap<String, Object>();
properties.put(PersistenceUnitProperties.WEAVING,
InstrumentationLoadTimeWeaver.isInstrumentationAvailable() ? "true" : "static");
properties.put(PersistenceUnitProperties.TARGET_DATABASE, TargetDatabase.HANA);
em.setJpaPropertyMap(properties);
return em;
}
第4行设置数据源,指向数据库;
第5行指定EntityManagers需要扫描Entity的包的路径;
第6行设置persistence unit名字;
再后面,设置一些JPA的属性。
然后,利用JPA的@EntityListeners,实现对新数据做同样的操作。在每个Listener中,可以利用Annotation @PostPersist、@PostUpdate、@PostRemove分别实现创建、更新、删除三类事件的监听方法。
在创建、更新监听方法中,克隆一份当前对象,然后保存到新数据库。参考如下代码:
@PostPersist
@PostUpdate
public void upsert2NewDB(Product productSaved) {
com.example.model.Product productCopy = clone(productSaved); //克隆对象
newDBProductRepository.save(productCopy);
log.info("Upsert to new db done");
}
需要注意一点:如果你的Entity有乐观锁检查,在克隆对象的时候,需要把新对象的version做复原操作,否则往新数据写的时候会发生乐观锁异常。
在删除监听方法中,只需要拿到当前对象id,然后在新数据库同样删除即可。参考如下代码:
@PostRemove
public void deleteInNewDB(Product productRemoved)
newDBProductRepository.deleteById(productRemoved.getPkId());
log.info("Delete to new db done");
}
注意:往新数据库操作的Repository需要关联到新数据库的EntityManagerFactory,可以用这个Annotation @EnableJpaRepositories进行设置。
事务Transaction
上面介绍了如何用JPA实现双写,但是没考虑数据一致性问题,对于一个请求,必须保证对数据的操作在两个数据库同时成功或者同时失败。当只有一个数据库的时候,单机事务(ACID)可以保证数据一致性,但是在双写这种场景下,同时有两个数据库存在,单机事务就无能为力了。
那么在这种场景下,有哪些办法可以保证数据一致性呢?其实,这个问题本质上是一个分布式事务问题,关于这类问题的讨论有很多,方案也有很多(比如两阶段提交、三阶段提交、Best Effort 1PC等等),有兴趣的童鞋可以网上搜索。这里分享一篇关于这个话题的文章:https://www.javaworld.com/article/2077963/distributed-transactions-in-spring--with-and-without-xa.html,这篇文章写很好,很详细,国内有很多这篇文章的翻译版本。
这里推荐的方案是:Best Effort 1PC,具体实现可以用ChainedTransactionManager或者TransactionSychronization。在这种方案下,基本可以100%保证数据一致性(除非在一些极限基础组件宕机情况,比如老数据库已经commit,等新数据库commit时,新数据库突然宕机),并且相比于两阶段提交Cost较小,实现容易。
关于ChainedTransactionManager如何使用,具体可以参考:
https://stackoverflow.com/questions/48954763/spring-transactional-with-a-transaction-across-multiple-data-sources
数据迁移工具
除了应用层面需要实现双写之外,还需要开发一个数据迁移工具,把老数据迁移到新数据库。如果数据库是运行在自己运维的服务器上,那么也许可以利用数据库自带的导入导出工具完成。如果数据库是买的某个云服务平台的service,那么可能不能利用这些数据库自带的工具,你需要自己开发一些工具来完成迁移。
这里推荐一个开源Library - Spring Batch。Spring Batch(https://spring.io/projects/spring-batch)是一个轻量级的Library,常常用于大量数据的批处理,用在这个场景下,非常合适。
数据完整性校验
在把数据迁移到新数据库之后,常常需要对两边数据做一个完整性校验,以确保两边数据库数据是一致的,因为在数据迁移或者应用层面的双写过程都可能发生数据丢失或者数据损毁导致数据不一致,特别是如果新老数据库是不同类型的数据库,更可能会出现一些因字段类型兼容性导致的问题。
那么如何实现数据的完整性校验呢?
首先,可能直接想到的就是比较两边记录的条数。
当然,这是非常直接的,因为条数不相等,说明两边的数据肯定不一致。那么如果两边条数相等,能不能说明数据一致呢?不一定,因为在数据迁移或者应用层面的双写过程还可能发生数据损毁,导致数据发生变化。
那么如何对数据内容做进一步的校验呢?
如果把数据从数据库抓出来,一条记录一条记录的比较,完全没问题,但是如果数据量很大,这样检查Performance会是一个很大的问题。这里介绍另外一个办法:就是减少数据库到应用程序的数据量负载,在两边数据库中先对每条记录先做一个hash(很多数据库都支持常见的hash算法,如MD5,SHA256),然后在应用层面比较两边数据记录的hash值。
如下是一个md5转换的例子:
select
sum(('x' || substring(hash, 1, 8))::bit(32)::bigint),
sum(('x' || substring(hash, 9, 8))::bit(32)::bigint),
sum(('x' || substring(hash, 17, 8))::bit(32)::bigint),
sum(('x' || substring(hash, 25, 8))::bit(32)::bigint)
from (
select md5 (
coalesce(md5("integer column"::text), ' ') ||
coalesce(md5(floor(
extract(epoch from "datetime column")
)::text), ' ') ||
coalesce(md5("string column"::text), ' ') ||
coalesce(md5("boolean column"::integer::text), ' ')
) as "hash"
from "my_schema"."my_table";
) as t;
具体可以参考这篇文章:
https://www.periscopedata.com/blog/hashing-tables-to-ensure-consistency-in-postgres-redshift-and-mysql
最后,在真正切换到新数据之前,还需要思考如下方面: