From cb5272c03987f34011227c5777435d3582d9bcb3 Mon Sep 17 00:00:00 2001 From: "lz.zukowski@gmail.com" Date: Tue, 5 Sep 2017 19:27:09 +0200 Subject: [PATCH 1/2] Add possibility to create LoadBalancing policy without DataCenter restrictions --- .../connector/cql/CassandraConnector.scala | 21 +++++++++++++++---- .../spark/connector/cql/DataCenterAware.scala | 9 ++++++++ .../LocalNodeFirstLoadBalancingPolicy.scala | 8 +++++-- 3 files changed, 32 insertions(+), 6 deletions(-) create mode 100644 spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/DataCenterAware.scala diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/CassandraConnector.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/CassandraConnector.scala index 133b39daf..fe4973449 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/CassandraConnector.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/CassandraConnector.scala @@ -11,6 +11,9 @@ import com.datastax.spark.connector.cql.CassandraConnectorConf.CassandraSSLConf import com.datastax.spark.connector.util.SerialShutdownHooks import com.datastax.spark.connector.util.Logging +import scala.tools.nsc.interpreter +import scala.tools.nsc.interpreter.session + /** Provides and manages connections to Cassandra. * * A `CassandraConnector` instance is serializable and @@ -78,10 +81,8 @@ class CassandraConnector(val conf: CassandraConnectorConf) def openSession() = { val session = sessionCache.acquire(_config) try { - val allNodes = session.getCluster.getMetadata.getAllHosts.toSet - val dcToUse = _config.localDC.getOrElse(LocalNodeFirstLoadBalancingPolicy.determineDataCenter(_config.hosts, allNodes)) - val myNodes = allNodes.filter(_.getDatacenter == dcToUse).map(_.getAddress) - _config = _config.copy(hosts = myNodes) + val foundNodes = findNodes(session) + _config = _config.copy(hosts = foundNodes) val connectionsPerHost = _config.maxConnectionsPerExecutor.getOrElse(1) val poolingOptions = session.getCluster.getConfiguration.getPoolingOptions @@ -102,6 +103,18 @@ class CassandraConnector(val conf: CassandraConnectorConf) } } + private def findNodes(session: Session) = { + val allNodes: Set[Host] = session.getCluster.getMetadata.getAllHosts.toSet + + session.getCluster.getConfiguration.getPolicies.getLoadBalancingPolicy match { + case policy: DataCenterAware => { + val dcToUse = _config.localDC.getOrElse(policy.determineDataCenter(_config.hosts, allNodes)) + allNodes.filter(_.getDatacenter == dcToUse).map(_.getAddress) + } + case _ => allNodes.map(_.getAddress) + } + } + /** Allows to use Cassandra `Session` in a safe way without * risk of forgetting to close it. The `Session` object obtained through this method * is a proxy to a shared, single `Session` associated with the cluster. diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/DataCenterAware.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/DataCenterAware.scala new file mode 100644 index 000000000..51259963c --- /dev/null +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/DataCenterAware.scala @@ -0,0 +1,9 @@ +package com.datastax.spark.connector.cql + +import java.net.InetAddress + +import com.datastax.driver.core.Host + +trait DataCenterAware { + def determineDataCenter(contactPoints: Set[InetAddress], allHosts: Set[Host]):String +} diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/LocalNodeFirstLoadBalancingPolicy.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/LocalNodeFirstLoadBalancingPolicy.scala index b02b408ca..5b1505735 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/LocalNodeFirstLoadBalancingPolicy.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/LocalNodeFirstLoadBalancingPolicy.scala @@ -15,7 +15,7 @@ import scala.util.Random * For writes, if a statement has a routing key set, this LBP is token aware - it prefers the nodes which * are replicas of the computed token to the other nodes. */ class LocalNodeFirstLoadBalancingPolicy(contactPoints: Set[InetAddress], localDC: Option[String] = None, - shuffleReplicas: Boolean = true) extends LoadBalancingPolicy with Logging { + shuffleReplicas: Boolean = true) extends LoadBalancingPolicy with Logging with DataCenterAware { import LocalNodeFirstLoadBalancingPolicy._ @@ -72,7 +72,7 @@ class LocalNodeFirstLoadBalancingPolicy(contactPoints: Set[InetAddress], localDC else tokenAwareQueryPlan(keyspace, statement) } - + override def onAdd(host: Host) { // The added host might be a "better" version of a host already in the set. // The nodes added in the init call don't have DC and rack set. @@ -86,6 +86,10 @@ class LocalNodeFirstLoadBalancingPolicy(contactPoints: Set[InetAddress], localDC logInfo(s"Removed host ${host.getAddress.getHostAddress} (${host.getDatacenter})") } + override def determineDataCenter(contactPoints: Set[InetAddress], allHosts: Set[Host]): String = { + LocalNodeFirstLoadBalancingPolicy.determineDataCenter(contactPoints,allHosts) + } + override def close() = { } override def onUp(host: Host) = { } override def onDown(host: Host) = { } From d510943024f292baf313bcc62a5cea177d7a3b30 Mon Sep 17 00:00:00 2001 From: "lz.zukowski@gmail.com" Date: Tue, 5 Sep 2017 19:33:19 +0200 Subject: [PATCH 2/2] Fix imports --- .../spark/connector/cql/CassandraConnector.scala | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/CassandraConnector.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/CassandraConnector.scala index fe4973449..1294a0d5e 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/CassandraConnector.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/CassandraConnector.scala @@ -3,16 +3,13 @@ package com.datastax.spark.connector.cql import java.io.IOException import java.net.InetAddress -import scala.collection.JavaConversions._ -import scala.language.reflectiveCalls -import org.apache.spark.{SparkConf, SparkContext} import com.datastax.driver.core._ import com.datastax.spark.connector.cql.CassandraConnectorConf.CassandraSSLConf -import com.datastax.spark.connector.util.SerialShutdownHooks -import com.datastax.spark.connector.util.Logging +import com.datastax.spark.connector.util.{Logging, SerialShutdownHooks} +import org.apache.spark.{SparkConf, SparkContext} -import scala.tools.nsc.interpreter -import scala.tools.nsc.interpreter.session +import scala.collection.JavaConversions._ +import scala.language.reflectiveCalls /** Provides and manages connections to Cassandra. *