前面我们已经学习了解过了数据库常用的分库分表方案,本节以水平分表为例来实战下.
最近项目中的几张表数据行超过了1000万行,所以需要对这些表进行水平分表,提高数据查询的性能。可选的方案有sharding-jdbc中间件还有就是Mybatis拦截器。由于使用的是Pg数据库,并且Pg数据库支持很多函数,以及复杂的sql查询语句,使用sharding-jdbc可能会有意想不到的坑,所以决定采用Mybatis拦截器的方式。
这里我需要分表的表名为process_log,这里不是根据正常的id字段去分表(因为这张表连id字段都没有…),而是选择这张表的唯一字段form_data_code来作为分表字段,将form_data_code字段值进行hashCode()然后进行取模。目前这张表的数据足足5000多万,考虑之后还会增加,需要将表数据控制在百万级内,所以决定分表20张。
// 1.准备20张表
结构同表process_log的20张表,process_log_0 ->process_log_19
// 2.分表健formDataCode,使用java hash算法
Math.abs(formDataCode.hashCode() % 20);
// 3.迁移数据
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface SegmentTable {
/**
* 表名
*/
String[] tableName();
/**
* 算法策略
*/
Class[] strategyClazz();
}
// 需要分表的table的Mapper接口
@SegmentTable(tableName = {"process_log"}, strategyClazz ={ProcessLogStrategy.class})
public interface ProcessLogMapper {// todo}
// 策略模式
public interface ShardTableStrategy {
/**
* 分表算法
*
* @param statementHandler
* @return
*/
String shardAlgorithm(StatementHandler statementHandler);
}
public class ShardTableContext {
private ShardTableStrategy tableStrategy;
public ShardTableContext(ShardTableStrategy tableStrategy) {
this.tableStrategy = tableStrategy;
}
public String doShardAlgorithm(StatementHandler statementHandler){
return tableStrategy.shardAlgorithm(statementHandler);
}
}
process_log策略实现
/**
* process_log 分表算法
*
*/
public class ProcessLogStrategy implements ShardTableStrategy {
private static final Logger logger = LoggerFactory.getLogger(ProcessLogStrategy.class);
/**
* 原始表名
*/
private final static String PROCESS_LOG_ORIGIN_TABLE_NAME = "process_log";
private final static String TABLE_LINE = "_";
/**
* 分表20张
*/
public final static Integer PROCESS_LOG_TABLE_NUM = 20;
/**
* 特殊处理字段
*/
private final static String PROCESS_LOG_TABLE_CONFIRM_INDEX = "subTableConfirmIndex";
/**
* 分表字段
*/
private final static String PROCESS_LOG_TABLE_SUB_FIELD = "formDataCode";
@Override
public String shardAlgorithm(StatementHandler statementHandler) throws RuntimeException {
AppEnv appEnv = ApplicationContextUtil.getApplicationContext().getBean(AppEnv.class);
if(!appEnv.isShardingTableProcessLog()){
return PROCESS_LOG_ORIGIN_TABLE_NAME;
}
BoundSql boundSql = statementHandler.getBoundSql();
Object parameterObject = boundSql.getParameterObject();
// 参数值
Map param2ValeMap = JSONObject.parseObject(JSON.toJSONString(parameterObject), Map.class);
logger.info("ProcessLogStrategy test!!! param2ValeMap={}", JSON.toJSONString(param2ValeMap));
// 特殊处理foreach循环语句
Object confirmIndexValue = param2ValeMap.get(PROCESS_LOG_TABLE_CONFIRM_INDEX);
if (confirmIndexValue != null) {
logger.info("handle success, sql exist param confirmIndexValue={}", confirmIndexValue);
return PROCESS_LOG_ORIGIN_TABLE_NAME + TABLE_LINE + confirmIndexValue;
}
Object subFieldValue = param2ValeMap.get(PROCESS_LOG_TABLE_SUB_FIELD);
if (MapUtils.isEmpty(param2ValeMap) || subFieldValue == null) {
throw new MybatisInterceptorException("process_log is subTable so must have subFiledValue!");
}
return PROCESS_LOG_ORIGIN_TABLE_NAME + TABLE_LINE + Math.abs(subFieldValue.hashCode() % PROCESS_LOG_TABLE_NUM);
}
}
mybatis拦截器必须实现Interceptor接口
public interface Interceptor {
// 拦截器执行的逻辑方法
Object intercept(Invocation invocation) throws Throwable;
// 用来封装目标对象。可以返回目标对象本身也可以根据实际需要,创建一个代理对象
Object plugin(Object target);
// 在Mybatis进行配置插件的时候可以配置自定义相关属性
void setProperties(Properties properties);
}
@Override
public Object intercept(Invocation invocation) throws Throwable {
StatementHandler statementHandler = (StatementHandler) invocation.getTarget();
// 全局操作读对象
MetaObject metaObject = MetaObject.forObject(statementHandler, SystemMetaObject.DEFAULT_OBJECT_FACTORY, SystemMetaObject.DEFAULT_OBJECT_WRAPPER_FACTORY, new DefaultReflectorFactory());
// @SegmentTable -- 只拦截有注解的Mapper
SegmentTable segmentTable = getSegmentTable(metaObject);
if (segmentTable == null) {
return invocation.proceed();
}
// 1.对value进行算法 -> 确定表名
Class strategyClazz = segmentTable.strategyClazz();
ShardTableStrategy strategy = (ShardTableStrategy) strategyClazz.newInstance();
String index = new ShardTableContext(strategy).doShardAlgorithm(statementHandler);
logger.info("ShardTableInterceptor segmentTable={},index={}", JSON.toJSONString(segmentTable), index);
// 2.替换表名
// 获取原始sql
String tableName = segmentTable.tableName();
String sql = (String) metaObject.getValue(BOUND_SQL_NAME);
metaObject.setValue(BOUND_SQL_NAME, sql.replaceFirst(tableName, tableName + index));
return invocation.proceed();
}
1.Mapper文件问题
老系统比较混乱,存在多表关联查询,有些Mapper.xml对象对应的sql不是唯一表。这里需要注意,因为注解是针对整个Mapper文件的,只要是上面的sql都会拦截。但是需要分表的sql都必须要有分表字段。需要避免不分表的sql走策略算法。 最好将需要分表的表拆成单表,逻辑在代码里处理。
2.分页插件pagehelper导致自定义插件无效
在sqlSessionFactory对象中放入拦截器对象,如果系统中有使用Mybatis对分页插件,要注意与自定义拦截器对顺序,拦截器底层采用责任链对方式,通常都会返回invocation.proceed()传递,但是分页插件没有返回。所以需要调整注册顺序.
3.针对sql中对foreach循环
sql查询中用到了分表key的集合,这种情况,在应用层提前使用hash算法,找到所在的表。
4.迁移数据
数据库的hash算法和Java的hash算法是不一致的,所以要确定两边的算法一直,大多数数据库是提供自定义算法的,本次是Pg数据.
// pg自定义java的hashCode算法
DROP FUNCTION IF EXISTS hash_code(text);
CREATE FUNCTION hash_code(text) RETURNS integer
LANGUAGE plpgsql
AS
$$
DECLARE
i integer := 0;
DECLARE
h bigint := 0;
BEGIN
FOR i IN 1..length($1)
LOOP
h = (h * 31 + ascii(substring($1, i, 1))) & 4294967295;
END LOOP;
RETURN cast(cast(h AS bit(32)) AS int4);
END;
$$;
以上便是水平分表的一次实践,就是提供一种分表的思路吧,加深下分库分表的理解.