From ede6dcc2b038eb45a35f4e3e0445ab43eb292a3b Mon Sep 17 00:00:00 2001 From: Piotr Grabowski Date: Thu, 6 Jul 2023 15:36:10 +0200 Subject: [PATCH 1/3] Add RackAwareRoundRobinPolicy RackAwareRoundRobinPolicy is a modified version of DCAwareRoundRobinPolicy. The new policy shares much of the code with the original policy, but changes the logic of newQueryPlan - it will return hosts in local rack first, followed by remote racks in local DC, followed by remote DC. In contrast to the approach in #199, this policy does not change the distance() method - it will return LOCAL for both local rack in local DC and remote racks in local DC. This is a deliberate choice, as many other places in the driver look at the distance and introducing a new distance type (e.g. LOCAL_REMOTERACK) would necessitate many changes. One example of that is PoolingOptions, where the user can specify how many connections should be made to LOCAL/REMOTE nodes - adding another type of distance would force the user to remember to change yet another configuration parameter. Integration tests in RackAwareRoundRobinPolicyTest are based on tests in DCAwareRoundRobinPolicyTest, as much of the behavior should be the same between the policies. A couple new tests were introduced to specifically test the rack-aware feature in local DC. --- .../policies/RackAwareRoundRobinPolicy.java | 510 +++++++++ .../driver/core/ScassandraCluster.java | 8 + .../RackAwareRoundRobinPolicyTest.java | 974 ++++++++++++++++++ manual/load_balancing/README.md | 42 + 4 files changed, 1534 insertions(+) create mode 100644 driver-core/src/main/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicy.java create mode 100644 driver-core/src/test/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicyTest.java diff --git a/driver-core/src/main/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicy.java b/driver-core/src/main/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicy.java new file mode 100644 index 00000000000..60761c65afa --- /dev/null +++ b/driver-core/src/main/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicy.java @@ -0,0 +1,510 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright (C) 2023 ScyllaDB + * + * Modified by ScyllaDB + */ +package com.datastax.driver.core.policies; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Configuration; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Host; +import com.datastax.driver.core.HostDistance; +import com.datastax.driver.core.Statement; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.AbstractIterator; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A rack aware Round-robin load balancing policy. + * + *

