Skip to content

Commit

Permalink
Merge branch 'master' into fix-caching
Browse files Browse the repository at this point in the history
  • Loading branch information
dciborow committed Sep 7, 2024
2 parents a77bda2 + f3953bc commit 46593ec
Show file tree
Hide file tree
Showing 145 changed files with 23,359 additions and 141 deletions.
24 changes: 12 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ SynapseML requires Scala 2.12, Spark 3.4+, and Python 3.8+.
| Topics | Links |
| :------ | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| Build | [![Build Status](https://msdata.visualstudio.com/A365/_apis/build/status/microsoft.SynapseML?branchName=master)](https://msdata.visualstudio.com/A365/_build/latest?definitionId=17563&branchName=master) [![codecov](https://codecov.io/gh/Microsoft/SynapseML/branch/master/graph/badge.svg)](https://codecov.io/gh/Microsoft/SynapseML) [![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black) |
| Version | [![Version](https://img.shields.io/badge/version-1.0.4-blue)](https://github.com/Microsoft/SynapseML/releases) [![Release Notes](https://img.shields.io/badge/release-notes-blue)](https://github.com/Microsoft/SynapseML/releases) [![Snapshot Version](https://mmlspark.blob.core.windows.net/icons/badges/master_version3.svg)](#sbt) |
| Docs | [![Website](https://img.shields.io/badge/SynapseML-Website-blue)](https://aka.ms/spark) [![Scala Docs](https://img.shields.io/static/v1?label=api%20docs&message=scala&color=blue&logo=scala)](https://mmlspark.blob.core.windows.net/docs/1.0.4/scala/index.html#package) [![PySpark Docs](https://img.shields.io/static/v1?label=api%20docs&message=python&color=blue&logo=python)](https://mmlspark.blob.core.windows.net/docs/1.0.4/pyspark/index.html) [![Academic Paper](https://img.shields.io/badge/academic-paper-7fdcf7)](https://arxiv.org/abs/1810.08744) |
| Version | [![Version](https://img.shields.io/badge/version-1.0.5-blue)](https://github.com/Microsoft/SynapseML/releases) [![Release Notes](https://img.shields.io/badge/release-notes-blue)](https://github.com/Microsoft/SynapseML/releases) [![Snapshot Version](https://mmlspark.blob.core.windows.net/icons/badges/master_version3.svg)](#sbt) |
| Docs | [![Website](https://img.shields.io/badge/SynapseML-Website-blue)](https://aka.ms/spark) [![Scala Docs](https://img.shields.io/static/v1?label=api%20docs&message=scala&color=blue&logo=scala)](https://mmlspark.blob.core.windows.net/docs/1.0.5/scala/index.html#package) [![PySpark Docs](https://img.shields.io/static/v1?label=api%20docs&message=python&color=blue&logo=python)](https://mmlspark.blob.core.windows.net/docs/1.0.5/pyspark/index.html) [![Academic Paper](https://img.shields.io/badge/academic-paper-7fdcf7)](https://arxiv.org/abs/1810.08744) |
| Support | [![Gitter](https://badges.gitter.im/Microsoft/MMLSpark.svg)](https://gitter.im/Microsoft/MMLSpark?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge) [![Mail](https://img.shields.io/badge/mail-synapseml--support-brightgreen)](mailto:[email protected]) |
| Binder | [![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/microsoft/SynapseML/v1.0.4?labpath=notebooks%2Ffeatures) |
| Binder | [![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/microsoft/SynapseML/v1.0.5?labpath=notebooks%2Ffeatures) |
| Usage | [![Downloads](https://static.pepy.tech/badge/synapseml)](https://pepy.tech/project/synapseml) |
<!-- markdownlint-disable MD033 -->
<details open>
Expand Down Expand Up @@ -119,7 +119,7 @@ In Azure Synapse notebooks please place the following in the first cell of your
{
"name": "synapseml",
"conf": {
"spark.jars.packages": "com.microsoft.azure:synapseml_2.12:1.0.4",
"spark.jars.packages": "com.microsoft.azure:synapseml_2.12:1.0.5",
"spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
"spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12,com.fasterxml.jackson.core:jackson-databind",
"spark.yarn.user.classpath.first": "true",
Expand Down Expand Up @@ -155,15 +155,15 @@ cloud](http://community.cloud.databricks.com), create a new [library from Maven
coordinates](https://docs.databricks.com/user-guide/libraries.html#libraries-from-maven-pypi-or-spark-packages)
in your workspace.

For the coordinates use: `com.microsoft.azure:synapseml_2.12:1.0.4`
For the coordinates use: `com.microsoft.azure:synapseml_2.12:1.0.5`
with the resolver: `https://mmlspark.azureedge.net/maven`. Ensure this library is
attached to your target cluster(s).

Finally, ensure that your Spark cluster has at least Spark 3.2 and Scala 2.12. If you encounter Netty dependency issues please use DBR 10.1.

You can use SynapseML in both your Scala and PySpark notebooks. To get started with our example notebooks import the following databricks archive:

`https://mmlspark.blob.core.windows.net/dbcs/SynapseMLExamplesv1.0.4.dbc`
`https://mmlspark.blob.core.windows.net/dbcs/SynapseMLExamplesv1.0.5.dbc`

### Python Standalone

Expand All @@ -174,7 +174,7 @@ the above example, or from python:
```python
import pyspark
spark = pyspark.sql.SparkSession.builder.appName("MyApp") \
.config("spark.jars.packages", "com.microsoft.azure:synapseml_2.12:1.0.4") \
.config("spark.jars.packages", "com.microsoft.azure:synapseml_2.12:1.0.5") \
.getOrCreate()
import synapse.ml
```
Expand All @@ -185,9 +185,9 @@ SynapseML can be conveniently installed on existing Spark clusters via the
`--packages` option, examples:

```bash
spark-shell --packages com.microsoft.azure:synapseml_2.12:1.0.4
pyspark --packages com.microsoft.azure:synapseml_2.12:1.0.4
spark-submit --packages com.microsoft.azure:synapseml_2.12:1.0.4 MyApp.jar
spark-shell --packages com.microsoft.azure:synapseml_2.12:1.0.5
pyspark --packages com.microsoft.azure:synapseml_2.12:1.0.5
spark-submit --packages com.microsoft.azure:synapseml_2.12:1.0.5 MyApp.jar
```

### SBT
Expand All @@ -196,7 +196,7 @@ If you are building a Spark application in Scala, add the following lines to
your `build.sbt`:

```scala
libraryDependencies += "com.microsoft.azure" % "synapseml_2.12" % "1.0.4"
libraryDependencies += "com.microsoft.azure" % "synapseml_2.12" % "1.0.5"
```

### Apache Livy and HDInsight
Expand All @@ -210,7 +210,7 @@ Excluding certain packages from the library may be necessary due to current issu
{
"name": "synapseml",
"conf": {
"spark.jars.packages": "com.microsoft.azure:synapseml_2.12:1.0.4",
"spark.jars.packages": "com.microsoft.azure:synapseml_2.12:1.0.5",
"spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12,com.fasterxml.jackson.core:jackson-databind"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class DetectLastAnomaly(override val uid: String) extends AnomalyDetectorBase(ui

def setSeriesCol(v: String): this.type = setVectorParam(series, v)

def urlPath: String = "/anomalydetector/v1.1-preview.1/timeseries/last/detect"
def urlPath: String = "/anomalydetector/v1.1/timeseries/last/detect"

override def responseDataType: DataType = ADLastResponse.schema

Expand All @@ -181,7 +181,7 @@ class DetectAnomalies(override val uid: String) extends AnomalyDetectorBase(uid)

def setSeriesCol(v: String): this.type = setVectorParam(series, v)

def urlPath: String = "/anomalydetector/v1.1-preview.1/timeseries/entire/detect"
def urlPath: String = "/anomalydetector/v1.1/timeseries/entire/detect"

override def responseDataType: DataType = ADEntireResponse.schema

Expand Down Expand Up @@ -281,7 +281,7 @@ class SimpleDetectAnomalies(override val uid: String) extends AnomalyDetectorBas

}

def urlPath: String = "/anomalydetector/v1.1-preview.1/timeseries/entire/detect"
def urlPath: String = "/anomalydetector/v1.1/timeseries/entire/detect"

override def responseDataType: DataType = ADEntireResponse.schema

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import org.apache.http.entity.AbstractHttpEntity
import org.apache.spark.ml.param.{BooleanParam, Param, ParamMap, ParamValidators}
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.ml.{ComplexParamsReadable, ComplexParamsWritable, Transformer}
import org.apache.spark.sql.Row.unapplySeq
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, functions => F, types => T}
Expand Down Expand Up @@ -78,7 +80,7 @@ class OpenAIPrompt(override val uid: String) extends Transformer

def setSystemPrompt(value: String): this.type = set(systemPrompt, value)

private val defaultSystemPrompt = "You are an AI chatbot who wants to answer user's questions and complete tasks. " +
private val defaultSystemPrompt = "You are an AI chatbot who wants to answer user's questions and complete tasks. " +
"Follow their instructions carefully and be brief if they don't say otherwise."

setDefault(
Expand All @@ -100,6 +102,27 @@ class OpenAIPrompt(override val uid: String) extends Transformer
"promptTemplate", "outputCol", "postProcessing", "postProcessingOptions", "dropPrompt", "dropMessages",
"systemPrompt")

private def addRAIErrors(df: DataFrame, errorCol: String, outputCol: String): DataFrame = {
val openAIResultFromRow = ChatCompletionResponse.makeFromRowConverter
df.map({ row =>
val originalOutput = Option(row.getAs[Row](outputCol))
.map({ row => openAIResultFromRow(row).choices.head })
val isFiltered = originalOutput
.map(output => Option(output.message.content).isEmpty)
.getOrElse(false)

if (isFiltered) {
val updatedRowSeq = row.toSeq.updated(
row.fieldIndex(errorCol),
Row(originalOutput.get.finish_reason, null) //scalastyle:ignore null
)
Row.fromSeq(updatedRowSeq)
} else {
row
}
})(RowEncoder(df.schema))
}

override def transform(dataset: Dataset[_]): DataFrame = {
import com.microsoft.azure.synapse.ml.core.schema.DatasetExtensions._

Expand All @@ -120,8 +143,10 @@ class OpenAIPrompt(override val uid: String) extends Transformer
val dfTemplated = df.withColumn(messageColName, createMessagesUDF(promptCol))
val completionNamed = chatCompletion.setMessagesCol(messageColName)

val results = completionNamed
.transform(dfTemplated)
val transformed = addRAIErrors(
completionNamed.transform(dfTemplated), chatCompletion.getErrorCol, chatCompletion.getOutputCol)

val results = transformed
.withColumn(getOutputCol,
getParser.parse(F.element_at(F.col(completionNamed.getOutputCol).getField("choices"), 1)
.getField("message").getField("content")))
Expand Down Expand Up @@ -155,19 +180,19 @@ class OpenAIPrompt(override val uid: String) extends Transformer
}, dataset.columns.length)
}

private val legacyModels = Set("ada","babbage", "curie", "davinci",
private val legacyModels = Set("ada", "babbage", "curie", "davinci",
"text-ada-001", "text-babbage-001", "text-curie-001", "text-davinci-002", "text-davinci-003",
"code-cushman-001", "code-davinci-002")

private def openAICompletion: OpenAIServicesBase = {

val completion: OpenAIServicesBase =
if (legacyModels.contains(getDeploymentName)) {
new OpenAICompletion()
}
else {
new OpenAIChatCompletion()
}
if (legacyModels.contains(getDeploymentName)) {
new OpenAICompletion()
}
else {
new OpenAIChatCompletion()
}
// apply all parameters
extractParamMap().toSeq
.filter(p => !localParamNames.contains(p.param.name))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,15 @@ case class OpenAIChatChoice(message: OpenAIMessage,
index: Long,
finish_reason: String)

case class OpenAIUsage(completion_tokens: Long, prompt_tokens: Long, total_tokens: Long)

case class ChatCompletionResponse(id: String,
`object`: String,
created: String,
model: String,
choices: Seq[OpenAIChatChoice])
choices: Seq[OpenAIChatChoice],
system_fingerprint: Option[String],
usage: Option[OpenAIUsage])

object ChatCompletionResponse extends SparkBindings[ChatCompletionResponse]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.microsoft.azure.synapse.ml.Secrets.getAccessToken
import com.microsoft.azure.synapse.ml.core.test.base.Flaky
import com.microsoft.azure.synapse.ml.core.test.fuzzing.{TestObject, TransformerFuzzing}
import org.apache.spark.ml.util.MLReadable
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions.col
import org.scalactic.Equality

Expand Down Expand Up @@ -35,6 +35,16 @@ class OpenAIPromptSuite extends TransformerFuzzing[OpenAIPrompt] with OpenAIAPIK
(null, "none") //scalastyle:ignore null
).toDF("text", "category")

test("RAI Usage") {
val result = prompt
.setDeploymentName(deploymentNameGpt4)
.setPromptTemplate("Tell me about a graphically disgusting movie in detail")
.transform(df)
.select(prompt.getErrorCol)
.collect().head.getAs[Row](0)
assert(Option(result).nonEmpty)
}

test("Basic Usage") {
val nonNullCount = prompt
.setPromptTemplate("here is a comma separated list of 5 {category}: {text}, ")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ def log_fit():

@classmethod
def get_column_number(cls, args, kwargs):
if kwargs and kwargs["df"] and isinstance(kwargs["df"], DataFrame):
if kwargs and kwargs.get("df") and isinstance(kwargs["df"], DataFrame):
return len(kwargs["df"].columns)
elif args and len(args) > 0 and isinstance(args[0], DataFrame):
return len(args[0].columns)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ object PackageUtils {
val PackageName = s"synapseml_$ScalaVersionSuffix"
val PackageMavenCoordinate = s"$PackageGroup:$PackageName:${BuildInfo.version}"
// Use a fixed version for local testing
// val PackageMavenCoordinate = s"$PackageGroup:$PackageName:1.0.4"
// val PackageMavenCoordinate = s"$PackageGroup:$PackageName:1.0.5"

private val AvroCoordinate = "org.apache.spark:spark-avro_2.12:3.4.1"
val PackageRepository: String = SparkMLRepository
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ trait HasErrorCol extends Params {
}

object ErrorUtils extends Serializable {

val ErrorSchema: StructType = new StructType()
.add("response", StringType, nullable = true)
.add("status", StatusLineData.schema, nullable = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ trait GetterSetterFuzzing[S <: PipelineStage with Params] extends TestBase with
val pipelineStage = getterSetterTestObject().stage.copy(new ParamMap()).asInstanceOf[S]
val methods = pipelineStage.getClass.getMethods
pipelineStage.params.foreach { p =>
println(s"Testing parameter ${p.name}")
val getters = methods.filter(_.getName == s"get${p.name.capitalize}").toSeq
val setters = methods.filter(_.getName == s"set${p.name.capitalize}").toSeq
val defaultValue = getterSetterParamExample(pipelineStage, p)
Expand Down
Loading

0 comments on commit 46593ec

Please sign in to comment.