Skip to content

Commit

Permalink
dal 2.0.2, support sharding
Browse files Browse the repository at this point in the history
  • Loading branch information
c7ch23en committed Nov 21, 2019
1 parent 8295e54 commit c0ae170
Show file tree
Hide file tree
Showing 13 changed files with 47 additions and 35 deletions.
2 changes: 1 addition & 1 deletion dal-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.ctrip.platform</groupId>
<artifactId>dal-client</artifactId>
<version>2.0.2-SNAPSHOT</version>
<version>2.0.2</version>
<properties>
<java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,9 @@ private SingleDataSource asyncCreateDataSourceWithPool(String name, DataSourceCo
return ds;
}

protected void returnAllDataSources() {
for (SingleDataSource ds : targetDataSourceCache.values())
returnDataSource(ds);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ public class DalBulkTaskRequest<K, T> implements DalRequest<K>{
public DalBulkTaskRequest(String logicDbName, String rawTableName, DalHints hints, List<T> rawPojos, BulkTask<K, T> task) {
this.logicDbName = logicDbName;
this.rawTableName = rawTableName;
this.hints = hints.clone();
this.hints = hints != null ? hints.clone() : new DalHints();
this.rawPojos = rawPojos;
this.task = task;
this.caller = LogContext.getRequestCaller();
prepareRequestContext();
}

@Override
Expand Down Expand Up @@ -64,11 +65,9 @@ public void validateAndPrepare() throws SQLException {

detectBulkTaskDistributedTransaction(logicDbName, hints, daoPojos);
taskContext = task.createTaskContext(hints, daoPojos, rawPojos);

prepareRequestContext(hints);
}

private void prepareRequestContext(DalHints hints) {
private void prepareRequestContext() {
hints.setRequestContext(null);
if (task instanceof TaskAdapter) {
RequestContext ctx = new DalRequestContext().setLogicTableName(((TaskAdapter) task).rawTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ public class DalSingleTaskRequest<T> implements DalRequest<int[]> {
private DalSingleTaskRequest(String logicDbName, DalHints hints, SingleTask<T> task) {
this.logicDbName = logicDbName;
this.task = task;
this.hints = hints.clone();
this.hints = hints != null ? hints.clone() : new DalHints();
this.caller = LogContext.getRequestCaller();
prepareRequestContext();
}

public DalSingleTaskRequest(String logicDbName, DalHints hints, T rawPojo, SingleTask<T> task) {
Expand Down Expand Up @@ -84,11 +85,9 @@ public void validateAndPrepare() throws SQLException {

detectDistributedTransaction(logicDbName, hints, daoPojos);
dalTaskContext = task.createTaskContext();

prepareRequestContext(hints);
}

private void prepareRequestContext(DalHints hints) {
private void prepareRequestContext() {
hints.setRequestContext(null);
if (task instanceof TaskAdapter) {
RequestContext ctx = new DalRequestContext().setLogicTableName(((TaskAdapter) task).rawTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,12 @@ public DalSqlTaskRequest(String logicDbName, SqlBuilder builder, DalHints hints,
this.logicDbName = logicDbName;
this.builder = builder;
this.parameters = builder.buildParameters();
this.hints = hints.clone();
this.hints = hints != null ? hints.clone() : new DalHints();
this.task = task;
this.merger = merger;
this.shards = getShards();
this.caller = LogContext.getRequestCaller();
prepareRequestContext();
this.shards = getShards();
}

@Override
Expand All @@ -68,11 +69,9 @@ public boolean isAsynExecution() {
public void validateAndPrepare() throws SQLException {
DalShardingHelper.detectDistributedTransaction(shards);
taskContext = task.createTaskContext();

prepareRequestContext(hints);
}

private void prepareRequestContext(DalHints hints) {
private void prepareRequestContext() {
hints.setRequestContext(null);
if (task instanceof TaskAdapter) {
RequestContext ctx = new DalRequestContext().setLogicTableName(((TaskAdapter) task).rawTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
@RunWith(Suite.class)
@SuiteClasses({
SlaveFreshnessScannerMysqlTest.class,
DataSourceConfigureLocatorTest.class
DataSourceConfigureLocatorTest.class,
ClusterConfigParserTest.class,
ClusterConfigValidatorTest.class
})
public class AllTest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public void testProviderThrowException() throws Exception {
MockSwitchListener listener = new MockSwitchListener();
dataSource.addListener(listener);

assertEquals("nullDataSource", dataSource.getSingleDataSource().getName());
assertTrue("nullDataSource".equalsIgnoreCase(dataSource.getSingleDataSource().getName()));
SwitchableDataSourceStatus status0 = dataSource.getStatus();
assertFalse(status0.isForceSwitched());
assertFalse(status0.isPoolCreated());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,13 +343,19 @@ public void run() {


while (true) {
Connection connection = refreshableDataSource.getConnection();
String currentServer = DataSourceSwitchChecker.getDBServerName(connection, refreshableDataSource.getSingleDataSource().getDataSourceConfigure());
System.out.println(currentServer);
if ("DST56614".equalsIgnoreCase(currentServer)) {
break;
try {
Connection connection = refreshableDataSource.getConnection();
String currentServer = DataSourceSwitchChecker.getDBServerName(connection, refreshableDataSource.getSingleDataSource().getDataSourceConfigure());
System.out.println(currentServer);
if ("DST56614".equalsIgnoreCase(currentServer)) {
break;
}
connection.close();
} catch (SQLException e) {
e.printStackTrace();
} finally {
Thread.sleep(10);
}
connection.close();
}
latch.await();
Assert.assertTrue(switched.get());
Expand Down Expand Up @@ -517,6 +523,8 @@ public void testReusedSingleDataSource() {

@Test
public void testDataSourceSwitch() throws Exception {
DataSourceCreator.getInstance().returnAllDataSources();

Properties p1 = new Properties();
p1.setProperty("userName", "root");
p1.setProperty("password", "!QAZ@WSX1qaz2wsx");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public void testIsCrossShard() throws SQLException {
Set<String> shards = new HashSet<>();
shards.add("0");
shards.add("1");
hints.inAllShards();
hints.inShards(shards);
test = new DalSqlTaskRequest<>("dao_test_sqlsvr_dbShard", new TestSqlBuilder(), hints, null, null);
assertTrue(test.isCrossShard());
}
Expand Down
2 changes: 1 addition & 1 deletion dal-cluster-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.ctrip.framework.dal</groupId>
<artifactId>dal-cluster-client</artifactId>
<version>2.0.2-SNAPSHOT</version>
<version>2.0.2</version>
<!-- TODO: mvn package sources & javadoc -->
<properties>
<java.version>1.8</java.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,19 @@ public Set<String> getAppliedTables() {
}

private ShardStrategy getTableStrategy(String tableName) {
if (tableName == null)
throw new ClusterRuntimeException("table name is necessary");
ShardStrategy strategy = tableStrategies.get(tableName);
if (strategy == null)
strategy = defaultStrategy;
return strategy;
}

private ShardStrategy getAndCheckTableStrategy(String tableName) {
ShardStrategy strategy = tableStrategies.get(tableName);
if (strategy == null)
strategy = defaultStrategy;
checkShardStrategy(strategy, tableName);
return strategy;
}

private void checkShardStrategy(ShardStrategy strategy, String tableName) {
ShardStrategy strategy = getTableStrategy(tableName);
if (strategy == null)
throw new ClusterRuntimeException(String.format("shard strategy not found for table '%s'", tableName));
return strategy;
}

public void addStrategy(ShardStrategy strategy) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ public MappedShardData(Map<String, Object> data) {

@Override
public Object getValue(String name) {
return data.get(name);
for (String key : data.keySet()) {
if (key != null && key.equalsIgnoreCase(name))
return data.get(key);
}
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ public Set<String> getAllTableShards(String tableName) {
return allShards;
}

private int getModResult(int mod, Object shardValue) {
protected int getModResult(int mod, Object shardValue) {
long longValue = getLongValue(shardValue);
return (int) longValue % mod;
}

private Long getLongValue(Object value) {
protected Long getLongValue(Object value) {
if (value instanceof Long)
return (Long) value;
if (value instanceof Number)
Expand Down

0 comments on commit c0ae170

Please sign in to comment.