ShardingJDBC分库分表
1.Maven 引用
< dependency> < groupId> org.apache.shardingsphere</ groupId> < artifactId> sharding-jdbc-spring-boot-starter</ artifactId> < version> 4.1.1</ version> </ dependency> < dependency> < groupId> org.springframework.boot</ groupId> < artifactId> spring-boot-starter-data-jpa</ artifactId> </ dependency> < dependency> < groupId> mysql</ groupId> < artifactId> mysql-connector-java</ artifactId> </ dependency>
2.数据库和表格
数据库
*****_ch
*****_hk
*****_us
*****_olap表格
kline
kline_D_0
kline_D_1
.......
kline_D_15kline
kline_M_0
kline_M_1
.......
kline_M_15kline_m1
kline_m1_250121
.......
kline_m1_2501221kline_M5_0
.......
kline_M5_15kline_M30_0
.......
kline_M30_15kline_M60_0
.......
kline_M60_15kline_W_0
.......
kline_W_15kline_Y_0
.......
kline_Y_15trade_record_240101trade_record_250213_0
........
trade_record_250221_249
CREATE DEFINER = ` admin` @`%` PROCEDURE ` CreateKlineTables` ( )
BEGIN DECLARE i INT DEFAULT 0 ; DECLARE j INT DEFAULT 0 ; DECLARE table_name VARCHAR ( 64 ) ; DECLARE date_parts TEXT ; DECLARE date_part VARCHAR ( 10 ) ; SET date_parts = 'M5,M30,M60,D,W,M,Y' ; WHILE j < LENGTH( date_parts) - LENGTH( REPLACE ( date_parts, ',' , '' ) ) + 1 DO SET date_part = SUBSTRING_INDEX( SUBSTRING_INDEX( date_parts, ',' , j + 1 ) , ',' , - 1 ) ; SET i = 0 ; WHILE i < 16 DO SET table_name = CONCAT( 'kline_' , date_part, '_' , i) ; SET @sql = CONCAT( 'CREATE TABLE IF NOT EXISTS ' , table_name, ' LIKE kline' ) ; PREPARE stmt FROM @sql ; EXECUTE stmt; DEALLOCATE PREPARE stmt; SET i = i + 1 ; END WHILE ; SET j = j + 1 ; END WHILE ;
END
CREATE DEFINER = ` admin` @`%` PROCEDURE ` CreateTradeRecordTables` ( IN date_part VARCHAR ( 10 ) )
BEGIN DECLARE i INT DEFAULT 0 ; DECLARE table_name VARCHAR ( 64 ) ; WHILE i < 250 DO SET table_name = CONCAT( 'trade_record_' , date_part, '_' , i) ; SET @sql = CONCAT( 'CREATE TABLE IF NOT EXISTS ' , table_name, ' like trade_record_240101' ) ; PREPARE stmt FROM @sql ; EXECUTE stmt; DEALLOCATE PREPARE stmt; SET i = i + 1 ; END WHILE ;
END
CREATE DEFINER = ` admin` @`%` PROCEDURE ` DropTradeRecordTables` ( IN date_part VARCHAR ( 10 ) )
BEGIN DECLARE i INT DEFAULT 0 ; DECLARE table_name VARCHAR ( 64 ) ; WHILE i < 250 DO SET table_name = CONCAT( 'trade_record_' , date_part, '_' , i) ; SET @sql = CONCAT( 'DROP TABLE IF EXISTS ' , table_name) ; PREPARE stmt FROM @sql ; EXECUTE stmt; DEALLOCATE PREPARE stmt; SET i = i + 1 ; END WHILE ;
END
2.application.yaml配置
spring : port : 8888 tomcat : uri-encoding : UTF- 8 max-http-post-size : 20MBmax-http-header-size : 20MBhttp : encoding : force : true charset : UTF- 8 enabled : true aop : auto : true main : allow-bean-definition-overriding : true jpa : database-platform : org.hibernate.dialect.MySQL5InnoDBDialectshow-sql : false hibernate : ddl-auto : nonedsx : olap : type : com.zaxxer.hikari.HikariDataSourcedriverClassName : com.mysql.cj.jdbc.DriverjdbcUrl : username : password : hikari : maximum-pool-size : 20 minimum-idle : 20 shardingsphere : datasource : names : center, ds0, ds1, ds2center : type : com.zaxxer.hikari.HikariDataSourcedriverClassName : com.mysql.cj.jdbc.DriverjdbcUrl : username : password : hikari : maximum-pool-size : 20 minimum-idle : 20 ds0 : type : com.zaxxer.hikari.HikariDataSourcedriverClassName : com.mysql.cj.jdbc.DriverjdbcUrl : username : password : hikari : maximum-pool-size : 20 minimum-idle : 20 ds1 : type : com.zaxxer.hikari.HikariDataSourcedriverClassName : com.mysql.cj.jdbc.DriverjdbcUrl : username : password : hikari : maximum-pool-size : 20 minimum-idle : 20 ds2 : type : com.zaxxer.hikari.HikariDataSourcedriverClassName : com.mysql.cj.jdbc.DriverjdbcUrl : username : password : hikari : maximum-pool-size : 20 minimum-idle : 20 props : sql : show : false sharding : default-data-source-name : centertables : trade_record : actual-data-nodes : ds$- > { 0..2} .trade_record_$- > { 0..10} database-strategy : standard : sharding-column : market_codeprecise-algorithm-class-name : com.zzc.sharding.DbShardingByMarketTypeAlgorithmtable-strategy : complex : sharding-columns : trade_date, symbol_idalgorithm-class-name : com.zzc.sharding.TableShardingByDateAndSymbolAlgorithmkline_m1 : actual-data-nodes : ds$- > { 0..2} .kline_m1database-strategy : standard : sharding-column : market_codeprecise-algorithm-class-name : com.zzc.sharding.DbShardingByMarketTypeAlgorithmtable-strategy : complex : sharding-columns : trade_datealgorithm-class-name : com.zzc.sharding.TableShardingByDateAlgkline : actual-data-nodes : ds$- > { 0..2} .kline_${ [ 'M5' , 'M30' , 'M60' , 'D' , 'W' , 'M' , 'Y' ] } _${ 0..15} database-strategy : standard : sharding-column : market_codeprecise-algorithm-class-name : com.zzc.sharding.DbShardingByMarketTypeAlgorithmtable-strategy : complex : sharding-columns : kline_type, symbol_idalgorithm-class-name : com.zzc.sharding.TableShardingByKlineTypeAndSymbolIdAlg
创建路由规则 DbShardingByMarketTypeAlgorithm
package com. zzc. sharding ; import java. util. Collection ; @Slf4j
public class DbShardingByMarketTypeAlgorithm implements PreciseShardingAlgorithm < String > { private DatabaseShardingConfig config; @Override public String doSharding ( Collection < String > collection, PreciseShardingValue < String > preciseShardingValue) { String marketType = preciseShardingValue. getValue ( ) ; if ( config == null ) { config = SpringContextUtil . getBean ( DatabaseShardingConfig . class ) ; } String dbName = config. getDbName ( marketType) ; if ( ! collection. contains ( dbName) ) { log. error ( "Database sharding error. column-value : [{}], DatabaseShardingConfig dbName : [{}], shardingsphere configs : [{}]" , marketType, dbName, collection) ; throw new IllegalArgumentException ( "Database sharding error." ) ; } return dbName; }
}
TableShardingByDateAndSymbolAlgorithm
package com. zzc. sharding ; @Slf4j
public class TableShardingByDateAndSymbolAlgorithm implements ComplexKeysShardingAlgorithm { private static final String FIELD_NAME_DATE = "trade_date" ; private static final String FIELD_NAME_SYMBOL = "symbol_id" ; private DatabaseShardingConfig config; @Override public Collection < String > doSharding ( Collection collection, ComplexKeysShardingValue complexKeysShardingValue) { if ( config == null ) { config = SpringContextUtil . getBean ( DatabaseShardingConfig . class ) ; } String date = ( ( List < String > ) complexKeysShardingValue. getColumnNameAndShardingValuesMap ( ) . get ( FIELD_NAME_DATE ) ) . get ( 0 ) ; Long symbolId = ( ( List < Long > ) complexKeysShardingValue. getColumnNameAndShardingValuesMap ( ) . get ( FIELD_NAME_SYMBOL ) ) . get ( 0 ) ; String logicTable = complexKeysShardingValue. getLogicTableName ( ) ; DatabaseShardingConfig. TableShardingConfig shardingConfig = config. getTableShardingConfig ( logicTable) ; return Collections . singletonList ( logicTable + "_" + date. substring ( 2 ) . replaceAll ( "-" , "" ) + "_" + symbolId % shardingConfig. getTableShardingNum ( ) ) ; }
}
package com. zzc. sharding ; @Slf4j
public class TableShardingByDateAlg implements ComplexKeysShardingAlgorithm { @Override public Collection < String > doSharding ( Collection collection, ComplexKeysShardingValue complexKeysShardingValue) { String date = ( ( List < String > ) complexKeysShardingValue. getColumnNameAndShardingValuesMap ( ) . get ( "trade_date" ) ) . get ( 0 ) ; String logicTable = complexKeysShardingValue. getLogicTableName ( ) ; return Collections . singletonList ( logicTable+ "_" + date. substring ( 2 ) . replaceAll ( "-" , "" ) ) ; }
}
TableShardingByKlineTypeAndSymbolIdAlg
package com. zzc. sharding ; @Slf4j
public class TableShardingByKlineTypeAndSymbolIdAlg implements ComplexKeysShardingAlgorithm { private DatabaseShardingConfig config; @Override public Collection < String > doSharding ( Collection collection, ComplexKeysShardingValue complexKeysShardingValue) { if ( config == null ) { config = SpringContextUtil . getBean ( DatabaseShardingConfig . class ) ; } String klineType = ( ( List < String > ) complexKeysShardingValue. getColumnNameAndShardingValuesMap ( ) . get ( "kline_type" ) ) . get ( 0 ) ; Long symbolId = ( ( List < Long > ) complexKeysShardingValue. getColumnNameAndShardingValuesMap ( ) . get ( "symbol_id" ) ) . get ( 0 ) ; String logicTable = complexKeysShardingValue. getLogicTableName ( ) ; DatabaseShardingConfig. TableShardingConfig shardingConfig = config. getTableShardingConfig ( logicTable) ; log. warn ( "symbolId:{}" , symbolId) ; log. warn ( "klineType:{}" , klineType) ; log. warn ( "shardingConfig:{}" , shardingConfig) ; return Collections . singletonList ( logicTable+ "_" + klineType + "_" + symbolId % shardingConfig. getTableShardingNum ( ) ) ; }
}
package com. zzc. service. schedule ; @Slf4j
@Component
@RequiredArgsConstructor
public class QuotationDataManagementJob { private final static int LOCK_WAIT_SECONDS = 10 ; private final static int LOCK_LEASE_SECONDS = 30 * 60 ; private final static String SHARDING_TABLE_CREATE_SQL = "CREATE TABLE IF NOT EXISTS %s LIKE %s;" ; private final static String SHARDING_TABLE_CLEAR_SQL = "DROP TABLE IF EXISTS %s;" ; private final static String DS_SHARDING = "shardingDataSource" ; private final static String DS_OLAP = "olapDataSource" ; private final DatabaseShardingConfig dbShardingConfig; private final RedissonClient redissonClient; private final DataSource shardingDataSource; private final DataSource olapDataSource; @Scheduled ( cron = "0 30 12 ? * FRI" ) public void createShardingTableJob ( ) { RLock lock = redissonClient. getLock ( LOCK_CREATE_SHARDING_TABLE ) ; RedisLockUtils . lockExecute ( lock, LOCK_WAIT_SECONDS , LOCK_LEASE_SECONDS , TimeUnit . SECONDS , ( ) -> { dbShardingConfig. getTables ( ) . forEach ( ( tableName, config) -> { if ( config. getRunCreateJob ( ) ) createShardingTable ( tableName, config) ; } ) ; return null ; } ) ; log. info ( "createShardingTable job done" ) ; } @Scheduled ( cron = "0 0 10 * * ?" ) public void clearShardingTableJob ( ) { RLock lock = redissonClient. getLock ( LOCK_CLEAR_SHARDING_TABLE ) ; RedisLockUtils . lockExecute ( lock, LOCK_WAIT_SECONDS , LOCK_LEASE_SECONDS , TimeUnit . SECONDS , ( ) -> { dbShardingConfig. getTables ( ) . forEach ( ( tableName, config) -> { clearShardingTable ( tableName, config) ; } ) ; return null ; } ) ; log. info ( "clearShardingTable job done" ) ; } private void createShardingTable ( String tableName, DatabaseShardingConfig. TableShardingConfig config) { if ( DS_OLAP . equals ( config. getDs ( ) ) ) { try { Connection connection = olapDataSource. getConnection ( ) ; List < String > nextWeekWorkDays = getNextWeekWorkDays ( ) ; nextWeekWorkDays. forEach ( day -> { createShardingTable ( "olap" , connection, tableName, day, config) ; } ) ; } catch ( Throwable t) { log. error ( "createShardingTable error. db : [olap] tableName : [{}]" , tableName, t) ; } } else { ( ( ShardingDataSource ) shardingDataSource) . getDataSourceMap ( ) . forEach ( ( dbName, myDataSource) -> { if ( dbName. equals ( dbShardingConfig. getCenterDs ( ) ) ) { return ; } try { Connection connection = myDataSource. getConnection ( ) ; List < String > nextWeekWorkDays = getNextWeekWorkDays ( ) ; nextWeekWorkDays. forEach ( day -> { createShardingTable ( dbName, connection, tableName, day, config) ; } ) ; } catch ( Throwable t) { log. error ( "createShardingTable error. db : [{}] tableName : [{}]" , dbName, tableName, t) ; } } ) ; } } private void createShardingTable ( String dbName, Connection connection, String tableName, String day, DatabaseShardingConfig. TableShardingConfig config) { DatabaseShardingConfig. TableShardingConfig tableShardingConfig = dbShardingConfig. getTableShardingConfig ( tableName) ; if ( config. getTableShardingNum ( ) > 1 ) { for ( int i = 0 ; i < tableShardingConfig. getTableShardingNum ( ) ; i++ ) { String realTableName = tableName + "_" + day. substring ( 2 ) + "_" + i; try { String sql = String . format ( SHARDING_TABLE_CREATE_SQL , realTableName, tableShardingConfig. getTemplateTable ( ) ) ; connection. createStatement ( ) . execute ( sql) ; log. info ( "createShardingTable success. db : [{}] tableName : [{}], realTableName : [{}], sql : [{}]" , dbName, tableName, realTableName, sql) ; } catch ( Throwable t) { log. error ( "createShardingTable error. db : [{}] tableName : [{}], realTableName : [{}]" , dbName, tableName, realTableName, t) ; } } } else { String realTableName = tableName + "_" + day. substring ( 2 ) ; try { String sql = String . format ( SHARDING_TABLE_CREATE_SQL , realTableName, tableShardingConfig. getTemplateTable ( ) ) ; connection. createStatement ( ) . execute ( sql) ; log. info ( "createShardingTable success. db : [{}] tableName : [{}], realTableName : [{}], sql : [{}]" , dbName, tableName, realTableName, sql) ; } catch ( Throwable t) { log. error ( "createShardingTable error. db : [{}] tableName : [{}], realTableName : [{}]" , dbName, tableName, realTableName, t) ; } } } private List < String > getNextWeekWorkDays ( ) { LocalDate today = LocalDate . now ( ) ; LocalDate nextMonday = today. with ( TemporalAdjusters . next ( DayOfWeek . MONDAY ) ) ; List < String > workDays = new ArrayList < > ( ) ; DateTimeFormatter formatter = DateTimeFormatter . ofPattern ( DateUtils . YYYYMMDD ) ; for ( int i = 0 ; i < 5 ; i++ ) { LocalDate date = nextMonday. plusDays ( i) ; workDays. add ( date. format ( formatter) ) ; } return workDays; } private void clearShardingTable ( String tableName, DatabaseShardingConfig. TableShardingConfig config) { if ( DS_OLAP . equals ( config. getDs ( ) ) ) { try { Connection connection = olapDataSource. getConnection ( ) ; List < String > nextWeekWorkDays = getToBeClearDays ( tableName) ; nextWeekWorkDays. forEach ( day -> { clearShardingTable ( "olap" , connection, tableName, day, config) ; } ) ; } catch ( Throwable t) { log. error ( "clearShardingTable error. db : [olap] tableName : [{}]" , tableName, t) ; } } else { ( ( ShardingDataSource ) shardingDataSource) . getDataSourceMap ( ) . forEach ( ( dbName, myDataSource) -> { if ( dbName. equals ( dbShardingConfig. getCenterDs ( ) ) ) { return ; } try { Connection connection = myDataSource. getConnection ( ) ; List < String > nextWeekWorkDays = getToBeClearDays ( tableName) ; nextWeekWorkDays. forEach ( day -> { clearShardingTable ( dbName, connection, tableName, day, config) ; } ) ; } catch ( Throwable t) { log. error ( "clearShardingTable error. db : [{}] tableName : [{}]" , dbName, tableName, t) ; } } ) ; } } private void clearShardingTable ( String dbName, Connection connection, String tableName, String day, DatabaseShardingConfig. TableShardingConfig config) { if ( config. getTableShardingNum ( ) > 1 ) { for ( int i = 0 ; i < config. getTableShardingNum ( ) ; i++ ) { String realTableName = tableName + "_" + day. substring ( 2 ) + "_" + i; try { String sql = String . format ( SHARDING_TABLE_CLEAR_SQL , realTableName) ; connection. createStatement ( ) . execute ( sql) ; log. info ( "clearShardingTable success. db : [{}] tableName : [{}], realTableName : [{}], sql : [{}]" , dbName, tableName, realTableName, sql) ; } catch ( Throwable t) { log. error ( "clearShardingTable error. db : [{}] tableName : [{}], realTableName : [{}]" , dbName, tableName, realTableName, t) ; } } } else { String realTableName = tableName + "_" + day. substring ( 2 ) ; try { String sql = String . format ( SHARDING_TABLE_CLEAR_SQL , realTableName) ; connection. createStatement ( ) . execute ( sql) ; log. info ( "clearShardingTable success. db : [{}] tableName : [{}], realTableName : [{}], sql : [{}]" , dbName, tableName, realTableName, sql) ; } catch ( Throwable t) { log. error ( "clearShardingTable error. db : [{}] tableName : [{}], realTableName : [{}]" , dbName, tableName, realTableName, t) ; } } } private List < String > getToBeClearDays ( String tableName) { List < String > days = new ArrayList < > ( ) ; DatabaseShardingConfig. TableShardingConfig tableShardingConfig = dbShardingConfig. getTableShardingConfig ( tableName) ; LocalDate today = LocalDate . now ( ) ; LocalDate startDay = today. minusDays ( tableShardingConfig. getClearOffset ( ) ) ; LocalDate endDay = today. minusDays ( tableShardingConfig. getKeepDays ( ) ) ; DateTimeFormatter formatter = DateTimeFormatter . ofPattern ( DateUtils . YYYYMMDD ) ; for ( LocalDate date = startDay; date. isBefore ( endDay) ; date = date. plusDays ( 1 ) ) { days. add ( date. format ( formatter) ) ; } return days; } }
package com. zzc. service. config ; @Data
@Slf4j
@RefreshScope
@Configuration
@ConfigurationProperties ( prefix = "refinitiv.api-service.db-sharding" )
@PropertySource ( value = "classpath:guda-refinitiv-api-db-sharding.yaml" , factory = YamlPropertySourceFactory . class )
public class DatabaseShardingConfig { private String centerDs; private Map < String , TableShardingConfig > tables; private Map < String , String > marketConfigs; @Setter ( AccessLevel . PRIVATE ) private Map < String , String > dbMap; @PostConstruct public void init ( ) { if ( marketConfigs == null || marketConfigs. isEmpty ( ) ) { throw new RuntimeException ( "DatabaseShardingConfig error. configs is empty" ) ; } Map < String , String > tmp = new HashMap < > ( ) ; marketConfigs. forEach ( ( dbName, markets) -> { for ( String market : markets. split ( "," ) ) { tmp. put ( market. trim ( ) , dbName) ; } } ) ; dbMap = tmp; log. info ( "DatabaseShardingConfig init success. config: [{}]" , this ) ; } public String getDbName ( String market) { return dbMap. get ( market) ; } public TableShardingConfig getTableShardingConfig ( String tableName) { return tables. get ( tableName) ; } @Data @NoArgsConstructor @AllArgsConstructor public static class TableShardingConfig { private String templateTable; private int tableShardingNum; private int keepDays; private int clearOffset; private String ds; private Boolean runCreateJob = true ; }
}
refinitiv.api-service : db-sharding : centerDs : 'center' tables : trade_record : templateTable : 'trade_record_240101' tableShardingNum : 250 keepDays : 7 clearOffset : 15 ds : 'shardingDataSource' olap_quotation_snapshot : templateTable : 'olap_quotation_snapshot_240101' tableShardingNum : 1 keepDays : 30 clearOffset : 40 ds : 'olapDataSource' kline_m1 : templateTable : 'kline_m1' tableShardingNum : 1 keepDays : 30 clearOffset : 40 ds : 'shardingDataSource' kline : templateTable : 'kline' tableShardingNum : 16 keepDays : 30 clearOffset : 40 ds : 'shardingDataSource' runCreateJob : false marketConfigs : ds1 : 'HK, HK_WRNT, HK_BONDA, HK_TRUST' ds0 : 'US, US_PINK, US_OPTION, SH, SZ, SZ_INDEX, SZ_FUND, SZ_GEM, US_ETF'