Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Support for Spark3.4 #2050

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
5 changes: 3 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import scala.xml.transform.{RewriteRule, RuleTransformer}
import scala.xml.{Node => XmlNode, NodeSeq => XmlNodeSeq, _}

val condaEnvName = "synapseml"
val sparkVersion = "3.2.3"
val sparkVersion = "3.4.1"
name := "synapseml"
ThisBuild / organization := "com.microsoft.azure"
ThisBuild / scalaVersion := "2.12.15"
ThisBuild / scalaVersion := "2.12.17"

val scalaMajorVersion = 2.12

Expand All @@ -27,6 +27,7 @@ val coreDependencies = Seq(
"org.apache.spark" %% "spark-tags" % sparkVersion % "test",
"org.scalatest" %% "scalatest" % "3.2.14" % "test")
val extraDependencies = Seq(
"commons-lang" % "commons-lang" % "2.6",
"org.scalactic" %% "scalactic" % "3.2.14",
"io.spray" %% "spray-json" % "1.3.5",
"com.jcraft" % "jsch" % "0.1.54",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ object PackageUtils {

val PackageName = s"synapseml_$ScalaVersionSuffix"
val PackageMavenCoordinate = s"$PackageGroup:$PackageName:${BuildInfo.version}"
private val AvroCoordinate = "org.apache.spark:spark-avro_2.12:3.3.1"
private val AvroCoordinate = "org.apache.spark:spark-avro_2.12:3.4.1"
val PackageRepository: String = SparkMLRepository

// If testing onnx package with snapshots repo, make sure to switch to using
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

package com.microsoft.azure.synapse.ml.exploratory

import breeze.stats.distributions.ChiSquared
import breeze.stats.distributions.{ChiSquared, RandBasis}
import com.microsoft.azure.synapse.ml.codegen.Wrappable
import com.microsoft.azure.synapse.ml.core.schema.DatasetExtensions
import com.microsoft.azure.synapse.ml.logging.SynapseMLLogging
Expand Down Expand Up @@ -261,6 +261,7 @@ private[exploratory] case class DistributionMetrics(numFeatures: Int,

// Calculates left-tailed p-value from degrees of freedom and chi-squared test statistic
def chiSquaredPValue: Column = {
implicit val rand: RandBasis = RandBasis.mt0
val degOfFreedom = numFeatures - 1
val scoreCol = chiSquaredTestStatistic
val chiSqPValueUdf = udf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ private[ml] class HadoopFileReader(file: PartitionedFile,

private val iterator = {
val fileSplit = new FileSplit(
new Path(new URI(file.filePath)),
new Path(new URI(file.filePath.toString())),
file.start,
file.length,
Array.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

package com.microsoft.azure.synapse.ml.nn

import breeze.linalg.functions.euclideanDistance
import breeze.linalg.{DenseVector, norm, _}
import com.microsoft.azure.synapse.ml.core.env.StreamUtilities.using

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,17 +199,20 @@ object SparkHelpers {

def flatten(ratings: Dataset[_], num: Int, dstOutputColumn: String, srcOutputColumn: String): DataFrame = {
import ratings.sparkSession.implicits._

val topKAggregator = new TopByKeyAggregator[Int, Int, Float](num, Ordering.by(_._2))
val recs = ratings.as[(Int, Int, Float)].groupByKey(_._1).agg(topKAggregator.toColumn)
.toDF("id", "recommendations")
import org.apache.spark.sql.functions.{collect_top_k, struct}

val arrayType = ArrayType(
new StructType()
.add(dstOutputColumn, IntegerType)
.add("rating", FloatType)
.add(Constants.RatingCol, FloatType)
)
recs.select(col("id").as(srcOutputColumn), col("recommendations").cast(arrayType))

ratings.toDF(srcOutputColumn, dstOutputColumn, Constants.RatingCol).groupBy(srcOutputColumn)
.agg(collect_top_k(struct(Constants.RatingCol, dstOutputColumn), num, false))
.as[(Int, Seq[(Float, Int)])]
.map(t => (t._1, t._2.map(p => (p._2, p._1))))
.toDF(srcOutputColumn, Constants.Recommendations)
.withColumn(Constants.Recommendations, col(Constants.Recommendations).cast(arrayType))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class PatchedImageFileFormat extends ImageFileFormat with Serializable with Logg
Iterator(emptyUnsafeRow)
} else {
val origin = file.filePath
val path = new Path(origin)
val path = new Path(origin.toString())
val fs = path.getFileSystem(broadcastedHadoopConf.value.value)
val stream = fs.open(path)
val bytes = try {
Expand All @@ -107,11 +107,12 @@ class PatchedImageFileFormat extends ImageFileFormat with Serializable with Logg
IOUtils.close(stream)
}

val resultOpt = catchFlakiness(5)(ImageSchema.decode(origin, bytes)) //scalastyle:ignore magic.number
val resultOpt = catchFlakiness(5)( //scalastyle:ignore magic.number
ImageSchema.decode(origin.toString(), bytes))
val filteredResult = if (imageSourceOptions.dropInvalid) {
resultOpt.toIterator
} else {
Iterator(resultOpt.getOrElse(ImageSchema.invalidImageRow(origin)))
Iterator(resultOpt.getOrElse(ImageSchema.invalidImageRow(origin.toString())))
}

if (requiredSchema.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ object RTestGen {
| "spark.sql.shuffle.partitions=10",
| "spark.sql.crossJoin.enabled=true")
|
|sc <- spark_connect(master = "local", version = "3.2.4", config = conf)
|sc <- spark_connect(master = "local", version = "3.4.1", config = conf)
|
|""".stripMargin, StandardOpenOption.CREATE)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

package com.microsoft.azure.synapse.ml.exploratory

import breeze.stats.distributions.ChiSquared
import breeze.stats.distributions.{ChiSquared, RandBasis}
import com.microsoft.azure.synapse.ml.core.test.base.TestBase
import org.apache.spark.sql.functions.{col, count, lit}
import org.apache.spark.sql.types.DoubleType
Expand Down Expand Up @@ -126,6 +126,7 @@ case class DistributionMetricsCalculator(refFeatureProbabilities: Array[Double],
val totalVariationDistance: Double = 0.5d * absDiffObsRef.sum
val wassersteinDistance: Double = absDiffObsRef.sum / absDiffObsRef.length
val chiSquaredTestStatistic: Double = (obsFeatureCounts, refFeatureCounts).zipped.map((a, b) => pow(a - b, 2) / b).sum
implicit val rand: RandBasis = RandBasis.mt0
val chiSquaredPValue: Double = chiSquaredTestStatistic match {
// limit of CDF as x approaches +inf is 1 (https://en.wikipedia.org/wiki/Cumulative_distribution_function)
case Double.PositiveInfinity => 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ import org.apache.spark.sql.functions.{col, length}
import org.apache.spark.sql.streaming.{DataStreamReader, DataStreamWriter, StreamingQuery}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row}
import org.json4s.DefaultFormats
import org.json4s.jackson.Serialization

import java.io.File
import java.util.UUID
import java.util.concurrent.{Executors, TimeUnit, TimeoutException}
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future}
import scala.util.parsing.json.JSONObject


trait HTTPTestUtils extends TestBase with WithFreeUrl {

Expand Down Expand Up @@ -81,7 +81,8 @@ trait HTTPTestUtils extends TestBase with WithFreeUrl {

def sendJsonRequest(map: Map[String, Any], url: String): String = {
val post = new HttpPost(url)
val params = new StringEntity(JSONObject(map).toString())
implicit val defaultFormats: DefaultFormats = DefaultFormats
val params = new StringEntity(Serialization.write(map))
post.addHeader("content-type", "application/json")
post.setEntity(params)
val res = RESTHelpers.Client.execute(post)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@ import java.io.File
import scala.collection.mutable.ListBuffer

class DatabricksGPUTests extends DatabricksTestHelper {
val horovodInstallationScript: File = FileUtilities.join(
BuildInfo.baseDirectory.getParent, "deep-learning",
"src", "main", "python", "horovod_installation.sh").getCanonicalFile
uploadFileToDBFS(horovodInstallationScript, "/FileStore/horovod-fix-commit/horovod_installation.sh")
val clusterId: String = createClusterInPool(GPUClusterName, AdbGpuRuntime, 2, GpuPoolId, GPUInitScripts)
val clusterId: String = createClusterInPool(GPUClusterName, AdbGpuRuntime, 2, GpuPoolId, "[]")
val jobIdsToCancel: ListBuffer[Int] = databricksTestHelper(
clusterId, GPULibraries, GPUNotebooks)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ object DatabricksUtilities {

// ADB Info
val Region = "eastus"
val PoolName = "synapseml-build-10.4"
val GpuPoolName = "synapseml-build-10.4-gpu"
val AdbRuntime = "10.4.x-scala2.12"
val AdbGpuRuntime = "10.4.x-gpu-ml-scala2.12"
val PoolName = "synapseml-build-11.2"
val GpuPoolName = "synapseml-build-11.2-gpu"
val AdbRuntime = "11.2.x-scala2.12"
// https://learn.microsoft.com/en-us/azure/databricks/release-notes/runtime/11.2
val AdbGpuRuntime = "11.2.x-gpu-ml-scala2.12"
val NumWorkers = 5
val AutoTerminationMinutes = 15

Expand Down Expand Up @@ -72,6 +73,8 @@ object DatabricksUtilities {
// TODO: install synapse.ml.dl wheel package here
val GPULibraries: String = List(
Map("maven" -> Map("coordinates" -> PackageMavenCoordinate, "repo" -> PackageRepository)),
Map("pypi" -> Map("package" -> "pytorch-lightning==1.5.0")),
Map("pypi" -> Map("package" -> "torchvision==0.12.0")),
Map("pypi" -> Map("package" -> "transformers==4.15.0")),
Map("pypi" -> Map("package" -> "petastorm==0.12.0"))
).toJson.compactPrint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ object SynapseUtilities {
| "nodeSizeFamily": "MemoryOptimized",
| "provisioningState": "Succeeded",
| "sessionLevelPackagesEnabled": "true",
| "sparkVersion": "3.2"
| "sparkVersion": "3.4"
| }
|}
|""".stripMargin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@
"featurizer = VectorAssembler(inputCols=feature_cols, outputCol=\"features\")\n",
"lr_train_data = featurizer.transform(train_data)[\"target\", \"features\"]\n",
"lr_test_data = featurizer.transform(test_data)[\"target\", \"features\"]\n",
"display(lr_train_data.limit(10).toPandas())"
"display(lr_train_data.limit(10))"
]
},
{
Expand All @@ -139,7 +139,7 @@
"lr_model = lr.fit(lr_train_data)\n",
"lr_predictions = lr_model.transform(lr_test_data)\n",
"\n",
"display(lr_predictions.limit(10).toPandas())"
"display(lr_predictions.limit(10))"
]
},
{
Expand Down Expand Up @@ -193,7 +193,7 @@
"\n",
"vw_train_data = vw_featurizer.transform(train_data)[\"target\", \"features\"]\n",
"vw_test_data = vw_featurizer.transform(test_data)[\"target\", \"features\"]\n",
"display(vw_train_data.limit(10).toPandas())"
"display(vw_train_data.limit(10))"
]
},
{
Expand All @@ -219,7 +219,7 @@
"vw_model = vwr.fit(vw_train_data_2.repartition(1))\n",
"vw_predictions = vw_model.transform(vw_test_data)\n",
"\n",
"display(vw_predictions.limit(10).toPandas())"
"display(vw_predictions.limit(10))"
]
},
{
Expand Down
6 changes: 3 additions & 3 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@ channels:
- conda-forge
- default
dependencies:
- python=3.8.8
- python=3.10.0
- requests=2.26.0
- pip=21.3
- r-base=4.1.1
- r-sparklyr=1.8.1
- r-devtools=2.4.2
- pip:
- pyarrow>=0.15.0
- numpy>=1.19.3
- pyspark==3.2.3
- pyspark==3.4.1
- pandas==1.2.5
- wheel
- sphinx==4.2.0
Expand All @@ -32,6 +31,7 @@ dependencies:
- twine
- jupyter
- mlflow
- numpy==1.23.0
- torch==1.11.0
- torchvision==0.12.0
- horovod==0.25.0
Expand Down
4 changes: 2 additions & 2 deletions pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -511,8 +511,8 @@ jobs:
fi
sbt publishM2

SPARK_VERSION=3.2.4
HADOOP_VERSION=3.2
SPARK_VERSION=3.4.1
HADOOP_VERSION=3.3
wget https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz
(timeout 20m sbt "project $(PACKAGE)" coverage testR) || (echo "retrying" && timeout 20m sbt "project $(PACKAGE)" coverage testR) || (echo "retrying" && timeout 20m sbt "project $(PACKAGE)" coverage testR)
- task: PublishTestResults@2
Expand Down
6 changes: 5 additions & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.9.0")
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.8")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.1")
addSbtPlugin("com.dwijnand" % "sbt-dynver" % "4.0.0")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.9.2")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.0.8")
addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.10.0-RC1")
addSbtPlugin("no.arktekk.sbt" % "aether-deploy" % "0.26.0")

ThisBuild / libraryDependencySchemes ++= Seq(
"org.scala-lang.modules" %% "scala-xml" % VersionScheme.Always
)
4 changes: 2 additions & 2 deletions start
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#!/bin/bash

export OPENMPI_VERSION="3.1.2"
export SPARK_VERSION="3.2.3"
export HADOOP_VERSION="2.7"
export SPARK_VERSION="3.4.1"
export HADOOP_VERSION="3.3"
export SYNAPSEML_VERSION="0.11.2" # Binder compatibility version

echo "Beginning Spark Session..."
Expand Down
4 changes: 2 additions & 2 deletions tools/docker/demo/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ FROM mcr.microsoft.com/oss/mirror/docker.io/library/ubuntu:20.04
ARG SYNAPSEML_VERSION=0.11.2
ARG DEBIAN_FRONTEND=noninteractive

ENV SPARK_VERSION=3.2.3
ENV HADOOP_VERSION=2.7
ENV SPARK_VERSION=3.4.1
ENV HADOOP_VERSION=3
ENV SYNAPSEML_VERSION=${SYNAPSEML_VERSION}
ENV JAVA_HOME /usr/lib/jvm/java-1.11.0-openjdk-amd64

Expand Down
4 changes: 2 additions & 2 deletions tools/docker/minimal/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ FROM mcr.microsoft.com/oss/mirror/docker.io/library/ubuntu:20.04
ARG SYNAPSEML_VERSION=0.11.2
ARG DEBIAN_FRONTEND=noninteractive

ENV SPARK_VERSION=3.2.3
ENV HADOOP_VERSION=2.7
ENV SPARK_VERSION=3.4.1
ENV HADOOP_VERSION=3
ENV SYNAPSEML_VERSION=${SYNAPSEML_VERSION}
ENV JAVA_HOME /usr/lib/jvm/java-1.11.0-openjdk-amd64

Expand Down
8 changes: 4 additions & 4 deletions tools/dotnet/dotnetSetup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ echo "##vso[task.setvariable variable=DOTNET_WORKER_DIR]$DOTNET_WORKER_DIR"
# Install Sleet
dotnet tool install -g sleet

# Install Apache Spark-3.2
curl https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz -o spark-3.2.0-bin-hadoop3.2.tgz
# Install Apache Spark-3.4.1
curl https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz -o spark-3.4.1-bin-hadoop3.tgz
mkdir ~/bin
tar -xzvf spark-3.2.0-bin-hadoop3.2.tgz -C ~/bin
export SPARK_HOME=~/bin/spark-3.2.0-bin-hadoop3.2/
tar -xzvf spark-3.4.1-bin-hadoop3.tgz -C ~/bin
export SPARK_HOME=~/bin/spark-3.4.1-bin-hadoop3/
export PATH=$SPARK_HOME/bin:$PATH
echo "##vso[task.setvariable variable=SPARK_HOME]$SPARK_HOME"
echo "##vso[task.setvariable variable=PATH]$SPARK_HOME/bin:$PATH"
2 changes: 1 addition & 1 deletion tools/helm/livy/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ LABEL maintainer="Dalitso Banda [email protected]"

# Get Spark from US Apache mirror.
ENV APACHE_SPARK_VERSION 2.4.5
ENV HADOOP_VERSION 3.2.1
ENV HADOOP_VERSION 3.3.4

RUN echo "$LOG_TAG Getting SPARK_HOME" && \
apt-get update && \
Expand Down
2 changes: 1 addition & 1 deletion tools/helm/spark/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ LABEL maintainer="Dalitso Banda [email protected]"

# Get Spark from US Apache mirror.
ENV APACHE_SPARK_VERSION 2.4.5
ENV HADOOP_VERSION 3.2.1
ENV HADOOP_VERSION 3.3.4

RUN echo "$LOG_TAG Getting SPARK_HOME" && \
apt-get update && \
Expand Down
2 changes: 1 addition & 1 deletion tools/helm/spark/mini.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ ARG k8s_tests=kubernetes/tests

# Get Spark from US Apache mirror.
ENV APACHE_SPARK_VERSION 2.4.3
ENV HADOOP_VERSION 3.1.2
ENV HADOOP_VERSION 3.3.4
ENV HADOOP_GIT_COMMIT="release-3.2.0-RC1"

ENV SPARK_HOME=/opt/spark
Expand Down
2 changes: 1 addition & 1 deletion tools/helm/zeppelin/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ LABEL maintainer="Dalitso Banda [email protected]"

# Get Spark from US Apache mirror.
ENV APACHE_SPARK_VERSION 2.4.5
ENV HADOOP_VERSION 3.2.1
ENV HADOOP_VERSION 3.3.4
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lave helm charts alone for this pr please


RUN echo "$LOG_TAG Getting SPARK_HOME" && \
apt-get update && \
Expand Down
2 changes: 1 addition & 1 deletion tools/tests/run_r_tests.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ if (!require("sparklyr")) {
library("sparklyr")
}

spark_install_tar(paste(getwd(), "/../../../../../../spark-3.2.4-bin-hadoop3.2.tgz", sep = ""))
spark_install_tar(paste(getwd(), "/../../../../../../spark-3.4.1-bin-hadoop3.3.tgz", sep = ""))

options("testthat.output_file" = "../../../../r-test-results.xml")
devtools::test(reporter = JunitReporter$new())
Loading