Skip to content

Commit

Permalink
Magnolify API (#5286)
Browse files Browse the repository at this point in the history
  • Loading branch information
kellen authored Apr 10, 2024
1 parent e5a778e commit bcd7d5e
Show file tree
Hide file tree
Showing 43 changed files with 1,602 additions and 264 deletions.
9 changes: 8 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,9 @@ lazy val `scio-avro` = project
// compile
"com.esotericsoftware" % "kryo-shaded" % kryoVersion,
"com.google.protobuf" % "protobuf-java" % protobufVersion,
"com.spotify" %% "magnolify-avro" % magnolifyVersion,
"com.spotify" %% "magnolify-protobuf" % magnolifyVersion,
"com.spotify" %% "magnolify-shared" % magnolifyVersion,
"com.twitter" %% "chill" % chillVersion,
"com.twitter" % "chill-java" % chillVersion,
"me.lyh" %% "protobuf-generic" % protobufGenericVersion,
Expand Down Expand Up @@ -809,6 +812,10 @@ lazy val `scio-google-cloud-platform` = project
"com.google.http-client" % "google-http-client" % googleHttpClientVersion,
"com.google.http-client" % "google-http-client-gson" % googleHttpClientVersion,
"com.google.protobuf" % "protobuf-java" % protobufVersion,
"com.spotify" %% "magnolify-bigquery" % magnolifyVersion,
"com.spotify" %% "magnolify-bigtable" % magnolifyVersion,
"com.spotify" %% "magnolify-datastore" % magnolifyVersion,
"com.spotify" %% "magnolify-shared" % magnolifyVersion,
"com.twitter" %% "chill" % chillVersion,
"com.twitter" % "chill-java" % chillVersion,
"commons-io" % "commons-io" % commonsIoVersion,
Expand Down Expand Up @@ -1123,6 +1130,7 @@ lazy val `scio-tensorflow` = project
"org.apache.beam" % "beam-sdks-java-core" % beamVersion,
"org.apache.beam" % "beam-vendor-guava-32_1_2-jre" % beamVendorVersion,
"org.apache.commons" % "commons-compress" % commonsCompressVersion,
"com.spotify" %% "magnolify-tensorflow" % magnolifyVersion,
"org.slf4j" % "slf4j-api" % slf4jVersion,
"org.tensorflow" % "ndarray" % ndArrayVersion,
"org.tensorflow" % "tensorflow-core-api" % tensorFlowVersion,
Expand Down Expand Up @@ -1227,7 +1235,6 @@ lazy val `scio-examples` = project
"com.spotify" %% "magnolify-avro" % magnolifyVersion,
"com.spotify" %% "magnolify-bigtable" % magnolifyVersion,
"com.spotify" %% "magnolify-datastore" % magnolifyVersion,
"com.spotify" %% "magnolify-shared" % magnolifyVersion,
"com.spotify" %% "magnolify-tensorflow" % magnolifyVersion,
"com.twitter" %% "algebird-core" % algebirdVersion,
"joda-time" % "joda-time" % jodaTimeVersion,
Expand Down
45 changes: 40 additions & 5 deletions scio-avro/src/main/scala/com/spotify/scio/avro/AvroTypedIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.spotify.scio.avro.types.AvroType.HasAvroAnnotation
import com.spotify.scio.coders.Coder
import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT}
import com.spotify.scio.values.SCollection
import magnolify.avro.{AvroType => AvroMagnolifyType}
import org.apache.avro.generic.GenericRecord

