Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LocalNodeFirstLoadBalancingPolicy not only one DC for connections #1140

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package com.datastax.spark.connector.cql
import java.io.IOException
import java.net.InetAddress

import scala.collection.JavaConversions._
import scala.language.reflectiveCalls
import org.apache.spark.{SparkConf, SparkContext}
import com.datastax.driver.core._
import com.datastax.spark.connector.cql.CassandraConnectorConf.CassandraSSLConf
import com.datastax.spark.connector.util.SerialShutdownHooks
import com.datastax.spark.connector.util.Logging
import com.datastax.spark.connector.util.{Logging, SerialShutdownHooks}
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.JavaConversions._
import scala.language.reflectiveCalls

/** Provides and manages connections to Cassandra.
*
Expand Down Expand Up @@ -78,10 +78,8 @@ class CassandraConnector(val conf: CassandraConnectorConf)
def openSession() = {
val session = sessionCache.acquire(_config)
try {
val allNodes = session.getCluster.getMetadata.getAllHosts.toSet
val dcToUse = _config.localDC.getOrElse(LocalNodeFirstLoadBalancingPolicy.determineDataCenter(_config.hosts, allNodes))
val myNodes = allNodes.filter(_.getDatacenter == dcToUse).map(_.getAddress)
_config = _config.copy(hosts = myNodes)
val foundNodes = findNodes(session)
_config = _config.copy(hosts = foundNodes)

val connectionsPerHost = _config.maxConnectionsPerExecutor.getOrElse(1)
val poolingOptions = session.getCluster.getConfiguration.getPoolingOptions
Expand All @@ -102,6 +100,18 @@ class CassandraConnector(val conf: CassandraConnectorConf)
}
}

private def findNodes(session: Session) = {
val allNodes: Set[Host] = session.getCluster.getMetadata.getAllHosts.toSet

session.getCluster.getConfiguration.getPolicies.getLoadBalancingPolicy match {
case policy: DataCenterAware => {
val dcToUse = _config.localDC.getOrElse(policy.determineDataCenter(_config.hosts, allNodes))
allNodes.filter(_.getDatacenter == dcToUse).map(_.getAddress)
}
case _ => allNodes.map(_.getAddress)
}
}

/** Allows to use Cassandra `Session` in a safe way without
* risk of forgetting to close it. The `Session` object obtained through this method
* is a proxy to a shared, single `Session` associated with the cluster.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.datastax.spark.connector.cql

import java.net.InetAddress

import com.datastax.driver.core.Host

trait DataCenterAware {
def determineDataCenter(contactPoints: Set[InetAddress], allHosts: Set[Host]):String
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import scala.util.Random
* For writes, if a statement has a routing key set, this LBP is token aware - it prefers the nodes which
* are replicas of the computed token to the other nodes. */
class LocalNodeFirstLoadBalancingPolicy(contactPoints: Set[InetAddress], localDC: Option[String] = None,
shuffleReplicas: Boolean = true) extends LoadBalancingPolicy with Logging {
shuffleReplicas: Boolean = true) extends LoadBalancingPolicy with Logging with DataCenterAware {

import LocalNodeFirstLoadBalancingPolicy._

Expand Down Expand Up @@ -72,7 +72,7 @@ class LocalNodeFirstLoadBalancingPolicy(contactPoints: Set[InetAddress], localDC
else
tokenAwareQueryPlan(keyspace, statement)
}

override def onAdd(host: Host) {
// The added host might be a "better" version of a host already in the set.
// The nodes added in the init call don't have DC and rack set.
Expand All @@ -86,6 +86,10 @@ class LocalNodeFirstLoadBalancingPolicy(contactPoints: Set[InetAddress], localDC
logInfo(s"Removed host ${host.getAddress.getHostAddress} (${host.getDatacenter})")
}

override def determineDataCenter(contactPoints: Set[InetAddress], allHosts: Set[Host]): String = {
LocalNodeFirstLoadBalancingPolicy.determineDataCenter(contactPoints,allHosts)
}

override def close() = { }
override def onUp(host: Host) = { }
override def onDown(host: Host) = { }
Expand Down