This policy provides round-robin queries over the node of the local rack. After that, the + * query plan includes round-robin list of nodes in the local DC (but not in local rack). It also + * includes in the query plans returned a configurable number of hosts in the remote data centers, + * but those are always tried after the local nodes. In other words, this policy guarantees that no + * host in a remote data center will be queried unless no host in the local data center can be + * reached. + */ +public class RackAwareRoundRobinPolicy implements LoadBalancingPolicy { + + private static final Logger logger = LoggerFactory.getLogger(RackAwareRoundRobinPolicy.class); + + /** + * Returns a builder to create a new instance. + * + * @return the builder. + */ + public static Builder builder() { + return new Builder(); + } + + private static final String UNSET = ""; + + private final ConcurrentMap> perDcLiveHosts = + new ConcurrentHashMap>(); + private final CopyOnWriteArrayList liveHostsLocalRackLocalDC = + new CopyOnWriteArrayList(); + private final CopyOnWriteArrayList liveHostsRemoteRacksLocalDC = + new CopyOnWriteArrayList(); + private final AtomicInteger index = new AtomicInteger(); + + @VisibleForTesting volatile String localDc; + @VisibleForTesting volatile String localRack; + + private final int usedHostsPerRemoteDc; + private final boolean dontHopForLocalCL; + + private volatile Configuration configuration; + + public RackAwareRoundRobinPolicy( + String localDc, + String localRack, + int usedHostsPerRemoteDc, + boolean allowRemoteDCsForLocalConsistencyLevel, + boolean allowEmptyLocalDc, + boolean allowEmptyLocalRack) { + if (!allowEmptyLocalDc && Strings.isNullOrEmpty(localDc)) + throw new IllegalArgumentException( + "Null or empty data center specified for Rack-aware policy"); + if (!allowEmptyLocalRack && Strings.isNullOrEmpty(localRack)) + throw new IllegalArgumentException("Null or empty rack specified for Rack-aware policy"); + this.localDc = localDc == null ? UNSET : localDc; + this.localRack = localRack == null ? UNSET : localRack; + this.usedHostsPerRemoteDc = usedHostsPerRemoteDc; + this.dontHopForLocalCL = !allowRemoteDCsForLocalConsistencyLevel; + } + + @Override + public void init(Cluster cluster, Collection hosts) { + if (localDc != UNSET) + logger.info("Using provided data-center name '{}' for RackAwareRoundRobinPolicy", localDc); + if (localRack != UNSET) + logger.info("Using provided rack name '{}' for RackAwareRoundRobinPolicy", localRack); + + this.configuration = cluster.getConfiguration(); + + ArrayList notInLocalDC = new ArrayList(); + ArrayList notInLocalRack = new ArrayList(); + + for (Host host : hosts) { + String dc = dc(host); + String rack = rack(host); + + // If the localDC was in "auto-discover" mode and it's the first host for which we have a DC, + // use it. + if (localDc == UNSET && dc != UNSET) { + logger.info( + "Using data-center name '{}' for RackAwareRoundRobinPolicy (if this is incorrect, please provide the correct datacenter name with RackAwareRoundRobinPolicy constructor)", + dc); + localDc = dc; + } else if (!dc.equals(localDc)) + notInLocalDC.add(String.format("%s (%s)", host.toString(), dc)); + + if (localRack == UNSET && rack != UNSET) { + logger.info( + "Using rack name '{}' for RackAwareRoundRobinPolicy (if this is incorrect, please provide the correct rack name with RackAwareRoundRobinPolicy constructor)", + rack); + localRack = rack; + } else if (!rack.equals(localRack)) { + notInLocalRack.add(String.format("%s (dc=%s, rack=%s)", host.toString(), dc, rack)); + } + + CopyOnWriteArrayList prev = perDcLiveHosts.get(dc); + if (prev == null) + perDcLiveHosts.put(dc, new CopyOnWriteArrayList(Collections.singletonList(host))); + else prev.addIfAbsent(host); + + if (dc.equals(localDc)) { + if (rack.equals(localRack)) { + liveHostsLocalRackLocalDC.add(host); + } else { + liveHostsRemoteRacksLocalDC.add(host); + } + } + } + + if (notInLocalDC.size() > 0) { + String nonLocalHosts = Joiner.on(",").join(notInLocalDC); + logger.warn( + "Some contact points don't match local data center. Local DC = {}. Non-conforming contact points: {}", + localDc, + nonLocalHosts); + } + + if (notInLocalRack.size() > 0) { + String nonLocalHosts = Joiner.on(",").join(notInLocalRack); + logger.warn( + "Some contact points don't match local rack. Local rack = {}. Non-conforming contact points: {}", + localRack, + nonLocalHosts); + } + + this.index.set(new Random().nextInt(Math.max(hosts.size(), 1))); + } + + private String dc(Host host) { + String dc = host.getDatacenter(); + return dc == null ? localDc : dc; + } + + private String rack(Host host) { + String rack = host.getRack(); + return rack == null ? localRack : rack; + } + + @SuppressWarnings("unchecked") + private static CopyOnWriteArrayList cloneList(CopyOnWriteArrayList list) { + return (CopyOnWriteArrayList) list.clone(); + } + + /** + * Return the HostDistance for the provided host. + * + *

This policy consider nodes in the local datacenter as {@code LOCAL}. For each remote + * datacenter, it considers a configurable number of hosts as {@code REMOTE} and the rest is + * {@code IGNORED}. + * + *

To configure how many hosts in each remote datacenter should be considered, see {@link + * Builder#withUsedHostsPerRemoteDc(int)}. + * + * @param host the host of which to return the distance of. + * @return the HostDistance to {@code host}. + */ + @Override + public HostDistance distance(Host host) { + String dc = dc(host); + if (dc == UNSET || dc.equals(localDc)) return HostDistance.LOCAL; + + CopyOnWriteArrayList dcHosts = perDcLiveHosts.get(dc); + if (dcHosts == null || usedHostsPerRemoteDc == 0) return HostDistance.IGNORED; + + // We need to clone, otherwise our subList call is not thread safe + dcHosts = cloneList(dcHosts); + return dcHosts.subList(0, Math.min(dcHosts.size(), usedHostsPerRemoteDc)).contains(host) + ? HostDistance.REMOTE + : HostDistance.IGNORED; + } + + /** + * Returns the hosts to use for a new query. + * + *

The returned plan will always try each known host in the local rack first, and then, if none + * of the host in local rack are reachable, will try hosts in remote racks of the local + * datacenter. Finally, it will try up to a configurable number of other host per remote + * datacenter. The order of the local node in the returned query plan will follow a Round-robin + * algorithm. + * + * @param loggedKeyspace the keyspace currently logged in on for this query. + * @param statement the query for which to build the plan. + * @return a new query plan, i.e. an iterator indicating which host to try first for querying, + * which one to use as failover, etc... + */ + @Override + public Iterator newQueryPlan(String loggedKeyspace, final Statement statement) { + + CopyOnWriteArrayList localLiveHosts = perDcLiveHosts.get(localDc); + // Clone for thread safety + final List copyLiveHostsLocalRackLocalDC = cloneList(liveHostsLocalRackLocalDC); + final List copyLiveHostsRemoteRacksLocalDC = cloneList(liveHostsRemoteRacksLocalDC); + final int startIdx = index.getAndIncrement(); + + return new AbstractIterator() { + + private int idx = startIdx; + private int remainingLiveHostsLocalRackLocalDC = copyLiveHostsLocalRackLocalDC.size(); + private int remainingLiveHostsRemoteRacksLocalDC = copyLiveHostsRemoteRacksLocalDC.size(); + + // For remote Dcs + private Iterator remoteDcs; + private List currentRemoteDcHosts; + private int currentRemoteDcRemaining; + + @Override + protected Host computeNext() { + while (true) { + if (remainingLiveHostsLocalRackLocalDC > 0) { + remainingLiveHostsLocalRackLocalDC--; + int c = idx++ % copyLiveHostsLocalRackLocalDC.size(); + if (c < 0) { + c += copyLiveHostsLocalRackLocalDC.size(); + } + return copyLiveHostsLocalRackLocalDC.get(c); + } + + if (remainingLiveHostsRemoteRacksLocalDC > 0) { + remainingLiveHostsRemoteRacksLocalDC--; + int c = idx++ % copyLiveHostsRemoteRacksLocalDC.size(); + if (c < 0) { + c += copyLiveHostsRemoteRacksLocalDC.size(); + } + return copyLiveHostsRemoteRacksLocalDC.get(c); + } + + if (currentRemoteDcHosts != null && currentRemoteDcRemaining > 0) { + currentRemoteDcRemaining--; + int c = idx++ % currentRemoteDcHosts.size(); + if (c < 0) { + c += currentRemoteDcHosts.size(); + } + return currentRemoteDcHosts.get(c); + } + + ConsistencyLevel cl = + statement.getConsistencyLevel() == null + ? configuration.getQueryOptions().getConsistencyLevel() + : statement.getConsistencyLevel(); + + if (dontHopForLocalCL && cl.isDCLocal()) return endOfData(); + + if (remoteDcs == null) { + Set copy = new HashSet(perDcLiveHosts.keySet()); + copy.remove(localDc); + remoteDcs = copy.iterator(); + } + + if (!remoteDcs.hasNext()) break; + + String nextRemoteDc = remoteDcs.next(); + CopyOnWriteArrayList nextDcHosts = perDcLiveHosts.get(nextRemoteDc); + if (nextDcHosts != null) { + // Clone for thread safety + List dcHosts = cloneList(nextDcHosts); + currentRemoteDcHosts = + dcHosts.subList(0, Math.min(dcHosts.size(), usedHostsPerRemoteDc)); + currentRemoteDcRemaining = currentRemoteDcHosts.size(); + } + } + return endOfData(); + } + }; + } + + @Override + public void onUp(Host host) { + String dc = dc(host); + String rack = rack(host); + + // If the localDC was in "auto-discover" mode and it's the first host for which we have a DC, + // use it. + if (localDc == UNSET && dc != UNSET) { + logger.info( + "Using data-center name '{}' for RackAwareRoundRobinPolicy (if this is incorrect, please provide the correct datacenter name with RackAwareRoundRobinPolicy constructor)", + dc); + localDc = dc; + } + if (localRack == UNSET && rack != UNSET) { + logger.info( + "Using rack name '{}' for RackAwareRoundRobinPolicy (if this is incorrect, please provide the correct datacenter name with RackAwareRoundRobinPolicy constructor)", + rack); + localRack = rack; + } + + CopyOnWriteArrayList dcHosts = perDcLiveHosts.get(dc); + if (dcHosts == null) { + CopyOnWriteArrayList newMap = + new CopyOnWriteArrayList(Collections.singletonList(host)); + dcHosts = perDcLiveHosts.putIfAbsent(dc, newMap); + // If we've successfully put our new host, we're good, otherwise we've been beaten so continue + if (dcHosts == null) return; + } + dcHosts.addIfAbsent(host); + + if (dc.equals(localDc)) { + if (rack.equals(localRack)) { + liveHostsLocalRackLocalDC.add(host); + } else { + liveHostsRemoteRacksLocalDC.add(host); + } + } + } + + @Override + public void onDown(Host host) { + String dc = dc(host); + String rack = rack(host); + + CopyOnWriteArrayList dcHosts = perDcLiveHosts.get(dc); + if (dcHosts != null) dcHosts.remove(host); + + if (dc.equals(localDc)) { + if (rack.equals(localRack)) { + liveHostsLocalRackLocalDC.remove(host); + } else { + liveHostsRemoteRacksLocalDC.remove(host); + } + } + } + + @Override + public void onAdd(Host host) { + onUp(host); + } + + @Override + public void onRemove(Host host) { + onDown(host); + } + + @Override + public void close() { + // nothing to do + } + + /** Helper class to build the policy. */ + public static class Builder { + private String localDc; + private String localRack; + private int usedHostsPerRemoteDc; + private boolean allowRemoteDCsForLocalConsistencyLevel; + + /** + * Sets the name of the datacenter that will be considered "local" by the policy. + * + *

This must be the name as known by Cassandra (in other words, the name in that appears in + * {@code system.peers}, or in the output of admin tools like nodetool). + * + *

If this method isn't called, the policy will default to the datacenter of the first node + * connected to. This will always be ok if all the contact points use at {@code Cluster} + * creation are in the local data-center. Otherwise, you should provide the name yourself with + * this method. + * + * @param localDc the name of the datacenter. It should not be {@code null}. + * @return this builder. + */ + public Builder withLocalDc(String localDc) { + Preconditions.checkArgument( + !Strings.isNullOrEmpty(localDc), + "localDc name can't be null or empty. If you want to let the policy autodetect the datacenter, don't call Builder.withLocalDC"); + this.localDc = localDc; + return this; + } + + /** + * Sets the name of the rack that will be considered "local" by the policy. + * + *

This must be the name as known by Cassandra (in other words, the name in that appears in + * {@code system.peers}, or in the output of admin tools like nodetool). + * + *

If this method isn't called, the policy will default to the rack of the first node + * connected to. This will always be ok if all the contact points use at {@code Cluster} + * creation are in the local rack. Otherwise, you should provide the name yourself with this + * method. + * + * @param localRack the name of the rack. It should not be {@code null}. + * @return this builder. + */ + public Builder withLocalRack(String localRack) { + Preconditions.checkArgument( + !Strings.isNullOrEmpty(localRack), + "localRack name can't be null or empty. If you want to let the policy autodetect the rack, don't call Builder.withLocalRack"); + this.localRack = localRack; + return this; + } + + /** + * Sets the number of hosts per remote datacenter that the policy should consider. + * + *

The policy's {@code distance()} method will return a {@code HostDistance.REMOTE} distance + * for only {@code usedHostsPerRemoteDc} hosts per remote datacenter. Other hosts of the remote + * datacenters will be ignored (and thus no connections to them will be maintained). + * + *

If {@code usedHostsPerRemoteDc > 0}, then if for a query no host in the local datacenter + * can be reached and if the consistency level of the query is not {@code LOCAL_ONE} or {@code + * LOCAL_QUORUM}, then up to {@code usedHostsPerRemoteDc} hosts per remote datacenter will be + * tried by the policy as a fallback. By default, no remote host will be used for {@code + * LOCAL_ONE} and {@code LOCAL_QUORUM}, since this would change the meaning of the consistency + * level, somewhat breaking the consistency contract (this can be overridden with {@link + * #allowRemoteDCsForLocalConsistencyLevel()}). + * + *

If this method isn't called, the policy will default to 0. + * + * @param usedHostsPerRemoteDc the number. + * @return this builder. + * @deprecated This functionality will be removed in the next major release of the driver. DC + * failover shouldn't be done in the driver, which does not have the necessary context to + * know what makes sense considering application semantics. + */ + @Deprecated + public Builder withUsedHostsPerRemoteDc(int usedHostsPerRemoteDc) { + Preconditions.checkArgument( + usedHostsPerRemoteDc >= 0, "usedHostsPerRemoteDc must be equal or greater than 0"); + this.usedHostsPerRemoteDc = usedHostsPerRemoteDc; + return this; + } + + /** + * Allows the policy to return remote hosts when building query plans for queries having + * consistency level {@code LOCAL_ONE} or {@code LOCAL_QUORUM}. + * + *

When used in conjunction with {@link #withUsedHostsPerRemoteDc(int) usedHostsPerRemoteDc} + * > 0, this overrides the policy of never using remote datacenter nodes for {@code LOCAL_ONE} + * and {@code LOCAL_QUORUM} queries. It is however inadvisable to do so in almost all cases, as + * this would potentially break consistency guarantees and if you are fine with that, it's + * probably better to use a weaker consistency like {@code ONE}, {@code TWO} or {@code THREE}. + * As such, this method should generally be avoided; use it only if you know and understand what + * you do. + * + * @return this builder. + * @deprecated This functionality will be removed in the next major release of the driver. DC + * failover shouldn't be done in the driver, which does not have the necessary context to + * know what makes sense considering application semantics. + */ + @Deprecated + public Builder allowRemoteDCsForLocalConsistencyLevel() { + this.allowRemoteDCsForLocalConsistencyLevel = true; + return this; + } + + /** + * Builds the policy configured by this builder. + * + * @return the policy. + */ + public RackAwareRoundRobinPolicy build() { + if (usedHostsPerRemoteDc == 0 && allowRemoteDCsForLocalConsistencyLevel) { + logger.warn( + "Setting allowRemoteDCsForLocalConsistencyLevel has no effect if usedHostsPerRemoteDc = 0. " + + "This setting will be ignored"); + } + return new RackAwareRoundRobinPolicy( + localDc, + localRack, + usedHostsPerRemoteDc, + allowRemoteDCsForLocalConsistencyLevel, + true, + true); + } + } +} diff --git a/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java b/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java index 62bd2ece358..ce4ba050407 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java +++ b/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java @@ -246,6 +246,10 @@ public static String datacenter(int dc) { return "DC" + dc; } + public static String rack(int rack) { + return "RACK" + rack; + } + public void init() { for (Map.Entry> dc : dcNodeMap.entrySet()) { for (Scassandra node : dc.getValue()) { @@ -835,6 +839,10 @@ public ScassandraClusterBuilder withNetworkTopologyKeyspace( return this; } + public ScassandraClusterBuilder withRack(int dc, int node, String rack) { + return forcePeerInfo(dc, node, "rack", rack); + } + public ScassandraClusterBuilder forcePeerInfo(int dc, int node, String name, Object value) { Map> forDc = forcedPeerInfos.get(dc); if (forDc == null) { diff --git a/driver-core/src/test/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicyTest.java b/driver-core/src/test/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicyTest.java new file mode 100644 index 00000000000..404877d499d --- /dev/null +++ b/driver-core/src/test/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicyTest.java @@ -0,0 +1,974 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright (C) 2021 ScyllaDB + * + * Modified by ScyllaDB + */ +package com.datastax.driver.core.policies; + +import static com.datastax.driver.core.Assertions.assertThat; +import static com.datastax.driver.core.ScassandraCluster.datacenter; +import static com.datastax.driver.core.ScassandraCluster.rack; +import static com.datastax.driver.core.TestUtils.findHost; +import static com.datastax.driver.core.TestUtils.nonQuietClusterCloseOptions; +import static com.google.common.collect.Lists.newArrayList; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.spy; +import static org.mockito.MockitoAnnotations.initMocks; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.DataProviders; +import com.datastax.driver.core.Host; +import com.datastax.driver.core.MemoryAppender; +import com.datastax.driver.core.QueryTracker; +import com.datastax.driver.core.ScassandraCluster; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.google.common.collect.Lists; +import java.util.Collection; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mockito; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class RackAwareRoundRobinPolicyTest { + + private final Logger policyLogger = Logger.getLogger(RackAwareRoundRobinPolicy.class); + private Level originalLevel; + private MemoryAppender logs; + private QueryTracker queryTracker; + + @Captor private ArgumentCaptor> initHostsCaptor; + + @BeforeMethod(groups = "short") + public void setUp() { + initMocks(this); + originalLevel = policyLogger.getLevel(); + policyLogger.setLevel(Level.WARN); + logs = new MemoryAppender(); + policyLogger.addAppender(logs); + queryTracker = new QueryTracker(); + } + + @AfterMethod(groups = "short", alwaysRun = true) + public void tearDown() { + policyLogger.setLevel(originalLevel); + policyLogger.removeAppender(logs); + } + + private Cluster.Builder builder() { + return Cluster.builder() + // Close cluster immediately to speed up tests. + .withNettyOptions(nonQuietClusterCloseOptions); + } + + /** + * Ensures that {@link RackAwareRoundRobinPolicy} will round robin within hosts in the explicitly + * specified local DC via {@link RackAwareRoundRobinPolicy.Builder#withLocalDc(String)} and local + * rack via {@link RackAwareRoundRobinPolicy.Builder#withLocalRack(String)} + * + * @test_category load_balancing:rack_aware + */ + @Test(groups = "short") + public void should_round_robin_within_local_rack() { + // given: a 10 node 2 DC cluster (3 nodes in RACK1, 2 nodes in RACK2). + ScassandraCluster sCluster = + ScassandraCluster.builder() + .withNodes(5, 5) + .withRack(1, 1, rack(1)) + .withRack(1, 2, rack(2)) + .withRack(1, 3, rack(1)) + .withRack(1, 4, rack(2)) + .withRack(1, 5, rack(1)) + .withRack(2, 1, rack(2)) + .withRack(2, 2, rack(1)) + .withRack(2, 3, rack(2)) + .withRack(2, 4, rack(1)) + .withRack(2, 5, rack(2)) + .build(); + + Cluster cluster = + builder() + .addContactPoints(sCluster.address(1, 1).getAddress()) + .withPort(sCluster.getBinaryPort()) + .withLoadBalancingPolicy( + RackAwareRoundRobinPolicy.builder() + .withLocalDc(datacenter(1)) + .withLocalRack(rack(1)) + .build()) + .build(); + try { + sCluster.init(); + + Session session = cluster.connect(); + // when: a query is executed 30 times. + queryTracker.query(session, 30); + + // then: each node in local DC and local rack should get an equal (10) number of requests. + queryTracker.assertQueried(sCluster, 1, 1, 10); + queryTracker.assertQueried(sCluster, 1, 3, 10); + queryTracker.assertQueried(sCluster, 1, 5, 10); + + // then: no node in the remote DC or remote rack in local DC should get a request. + for (int dc = 1; dc <= 2; dc++) { + for (int node = 1; node <= 5; node++) { + if (dc == 1 && (node == 1 || node == 3 || node == 5)) continue; + queryTracker.assertQueried(sCluster, dc, node, 0); + } + } + } finally { + cluster.close(); + sCluster.stop(); + } + } + + /** + * Ensures that {@link RackAwareRoundRobinPolicy} does not use remote hosts (remote rack or remote + * DC) if replicas in local rack are UP. + * + * @test_category load_balancing:rack_aware + */ + @Test(groups = "short") + public void should_not_use_remote_hosts_if_some_nodes_are_up_in_local_rack() { + // given: a 10 node 2 DC cluster (3 nodes in RACK1, 2 nodes in RACK2). + ScassandraCluster sCluster = + ScassandraCluster.builder() + .withNodes(5, 5) + .withRack(1, 1, rack(1)) + .withRack(1, 2, rack(2)) + .withRack(1, 3, rack(1)) + .withRack(1, 4, rack(2)) + .withRack(1, 5, rack(1)) + .withRack(2, 1, rack(2)) + .withRack(2, 2, rack(1)) + .withRack(2, 3, rack(2)) + .withRack(2, 4, rack(1)) + .withRack(2, 5, rack(2)) + .build(); + @SuppressWarnings("deprecation") + Cluster cluster = + builder() + .addContactPoints(sCluster.address(1, 1).getAddress()) + .withPort(sCluster.getBinaryPort()) + .withLoadBalancingPolicy( + RackAwareRoundRobinPolicy.builder() + .withLocalDc(datacenter(1)) + .withLocalRack(rack(1)) + .withUsedHostsPerRemoteDc(2) + .build()) + .build(); + + try { + sCluster.init(); + + Session session = cluster.connect(); + + // when: a query is executed 50 times and some hosts are down in the local rack. + sCluster.stop(cluster, 1, 5); + assertThat(cluster).controlHost().isNotNull(); + queryTracker.query(session, 50); + + // then: all requests should be distributed to the remaining up nodes in local DC. + queryTracker.assertQueried(sCluster, 1, 1, 25); + queryTracker.assertQueried(sCluster, 1, 3, 25); + + // then: no nodes in the remote DC should have been queried. + for (int i = 1; i <= 5; i++) { + queryTracker.assertQueried(sCluster, 2, i, 0); + } + + // then: no nodes in the remote rack should have been queried. + queryTracker.assertQueried(sCluster, 1, 2, 0); + queryTracker.assertQueried(sCluster, 1, 4, 0); + } finally { + cluster.close(); + sCluster.stop(); + } + } + + /** + * Ensures that {@link RackAwareRoundRobinPolicy} does not use remote DC if some replicas in local + * DC and remote rack are UP. + * + * @test_category load_balancing:rack_aware + */ + @Test(groups = "short") + public void should_not_use_remote_hosts_if_some_nodes_are_up_in_remote_dc() { + // given: a 10 node 2 DC cluster (3 nodes in RACK1, 2 nodes in RACK2). + ScassandraCluster sCluster = + ScassandraCluster.builder() + .withNodes(5, 5) + .withRack(1, 1, rack(1)) + .withRack(1, 2, rack(2)) + .withRack(1, 3, rack(1)) + .withRack(1, 4, rack(2)) + .withRack(1, 5, rack(1)) + .withRack(2, 1, rack(2)) + .withRack(2, 2, rack(1)) + .withRack(2, 3, rack(2)) + .withRack(2, 4, rack(1)) + .withRack(2, 5, rack(2)) + .build(); + @SuppressWarnings("deprecation") + Cluster cluster = + builder() + .addContactPoints(sCluster.address(1, 1).getAddress()) + .withPort(sCluster.getBinaryPort()) + .withLoadBalancingPolicy( + RackAwareRoundRobinPolicy.builder() + .withLocalDc(datacenter(1)) + .withLocalRack(rack(1)) + .withUsedHostsPerRemoteDc(2) + .build()) + .build(); + + try { + sCluster.init(); + + Session session = cluster.connect(); + + // when: a query is executed 50 times and all hosts are down in the local rack. + sCluster.stop(cluster, 1, 1); + sCluster.stop(cluster, 1, 3); + sCluster.stop(cluster, 1, 5); + assertThat(cluster).controlHost().isNotNull(); + queryTracker.query(session, 50); + + // then: all requests should be distributed to the remaining up nodes in local DC, remote + // rack. + queryTracker.assertQueried(sCluster, 1, 2, 25); + queryTracker.assertQueried(sCluster, 1, 4, 25); + + // then: no nodes in the remote DC should have been queried. + for (int i = 1; i <= 5; i++) { + queryTracker.assertQueried(sCluster, 2, i, 0); + } + } finally { + cluster.close(); + sCluster.stop(); + } + } + + /** + * Ensures that {@link RackAwareRoundRobinPolicy} will determine it's local DC and local rack + * based on the data center of the contact point(s). + * + * @test_category load_balancing:rack_aware + */ + @Test(groups = "short") + public void should_use_local_dc_and_rack_from_contact_points_when_not_explicitly_specified() { + // given: a 10 node 2 DC cluster (3 nodes in RACK1, 2 nodes in RACK2). + RackAwareRoundRobinPolicy policy = spy(RackAwareRoundRobinPolicy.builder().build()); + ScassandraCluster sCluster = + ScassandraCluster.builder() + .withNodes(5, 5) + .withRack(1, 1, rack(1)) + .withRack(1, 2, rack(2)) + .withRack(1, 3, rack(1)) + .withRack(1, 4, rack(2)) + .withRack(1, 5, rack(1)) + .withRack(2, 1, rack(2)) + .withRack(2, 2, rack(1)) + .withRack(2, 3, rack(2)) + .withRack(2, 4, rack(1)) + .withRack(2, 5, rack(2)) + .build(); + Cluster cluster = + builder() + .addContactPoints(sCluster.address(1, 1).getAddress()) + .withPort(sCluster.getBinaryPort()) + .withLoadBalancingPolicy(policy) + .build(); + + try { + sCluster.init(); + + Host host1 = findHost(cluster, 1); + + // when: the cluster is initialized. + cluster.init(); + + // then: should have been initialized with only the host given as the contact point. + Mockito.verify(policy).init(any(Cluster.class), initHostsCaptor.capture()); + assertThat(initHostsCaptor.getValue()).containsExactly(host1); + // then: the local dc should match the contact points' datacenter. + assertThat(policy.localDc).isEqualTo(host1.getDatacenter()); + // then: the local rack should match the contact points' rack. + assertThat(policy.localRack).isEqualTo(host1.getRack()); + // then: should not indicate that contact points don't match the local datacenter. + assertThat(logs.get()).doesNotContain("Some contact points don't match local datacenter"); + // then: should not indicate that contact points don't match the local rack. + assertThat(logs.get()).doesNotContain("Some contact points don't match local rack"); + } finally { + cluster.close(); + sCluster.stop(); + } + } + + /** + * Ensures that {@link RackAwareRoundRobinPolicy} will determine it's local rack based on the + * contact point(s) and if contact points in different racks are detected that a log message is + * generated indicating some contact points don't match the local rack. + * + * @test_category load_balancing:rack_aware + */ + @Test(groups = "short") + public void should_warn_if_contact_points_have_different_racks_when_not_explicitly_specified() { + // given: a a 10 node 2 DC cluster (3 nodes in RACK1, 2 nodes in RACK2) with a Cluster instance + // with contact points in different racks. + RackAwareRoundRobinPolicy policy = spy(RackAwareRoundRobinPolicy.builder().build()); + ScassandraCluster sCluster = + ScassandraCluster.builder() + .withNodes(5, 5) + .withRack(1, 1, rack(1)) + .withRack(1, 2, rack(2)) + .withRack(1, 3, rack(1)) + .withRack(1, 4, rack(2)) + .withRack(1, 5, rack(1)) + .withRack(2, 1, rack(2)) + .withRack(2, 2, rack(1)) + .withRack(2, 3, rack(2)) + .withRack(2, 4, rack(1)) + .withRack(2, 5, rack(2)) + .build(); + Cluster cluster = + builder() + .addContactPoints( + sCluster.address(1, 1).getAddress(), + sCluster.address(1, 3).getAddress(), + sCluster.address(1, 2).getAddress()) + .withPort(sCluster.getBinaryPort()) + .withLoadBalancingPolicy(policy) + .build(); + + try { + sCluster.init(); + + Host host1 = findHost(cluster, 1); + Host host2 = findHost(cluster, 2); + Host host3 = findHost(cluster, 3); + + // when: the cluster is initialized. + cluster.init(); + + // then: should have been initialized with only two hosts given as the contact point. + Mockito.verify(policy).init(any(Cluster.class), initHostsCaptor.capture()); + assertThat(initHostsCaptor.getValue()).containsOnly(host1, host2, host3); + // then: should indicate that some contact points don't match the local datacenter. + assertThat(logs.get()).contains("Some contact points don't match local rack"); + } finally { + cluster.close(); + sCluster.stop(); + } + } + + /** + * Ensures that {@link RackAwareRoundRobinPolicy} will not log a warning if all contact points + * match the data center and rack provided in {@link + * RackAwareRoundRobinPolicy.Builder#withLocalDc(String)}, {@link + * RackAwareRoundRobinPolicy.Builder#withLocalRack(String)} and that the explicitly provided local + * data center and local rack is used. + * + * @test_category load_balancing:rack_aware + */ + @Test(groups = "short") + public void should_use_provided_local_dc_rack_and_not_warn_if_contact_points_match() { + // given: a a 10 node 2 DC cluster (3 nodes in RACK1, 2 nodes in RACK2) with a Cluster instance + // with contact points in the same DCs and rack. + ScassandraCluster sCluster = + ScassandraCluster.builder() + .withNodes(5, 5) + .withRack(1, 1, rack(1)) + .withRack(1, 2, rack(2)) + .withRack(1, 3, rack(1)) + .withRack(1, 4, rack(2)) + .withRack(1, 5, rack(1)) + .withRack(2, 1, rack(2)) + .withRack(2, 2, rack(1)) + .withRack(2, 3, rack(2)) + .withRack(2, 4, rack(1)) + .withRack(2, 5, rack(2)) + .build(); + RackAwareRoundRobinPolicy policy = + spy( + RackAwareRoundRobinPolicy.builder() + .withLocalDc(datacenter(1)) + .withLocalRack(rack(2)) + .build()); + Cluster cluster = + builder() + .addContactPoints( + sCluster.address(1, 2).getAddress(), sCluster.address(1, 4).getAddress()) + .withPort(sCluster.getBinaryPort()) + .withLoadBalancingPolicy(policy) + .build(); + + try { + sCluster.init(); + + Host host2 = findHost(cluster, 2); + Host host4 = findHost(cluster, 4); + + // when: the cluster is initialized. + cluster.init(); + + // then: should have been initialized with only two hosts given as the contact point. + Mockito.verify(policy).init(any(Cluster.class), initHostsCaptor.capture()); + assertThat(initHostsCaptor.getValue()).containsOnly(host2, host4); + // then: the data center should appropriately be set to the one specified. + assertThat(policy.localDc).isEqualTo(host2.getDatacenter()); + // then: the rack should appropriately be set to the one specified. + assertThat(policy.localRack).isEqualTo(host2.getRack()); + // then: should not indicate that contact points don't match the local datacenter. + assertThat(logs.get()).doesNotContain("Some contact points don't match local data center"); + // then: should not indicate that contact points don't match the local datacenter. + assertThat(logs.get()).doesNotContain("Some contact points don't match local data rack"); + } finally { + cluster.close(); + sCluster.stop(); + } + } + + // The following tests are copied from {@link DCAwareRoundRobinPolicy}. If all + // DCs are a single-rack DC, then the behavior of {@link RackAwareRoundRobinPolicy} + // should be exactly the same. + + /** + * Ensures that {@link RackAwareRoundRobinPolicy} will round robin within hosts in the explicitly + * specific local DC via {@link RackAwareRoundRobinPolicy.Builder#withLocalDc(String)} + * + * @test_category load_balancing:rack_aware + */ + @Test(groups = "short") + public void should_round_robin_within_local_dc() { + // given: a 10 node 2 DC cluster. + ScassandraCluster sCluster = ScassandraCluster.builder().withNodes(5, 5).build(); + Cluster cluster = + builder() + .addContactPoints(sCluster.address(1, 1).getAddress()) + .withPort(sCluster.getBinaryPort()) + .withLoadBalancingPolicy( + RackAwareRoundRobinPolicy.builder().withLocalDc(datacenter(1)).build()) + .build(); + try { + sCluster.init(); + + Session session = cluster.connect(); + // when: a query is executed 50 times. + queryTracker.query(session, 50); + + // then: each node in local DC should get an equal (10) number of requests. + // then: no node in the remote DC should get a request. + for (int i = 1; i <= 5; i++) { + queryTracker.assertQueried(sCluster, 1, i, 10); + queryTracker.assertQueried(sCluster, 2, i, 0); + } + } finally { + cluster.close(); + sCluster.stop(); + } + } + + /** + * Ensures that {@link RackAwareRoundRobinPolicy} does not use remote hosts if replicas in the + * local DC are UP. + * + * @test_category load_balancing:rack_aware + */ + @Test(groups = "short") + public void should_not_use_remote_hosts_if_some_nodes_are_up_in_local_dc() { + // given: a 10 node 2 DC cluster with DC policy with 2 remote hosts. + ScassandraCluster sCluster = ScassandraCluster.builder().withNodes(5, 5).build(); + @SuppressWarnings("deprecation") + Cluster cluster = + builder() + .addContactPoints(sCluster.address(1, 1).getAddress()) + .withPort(sCluster.getBinaryPort()) + .withLoadBalancingPolicy( + RackAwareRoundRobinPolicy.builder() + .withLocalDc(datacenter(1)) + .withUsedHostsPerRemoteDc(2) + .build()) + .build(); + + try { + sCluster.init(); + + Session session = cluster.connect(); + + // when: a query is executed 50 times and some hosts are down in the local DC. + sCluster.stop(cluster, 1, 5); + sCluster.stop(cluster, 1, 3); + sCluster.stop(cluster, 1, 1); + assertThat(cluster).controlHost().isNotNull(); + queryTracker.query(session, 50); + + // then: all requests should be distributed to the remaining up nodes in local DC. + queryTracker.assertQueried(sCluster, 1, 2, 25); + queryTracker.assertQueried(sCluster, 1, 4, 25); + + // then: no nodes in the remote DC should have been queried. + for (int i = 1; i <= 5; i++) { + queryTracker.assertQueried(sCluster, 2, i, 0); + } + } finally { + cluster.close(); + sCluster.stop(); + } + } + + /** + * Ensures that {@link RackAwareRoundRobinPolicy} will round robin on remote hosts but only if no + * local replicas are available and only within the number of hosts configured by {@link + * RackAwareRoundRobinPolicy.Builder#withUsedHostsPerRemoteDc(int)} + * + * @test_category load_balancing:rack_aware + */ + @Test(groups = "short") + public void should_round_robin_on_remote_hosts_when_no_up_nodes_in_local_dc() { + // given: a 10 node 2 DC cluster with DC policy with 2 remote hosts. + ScassandraCluster sCluster = ScassandraCluster.builder().withNodes(5, 5).build(); + @SuppressWarnings("deprecation") + Cluster cluster = + builder() + .addContactPoints(sCluster.address(1, 1).getAddress()) + .withPort(sCluster.getBinaryPort()) + .withLoadBalancingPolicy( + RackAwareRoundRobinPolicy.builder().withUsedHostsPerRemoteDc(2).build()) + .build(); + + try { + sCluster.init(); + + Session session = cluster.connect(); + + sCluster.stopDC(cluster, 1); + + // Wait for control connection to be re-established, needed as + // control connection attempts increment LBP counter. + assertThat(cluster).controlHost().isNotNull(); + + // when: a query is executed 50 times and all hosts are down in local DC. + queryTracker.query(session, 50); + + // then: only usedHostsPerRemoteDc nodes in the remote DC should get requests. + Collection queryCounts = newArrayList(); + for (int i = 1; i <= 5; i++) { + queryCounts.add(queryTracker.queryCount(sCluster, 2, i)); + } + assertThat(queryCounts).containsOnly(0, 0, 0, 25, 25); + } finally { + cluster.close(); + sCluster.stop(); + } + } + + /** + * Ensures that {@link RackAwareRoundRobinPolicy} will by default only use remote hosts for non DC + * local Consistency Levels. In the case that a DC local Consistency Level is provided a {@link + * NoHostAvailableException} is raised. + * + * @test_category load_balancing:rack_aware + */ + @Test( + groups = "short", + dataProvider = "consistencyLevels", + dataProviderClass = DataProviders.class) + public void should_only_use_remote_hosts_when_using_non_dc_local_cl(ConsistencyLevel cl) { + // given: a 4 node 2 DC Cluster with a LB policy that specifies to not allow remote dcs for + // a local consistency level. + ScassandraCluster sCluster = ScassandraCluster.builder().withNodes(2, 2).build(); + @SuppressWarnings("deprecation") + Cluster cluster = + builder() + .addContactPoints(sCluster.address(1, 1).getAddress()) + .withPort(sCluster.getBinaryPort()) + .withLoadBalancingPolicy( + RackAwareRoundRobinPolicy.builder().withUsedHostsPerRemoteDc(2).build()) + .build(); + + try { + sCluster.init(); + + Session session = cluster.connect(); + + sCluster.stopDC(cluster, 1); + + // Wait for control connection to be re-established, needed as + // control connection attempts increment LBP counter. + assertThat(cluster).controlHost().isNotNull(); + + // when: a query is executed 50 times and all hosts are down in local DC. + // then: expect a NHAE for a local CL since no local replicas available. + Class expectedException = + cl.isDCLocal() ? NoHostAvailableException.class : null; + queryTracker.query(session, 50, cl, expectedException); + + int expectedQueryCount = cl.isDCLocal() ? 0 : 25; + for (int i = 1; i <= 2; i++) { + queryTracker.assertQueried(sCluster, 1, i, 0); + // then: Remote hosts should only be queried for non local CLs. + queryTracker.assertQueried(sCluster, 2, i, expectedQueryCount); + } + } finally { + cluster.close(); + sCluster.stop(); + } + } + + /** + * Ensures that {@link RackAwareRoundRobinPolicy} will use remote hosts for non DC local + * Consistency Levels if {@code + * RackAwareRoundRobinPolicy.Builder#allowRemoteDCsForLocalConsistencyLevel} is used. In the case + * that a DC local Consistency Level is provided a {@link NoHostAvailableException} is raised. + * + * @test_category load_balancing:rack_aware + */ + @Test( + groups = "short", + dataProvider = "consistencyLevels", + dataProviderClass = DataProviders.class) + public void should_use_remote_hosts_for_local_cl_when_allowed(ConsistencyLevel cl) { + // given: a 4 node 2 DC Cluster with a LB policy that specifies to allow remote dcs for + // a local consistency level. + ScassandraCluster sCluster = ScassandraCluster.builder().withNodes(2, 2).build(); + @SuppressWarnings("deprecation") + Cluster cluster = + builder() + .addContactPoints(sCluster.address(1, 1).getAddress()) + .withPort(sCluster.getBinaryPort()) + .withLoadBalancingPolicy( + RackAwareRoundRobinPolicy.builder() + .allowRemoteDCsForLocalConsistencyLevel() + .withUsedHostsPerRemoteDc(2) + .build()) + .build(); + + try { + sCluster.init(); + + Session session = cluster.connect(); + + sCluster.stopDC(cluster, 1); + + // Wait for control connection to be re-established, needed as + // control connection attempts increment LBP counter. + assertThat(cluster).controlHost().isNotNull(); + + // when: a query is executed 50 times and all hosts are down in local DC. + queryTracker.query(session, 50, cl, null); + + for (int i = 1; i <= 2; i++) { + queryTracker.assertQueried(sCluster, 1, i, 0); + // then: Remote hosts should be queried. + queryTracker.assertQueried(sCluster, 2, i, 25); + } + } finally { + cluster.close(); + sCluster.stop(); + } + } + + /** + * Ensures that when {@link RackAwareRoundRobinPolicy} is wrapped with a {@link HostFilterPolicy} + * that blacklists a data center that nodes in that datacenter are never queried. + * + * @test_category load_balancing:rack_aware + */ + @Test(groups = "short") + public void should_not_send_requests_to_blacklisted_dc_using_host_filter_policy() { + // given: a 6 node 3 DC cluster with a RackAwareRoundRobinPolicy that is filtering hosts in DC2. + ScassandraCluster sCluster = ScassandraCluster.builder().withNodes(2, 2, 2).build(); + @SuppressWarnings("deprecation") + LoadBalancingPolicy loadBalancingPolicy = + HostFilterPolicy.fromDCBlackList( + RackAwareRoundRobinPolicy.builder().withUsedHostsPerRemoteDc(2).build(), + Lists.newArrayList(datacenter(2))); + Cluster cluster = + builder() + .addContactPoints(sCluster.address(1, 1).getAddress()) + .withPort(sCluster.getBinaryPort()) + .withLoadBalancingPolicy(loadBalancingPolicy) + .build(); + + try { + sCluster.init(); + + Session session = cluster.connect(); + + // when: A query is made and nodes for the local dc are available. + queryTracker.query(session, 50); + + // then: only nodes in the local DC should have been queried. + queryTracker.assertQueried(sCluster, 1, 1, 25); + queryTracker.assertQueried(sCluster, 1, 2, 25); + queryTracker.assertQueried(sCluster, 2, 1, 0); + queryTracker.assertQueried(sCluster, 2, 2, 0); + queryTracker.assertQueried(sCluster, 3, 1, 0); + queryTracker.assertQueried(sCluster, 3, 1, 0); + + // when: A query is made and all nodes in the local dc are down. + sCluster.stopDC(cluster, 1); + assertThat(cluster).controlHost().isNotNull(); + queryTracker.reset(); + queryTracker.query(session, 50); + + // then: Only nodes in DC3 should have been queried, since DC2 is blacklisted and DC1 is down. + queryTracker.assertQueried(sCluster, 1, 1, 0); + queryTracker.assertQueried(sCluster, 1, 2, 0); + queryTracker.assertQueried(sCluster, 2, 1, 0); + queryTracker.assertQueried(sCluster, 2, 2, 0); + queryTracker.assertQueried(sCluster, 3, 1, 25); + queryTracker.assertQueried(sCluster, 3, 2, 25); + } finally { + cluster.close(); + sCluster.stop(); + } + } + + /** + * Ensures that when {@link RackAwareRoundRobinPolicy} is wrapped with a {@link HostFilterPolicy} + * that white lists data centers that only nodes in those data centers are queried. + * + * @test_category load_balancing:rack_aware + */ + @Test(groups = "short") + public void should_send_requests_to_whitelisted_dcs_using_host_filter_policy() { + // given: a 6 node 3 DC cluster with a RackAwareRoundRobinPolicy that is whitelisting hosts in + // DC1 + // and DC2. + ScassandraCluster sCluster = ScassandraCluster.builder().withNodes(2, 2, 2).build(); + @SuppressWarnings("deprecation") + LoadBalancingPolicy loadBalancingPolicy = + HostFilterPolicy.fromDCWhiteList( + RackAwareRoundRobinPolicy.builder().withUsedHostsPerRemoteDc(2).build(), + Lists.newArrayList(datacenter(1), datacenter(2))); + Cluster cluster = + builder() + .addContactPoints(sCluster.address(1, 1).getAddress()) + .withPort(sCluster.getBinaryPort()) + .withLoadBalancingPolicy(loadBalancingPolicy) + .build(); + + try { + sCluster.init(); + + Session session = cluster.connect(); + + // when: A query is made and nodes for the local dc are available. + queryTracker.query(session, 50); + + // then: only nodes in the local DC should have been queried. + queryTracker.assertQueried(sCluster, 1, 1, 25); + queryTracker.assertQueried(sCluster, 1, 2, 25); + queryTracker.assertQueried(sCluster, 2, 1, 0); + queryTracker.assertQueried(sCluster, 2, 2, 0); + queryTracker.assertQueried(sCluster, 3, 1, 0); + queryTracker.assertQueried(sCluster, 3, 1, 0); + + // when: A query is made and all nodes in the local dc are down. + sCluster.stopDC(cluster, 1); + assertThat(cluster).controlHost().isNotNull(); + queryTracker.reset(); + queryTracker.query(session, 50); + + // then: Only nodes in DC2 should have been queried, since DC3 is not in the whitelist and DC1 + // is down. + queryTracker.assertQueried(sCluster, 1, 1, 0); + queryTracker.assertQueried(sCluster, 1, 2, 0); + queryTracker.assertQueried(sCluster, 2, 1, 25); + queryTracker.assertQueried(sCluster, 2, 2, 25); + queryTracker.assertQueried(sCluster, 3, 1, 0); + queryTracker.assertQueried(sCluster, 3, 1, 0); + } finally { + cluster.close(); + sCluster.stop(); + } + } + + /** + * Ensures that {@link RackAwareRoundRobinPolicy} will determine it's local DC based on the data + * center of the contact point(s). + * + * @test_category load_balancing:rack_aware + */ + @Test(groups = "short") + public void should_use_local_dc_from_contact_points_when_not_explicitly_specified() { + // given: a 4 node 2 DC cluster without a local DC specified. + RackAwareRoundRobinPolicy policy = spy(RackAwareRoundRobinPolicy.builder().build()); + ScassandraCluster sCluster = ScassandraCluster.builder().withNodes(2, 2).build(); + Cluster cluster = + builder() + .addContactPoints(sCluster.address(1, 1).getAddress()) + .withPort(sCluster.getBinaryPort()) + .withLoadBalancingPolicy(policy) + .build(); + + try { + sCluster.init(); + + Host host1 = findHost(cluster, 1); + + // when: the cluster is initialized. + cluster.init(); + + // then: should have been initialized with only the host given as the contact point. + Mockito.verify(policy).init(any(Cluster.class), initHostsCaptor.capture()); + assertThat(initHostsCaptor.getValue()).containsExactly(host1); + // then: the local dc should match the contact points' datacenter. + assertThat(policy.localDc).isEqualTo(host1.getDatacenter()); + // then: should not indicate that contact points don't match the local datacenter. + assertThat(logs.get()).doesNotContain("Some contact points don't match local datacenter"); + } finally { + cluster.close(); + sCluster.stop(); + } + } + + /** + * Ensures that {@link RackAwareRoundRobinPolicy} will determine it's local DC based on the data + * center of the contact point(s) and if contact points in different DCs are detected that a log + * message is generated indicating some contact points don't match the local data center. + * + * @test_category load_balancing:rack_aware + */ + @Test(groups = "short") + public void should_warn_if_contact_points_have_different_dcs_when_not_explicitly_specified() { + // given: a 4 node 2 DC cluster with a Cluster instance with contact points in different DCs + // and no contact point specified. + RackAwareRoundRobinPolicy policy = spy(RackAwareRoundRobinPolicy.builder().build()); + ScassandraCluster sCluster = ScassandraCluster.builder().withNodes(2, 2).build(); + Cluster cluster = + builder() + .addContactPoints( + sCluster.address(1, 1).getAddress(), sCluster.address(2, 1).getAddress()) + .withPort(sCluster.getBinaryPort()) + .withLoadBalancingPolicy(policy) + .build(); + + try { + sCluster.init(); + + Host host1 = findHost(cluster, 1); + Host host3 = findHost(cluster, 3); + + // when: the cluster is initialized. + cluster.init(); + + // then: should have been initialized with only two hosts given as the contact point. + Mockito.verify(policy).init(any(Cluster.class), initHostsCaptor.capture()); + assertThat(initHostsCaptor.getValue()).containsOnly(host1, host3); + // then: should indicate that some contact points don't match the local datacenter. + assertThat(logs.get()).contains("Some contact points don't match local data center"); + } finally { + cluster.close(); + sCluster.stop(); + } + } + + /** + * Ensures that {@link RackAwareRoundRobinPolicy} will not log a warning if all contact points + * match the data center provided in {@link RackAwareRoundRobinPolicy.Builder#withLocalDc(String)} + * and that the explicitly provided local data center is used. + * + * @test_category load_balancing:rack_aware + */ + @Test(groups = "short") + public void should_use_provided_local_dc_and_not_warn_if_contact_points_match() { + // given: a 4 node 2 DC cluster with a Cluster instance with contact points in different DCs + // and a local DC that doesn't match any contact points. + ScassandraCluster sCluster = ScassandraCluster.builder().withNodes(2, 2).build(); + RackAwareRoundRobinPolicy policy = + spy(RackAwareRoundRobinPolicy.builder().withLocalDc(datacenter(1)).build()); + Cluster cluster = + builder() + .addContactPoints(sCluster.address(1, 1).getAddress()) + .withPort(sCluster.getBinaryPort()) + .withLoadBalancingPolicy(policy) + .build(); + + try { + sCluster.init(); + + Host host1 = findHost(cluster, 1); + + // when: the cluster is initialized. + cluster.init(); + + // then: should have been initialized with only two hosts given as the contact point. + Mockito.verify(policy).init(any(Cluster.class), initHostsCaptor.capture()); + assertThat(initHostsCaptor.getValue()).containsOnly(host1); + // then: the data center should appropriately be set to the one specified. + assertThat(policy.localDc).isEqualTo(host1.getDatacenter()); + // then: should not indicate that contact points don't match the local datacenter. + assertThat(logs.get()).doesNotContain("Some contact points don't match local data center"); + } finally { + cluster.close(); + sCluster.stop(); + } + } + + /** + * Ensures that {@link RackAwareRoundRobinPolicy} will log a warning if some contact points don't + * match the data center provided in {@link RackAwareRoundRobinPolicy.Builder#withLocalDc(String)} + * and that the explicitly provided local data center is used. + * + * @test_category load_balancing:rack_aware + */ + @Test(groups = "short") + public void should_use_provided_local_dc_and_warn_if_contact_points_dont_match() { + // given: a 4 node 2 DC cluster with a Cluster instance with contact points in different DCs + // and a local DC that doesn't match any contact points. + ScassandraCluster sCluster = ScassandraCluster.builder().withNodes(2, 2).build(); + RackAwareRoundRobinPolicy policy = + spy(RackAwareRoundRobinPolicy.builder().withLocalDc(datacenter(3)).build()); + Cluster cluster = + builder() + .addContactPoints( + sCluster.address(1, 1).getAddress(), sCluster.address(2, 1).getAddress()) + .withPort(sCluster.getBinaryPort()) + .withLoadBalancingPolicy(policy) + .build(); + + try { + sCluster.init(); + + Host host1 = findHost(cluster, 1); + Host host3 = findHost(cluster, 3); + + // when: the cluster is initialized. + cluster.init(); + + // then: should have been initialized with only two hosts given as the contact point. + Mockito.verify(policy).init(any(Cluster.class), initHostsCaptor.capture()); + assertThat(initHostsCaptor.getValue()).containsOnly(host1, host3); + // then: the data center should appropriately be set to the one specified. + assertThat(policy.localDc).isEqualTo(datacenter(3)); + // then: should indicate that some contact points don't match the local datacenter. + assertThat(logs.get()).contains("Some contact points don't match local data center"); + } finally { + cluster.close(); + sCluster.stop(); + } + } +} diff --git a/manual/load_balancing/README.md b/manual/load_balancing/README.md index 4568c8025b5..07d5b3dcbb2 100644 --- a/manual/load_balancing/README.md +++ b/manual/load_balancing/README.md @@ -115,6 +115,47 @@ local datacenter. In general, providing the datacenter name explicitly is a safe Hosts belonging to the local datacenter are at distance `LOCAL`, and appear first in query plans (in a round-robin fashion). +### [RackAwareRoundRobinPolicy] + +```java +Cluster cluster = Cluster.builder() + .addContactPoint("127.0.0.1") + .withLoadBalancingPolicy( + RackAwareRoundRobinPolicy.builder() + .withLocalDc("myLocalDC") + .withLocalRack("myLocalRack") + .build() + ).build(); +``` + +This policy queries nodes of the local rack in a round-robin fashion. + +Call `withLocalDc` to specify the name of your local datacenter and `withLocalRack` to specify the name of your local rack. +You can also leave it out, and the driver will use the datacenter and rack of the first contact point that was reached [at initialization](../#cluster-initialization). +However, remember that the driver shuffles the initial list of contact points, so this assumes that all contact points are in the +local datacenter and rack. In general, providing the datacenter and rack name explicitly is a safer option. + +Hosts belonging to the local datacenter are at distance `LOCAL`, and appear first in query plans (in a round-robin +fashion) with hosts in the local rack having precedence over nodes in remote racks in the local datacenter. + +For example, if there are any UP hosts in the local rack the policy will query those nodes in round-robin fashion: +* query 1: host1 *(local DC, local rack)*, host2 *(local DC, local rack)*, host3 *(local DC, local rack)* +* query 2: host2 *(local DC, local rack)*, host3 *(local DC, local rack)*, host1 *(local DC, local rack)* +* query 3: host3 *(local DC, local rack)*, host1 *(local DC, local rack)*, host2 *(local DC, local rack)* +* query 4: host1 *(local DC, local rack)*, host2 *(local DC, local rack)*, host3 *(local DC, local rack)* + +If all nodes in the local rack are DOWN, the policy will then move on to remote racks in local DC: +* query 1: host4 *(local DC, remote rack 1)*, host5 *(local DC, remote rack 1)*, host6 *(local DC, remote rack 2)* +* query 2: host5 *(local DC, remote rack 1)*, host6 *(local DC, remote rack 2)*, host4 *(local DC, remote rack 1)* +* query 3: host6 *(local DC, remote rack 2)*, host4 *(local DC, remote rack 1)*, host5 *(local DC, remote rack 1)* +* query 4: host4 *(local DC, remote rack 1)*, host5 *(local DC, remote rack 1)*, host6 *(local DC, remote rack 2)* + +If all nodes in the local datacenter are DOWN, the policy can query remote DCs (configurable by `withUsedHostsPerRemoteDc`): +* query 1: host7 *(remote DC, remote rack 1)*, host8 *(remote DC, remote rack 1)*, host9 *(remote DC, remote rack 2)* +* query 2: host8 *(remote DC, remote rack 1)*, host9 *(remote DC, remote rack 2)*, host7 *(remote DC, remote rack 1)* +* query 3: host9 *(remote DC, remote rack 2)*, host7 *(remote DC, remote rack 1)*, host8 *(remote DC, remote rack 1)* +* query 4: host7 *(remote DC, remote rack 1)*, host8 *(remote DC, remote rack 1)*, host9 *(remote DC, remote rack 2)* + ### [TokenAwarePolicy] ```java @@ -302,6 +343,7 @@ complex ones like `DCAwareRoundRobinPolicy`. [LoadBalancingPolicy]: https://docs.datastax.com/en/drivers/java/3.11/com/datastax/driver/core/policies/LoadBalancingPolicy.html [RoundRobinPolicy]: https://docs.datastax.com/en/drivers/java/3.11/com/datastax/driver/core/policies/RoundRobinPolicy.html [DCAwareRoundRobinPolicy]: https://docs.datastax.com/en/drivers/java/3.11/com/datastax/driver/core/policies/DCAwareRoundRobinPolicy.html +[RackAwareRoundRobinPolicy]: https://docs.datastax.com/en/drivers/java/3.11/com/datastax/driver/core/policies/RackAwareRoundRobinPolicy.html [TokenAwarePolicy]: https://docs.datastax.com/en/drivers/java/3.11/com/datastax/driver/core/policies/TokenAwarePolicy.html [LatencyAwarePolicy]: https://docs.datastax.com/en/drivers/java/3.11/com/datastax/driver/core/policies/LatencyAwarePolicy.html [HostFilterPolicy]: https://docs.datastax.com/en/drivers/java/3.11/com/datastax/driver/core/policies/HostFilterPolicy.html From 92de2974298777d21c679b9f4ba12b99c6ff11ab Mon Sep 17 00:00:00 2001 From: Piotr Grabowski Date: Thu, 6 Jul 2023 17:52:21 +0200 Subject: [PATCH 2/3] Increase timeout in ScassandraCluster On my local system some Scassandra nodes sometimes take a long time to shut down and the previous 10 second value was insufficient. --- .../test/java/com/datastax/driver/core/ScassandraCluster.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java b/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java index ce4ba050407..83078670d7e 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java +++ b/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java @@ -315,7 +315,7 @@ public void stop(Cluster cluster, int node) { } catch (Exception e) { logger.error("Could not stop node " + scassandra, e); } - assertThat(cluster).host(node).goesDownWithin(10, TimeUnit.SECONDS); + assertThat(cluster).host(node).goesDownWithin(60, TimeUnit.SECONDS); } /** From 0b003c3e77c7c6002c17b32b35b4c3c10f8eaf44 Mon Sep 17 00:00:00 2001 From: Piotr Grabowski Date: Fri, 7 Jul 2023 12:47:51 +0200 Subject: [PATCH 3/3] Reduce the number of queries in integration tests Reduce the number of queries that are executed in DCAwareRoundRobinPolicyTest and RackAwareRoundRobinPolicyTest, as we observed that on GitHub Actions the test would be flaky, failing with: Unfortunately this error could not be reproduced locally. --- .../policies/DCAwareRoundRobinPolicyTest.java | 58 ++++++------- .../RackAwareRoundRobinPolicyTest.java | 86 +++++++++---------- 2 files changed, 72 insertions(+), 72 deletions(-) diff --git a/driver-core/src/test/java/com/datastax/driver/core/policies/DCAwareRoundRobinPolicyTest.java b/driver-core/src/test/java/com/datastax/driver/core/policies/DCAwareRoundRobinPolicyTest.java index 38e8cc92eee..e856aea7448 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/policies/DCAwareRoundRobinPolicyTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/policies/DCAwareRoundRobinPolicyTest.java @@ -102,13 +102,13 @@ public void should_round_robin_within_local_dc() { sCluster.init(); Session session = cluster.connect(); - // when: a query is executed 50 times. - queryTracker.query(session, 50); + // when: a query is executed 25 times. + queryTracker.query(session, 25); - // then: each node in local DC should get an equal (10) number of requests. + // then: each node in local DC should get an equal (5) number of requests. // then: no node in the remote DC should get a request. for (int i = 1; i <= 5; i++) { - queryTracker.assertQueried(sCluster, 1, i, 10); + queryTracker.assertQueried(sCluster, 1, i, 5); queryTracker.assertQueried(sCluster, 2, i, 0); } } finally { @@ -144,16 +144,16 @@ public void should_not_use_remote_hosts_if_some_nodes_are_up_in_local_dc() { Session session = cluster.connect(); - // when: a query is executed 50 times and some hosts are down in the local DC. + // when: a query is executed 20 times and some hosts are down in the local DC. sCluster.stop(cluster, 1, 5); sCluster.stop(cluster, 1, 3); sCluster.stop(cluster, 1, 1); assertThat(cluster).controlHost().isNotNull(); - queryTracker.query(session, 50); + queryTracker.query(session, 20); // then: all requests should be distributed to the remaining up nodes in local DC. - queryTracker.assertQueried(sCluster, 1, 2, 25); - queryTracker.assertQueried(sCluster, 1, 4, 25); + queryTracker.assertQueried(sCluster, 1, 2, 10); + queryTracker.assertQueried(sCluster, 1, 4, 10); // then: no nodes in the remote DC should have been queried. for (int i = 1; i <= 5; i++) { @@ -196,15 +196,15 @@ public void should_round_robin_on_remote_hosts_when_no_up_nodes_in_local_dc() { // control connection attempts increment LBP counter. assertThat(cluster).controlHost().isNotNull(); - // when: a query is executed 50 times and all hosts are down in local DC. - queryTracker.query(session, 50); + // when: a query is executed 20 times and all hosts are down in local DC. + queryTracker.query(session, 20); // then: only usedHostsPerRemoteDc nodes in the remote DC should get requests. Collection queryCounts = newArrayList(); for (int i = 1; i <= 5; i++) { queryCounts.add(queryTracker.queryCount(sCluster, 2, i)); } - assertThat(queryCounts).containsOnly(0, 0, 0, 25, 25); + assertThat(queryCounts).containsOnly(0, 0, 0, 10, 10); } finally { cluster.close(); sCluster.stop(); @@ -246,13 +246,13 @@ public void should_only_use_remote_hosts_when_using_non_dc_local_cl(ConsistencyL // control connection attempts increment LBP counter. assertThat(cluster).controlHost().isNotNull(); - // when: a query is executed 50 times and all hosts are down in local DC. + // when: a query is executed 20 times and all hosts are down in local DC. // then: expect a NHAE for a local CL since no local replicas available. Class expectedException = cl.isDCLocal() ? NoHostAvailableException.class : null; - queryTracker.query(session, 50, cl, expectedException); + queryTracker.query(session, 20, cl, expectedException); - int expectedQueryCount = cl.isDCLocal() ? 0 : 25; + int expectedQueryCount = cl.isDCLocal() ? 0 : 10; for (int i = 1; i <= 2; i++) { queryTracker.assertQueried(sCluster, 1, i, 0); // then: Remote hosts should only be queried for non local CLs. @@ -303,13 +303,13 @@ public void should_use_remote_hosts_for_local_cl_when_allowed(ConsistencyLevel c // control connection attempts increment LBP counter. assertThat(cluster).controlHost().isNotNull(); - // when: a query is executed 50 times and all hosts are down in local DC. - queryTracker.query(session, 50, cl, null); + // when: a query is executed 20 times and all hosts are down in local DC. + queryTracker.query(session, 20, cl, null); for (int i = 1; i <= 2; i++) { queryTracker.assertQueried(sCluster, 1, i, 0); // then: Remote hosts should be queried. - queryTracker.assertQueried(sCluster, 2, i, 25); + queryTracker.assertQueried(sCluster, 2, i, 10); } } finally { cluster.close(); @@ -345,11 +345,11 @@ public void should_not_send_requests_to_blacklisted_dc_using_host_filter_policy( Session session = cluster.connect(); // when: A query is made and nodes for the local dc are available. - queryTracker.query(session, 50); + queryTracker.query(session, 20); // then: only nodes in the local DC should have been queried. - queryTracker.assertQueried(sCluster, 1, 1, 25); - queryTracker.assertQueried(sCluster, 1, 2, 25); + queryTracker.assertQueried(sCluster, 1, 1, 10); + queryTracker.assertQueried(sCluster, 1, 2, 10); queryTracker.assertQueried(sCluster, 2, 1, 0); queryTracker.assertQueried(sCluster, 2, 2, 0); queryTracker.assertQueried(sCluster, 3, 1, 0); @@ -359,15 +359,15 @@ public void should_not_send_requests_to_blacklisted_dc_using_host_filter_policy( sCluster.stopDC(cluster, 1); assertThat(cluster).controlHost().isNotNull(); queryTracker.reset(); - queryTracker.query(session, 50); + queryTracker.query(session, 20); // then: Only nodes in DC3 should have been queried, since DC2 is blacklisted and DC1 is down. queryTracker.assertQueried(sCluster, 1, 1, 0); queryTracker.assertQueried(sCluster, 1, 2, 0); queryTracker.assertQueried(sCluster, 2, 1, 0); queryTracker.assertQueried(sCluster, 2, 2, 0); - queryTracker.assertQueried(sCluster, 3, 1, 25); - queryTracker.assertQueried(sCluster, 3, 2, 25); + queryTracker.assertQueried(sCluster, 3, 1, 10); + queryTracker.assertQueried(sCluster, 3, 2, 10); } finally { cluster.close(); sCluster.stop(); @@ -403,11 +403,11 @@ public void should_send_requests_to_whitelisted_dcs_using_host_filter_policy() { Session session = cluster.connect(); // when: A query is made and nodes for the local dc are available. - queryTracker.query(session, 50); + queryTracker.query(session, 20); // then: only nodes in the local DC should have been queried. - queryTracker.assertQueried(sCluster, 1, 1, 25); - queryTracker.assertQueried(sCluster, 1, 2, 25); + queryTracker.assertQueried(sCluster, 1, 1, 10); + queryTracker.assertQueried(sCluster, 1, 2, 10); queryTracker.assertQueried(sCluster, 2, 1, 0); queryTracker.assertQueried(sCluster, 2, 2, 0); queryTracker.assertQueried(sCluster, 3, 1, 0); @@ -417,14 +417,14 @@ public void should_send_requests_to_whitelisted_dcs_using_host_filter_policy() { sCluster.stopDC(cluster, 1); assertThat(cluster).controlHost().isNotNull(); queryTracker.reset(); - queryTracker.query(session, 50); + queryTracker.query(session, 20); // then: Only nodes in DC2 should have been queried, since DC3 is not in the whitelist and DC1 // is down. queryTracker.assertQueried(sCluster, 1, 1, 0); queryTracker.assertQueried(sCluster, 1, 2, 0); - queryTracker.assertQueried(sCluster, 2, 1, 25); - queryTracker.assertQueried(sCluster, 2, 2, 25); + queryTracker.assertQueried(sCluster, 2, 1, 10); + queryTracker.assertQueried(sCluster, 2, 2, 10); queryTracker.assertQueried(sCluster, 3, 1, 0); queryTracker.assertQueried(sCluster, 3, 1, 0); } finally { diff --git a/driver-core/src/test/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicyTest.java b/driver-core/src/test/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicyTest.java index 404877d499d..3d4d42617c0 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicyTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicyTest.java @@ -121,13 +121,13 @@ public void should_round_robin_within_local_rack() { sCluster.init(); Session session = cluster.connect(); - // when: a query is executed 30 times. - queryTracker.query(session, 30); + // when: a query is executed 15 times. + queryTracker.query(session, 15); - // then: each node in local DC and local rack should get an equal (10) number of requests. - queryTracker.assertQueried(sCluster, 1, 1, 10); - queryTracker.assertQueried(sCluster, 1, 3, 10); - queryTracker.assertQueried(sCluster, 1, 5, 10); + // then: each node in local DC and local rack should get an equal (5) number of requests. + queryTracker.assertQueried(sCluster, 1, 1, 5); + queryTracker.assertQueried(sCluster, 1, 3, 5); + queryTracker.assertQueried(sCluster, 1, 5, 5); // then: no node in the remote DC or remote rack in local DC should get a request. for (int dc = 1; dc <= 2; dc++) { @@ -183,14 +183,14 @@ public void should_not_use_remote_hosts_if_some_nodes_are_up_in_local_rack() { Session session = cluster.connect(); - // when: a query is executed 50 times and some hosts are down in the local rack. + // when: a query is executed 20 times and some hosts are down in the local rack. sCluster.stop(cluster, 1, 5); assertThat(cluster).controlHost().isNotNull(); - queryTracker.query(session, 50); + queryTracker.query(session, 20); // then: all requests should be distributed to the remaining up nodes in local DC. - queryTracker.assertQueried(sCluster, 1, 1, 25); - queryTracker.assertQueried(sCluster, 1, 3, 25); + queryTracker.assertQueried(sCluster, 1, 1, 10); + queryTracker.assertQueried(sCluster, 1, 3, 10); // then: no nodes in the remote DC should have been queried. for (int i = 1; i <= 5; i++) { @@ -247,17 +247,17 @@ public void should_not_use_remote_hosts_if_some_nodes_are_up_in_remote_dc() { Session session = cluster.connect(); - // when: a query is executed 50 times and all hosts are down in the local rack. + // when: a query is executed 20 times and all hosts are down in the local rack. sCluster.stop(cluster, 1, 1); sCluster.stop(cluster, 1, 3); sCluster.stop(cluster, 1, 5); assertThat(cluster).controlHost().isNotNull(); - queryTracker.query(session, 50); + queryTracker.query(session, 20); // then: all requests should be distributed to the remaining up nodes in local DC, remote // rack. - queryTracker.assertQueried(sCluster, 1, 2, 25); - queryTracker.assertQueried(sCluster, 1, 4, 25); + queryTracker.assertQueried(sCluster, 1, 2, 10); + queryTracker.assertQueried(sCluster, 1, 4, 10); // then: no nodes in the remote DC should have been queried. for (int i = 1; i <= 5; i++) { @@ -474,13 +474,13 @@ public void should_round_robin_within_local_dc() { sCluster.init(); Session session = cluster.connect(); - // when: a query is executed 50 times. - queryTracker.query(session, 50); + // when: a query is executed 25 times. + queryTracker.query(session, 25); - // then: each node in local DC should get an equal (10) number of requests. + // then: each node in local DC should get an equal (5) number of requests. // then: no node in the remote DC should get a request. for (int i = 1; i <= 5; i++) { - queryTracker.assertQueried(sCluster, 1, i, 10); + queryTracker.assertQueried(sCluster, 1, i, 5); queryTracker.assertQueried(sCluster, 2, i, 0); } } finally { @@ -516,16 +516,16 @@ public void should_not_use_remote_hosts_if_some_nodes_are_up_in_local_dc() { Session session = cluster.connect(); - // when: a query is executed 50 times and some hosts are down in the local DC. + // when: a query is executed 20 times and some hosts are down in the local DC. sCluster.stop(cluster, 1, 5); sCluster.stop(cluster, 1, 3); sCluster.stop(cluster, 1, 1); assertThat(cluster).controlHost().isNotNull(); - queryTracker.query(session, 50); + queryTracker.query(session, 20); // then: all requests should be distributed to the remaining up nodes in local DC. - queryTracker.assertQueried(sCluster, 1, 2, 25); - queryTracker.assertQueried(sCluster, 1, 4, 25); + queryTracker.assertQueried(sCluster, 1, 2, 10); + queryTracker.assertQueried(sCluster, 1, 4, 10); // then: no nodes in the remote DC should have been queried. for (int i = 1; i <= 5; i++) { @@ -568,15 +568,15 @@ public void should_round_robin_on_remote_hosts_when_no_up_nodes_in_local_dc() { // control connection attempts increment LBP counter. assertThat(cluster).controlHost().isNotNull(); - // when: a query is executed 50 times and all hosts are down in local DC. - queryTracker.query(session, 50); + // when: a query is executed 20 times and all hosts are down in local DC. + queryTracker.query(session, 20); // then: only usedHostsPerRemoteDc nodes in the remote DC should get requests. Collection queryCounts = newArrayList(); for (int i = 1; i <= 5; i++) { queryCounts.add(queryTracker.queryCount(sCluster, 2, i)); } - assertThat(queryCounts).containsOnly(0, 0, 0, 25, 25); + assertThat(queryCounts).containsOnly(0, 0, 0, 10, 10); } finally { cluster.close(); sCluster.stop(); @@ -618,13 +618,13 @@ public void should_only_use_remote_hosts_when_using_non_dc_local_cl(ConsistencyL // control connection attempts increment LBP counter. assertThat(cluster).controlHost().isNotNull(); - // when: a query is executed 50 times and all hosts are down in local DC. + // when: a query is executed 20 times and all hosts are down in local DC. // then: expect a NHAE for a local CL since no local replicas available. Class expectedException = cl.isDCLocal() ? NoHostAvailableException.class : null; - queryTracker.query(session, 50, cl, expectedException); + queryTracker.query(session, 20, cl, expectedException); - int expectedQueryCount = cl.isDCLocal() ? 0 : 25; + int expectedQueryCount = cl.isDCLocal() ? 0 : 10; for (int i = 1; i <= 2; i++) { queryTracker.assertQueried(sCluster, 1, i, 0); // then: Remote hosts should only be queried for non local CLs. @@ -675,13 +675,13 @@ public void should_use_remote_hosts_for_local_cl_when_allowed(ConsistencyLevel c // control connection attempts increment LBP counter. assertThat(cluster).controlHost().isNotNull(); - // when: a query is executed 50 times and all hosts are down in local DC. - queryTracker.query(session, 50, cl, null); + // when: a query is executed 20 times and all hosts are down in local DC. + queryTracker.query(session, 20, cl, null); for (int i = 1; i <= 2; i++) { queryTracker.assertQueried(sCluster, 1, i, 0); // then: Remote hosts should be queried. - queryTracker.assertQueried(sCluster, 2, i, 25); + queryTracker.assertQueried(sCluster, 2, i, 10); } } finally { cluster.close(); @@ -717,11 +717,11 @@ public void should_not_send_requests_to_blacklisted_dc_using_host_filter_policy( Session session = cluster.connect(); // when: A query is made and nodes for the local dc are available. - queryTracker.query(session, 50); + queryTracker.query(session, 20); // then: only nodes in the local DC should have been queried. - queryTracker.assertQueried(sCluster, 1, 1, 25); - queryTracker.assertQueried(sCluster, 1, 2, 25); + queryTracker.assertQueried(sCluster, 1, 1, 10); + queryTracker.assertQueried(sCluster, 1, 2, 10); queryTracker.assertQueried(sCluster, 2, 1, 0); queryTracker.assertQueried(sCluster, 2, 2, 0); queryTracker.assertQueried(sCluster, 3, 1, 0); @@ -731,15 +731,15 @@ public void should_not_send_requests_to_blacklisted_dc_using_host_filter_policy( sCluster.stopDC(cluster, 1); assertThat(cluster).controlHost().isNotNull(); queryTracker.reset(); - queryTracker.query(session, 50); + queryTracker.query(session, 20); // then: Only nodes in DC3 should have been queried, since DC2 is blacklisted and DC1 is down. queryTracker.assertQueried(sCluster, 1, 1, 0); queryTracker.assertQueried(sCluster, 1, 2, 0); queryTracker.assertQueried(sCluster, 2, 1, 0); queryTracker.assertQueried(sCluster, 2, 2, 0); - queryTracker.assertQueried(sCluster, 3, 1, 25); - queryTracker.assertQueried(sCluster, 3, 2, 25); + queryTracker.assertQueried(sCluster, 3, 1, 10); + queryTracker.assertQueried(sCluster, 3, 2, 10); } finally { cluster.close(); sCluster.stop(); @@ -776,11 +776,11 @@ public void should_send_requests_to_whitelisted_dcs_using_host_filter_policy() { Session session = cluster.connect(); // when: A query is made and nodes for the local dc are available. - queryTracker.query(session, 50); + queryTracker.query(session, 20); // then: only nodes in the local DC should have been queried. - queryTracker.assertQueried(sCluster, 1, 1, 25); - queryTracker.assertQueried(sCluster, 1, 2, 25); + queryTracker.assertQueried(sCluster, 1, 1, 10); + queryTracker.assertQueried(sCluster, 1, 2, 10); queryTracker.assertQueried(sCluster, 2, 1, 0); queryTracker.assertQueried(sCluster, 2, 2, 0); queryTracker.assertQueried(sCluster, 3, 1, 0); @@ -790,14 +790,14 @@ public void should_send_requests_to_whitelisted_dcs_using_host_filter_policy() { sCluster.stopDC(cluster, 1); assertThat(cluster).controlHost().isNotNull(); queryTracker.reset(); - queryTracker.query(session, 50); + queryTracker.query(session, 20); // then: Only nodes in DC2 should have been queried, since DC3 is not in the whitelist and DC1 // is down. queryTracker.assertQueried(sCluster, 1, 1, 0); queryTracker.assertQueried(sCluster, 1, 2, 0); - queryTracker.assertQueried(sCluster, 2, 1, 25); - queryTracker.assertQueried(sCluster, 2, 2, 25); + queryTracker.assertQueried(sCluster, 2, 1, 10); + queryTracker.assertQueried(sCluster, 2, 2, 10); queryTracker.assertQueried(sCluster, 3, 1, 0); queryTracker.assertQueried(sCluster, 3, 1, 0); } finally {