import scala.reflect.runtime.universe._
Expand All @@ -38,13 +39,18 @@ final case class AvroTypedIO[T <: HasAvroAnnotation: TypeTag: Coder](path: Strin
private lazy val underlying: GenericRecordIO = GenericRecordIO(path, schema)

override protected def read(sc: ScioContext, params: ReadP): SCollection[T] =
sc.read(underlying)(params).map(avroT.fromGenericRecord)
sc.transform(_.read(underlying)(params).map(avroT.fromGenericRecord))

override protected def write(data: SCollection[T], params: WriteP): Tap[T] = {
val datumFactory = Option(params.datumFactory).getOrElse(GenericRecordDatumFactory)
implicit val coder: Coder[GenericRecord] = avroCoder(datumFactory, schema)
data.map(avroT.toGenericRecord).write(underlying)(params)
tap(AvroIO.ReadParam(params))
underlying
.writeWithContext(
data.transform(_.map(avroT.toGenericRecord)),
params
)
.underlying
.map(avroT.fromGenericRecord)
}

override def tap(read: ReadP): Tap[T] =
Expand All @@ -61,6 +67,35 @@ object AvroTypedIO {
@deprecated("Use AvroTypedIO instead", "0.14.0")
object AvroTyped {
type AvroIO[T <: HasAvroAnnotation] = AvroTypedIO[T]
def AvroIO[T <: HasAvroAnnotation: TypeTag: Coder](path: String): AvroIO[T] =
AvroTypedIO[T](path)
def AvroIO[T <: HasAvroAnnotation: TypeTag: Coder](path: String): AvroIO[T] = AvroTypedIO[T](path)
}

final case class AvroMagnolifyTypedIO[T: AvroMagnolifyType: Coder](path: String) extends ScioIO[T] {
override type ReadP = AvroMagnolifyTypedIO.ReadParam
override type WriteP = AvroMagnolifyTypedIO.WriteParam
override val tapT: TapT.Aux[T, T] = TapOf[T]

override def testId: String = s"AvroIO($path)"

private lazy val avroT: AvroMagnolifyType[T] = implicitly
private lazy val schema = avroT.schema
private lazy val underlying: GenericRecordIO = GenericRecordIO(path, schema)

override protected def read(sc: ScioContext, params: ReadP): SCollection[T] =
sc.transform(_.read(underlying)(params).map(avroT.from))

override protected def write(data: SCollection[T], params: WriteP): Tap[T] = {
val datumFactory = Option(params.datumFactory).getOrElse(GenericRecordDatumFactory)
implicit val coder: Coder[GenericRecord] = avroCoder(datumFactory, schema)
underlying.writeWithContext(data.transform(_.map(avroT.to)), params).underlying.map(avroT.from)
}

override def tap(read: ReadP): Tap[T] = underlying.tap(read).map(avroT.from)
}

object AvroMagnolifyTypedIO {
type ReadParam = GenericRecordIO.ReadParam
val ReadParam = GenericRecordIO.ReadParam
type WriteParam = GenericRecordIO.WriteParam
val WriteParam = GenericRecordIO.WriteParam
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ final case class ObjectFileIO[T: Coder](path: String) extends ScioIO[T] {
*/
override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = {
val objectCoder = CoderMaterializer.beamWithDefault(Coder[T])
sc.read(underlying)(params).map(record => AvroBytesUtil.decode(objectCoder, record))
sc.transform { self =>
self
.read(underlying)(params)
.map(record => AvroBytesUtil.decode(objectCoder, record))
}
}

/**
Expand Down
80 changes: 68 additions & 12 deletions scio-avro/src/main/scala/com/spotify/scio/avro/ProtobufIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,35 @@ package com.spotify.scio.avro

import com.google.protobuf.Message
import com.spotify.scio.ScioContext
import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT}
import com.spotify.scio.coders.Coder
import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT, TestIO}
import com.spotify.scio.protobuf.util.ProtobufUtil
import com.spotify.scio.values.SCollection
import magnolify.protobuf.ProtobufType

import scala.reflect.ClassTag

final case class ProtobufIO[T <: Message: ClassTag](path: String) extends ScioIO[T] {
override type ReadP = ProtobufIO.ReadParam
override type WriteP = ProtobufIO.WriteParam
override val tapT: TapT.Aux[T, T] = TapOf[T]
sealed trait ProtobufIO[T] extends ScioIO[T] {
final override val tapT: TapT.Aux[T, T] = TapOf[T]
}

object ProtobufIO {
final def apply[T](path: String): ProtobufIO[T] =
new ProtobufIO[T] with TestIO[T] {
override def testId: String = s"ProtobufIO($path)"
}
}

object ProtobufObjectFileIO {
type ReadParam = GenericRecordIO.ReadParam
val ReadParam = GenericRecordIO.ReadParam
type WriteParam = GenericRecordIO.WriteParam
val WriteParam = GenericRecordIO.WriteParam
}

final case class ProtobufObjectFileIO[T <: Message: ClassTag](path: String) extends ProtobufIO[T] {
override type ReadP = ProtobufObjectFileIO.ReadParam
override type WriteP = ProtobufObjectFileIO.WriteParam
override def testId: String = s"ProtobufIO($path)"

private lazy val underlying: ObjectFileIO[T] = ObjectFileIO(path)
Expand All @@ -53,13 +71,51 @@ final case class ProtobufIO[T <: Message: ClassTag](path: String) extends ScioIO
data.write(underlying)(params.copy(metadata = metadata)).underlying
}

override def tap(read: ReadP): Tap[T] =
ProtobufFileTap(path, read)
override def tap(read: ReadP): Tap[T] = ProtobufFileTap(path, read)
}

object ProtobufIO {
type ReadParam = GenericRecordIO.ReadParam
val ReadParam = GenericRecordIO.ReadParam
type WriteParam = GenericRecordIO.WriteParam
val WriteParam = GenericRecordIO.WriteParam
final case class ProtobufTypedObjectFileIO[T: Coder, U <: Message: ClassTag](
path: String
)(implicit pt: ProtobufType[T, U])
extends ProtobufIO[T] {
override type ReadP = ProtobufTypedObjectFileIO.ReadParam
override type WriteP = ProtobufTypedObjectFileIO.WriteParam
override def testId: String = s"ProtobufIO($path)"

private lazy val underlying: ObjectFileIO[U] = ObjectFileIO(path)

/**
* Get an SCollection for a Protobuf file.
*
* Protobuf messages are serialized into `Array[Byte]` and stored in Avro files to leverage Avro's
* block file format.
*/
override protected def read(sc: ScioContext, params: ReadP): SCollection[T] =
sc.transform(_.read(underlying)(params).map(pt.from))

/**
* Save this SCollection as a Protobuf file.
*
* Protobuf messages are serialized into `Array[Byte]` and stored in Avro files to leverage Avro's
* block file format.
*/
override protected def write(data: SCollection[T], params: WriteP): Tap[T] = {
val metadata = params.metadata ++ ProtobufUtil.schemaMetadataOf[U]
underlying
.writeWithContext(
data.transform(_.map(pt.to)),
params.copy(metadata = metadata)
)
.underlying
.map(pt.from)
}

override def tap(read: ReadP): Tap[T] = ProtobufFileTap[U](path, read).map(pt.from)
}

object ProtobufTypedObjectFileIO {
type ReadParam = ProtobufObjectFileIO.ReadParam
val ReadParam = ProtobufObjectFileIO.ReadParam
type WriteParam = ProtobufObjectFileIO.WriteParam
val WriteParam = ProtobufObjectFileIO.WriteParam
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import com.spotify.scio.coders.Coder
import com.spotify.scio.io.ClosedTap
import com.spotify.scio.util.FilenamePolicySupplier
import com.spotify.scio.values._
import magnolify.avro.{AvroType => AvroMagnolifyType}
import magnolify.protobuf.ProtobufType
import org.apache.avro.Schema
import org.apache.avro.file.CodecFactory
import org.apache.avro.specific.SpecificRecord
Expand Down Expand Up @@ -183,17 +185,17 @@ final class ProtobufSCollectionOps[T <: Message](private val self: SCollection[T
*/
def saveAsProtobufFile(
path: String,
numShards: Int = ProtobufIO.WriteParam.DefaultNumShards,
suffix: String = ProtobufIO.WriteParam.DefaultSuffixProtobuf,
codec: CodecFactory = ProtobufIO.WriteParam.DefaultCodec,
metadata: Map[String, AnyRef] = ProtobufIO.WriteParam.DefaultMetadata,
shardNameTemplate: String = ProtobufIO.WriteParam.DefaultShardNameTemplate,
tempDirectory: String = ProtobufIO.WriteParam.DefaultTempDirectory,
numShards: Int = ProtobufObjectFileIO.WriteParam.DefaultNumShards,
suffix: String = ProtobufObjectFileIO.WriteParam.DefaultSuffixProtobuf,
codec: CodecFactory = ProtobufObjectFileIO.WriteParam.DefaultCodec,
metadata: Map[String, AnyRef] = ProtobufObjectFileIO.WriteParam.DefaultMetadata,
shardNameTemplate: String = ProtobufObjectFileIO.WriteParam.DefaultShardNameTemplate,
tempDirectory: String = ProtobufObjectFileIO.WriteParam.DefaultTempDirectory,
filenamePolicySupplier: FilenamePolicySupplier =
ProtobufIO.WriteParam.DefaultFilenamePolicySupplier,
prefix: String = ProtobufIO.WriteParam.DefaultPrefix
ProtobufObjectFileIO.WriteParam.DefaultFilenamePolicySupplier,
prefix: String = ProtobufObjectFileIO.WriteParam.DefaultPrefix
)(implicit ct: ClassTag[T]): ClosedTap[T] = {
val param = ProtobufIO.WriteParam[GenericRecord](
val param = ProtobufObjectFileIO.WriteParam[GenericRecord](
numShards,
suffix,
codec,
Expand All @@ -203,7 +205,73 @@ final class ProtobufSCollectionOps[T <: Message](private val self: SCollection[T
shardNameTemplate,
tempDirectory
)
self.write(ProtobufIO[T](path))(param)
self.write(ProtobufObjectFileIO[T](path))(param)
}
}

final class TypedMagnolifyProtobufSCollectionOps[T](private val self: SCollection[T])
extends AnyVal {

/**
* Save this SCollection as a Protobuf file.
*
* Protobuf messages are serialized into `Array[Byte]` and stored in Avro files to leverage Avro's
* block file format.
*/
def saveAsProtobufFile[U <: Message: ClassTag](
path: String,
numShards: Int = ProtobufTypedObjectFileIO.WriteParam.DefaultNumShards,
suffix: String = ProtobufTypedObjectFileIO.WriteParam.DefaultSuffixProtobuf,
codec: CodecFactory = ProtobufTypedObjectFileIO.WriteParam.DefaultCodec,
metadata: Map[String, AnyRef] = ProtobufTypedObjectFileIO.WriteParam.DefaultMetadata,
shardNameTemplate: String = ProtobufTypedObjectFileIO.WriteParam.DefaultShardNameTemplate,
tempDirectory: String = ProtobufTypedObjectFileIO.WriteParam.DefaultTempDirectory,
filenamePolicySupplier: FilenamePolicySupplier =
ProtobufTypedObjectFileIO.WriteParam.DefaultFilenamePolicySupplier,
prefix: String = ProtobufTypedObjectFileIO.WriteParam.DefaultPrefix
)(implicit pt: ProtobufType[T, U]): ClosedTap[T] = {
implicit val tCoder: Coder[T] = self.coder
val param = ProtobufTypedObjectFileIO.WriteParam[GenericRecord](
numShards,
suffix,
codec,
metadata,
filenamePolicySupplier,
prefix,
shardNameTemplate,
tempDirectory
)
self.write(ProtobufTypedObjectFileIO[T, U](path))(param)
}
}

final class TypedMagnolifyAvroSCollectionOps[T](private val self: SCollection[T]) {

def saveAsAvroFile(
path: String,
numShards: Int = AvroTypedIO.WriteParam.DefaultNumShards,
suffix: String = AvroTypedIO.WriteParam.DefaultSuffix,
codec: CodecFactory = AvroTypedIO.WriteParam.DefaultCodec,
metadata: Map[String, AnyRef] = AvroTypedIO.WriteParam.DefaultMetadata,
shardNameTemplate: String = AvroTypedIO.WriteParam.DefaultShardNameTemplate,
tempDirectory: String = AvroTypedIO.WriteParam.DefaultTempDirectory,
filenamePolicySupplier: FilenamePolicySupplier =
AvroTypedIO.WriteParam.DefaultFilenamePolicySupplier,
prefix: String = AvroTypedIO.WriteParam.DefaultPrefix,
datumFactory: AvroDatumFactory[GenericRecord] = AvroTypedIO.WriteParam.DefaultDatumFactory
)(implicit coder: Coder[T], at: AvroMagnolifyType[T]): ClosedTap[T] = {
val param = AvroMagnolifyTypedIO.WriteParam(
numShards,
suffix,
codec,
metadata,
filenamePolicySupplier,
prefix,
shardNameTemplate,
tempDirectory,
datumFactory
)
self.write(AvroMagnolifyTypedIO[T](path))(param)
}
}

Expand All @@ -228,4 +296,12 @@ trait SCollectionSyntax {
implicit def avroProtobufSCollectionOps[T <: Message](
c: SCollection[T]
): ProtobufSCollectionOps[T] = new ProtobufSCollectionOps[T](c)

implicit def typedAvroProtobufSCollectionOps[T](
c: SCollection[T]
): TypedMagnolifyProtobufSCollectionOps[T] = new TypedMagnolifyProtobufSCollectionOps[T](c)

implicit def typedMagnolifyAvroSCollectionOps[T](
c: SCollection[T]
): TypedMagnolifyAvroSCollectionOps[T] = new TypedMagnolifyAvroSCollectionOps(c)
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import com.spotify.scio.avro._
import com.spotify.scio.avro.types.AvroType.HasAvroAnnotation
import com.spotify.scio.coders.Coder
import com.spotify.scio.values._
import magnolify.protobuf.ProtobufType
import magnolify.avro.{AvroType => AvroMagnolifyType}
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.avro.specific.SpecificRecord
Expand Down Expand Up @@ -167,6 +169,16 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal {
): SCollection[T] =
self.read(AvroTypedIO[T](path))(AvroTypedIO.ReadParam(suffix))

/**
* Read avro data from `path` as `GenericRecord` and convert to `T` via the implicitly-available
* `magnolify.avro.AvroType[T]`
*/
def typedAvroFileMagnolify[T: AvroMagnolifyType: Coder](
path: String,
suffix: String = AvroMagnolifyTypedIO.ReadParam.DefaultSuffix
): SCollection[T] =
self.read(AvroMagnolifyTypedIO[T](path))(AvroMagnolifyTypedIO.ReadParam(suffix))

/**
* Get an SCollection for a Protobuf file.
*
Expand All @@ -175,9 +187,19 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal {
*/
def protobufFile[T <: Message: ClassTag](
path: String,
suffix: String = ProtobufIO.ReadParam.DefaultSuffix
suffix: String = ProtobufObjectFileIO.ReadParam.DefaultSuffix
): SCollection[T] =
self.read(ProtobufIO[T](path))(ProtobufIO.ReadParam(suffix))
self.read(ProtobufObjectFileIO[T](path))(ProtobufObjectFileIO.ReadParam(suffix))

/**
* Read back protobuf messages serialized to `Array[Byte]` and stored in Avro files then map them
* automatically to type `T` via the implicit [[magnolify.protobuf.ProtobufType]]
*/
def typedProtobufFile[T: Coder, U <: Message: ClassTag](
path: String,
suffix: String = ProtobufObjectFileIO.ReadParam.DefaultSuffix
)(implicit pt: ProtobufType[T, U]): SCollection[T] =
self.read(ProtobufTypedObjectFileIO[T, U](path))(ProtobufObjectFileIO.ReadParam(suffix))
}

/** Enhanced with Avro methods. */
Expand Down
Loading

0 comments on commit bcd7d5e

Please sign in to comment.