From c0ae1705265db3e5d46ac186c84f10113ce95909 Mon Sep 17 00:00:00 2001 From: c7ch23en Date: Thu, 21 Nov 2019 17:12:39 +0800 Subject: [PATCH] dal 2.0.2, support sharding --- dal-client/pom.xml | 2 +- .../dal/dao/datasource/DataSourceCreator.java | 5 +++++ .../dal/dao/task/DalBulkTaskRequest.java | 7 +++---- .../dal/dao/task/DalSingleTaskRequest.java | 7 +++---- .../dal/dao/task/DalSqlTaskRequest.java | 9 ++++----- .../platform/dal/dao/configure/AllTest.java | 4 +++- .../ForceSwitchableDataSourceTest.java | 2 +- .../datasource/RefreshableDataSourceTest.java | 20 +++++++++++++------ .../dal/dao/task/DalSqlTaskRequestTest.java | 2 +- dal-cluster-client/pom.xml | 2 +- .../client/cluster/ShardStrategyProxy.java | 12 ++++------- .../sharding/context/MappedShardData.java | 6 +++++- .../sharding/strategy/ModShardStrategy.java | 4 ++-- 13 files changed, 47 insertions(+), 35 deletions(-) diff --git a/dal-client/pom.xml b/dal-client/pom.xml index a7f7436a0..dd59ff503 100644 --- a/dal-client/pom.xml +++ b/dal-client/pom.xml @@ -4,7 +4,7 @@ 4.0.0 com.ctrip.platform dal-client - 2.0.2-SNAPSHOT + 2.0.2 1.8 ${java.version} diff --git a/dal-client/src/main/java/com/ctrip/platform/dal/dao/datasource/DataSourceCreator.java b/dal-client/src/main/java/com/ctrip/platform/dal/dao/datasource/DataSourceCreator.java index 8f17017d6..f25ceca71 100644 --- a/dal-client/src/main/java/com/ctrip/platform/dal/dao/datasource/DataSourceCreator.java +++ b/dal-client/src/main/java/com/ctrip/platform/dal/dao/datasource/DataSourceCreator.java @@ -143,4 +143,9 @@ private SingleDataSource asyncCreateDataSourceWithPool(String name, DataSourceCo return ds; } + protected void returnAllDataSources() { + for (SingleDataSource ds : targetDataSourceCache.values()) + returnDataSource(ds); + } + } diff --git a/dal-client/src/main/java/com/ctrip/platform/dal/dao/task/DalBulkTaskRequest.java b/dal-client/src/main/java/com/ctrip/platform/dal/dao/task/DalBulkTaskRequest.java index ba3e090fe..518d737f8 100644 --- a/dal-client/src/main/java/com/ctrip/platform/dal/dao/task/DalBulkTaskRequest.java +++ b/dal-client/src/main/java/com/ctrip/platform/dal/dao/task/DalBulkTaskRequest.java @@ -31,10 +31,11 @@ public class DalBulkTaskRequest implements DalRequest{ public DalBulkTaskRequest(String logicDbName, String rawTableName, DalHints hints, List rawPojos, BulkTask task) { this.logicDbName = logicDbName; this.rawTableName = rawTableName; - this.hints = hints.clone(); + this.hints = hints != null ? hints.clone() : new DalHints(); this.rawPojos = rawPojos; this.task = task; this.caller = LogContext.getRequestCaller(); + prepareRequestContext(); } @Override @@ -64,11 +65,9 @@ public void validateAndPrepare() throws SQLException { detectBulkTaskDistributedTransaction(logicDbName, hints, daoPojos); taskContext = task.createTaskContext(hints, daoPojos, rawPojos); - - prepareRequestContext(hints); } - private void prepareRequestContext(DalHints hints) { + private void prepareRequestContext() { hints.setRequestContext(null); if (task instanceof TaskAdapter) { RequestContext ctx = new DalRequestContext().setLogicTableName(((TaskAdapter) task).rawTableName); diff --git a/dal-client/src/main/java/com/ctrip/platform/dal/dao/task/DalSingleTaskRequest.java b/dal-client/src/main/java/com/ctrip/platform/dal/dao/task/DalSingleTaskRequest.java index dc02d68b0..07fe767b3 100644 --- a/dal-client/src/main/java/com/ctrip/platform/dal/dao/task/DalSingleTaskRequest.java +++ b/dal-client/src/main/java/com/ctrip/platform/dal/dao/task/DalSingleTaskRequest.java @@ -33,8 +33,9 @@ public class DalSingleTaskRequest implements DalRequest { private DalSingleTaskRequest(String logicDbName, DalHints hints, SingleTask task) { this.logicDbName = logicDbName; this.task = task; - this.hints = hints.clone(); + this.hints = hints != null ? hints.clone() : new DalHints(); this.caller = LogContext.getRequestCaller(); + prepareRequestContext(); } public DalSingleTaskRequest(String logicDbName, DalHints hints, T rawPojo, SingleTask task) { @@ -84,11 +85,9 @@ public void validateAndPrepare() throws SQLException { detectDistributedTransaction(logicDbName, hints, daoPojos); dalTaskContext = task.createTaskContext(); - - prepareRequestContext(hints); } - private void prepareRequestContext(DalHints hints) { + private void prepareRequestContext() { hints.setRequestContext(null); if (task instanceof TaskAdapter) { RequestContext ctx = new DalRequestContext().setLogicTableName(((TaskAdapter) task).rawTableName); diff --git a/dal-client/src/main/java/com/ctrip/platform/dal/dao/task/DalSqlTaskRequest.java b/dal-client/src/main/java/com/ctrip/platform/dal/dao/task/DalSqlTaskRequest.java index 7a7ce5ddf..9905e481c 100644 --- a/dal-client/src/main/java/com/ctrip/platform/dal/dao/task/DalSqlTaskRequest.java +++ b/dal-client/src/main/java/com/ctrip/platform/dal/dao/task/DalSqlTaskRequest.java @@ -47,11 +47,12 @@ public DalSqlTaskRequest(String logicDbName, SqlBuilder builder, DalHints hints, this.logicDbName = logicDbName; this.builder = builder; this.parameters = builder.buildParameters(); - this.hints = hints.clone(); + this.hints = hints != null ? hints.clone() : new DalHints(); this.task = task; this.merger = merger; - this.shards = getShards(); this.caller = LogContext.getRequestCaller(); + prepareRequestContext(); + this.shards = getShards(); } @Override @@ -68,11 +69,9 @@ public boolean isAsynExecution() { public void validateAndPrepare() throws SQLException { DalShardingHelper.detectDistributedTransaction(shards); taskContext = task.createTaskContext(); - - prepareRequestContext(hints); } - private void prepareRequestContext(DalHints hints) { + private void prepareRequestContext() { hints.setRequestContext(null); if (task instanceof TaskAdapter) { RequestContext ctx = new DalRequestContext().setLogicTableName(((TaskAdapter) task).rawTableName); diff --git a/dal-client/src/test/java/com/ctrip/platform/dal/dao/configure/AllTest.java b/dal-client/src/test/java/com/ctrip/platform/dal/dao/configure/AllTest.java index 8b334d609..c04fbe795 100644 --- a/dal-client/src/test/java/com/ctrip/platform/dal/dao/configure/AllTest.java +++ b/dal-client/src/test/java/com/ctrip/platform/dal/dao/configure/AllTest.java @@ -7,7 +7,9 @@ @RunWith(Suite.class) @SuiteClasses({ SlaveFreshnessScannerMysqlTest.class, - DataSourceConfigureLocatorTest.class + DataSourceConfigureLocatorTest.class, + ClusterConfigParserTest.class, + ClusterConfigValidatorTest.class }) public class AllTest { diff --git a/dal-client/src/test/java/com/ctrip/platform/dal/dao/datasource/ForceSwitchableDataSourceTest.java b/dal-client/src/test/java/com/ctrip/platform/dal/dao/datasource/ForceSwitchableDataSourceTest.java index e8c29c769..ec1bb862b 100644 --- a/dal-client/src/test/java/com/ctrip/platform/dal/dao/datasource/ForceSwitchableDataSourceTest.java +++ b/dal-client/src/test/java/com/ctrip/platform/dal/dao/datasource/ForceSwitchableDataSourceTest.java @@ -205,7 +205,7 @@ public void testProviderThrowException() throws Exception { MockSwitchListener listener = new MockSwitchListener(); dataSource.addListener(listener); - assertEquals("nullDataSource", dataSource.getSingleDataSource().getName()); + assertTrue("nullDataSource".equalsIgnoreCase(dataSource.getSingleDataSource().getName())); SwitchableDataSourceStatus status0 = dataSource.getStatus(); assertFalse(status0.isForceSwitched()); assertFalse(status0.isPoolCreated()); diff --git a/dal-client/src/test/java/com/ctrip/platform/dal/dao/datasource/RefreshableDataSourceTest.java b/dal-client/src/test/java/com/ctrip/platform/dal/dao/datasource/RefreshableDataSourceTest.java index 33c35aa4f..83dc20672 100644 --- a/dal-client/src/test/java/com/ctrip/platform/dal/dao/datasource/RefreshableDataSourceTest.java +++ b/dal-client/src/test/java/com/ctrip/platform/dal/dao/datasource/RefreshableDataSourceTest.java @@ -343,13 +343,19 @@ public void run() { while (true) { - Connection connection = refreshableDataSource.getConnection(); - String currentServer = DataSourceSwitchChecker.getDBServerName(connection, refreshableDataSource.getSingleDataSource().getDataSourceConfigure()); - System.out.println(currentServer); - if ("DST56614".equalsIgnoreCase(currentServer)) { - break; + try { + Connection connection = refreshableDataSource.getConnection(); + String currentServer = DataSourceSwitchChecker.getDBServerName(connection, refreshableDataSource.getSingleDataSource().getDataSourceConfigure()); + System.out.println(currentServer); + if ("DST56614".equalsIgnoreCase(currentServer)) { + break; + } + connection.close(); + } catch (SQLException e) { + e.printStackTrace(); + } finally { + Thread.sleep(10); } - connection.close(); } latch.await(); Assert.assertTrue(switched.get()); @@ -517,6 +523,8 @@ public void testReusedSingleDataSource() { @Test public void testDataSourceSwitch() throws Exception { + DataSourceCreator.getInstance().returnAllDataSources(); + Properties p1 = new Properties(); p1.setProperty("userName", "root"); p1.setProperty("password", "!QAZ@WSX1qaz2wsx"); diff --git a/dal-client/src/test/java/com/ctrip/platform/dal/dao/task/DalSqlTaskRequestTest.java b/dal-client/src/test/java/com/ctrip/platform/dal/dao/task/DalSqlTaskRequestTest.java index 50ba446fc..0a664b4d1 100644 --- a/dal-client/src/test/java/com/ctrip/platform/dal/dao/task/DalSqlTaskRequestTest.java +++ b/dal-client/src/test/java/com/ctrip/platform/dal/dao/task/DalSqlTaskRequestTest.java @@ -163,7 +163,7 @@ public void testIsCrossShard() throws SQLException { Set shards = new HashSet<>(); shards.add("0"); shards.add("1"); - hints.inAllShards(); + hints.inShards(shards); test = new DalSqlTaskRequest<>("dao_test_sqlsvr_dbShard", new TestSqlBuilder(), hints, null, null); assertTrue(test.isCrossShard()); } diff --git a/dal-cluster-client/pom.xml b/dal-cluster-client/pom.xml index 7cbcf24e3..321667681 100644 --- a/dal-cluster-client/pom.xml +++ b/dal-cluster-client/pom.xml @@ -5,7 +5,7 @@ 4.0.0 com.ctrip.framework.dal dal-cluster-client - 2.0.2-SNAPSHOT + 2.0.2 1.8 diff --git a/dal-cluster-client/src/main/java/com/ctrip/framework/dal/cluster/client/cluster/ShardStrategyProxy.java b/dal-cluster-client/src/main/java/com/ctrip/framework/dal/cluster/client/cluster/ShardStrategyProxy.java index df42e5908..b2732cc5d 100644 --- a/dal-cluster-client/src/main/java/com/ctrip/framework/dal/cluster/client/cluster/ShardStrategyProxy.java +++ b/dal-cluster-client/src/main/java/com/ctrip/framework/dal/cluster/client/cluster/ShardStrategyProxy.java @@ -55,6 +55,8 @@ public Set getAppliedTables() { } private ShardStrategy getTableStrategy(String tableName) { + if (tableName == null) + throw new ClusterRuntimeException("table name is necessary"); ShardStrategy strategy = tableStrategies.get(tableName); if (strategy == null) strategy = defaultStrategy; @@ -62,16 +64,10 @@ private ShardStrategy getTableStrategy(String tableName) { } private ShardStrategy getAndCheckTableStrategy(String tableName) { - ShardStrategy strategy = tableStrategies.get(tableName); - if (strategy == null) - strategy = defaultStrategy; - checkShardStrategy(strategy, tableName); - return strategy; - } - - private void checkShardStrategy(ShardStrategy strategy, String tableName) { + ShardStrategy strategy = getTableStrategy(tableName); if (strategy == null) throw new ClusterRuntimeException(String.format("shard strategy not found for table '%s'", tableName)); + return strategy; } public void addStrategy(ShardStrategy strategy) { diff --git a/dal-cluster-client/src/main/java/com/ctrip/framework/dal/cluster/client/sharding/context/MappedShardData.java b/dal-cluster-client/src/main/java/com/ctrip/framework/dal/cluster/client/sharding/context/MappedShardData.java index 977e5ba54..a568a7812 100644 --- a/dal-cluster-client/src/main/java/com/ctrip/framework/dal/cluster/client/sharding/context/MappedShardData.java +++ b/dal-cluster-client/src/main/java/com/ctrip/framework/dal/cluster/client/sharding/context/MappedShardData.java @@ -17,7 +17,11 @@ public MappedShardData(Map data) { @Override public Object getValue(String name) { - return data.get(name); + for (String key : data.keySet()) { + if (key != null && key.equalsIgnoreCase(name)) + return data.get(key); + } + return null; } @Override diff --git a/dal-cluster-client/src/main/java/com/ctrip/framework/dal/cluster/client/sharding/strategy/ModShardStrategy.java b/dal-cluster-client/src/main/java/com/ctrip/framework/dal/cluster/client/sharding/strategy/ModShardStrategy.java index 8177f53f7..3b9d3b493 100644 --- a/dal-cluster-client/src/main/java/com/ctrip/framework/dal/cluster/client/sharding/strategy/ModShardStrategy.java +++ b/dal-cluster-client/src/main/java/com/ctrip/framework/dal/cluster/client/sharding/strategy/ModShardStrategy.java @@ -52,12 +52,12 @@ public Set getAllTableShards(String tableName) { return allShards; } - private int getModResult(int mod, Object shardValue) { + protected int getModResult(int mod, Object shardValue) { long longValue = getLongValue(shardValue); return (int) longValue % mod; } - private Long getLongValue(Object value) { + protected Long getLongValue(Object value) { if (value instanceof Long) return (Long) value; if (value instanceof Number)