Skip to content

Commit

Permalink
single db adapted to cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
chen_cheng committed Apr 13, 2020
1 parent dd812e4 commit a8e5882
Show file tree
Hide file tree
Showing 15 changed files with 201 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@
import java.util.Collection;

import com.ctrip.framework.dal.cluster.client.Cluster;
import com.ctrip.framework.dal.cluster.client.database.Database;
import com.ctrip.platform.dal.dao.configure.ClusterConfigProvider;
import com.ctrip.platform.dal.dao.configure.DalComponent;
import com.ctrip.platform.dal.dao.configure.DatabaseSet;
import com.ctrip.platform.dal.dao.datasource.ConnectionStringConfigureProvider;
import com.ctrip.platform.dal.dao.configure.IntegratedConfigProvider;
import com.ctrip.platform.dal.dao.datasource.DataSourceIdentity;

public interface DalConnectionLocator extends DalComponent {
Expand All @@ -19,7 +17,7 @@ public interface DalConnectionLocator extends DalComponent {

Connection getConnection(DataSourceIdentity id) throws Exception;

ClusterConfigProvider getClusterConfigProvider();
IntegratedConfigProvider getIntegratedConfigProvider();

void setupCluster(Cluster cluster);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.ctrip.platform.dal.dao.configure;

import com.ctrip.framework.dal.cluster.client.Cluster;
import com.ctrip.framework.dal.cluster.client.config.ClusterConfig;
import com.ctrip.framework.dal.cluster.client.database.DatabaseRole;
import com.ctrip.platform.dal.common.enums.DatabaseCategory;
import com.ctrip.platform.dal.dao.client.DalConnectionLocator;
import com.ctrip.platform.dal.dao.cluster.DynamicCluster;
import com.ctrip.platform.dal.dao.helper.DalElementFactory;
import com.ctrip.platform.dal.dao.log.ILogger;

import java.util.List;
import java.util.Map;

/**
* @author c7ch23en
*/
public class ClusterDatabaseSetAdapter implements DatabaseSetAdapter {

private static final ILogger LOGGER = DalElementFactory.DEFAULT.getILogger();

private ClusterInfoProvider clusterInfoProvider;
private ClusterConfigProvider clusterConfigProvider;
private DalConnectionLocator connectionLocator;

public ClusterDatabaseSetAdapter(DalConnectionLocator connectionLocator) {
this.connectionLocator = connectionLocator;
this.clusterInfoProvider = connectionLocator.getIntegratedConfigProvider();
this.clusterConfigProvider = connectionLocator.getIntegratedConfigProvider();
}

@Override
public DatabaseSet adapt(DatabaseSet original) {
if (original instanceof DefaultDatabaseSet) {
DefaultDatabaseSet defaultDatabaseSet = (DefaultDatabaseSet) original;
if (adaptable(defaultDatabaseSet)) {
ClusterDatabaseSet clusterDatabaseSet = tryAdapt(defaultDatabaseSet);
return clusterDatabaseSet != null ? clusterDatabaseSet : original;
}
}
return original;
}

private boolean adaptable(DefaultDatabaseSet defaultDatabaseSet) {
/*
* 1. mysql
* 2. no shard strategy
* 4. no idgen config
* 5. one master, no slaves
*/
boolean adaptable = true;
if (defaultDatabaseSet.getDatabaseCategory() != DatabaseCategory.MySql)
adaptable = false;
if (defaultDatabaseSet.getShardingStrategy() != null)
adaptable = false;
if (defaultDatabaseSet.getIdGenConfig() != null)
adaptable = false;
List<DataBase> masters = defaultDatabaseSet.getMasterDbs();
List<DataBase> slaves = defaultDatabaseSet.getSlaveDbs();
if (masters == null || masters.size() != 1)
adaptable = false;
if (slaves != null && slaves.size() > 0)
adaptable = false;
return adaptable;
}

private ClusterDatabaseSet tryAdapt(DefaultDatabaseSet defaultDatabaseSet) {
try {
List<DataBase> masters = defaultDatabaseSet.getMasterDbs();
if (masters != null && masters.size() == 1) {
String databaseKey = masters.iterator().next().getConnectionString();
Map<String, DalConnectionString> failedConnectionStrings = DataSourceConfigureLocatorManager.
getInstance().getFailedConnectionStrings();
if (failedConnectionStrings == null || !failedConnectionStrings.containsKey(databaseKey)) {
ClusterInfo clusterInfo = clusterInfoProvider.getClusterInfo(databaseKey);
if (clusterInfo != null && clusterInfo.getRole() == DatabaseRole.MASTER &&
!clusterInfo.dbSharding()) {
String clusterName = clusterInfo.getClusterName();
ClusterConfig clusterConfig = clusterConfigProvider.getClusterConfig(clusterName);
Cluster cluster = new DynamicCluster(clusterConfig);
return new ClusterDatabaseSet(defaultDatabaseSet.getName(), cluster, connectionLocator);
}
}
}
} catch (Throwable t) {
LOGGER.warn("Adapt DefaultDatabaseSet to ClusterDatabaseSet exception", t);
}
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ public class ClusterInfo {
private String clusterName;
private Integer shardIndex;
private DatabaseRole role;
private boolean dbSharding;

public ClusterInfo() {}

public ClusterInfo(String clusterName, Integer shardIndex, DatabaseRole role) {
public ClusterInfo(String clusterName, Integer shardIndex, DatabaseRole role, boolean dbSharding) {
this.clusterName = clusterName;
this.shardIndex = shardIndex;
this.role = role;
this.dbSharding = dbSharding;
}

public String getClusterName() {
Expand All @@ -31,6 +33,10 @@ public DatabaseRole getRole() {
return role;
}

public boolean dbSharding() {
return dbSharding;
}

public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.ctrip.platform.dal.dao.configure;

public interface ClusterInfoProvider {

ClusterInfo getClusterInfo(String databaseKey);

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
import java.io.FileInputStream;
import java.io.InputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;

import javax.persistence.Entity;
import javax.xml.parsers.DocumentBuilderFactory;
Expand Down Expand Up @@ -110,6 +107,9 @@ public DalConfigure getFromDocument(Document doc) throws Exception {

locator.setup(databaseSets.values());

DatabaseSetAdapter adapter = new ClusterDatabaseSetAdapter(locator);
tryAdaptToClusters(databaseSets, adapter);

DatabaseSelector selector =
readComponent(root, DATABASE_SELECTOR, new DefaultDatabaseSelector(), SELECTOR);

Expand Down Expand Up @@ -183,7 +183,7 @@ private Node getChildNode(Node node, String name) {
private Map<String, DatabaseSet> readDatabaseSets(Node databaseSetsNode, DalConnectionLocator locator) throws Exception {
Map<String, DatabaseSet> databaseSets = new HashMap<>();

ClusterConfigProvider provider = locator.getClusterConfigProvider();
ClusterConfigProvider provider = locator.getIntegratedConfigProvider();
List<Node> clusterList = getChildNodes(databaseSetsNode, CLUSTER);
for (int i = 0; i < clusterList.size(); i++) {
Node node = clusterList.get(i);
Expand Down Expand Up @@ -241,6 +241,11 @@ else if(hasAttribute(databaseSetNode, SHARDING_STRATEGY))
shardingStrategy, databases, idGenConfig);
}

private void tryAdaptToClusters(Map<String, DatabaseSet> databaseSets, DatabaseSetAdapter adapter) {
for (Map.Entry<String, DatabaseSet> entry : new HashMap<>(databaseSets).entrySet())
databaseSets.put(entry.getKey(), adapter.adapt(entry.getValue()));
}

private IIdGeneratorConfig getIdGenConfig(Node databaseSetNode) throws Exception {
Node idGeneratorNode = getChildNode(databaseSetNode, ID_GENERATOR);
if (null == idGeneratorNode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private int getAdjustedServerWaitTimeout() {
LOGGER.info(String.format("minIdle=%d, serverWaitTimeout set to 0", getMinIdle()));
return 0;
}
if (getTimeBetweenEvictionRunsMillis() <= 0 || getMinEvictableIdleTimeMillis() <= 0) {
/*if (getTimeBetweenEvictionRunsMillis() <= 0 || getMinEvictableIdleTimeMillis() <= 0) {
LOGGER.info("idle cleaner disabled, serverWaitTimeout set to 0");
return 0;
}
Expand All @@ -50,7 +50,7 @@ private int getAdjustedServerWaitTimeout() {
if (maxIdleSeconds > adjustedServerWaitTimeout) {
LOGGER.info(String.format("serverWaitTimeout set to possible maxIdleSeconds: %d", maxIdleSeconds));
adjustedServerWaitTimeout = maxIdleSeconds;
}
}*/
}
return adjustedServerWaitTimeout;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,20 @@ public boolean dynamicPoolPropertiesEnabled() {
return Boolean.parseBoolean(value);
}

public Integer getServerWaitTimeout() {
if (properties != null) {
String value = properties.getProperty(SERVER_WAIT_TIMEOUT);
if (value != null) {
try {
return Integer.parseInt(value);
} catch (NumberFormatException e) {
// ignore
}
}
}
return null;
}

public DatabaseCategory getDatabaseCategory() {
return DatabaseCategory.matchWithConnectionUrl(getConnectionUrl());
}
Expand Down Expand Up @@ -412,31 +426,32 @@ public void replaceURL(String ip, int port) {
public boolean equals(Object obj) {
if (obj instanceof DataSourceConfigure) {
DataSourceConfigure ref = (DataSourceConfigure) obj;
return (equals(ref.getConnectionUrl(), ref.getConnectionUrl()) &&
equals(ref.getUserName(), ref.getUserName()) &&
equals(ref.getPassword(), ref.getPassword()) &&
equals(ref.getDriverClass(), ref.getDriverClass()) &&
equals(ref.getTestOnBorrow(), ref.getTestOnBorrow()) &&
equals(ref.getTestOnReturn(), ref.getTestOnReturn()) &&
equals(ref.getTestWhileIdle(), ref.getTestWhileIdle()) &&
equals(ref.getValidationInterval(), ref.getValidationInterval()) &&
equals(ref.getValidationQuery(), ref.getValidationQuery()) &&
equals(ref.getValidationQueryTimeout(), ref.getValidationQueryTimeout()) &&
equals(ref.getValidatorClassName(), ref.getValidatorClassName()) &&
equals(ref.getMaxActive(), ref.getMaxActive()) &&
equals(ref.getMaxAge(), ref.getMaxAge()) &&
equals(ref.getMaxWait(), ref.getMaxWait()) &&
equals(ref.getMinIdle(), ref.getMinIdle()) &&
equals(ref.getTimeBetweenEvictionRunsMillis(), ref.getTimeBetweenEvictionRunsMillis()) &&
equals(ref.getMinEvictableIdleTimeMillis(), ref.getMinEvictableIdleTimeMillis()) &&
equals(ref.getInitialSize(), ref.getInitialSize()) &&
equals(ref.getInitSQL(), ref.getInitSQL()) &&
equals(ref.getLogAbandoned(), ref.getLogAbandoned()) &&
equals(ref.getRemoveAbandoned(), ref.getRemoveAbandoned()) &&
equals(ref.getRemoveAbandonedTimeout(), ref.getRemoveAbandonedTimeout()) &&
equals(ref.getJdbcInterceptors(), ref.getJdbcInterceptors()) &&
equals(ref.getConnectionProperties(), ref.getConnectionProperties()) &&
equals(ref.getJmxEnabled(), ref.getJmxEnabled()));
return (equals(getConnectionUrl(), ref.getConnectionUrl()) &&
equals(getUserName(), ref.getUserName()) &&
equals(getPassword(), ref.getPassword()) &&
equals(getDriverClass(), ref.getDriverClass()) &&
equals(getTestOnBorrow(), ref.getTestOnBorrow()) &&
equals(getTestOnReturn(), ref.getTestOnReturn()) &&
equals(getTestWhileIdle(), ref.getTestWhileIdle()) &&
equals(getValidationInterval(), ref.getValidationInterval()) &&
equals(getValidationQuery(), ref.getValidationQuery()) &&
equals(getValidationQueryTimeout(), ref.getValidationQueryTimeout()) &&
equals(getValidatorClassName(), ref.getValidatorClassName()) &&
equals(getMaxActive(), ref.getMaxActive()) &&
equals(getMaxAge(), ref.getMaxAge()) &&
equals(getMaxWait(), ref.getMaxWait()) &&
equals(getMinIdle(), ref.getMinIdle()) &&
equals(getTimeBetweenEvictionRunsMillis(), ref.getTimeBetweenEvictionRunsMillis()) &&
equals(getMinEvictableIdleTimeMillis(), ref.getMinEvictableIdleTimeMillis()) &&
equals(getInitialSize(), ref.getInitialSize()) &&
equals(getInitSQL(), ref.getInitSQL()) &&
equals(getLogAbandoned(), ref.getLogAbandoned()) &&
equals(getRemoveAbandoned(), ref.getRemoveAbandoned()) &&
equals(getRemoveAbandonedTimeout(), ref.getRemoveAbandonedTimeout()) &&
equals(getJdbcInterceptors(), ref.getJdbcInterceptors()) &&
equals(getConnectionProperties(), ref.getConnectionProperties()) &&
equals(getJmxEnabled(), ref.getJmxEnabled()) &&
equals(getServerWaitTimeout(), ref.getServerWaitTimeout()));
}
return false;
}
Expand Down Expand Up @@ -473,6 +488,7 @@ public int hashCode() {
append(getJdbcInterceptors()).
append(getConnectionProperties()).
append(getJmxEnabled()).
append(getServerWaitTimeout()).
generate();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.ctrip.platform.dal.dao.configure;

/**
* @author c7ch23en
*/
public interface DatabaseSetAdapter {

DatabaseSet adapt(DatabaseSet original);

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
public class DefaultDataSourceConfigureProvider implements IntegratedConfigProvider {

private ClusterConfigProvider clusterConfigProvider = new LocalClusterConfigProvider();
private ClusterInfoProvider clusterInfoProvider = new LocalClusterInfoProvider();

@Override
public void initialize(Map<String, String> settings) throws Exception {
Expand Down Expand Up @@ -41,6 +42,11 @@ public ClusterConfig getClusterConfig(String clusterName) {
return clusterConfigProvider.getClusterConfig(clusterName);
}

@Override
public ClusterInfo getClusterInfo(String databaseKey) {
return clusterInfoProvider.getClusterInfo(databaseKey);
}

@Override
public DataSourceConfigure getDataSourceConfigure(DataSourceIdentity id) {
DataSourceConfigure dataSourceConfigure =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private void initStrategy(String shardStrategy) throws Exception {
}

private void initShards() throws Exception {
if (strategy == null || strategy.isShardingByDb() == false) {
if (strategy == null || !strategy.isShardingByDb()) {
// Init with no shard support
for (DataBase db : databases.values()) {
if (db.isMaster())
Expand Down Expand Up @@ -182,6 +182,10 @@ public DalShardingStrategy getStrategy() throws SQLException {
return strategy;
}

protected DalShardingStrategy getShardingStrategy() {
return strategy;
}

public List<DataBase> getMasterDbs() {
return masterDbs == null ? null : new ArrayList<>(masterDbs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
/**
* @author c7ch23en
*/
public interface IntegratedConfigProvider extends ClusterConfigProvider, DataSourceConfigureProvider {
public interface IntegratedConfigProvider extends DataSourceConfigureProvider,
ClusterConfigProvider, ClusterInfoProvider {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.ctrip.platform.dal.dao.configure;

public class LocalClusterInfoProvider implements ClusterInfoProvider {

@Override
public ClusterInfo getClusterInfo(String databaseKey) {
return new NullClusterInfo();
}

}
Loading

0 comments on commit a8e5882

Please sign in to comment.