基于JdbcTemplate实现分库分表路由
前言
我们做服务端开发的都知道,当数据量到一定程度后单库单表会暴露出各种问题,比如单库数据量过大,查询rt过高等一系列问题。有句俗话描述的特别贴切,不要把鸡蛋放到一个篮子里边,一旦这个篮子漏了或者坏了,那么所有的鸡蛋都在劫难逃,对于我们的业务数据也一样,在条件允许的情况下,尽量不要把所有的数据放到一个数据库或者一张表中,如果数据库宕机或者表由于指定DDL操作长时间锁表,会导致上层服务全线崩溃,如果把数据放到多张表或者多个数据库里边,即使数据库崩溃或者锁表也只会影响固定范围内的服务,而不至于服务全部不可用。
背景
某个领域业务中,单表数据已经超过500万,对于某个查询条件筛选数据出现超时,经过排查后发现字段没有加索引,该字段可以筛选出30多万数据,粗略估计下索引区分度5%左右,平衡了一下加索引的成本和带来的收益,加索引的查询性能提升还是比较可观的,但是加完索引之后出现了另外一个问题,rpc查询5秒仍旧是超时的,也就是加索引虽然带来一定的性能提升,但是由于筛选后的数据量仍旧很大,查询的rt仍旧比较长,那么对于这种场景的解决方案是业务妥协,只允许查3个月之内的数据,很显然这不知一种很完美的解决方案。本篇本章我们会重点讲一下基于spring原生JdbcTemplate实现分库分表的路由。
一
分库分表概念
1:分表
由于单表数据量过大导致查询rt时间长,而把数据分别放到多张表中的一种方案。
将一张表中的数据拆分到多张表中,多个表的数据结构不变。
将一张表的字段拆分到多张表中,主表只留一些关键通用信息,将其他字段放到垂直表或者关联表中。
分表策略也就是将一张表的数据拆分到多张表中的规则,以及查询更新的路由策略。
按照主键取模将数据分表路由到不同的表中。
将业务主键取hash值,然后取hash后的几位作为表名的一部分,并且将数据路由到相应的表中。
按照时间分表也是一种常见的做法,一般有按照月份分表和按照季度分表,取业务数据的创建时间与分表规则对比,如果在分表的时间范围内,就路由到该表中。
所谓按照区间分表,就是每张表存储的数据对应的主键(一般是id),有一定的有序范围,比如0~9999,10000~20000等。
2:分库
当然一般情况下,分表基本解决了数据量不是很大,业务也并发量不是很大的绝大多数场景的问题,如果并发量足够大和数据量足够大,单库分表就无能为力了,因为单个数据库实例的并发连接数有限,内存和磁盘空间也有限,我们无法无限制的增加数据库的连接数和建表,这个时候就要考分库,分库一般是解决单库的并发访问和存储问题以及高可用性。
主要是根据数据属性(比如用户),将数据分成区域存储,拆分成多个物理库。
根据业务维度和数据访问量等因素,进行数据分离剥离为多个数据库,一般是按照领域拆分微服务并有独立的数据库。
二
单库分表路由实现
目前业内有很多分库分表中间件,比如mycat,sharding-jdbc和tddl等,这些都对分库分表的路由提供了不错的实现,具体孰优孰劣此篇不做深究,我个人比较喜欢翻弄源码,spring的jdbc模块对官方的jdbc也提供了不错的封装抽象,对于很多中小型应用,我认为没必要引用什么hibernate和mybatis等持久层框架,直接基于spring原生的JdbcTemplate做数据交互就足够了,只要你足够仔细和认真,基于JdbcTemplate能够玩出很多花样,接下来我们就基于JdbcTemplate来分别实现单库分表和分库分表的路由实现。
1:类图
自定义一个JdbcTemplate,继承原生JdbcTemplate复用其现有能力,实现ApplicationContextAware使用spring上下文中的bean,实现自定义JdbcOperations来实现分表路由查询能力。
2:时序图
时序图很简单,DAO层发送查询请求给自定义JdbcTemplate,然后调用路由规则并返回表名,自定义JdbcTemplate根据表明重新组装查询sql,并调用原生JdbcTemplate查询并返回。
3:编码实现
a
创建表结构并初始化数据
CREATE TABLE `User_0` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `name` varchar(50) NOT NULL DEFAULT '' COMMENT '姓名', `createTime` datetime NOT NULL COMMENT '创建时间', `sex` tinyint(4) DEFAULT '0' COMMENT '性别(0男,1女)', `age` int(10) DEFAULT NULL COMMENT '年龄', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8;CREATE TABLE `User_1` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `name` varchar(50) NOT NULL DEFAULT '' COMMENT '姓名', `createTime` datetime NOT NULL COMMENT '创建时间', `sex` tinyint(4) DEFAULT '0' COMMENT '性别(0男,1女)', `age` int(10) DEFAULT NULL COMMENT '年龄', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8;CREATE TABLE `User_2` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `name` varchar(50) NOT NULL DEFAULT '' COMMENT '姓名', `createTime` datetime NOT NULL COMMENT '创建时间', `sex` tinyint(4) DEFAULT '0' COMMENT '性别(0男,1女)', `age` int(10) DEFAULT NULL COMMENT '年龄', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8;CREATE TABLE `User_3` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `name` varchar(50) NOT NULL DEFAULT '' COMMENT '姓名', `createTime` datetime NOT NULL COMMENT '创建时间', `sex` tinyint(4) DEFAULT '0' COMMENT '性别(0男,1女)', `age` int(10) DEFAULT NULL COMMENT '年龄', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8;
初始化数据:
insert INTO User_0 values(0,"user_0",now(),1,25); insert INTO User_1 values(1,"user_1",now(),0,21); insert INTO User_2 values(2,"user_2",now(),0,23); insert INTO User_3 values(3,"user_3",now(),1,31); insert INTO User_0 values(4,"user_4",now(),1,8); insert INTO User_1 values(5,"user_5",now(),0,11); insert INTO User_2 values(6,"user_6",now(),0,13); insert INTO User_3 values(7,"user_7",now(),1,20);
b
编码
路由规则注解:
@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Inherited @Documented public @interface TableSeg { /** * 表名 * @return */ String tableName(); /** * 表名对应参数值的名称 * @return */ String shardByField(); int tableNum(); /** * 分表策略 * * @return */ Class<? extends IShard> shardStrategy(); }
数据实体:
@Data @NoArgsConstructor @AllArgsConstructor @Builder @TableSeg(tableName = "User" ,shardByField = "id" ,tableNum = 4 ,shardStrategy = SimpleHashModShard.class) public class UserDO extends BaseDO { private Long id; // 主键ID private String name; // 姓名 private Date createTime; private Integer sex; private Integer age; }
路由规则定义:
public interface IShard { String shard(IShardParam param); default void checkParameter(IShardParam param) { ParameterChecker.notNull(param,"param"); ParameterChecker.assertPositive(param.getTableNum(),"tableNum"); } }
简单取模路由规则实现:
@Slf4j @Component public class SimpleHashModShard implements IShard { @Override public String shard(IShardParam param) { IShard.super.checkParameter(param); ParameterChecker.notNull(param.getFieldValue(),"fieldValue"); String fieldValue = param.getFieldValue(); int tableNum = param.getTableNum(); int length = String.valueOf(tableNum).length(); int x = Math.abs(fieldValue.hashCode()%tableNum); int tmpLength = String.valueOf(x).length(); String result; if (tmpLength < length){ result=StringUtil.repeatLetter("0",length - tmpLength)+x; }else{ result = String.valueOf(x); } log.info("SimpleHashModShard.shard param={},result={}",param,result); return result; } }
自定义JdbcTemplate:
@Slf4j public class CustomJdbcTemplate extends JdbcTemplate implements CustomJdbcOperations, ApplicationContextAware { private static final Map<String,IShard> sharMap = new HashMap<>(); public CustomJdbcTemplate() { } public CustomJdbcTemplate(DataSource dataSource) { super.setDataSource(dataSource); } @Override public <T> List<T> query(String sql, Map<String,Object> params, Class<T> clazz) throws DataAccessException { TableSeg tableSeg = clazz.getAnnotation(TableSeg.class); List<Object> args = params.entrySet().stream().map(Map.Entry::getValue).collect(Collectors.toList()); if(null == tableSeg) { return super.query(sql,args.toArray(), BeanPropertyRowMapper.newInstance(clazz)); } String tableName = tableSeg.tableName(); String shardField = tableSeg.shardByField(); Class shardClazz = tableSeg.shardStrategy(); if(!params.containsKey(shardField)) { throw new DataAccessException("query params not contains shard key"){}; } Object value = params.get(shardField); String key = StringUtil.capitalLowerCase(shardClazz.getSimpleName()); IShard shard = sharMap.get(key); if(null == shard) { log.warn("CustomJdbcTemplate.query shardStratege is null;shard={}",key); return super.query(sql,args.toArray(), BeanPropertyRowMapper.newInstance(clazz)); } String tableSuffix = shard.shard(SimpleShardParam.builder() .fieldValue(String.valueOf(value)) .tableNum(tableSeg.tableNum()) .build()); log.info("oldSql= {}",sql); sql = sql.replaceAll("\\b" + tableName + "\\b", tableName + "_" + tableSuffix); log.info("newSql = {}",sql); return super.query(sql,args.toArray(), BeanPropertyRowMapper.newInstance(clazz)); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map<String,IShard> beans = applicationContext.getBeansOfType(IShard.class); beans.forEach((key,bean) -> sharMap.put(key,bean)); } }
通过实现ApplicationContextAware把路由规则注入进来,继承JdbcTemplate复用其原生数据交互能力,实现CustomJdbcOperations接口实现自定义查询能力,这个类也是最核心的,首先检查数据实体是否加分表注解,没有的话走非分表查询规则,然后检查入参是否包含分表字段,如果没有直接报错,然后根据数据实体上的分表注解从容器中获取分表规则,如果没有找到分表规则走默认的非分表查询,接着调用分表规则计算出分表的后缀,把原来的sql中的表名替换成新的分表表名,最后调用原生的JdbcTemplate查询能力并返回结果。
声明自定义JdbcTemplate:
@Bean public CustomJdbcTemplate customJdbcTemplate(DataSource dataSource) { return new CustomJdbcTemplate(dataSource); }
定义DAO查询:
@Slf4j @Repository public class UserDao extends BaseDao { private static final String TABLE_NAME = "User"; @Autowired private CustomJdbcTemplate customJdbcTemplate; @Override protected String getTableName() { return TABLE_NAME; } public UserDO queryByPK(Long id) { if(null == id || id <= 0) { log.error("UserDao.queryByPK param illegal;id={}",id); throw new IllegalArgumentException(String.format("UserDao.queryByPK param illegal;id=%d",id)); } Map<String,Object> params = new HashMap<>(); params.put("id",id); List<UserDO> list = this.customJdbcTemplate.query("select * from User where id = ? limit 1",params,UserDO.class); if(CollectionUtil.isEmpty(list)) { return null; } return list.get(0); } }
请求接收层:
@RestController @RequestMapping("user") @Slf4j public class UserController { @Autowired private UserManager userManager; @GetMapping("/{id}") public IResp<UserVO> queryUser(@PathVariable("id")Long id) { if(null == id || id <= 0L) { log.warn("UserController.queryUser param illegal;id={}",id); return IResp.getFailureResult(EntityError.ILLEGAL_ARGUMENT); } try { UserVO userVO = this.userManager.queryById(id); return IResp.getSuccessResult(userVO); } catch (Exception e) { log.error("UserController.queryUser occur error;id={}",id,e); return IResp.getFailureResult(EntityError.SYSTEM_ERROR); } } }
其他Manager和service层代码不在这里赘述。
4:测试验证
启动应用没有问题:
命令行发送请求:
curl http://localhost:8080/user/5
拿到结果响应:
查看sql替换前后的日志:
回顾一下我们UserDO中的分表规则注解,我们定义了4张表,表名是User,分表字段是id,分表策略是取模,对比一下请求响应结果,我们已经实现了单库分表的路由能力。
三
分库分表路由实现
1:类图
自定义一个JdbcTemplate,实现ApplicationContextAware使用spring上下文中的bean,实现自定义JdbcOperations来实现分表路由查询能力。
2:时序图
DAO层发送查询请求给自定义JdbcTemplate,然后调用路由规则并返回数据源key和表名,然后从容器中获取数据源对应的JdbcTemplate,自定义JdbcTemplate根据表明重新组装查询sql,并调用对应JdbcTemplate查询并返回。
3:编码实现
a
创建数据库和表结构并初始化数据
创建两个数据库分别是test_0和test1,然后在两个库中创建四张表,脚本不在粘贴,效果如图:
b
编码
路由规则注解增加数据库数量:
@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Inherited @Documented public @interface TableSeg { /** * 表名 * @return */ String tableName(); /** * 表名对应参数值的名称 * @return */ String shardByField(); /** * * table num * @return */ int tableNum(); /** * db num * * @return */ int dbNum(); /** * 分表策略 * * @return */ Class<? extends IShard> shardStrategy(); }
取模路由规则实现调整:
@Slf4j @Component public class SimpleHashModShard implements IShard { @Override public Pair<String,String> shard(IShardParam param) { IShard.super.checkParameter(param); ParameterChecker.notNull(param.getFieldValue(),"fieldValue"); String fieldValue = param.getFieldValue(); int tableNum = param.getTableNum(); int dbNum = param.getDbNum(); int hashCode = Math.abs(fieldValue.hashCode()); int dbIdx = hashCode % dbNum; String dbSuffix = getSuffix(dbNum,dbIdx); int tableIdx = (hashCode / dbNum) %tableNum; String tableSuffix = getSuffix(tableNum, tableIdx); Pair pair = new Pair(jdbcTemplatePrefix() + "_" + dbSuffix,"_" + tableSuffix); log.info("SimpleHashModShard.shard param={},result={}",param,pair); return pair; } /** * 生成表或者数据库后缀 * * @param num * @param idx * @return */ private String getSuffix(int num, int idx) { int length = String.valueOf(num).length(); int tmpLength = String.valueOf(idx).length(); String suffix; if (tmpLength < length){ suffix= StringUtil.repeatLetter("0",length - tmpLength)+idx; }else{ suffix = String.valueOf(idx); } return suffix; } }
数据源定义:
@Configuration @PropertySource(value = "classpath:druid.properties") public class DruidConfiguration { /** * 数据源 * * @return */ @Bean(destroyMethod = "close", initMethod = "init",name = "dataSource_0") @ConfigurationProperties(prefix = "spring.datasource0") public DataSource druidDataSource0() { DruidDataSource dataSource = new DruidDataSource(); return dataSource; } @Bean(destroyMethod = "close", initMethod = "init",name = "dataSource_1") @ConfigurationProperties(prefix = "spring.datasource1") public DataSource druidDataSource1() { DruidDataSource dataSource = new DruidDataSource(); return dataSource; } @Bean(name="jdbcTemplate_0") @ConditionalOnBean(DataSource.class) @Primary public JdbcTemplate jdbcTemplate0(@Qualifier("dataSource_0") DataSource dataSource_0) { return new JdbcTemplate(dataSource_0); } @Bean(name="jdbcTemplate_1") @ConditionalOnBean(DataSource.class) public JdbcTemplate jdbcTemplate1(@Qualifier("dataSource_1") DataSource dataSource_1) { return new JdbcTemplate(dataSource_1); } @Bean @ConditionalOnBean(JdbcTemplate.class) public CustomJdbcTemplate customJdbcTemplate(@Qualifier("jdbcTemplate_0") JdbcTemplate jdbcTemplate) { return new CustomJdbcTemplate(jdbcTemplate); } }
自定义JdbcTemplate实现:
@Slf4j public class CustomJdbcTemplate implements CustomJdbcOperations, ApplicationContextAware { private static final Map<String,IShard> sharMap = new HashMap<>(); private static final Map<String,JdbcTemplate> jdbcMap = new HashMap<>(); private JdbcTemplate defaultJdbcTemplate; public CustomJdbcTemplate() { } public CustomJdbcTemplate(JdbcTemplate defaultJdbcTemplate) { this.defaultJdbcTemplate = defaultJdbcTemplate; } @Override public <T> List<T> query(String sql, Map<String,Object> params, Class<T> clazz) throws DataAccessException { TableSeg tableSeg = clazz.getAnnotation(TableSeg.class); List<Object> args = params.entrySet().stream().map(Map.Entry::getValue).collect(Collectors.toList()); if(null == tableSeg) { return this.defaultJdbcTemplate.query(sql,args.toArray(), BeanPropertyRowMapper.newInstance(clazz)); } String tableName = tableSeg.tableName(); String shardField = tableSeg.shardByField(); Class shardClazz = tableSeg.shardStrategy(); if(!params.containsKey(shardField)) { log.error("CustomJdbcTemplate.query shardStrategy CustomJdbcTemplate.query shardStrategy query params not contains shard key;sql = {},params={}",sql, MapUtil.toPlaintString(params)); throw new DataAccessException("CustomJdbcTemplate.query shardStrategy query params not contains shard key"){}; } Object value = params.get(shardField); String key = StringUtil.capitalLowerCase(shardClazz.getSimpleName()); IShard shard = sharMap.get(key); if(null == shard) { log.warn("CustomJdbcTemplate.query shardStrategy is null;shard={}",key); return this.defaultJdbcTemplate.query(sql,args.toArray(), BeanPropertyRowMapper.newInstance(clazz)); } Pair<String,String> pair = shard.shard(SimpleShardParam.builder() .fieldValue(String.valueOf(value)) .tableNum(tableSeg.tableNum()) .dbNum(tableSeg.dbNum()) .build()); log.info("CustomJdbcTemplate.query shard result={}},",pair); sql = sql.replaceAll("\\b" + tableName + "\\b", tableName + pair.getValue()); log.info("CustomJdbcTemplate.query newSql = {},",sql); JdbcTemplate jdbcTemplate = jdbcMap.get(pair.getKey()); if(null == jdbcTemplate) { log.warn("CustomJdbcTemplate.query shardStrategy route jdbcTemplate is null;pair={}",pair); this.defaultJdbcTemplate.query(sql,args.toArray(), BeanPropertyRowMapper.newInstance(clazz)); } return jdbcTemplate.query(sql,args.toArray(), BeanPropertyRowMapper.newInstance(clazz)); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { //shard strategy Map<String,IShard> beans = applicationContext.getBeansOfType(IShard.class); beans.forEach((key,bean) -> sharMap.put(key,bean)); //multi jdbc Map<String,JdbcTemplate> jdbcBeans = applicationContext.getBeansOfType(JdbcTemplate.class); jdbcBeans.forEach((key,jdbc) -> { jdbcMap.put(key,jdbc); }); } }
通过实现ApplicationContextAware把路由规则和多数据库JdbcTemplate注入进来,实现CustomJdbcOperations接口实现自定义查询能力,首先检查数据实体是否加分表注解,没有的话调用默认JdbcTemplate执行查询,然后检查入参是否包含分表字段,如果没有直接报错,然后根据数据实体上的分表注解从容器中获取分表规则,如果没有找到分表规则走默认的非分表查询,接着调用分表规则计算出数据库和分表的后缀,把原来的sql中的表名替换成新的分表表名,最后从容器中获取对应数据库的JdbcTemplate查询并返回结果。
其他代码与上一小节没有变化可以直接复用。
4:测试验证
启动应用没有问题:
发送请求:
curl http://localhost:8080/user/5
响应结果:
查看路由日志:
可以看出id=5的请求路由到jdbcTemplate_1对应的test_1数据库,以及路由到User_2表。
重新发送请求:
curl http://localhost:8080/user/2
响应结果:
查看路由日志:
可以看出id=2的请求路由到jdbcTemplate_0对应的test_0数据库,以及路由到User_1表。
通过这两条请求对比响应结果以及业务日志,可以看出我们已经实现了分库分表的路由查询。
总结
通过上述的分析和编码以及测试验证,我们已经通过扩展原生JdbcTemplate实现了单库分表和分库分表的路由查询,我们可以总结出里边的一些关键点供大家参考:
我们基于JdbcTemplate可以做很多事情,比如分页查询、读写分离等等,有时间的话可以好好研究一下,本篇文章主要介绍了分库分表的一些概念,并通过编码特使验证实现了分库分表路由和查询,希望对大家理解分库分表以及分库分表的路由实现原理有所帮助,后续会对分库分表带来的查询和其他问题做介绍。
本文分享自 PersistentCoder 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!