From bcd7d5e0eb9545d714383f23a97ee419a50db515 Mon Sep 17 00:00:00 2001 From: kellen Date: Wed, 10 Apr 2024 09:50:02 -0400 Subject: [PATCH] Magnolify API (#5286) --- build.sbt | 9 +- .../com/spotify/scio/avro/AvroTypedIO.scala | 45 ++++- .../com/spotify/scio/avro/ObjectFileIO.scala | 6 +- .../com/spotify/scio/avro/ProtobufIO.scala | 80 ++++++-- .../scio/avro/syntax/SCollectionSyntax.scala | 96 +++++++++- .../scio/avro/syntax/ScioContextSyntax.scala | 26 ++- .../scala/com/spotify/scio/avro/taps.scala | 4 +- .../spotify/scio/avro/types/AvroType.scala | 11 ++ .../avro/types/ConverterProviderTest.scala | 1 - .../examples/extra/MagnolifyAvroExample.scala | 19 +- .../extra/MagnolifyBigtableExample.scala | 32 ++-- .../extra/MagnolifyDatastoreExample.scala | 18 +- .../extra/MagnolifyTensorFlowExample.scala | 14 +- .../MagnolifyTypedBigQueryTornadoes.scala | 72 ++++++++ ...gnolifyTypedStorageBigQueryTornadoes.scala | 79 ++++++++ .../extra/MagnolifyAvroExampleTest.scala | 14 +- .../extra/MagnolifyBigtableExampleTest.scala | 58 ++++++ .../extra/MagnolifyDatastoreExampleTest.scala | 17 +- .../MagnolifyTensorFlowExampleTest.scala | 44 ++--- .../MagnolifyTypedBigQueryTornadoesTest.scala | 46 +++++ ...ifyTypedStorageBigQueryTornadoesTest.scala | 51 ++++++ .../spotify/scio/bigquery/BigQueryIO.scala | 130 ++++++++++++- .../dynamic/syntax/SCollectionSyntax.scala | 44 ++++- .../com/spotify/scio/bigquery/package.scala | 2 + .../bigquery/syntax/MagnolifySyntax.scala | 148 +++++++++++++++ .../com/spotify/scio/bigquery/taps.scala | 10 + .../scio/bigquery/types/BigQueryType.scala | 14 ++ .../spotify/scio/bigtable/BigTableIO.scala | 172 ++++++++++++++---- .../bigtable/syntax/SCollectionSyntax.scala | 54 +++++- .../bigtable/syntax/ScioContextSyntax.scala | 121 +++++++++--- .../spotify/scio/datastore/DatastoreIO.scala | 94 ++++++++-- .../datastore/syntax/SCollectionSyntax.scala | 25 ++- .../datastore/syntax/ScioContextSyntax.scala | 25 ++- .../scio/bigquery/BigQueryIOTest.scala | 54 ++++++ .../scio/bigtable/BigtableIOTest.scala | 24 +++ .../scio/datastore/DatastoreIOTest.scala | 42 ++++- .../spotify/scio/tensorflow/TFRecordIO.scala | 57 +++++- .../tensorflow/syntax/SCollectionSyntax.scala | 38 ++++ .../tensorflow/syntax/ScioContextSyntax.scala | 18 +- .../scio/tensorflow/TFExampleIOTest.scala | 9 +- .../scio/testing/SCollectionMatchers.scala | 2 +- .../com/spotify/scio/testing/ScioIOSpec.scala | 18 +- .../com/spotify/scio/avro/AvroIOTest.scala | 23 ++- 43 files changed, 1602 insertions(+), 264 deletions(-) create mode 100644 scio-examples/src/main/scala/com/spotify/scio/examples/extra/MagnolifyTypedBigQueryTornadoes.scala create mode 100644 scio-examples/src/main/scala/com/spotify/scio/examples/extra/MagnolifyTypedStorageBigQueryTornadoes.scala create mode 100644 scio-examples/src/test/scala/com/spotify/scio/examples/extra/MagnolifyBigtableExampleTest.scala create mode 100644 scio-examples/src/test/scala/com/spotify/scio/examples/extra/MagnolifyTypedBigQueryTornadoesTest.scala create mode 100644 scio-examples/src/test/scala/com/spotify/scio/examples/extra/MagnolifyTypedStorageBigQueryTornadoesTest.scala create mode 100644 scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/MagnolifySyntax.scala diff --git a/build.sbt b/build.sbt index eb9890422f..c5246e65e2 100644 --- a/build.sbt +++ b/build.sbt @@ -750,6 +750,9 @@ lazy val `scio-avro` = project // compile "com.esotericsoftware" % "kryo-shaded" % kryoVersion, "com.google.protobuf" % "protobuf-java" % protobufVersion, + "com.spotify" %% "magnolify-avro" % magnolifyVersion, + "com.spotify" %% "magnolify-protobuf" % magnolifyVersion, + "com.spotify" %% "magnolify-shared" % magnolifyVersion, "com.twitter" %% "chill" % chillVersion, "com.twitter" % "chill-java" % chillVersion, "me.lyh" %% "protobuf-generic" % protobufGenericVersion, @@ -809,6 +812,10 @@ lazy val `scio-google-cloud-platform` = project "com.google.http-client" % "google-http-client" % googleHttpClientVersion, "com.google.http-client" % "google-http-client-gson" % googleHttpClientVersion, "com.google.protobuf" % "protobuf-java" % protobufVersion, + "com.spotify" %% "magnolify-bigquery" % magnolifyVersion, + "com.spotify" %% "magnolify-bigtable" % magnolifyVersion, + "com.spotify" %% "magnolify-datastore" % magnolifyVersion, + "com.spotify" %% "magnolify-shared" % magnolifyVersion, "com.twitter" %% "chill" % chillVersion, "com.twitter" % "chill-java" % chillVersion, "commons-io" % "commons-io" % commonsIoVersion, @@ -1123,6 +1130,7 @@ lazy val `scio-tensorflow` = project "org.apache.beam" % "beam-sdks-java-core" % beamVersion, "org.apache.beam" % "beam-vendor-guava-32_1_2-jre" % beamVendorVersion, "org.apache.commons" % "commons-compress" % commonsCompressVersion, + "com.spotify" %% "magnolify-tensorflow" % magnolifyVersion, "org.slf4j" % "slf4j-api" % slf4jVersion, "org.tensorflow" % "ndarray" % ndArrayVersion, "org.tensorflow" % "tensorflow-core-api" % tensorFlowVersion, @@ -1227,7 +1235,6 @@ lazy val `scio-examples` = project "com.spotify" %% "magnolify-avro" % magnolifyVersion, "com.spotify" %% "magnolify-bigtable" % magnolifyVersion, "com.spotify" %% "magnolify-datastore" % magnolifyVersion, - "com.spotify" %% "magnolify-shared" % magnolifyVersion, "com.spotify" %% "magnolify-tensorflow" % magnolifyVersion, "com.twitter" %% "algebird-core" % algebirdVersion, "joda-time" % "joda-time" % jodaTimeVersion, diff --git a/scio-avro/src/main/scala/com/spotify/scio/avro/AvroTypedIO.scala b/scio-avro/src/main/scala/com/spotify/scio/avro/AvroTypedIO.scala index d82ca8df4c..bfaed96d9b 100644 --- a/scio-avro/src/main/scala/com/spotify/scio/avro/AvroTypedIO.scala +++ b/scio-avro/src/main/scala/com/spotify/scio/avro/AvroTypedIO.scala @@ -21,6 +21,7 @@ import com.spotify.scio.avro.types.AvroType.HasAvroAnnotation import com.spotify.scio.coders.Coder import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT} import com.spotify.scio.values.SCollection +import magnolify.avro.{AvroType => AvroMagnolifyType} import org.apache.avro.generic.GenericRecord import scala.reflect.runtime.universe._ @@ -38,13 +39,18 @@ final case class AvroTypedIO[T <: HasAvroAnnotation: TypeTag: Coder](path: Strin private lazy val underlying: GenericRecordIO = GenericRecordIO(path, schema) override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = - sc.read(underlying)(params).map(avroT.fromGenericRecord) + sc.transform(_.read(underlying)(params).map(avroT.fromGenericRecord)) override protected def write(data: SCollection[T], params: WriteP): Tap[T] = { val datumFactory = Option(params.datumFactory).getOrElse(GenericRecordDatumFactory) implicit val coder: Coder[GenericRecord] = avroCoder(datumFactory, schema) - data.map(avroT.toGenericRecord).write(underlying)(params) - tap(AvroIO.ReadParam(params)) + underlying + .writeWithContext( + data.transform(_.map(avroT.toGenericRecord)), + params + ) + .underlying + .map(avroT.fromGenericRecord) } override def tap(read: ReadP): Tap[T] = @@ -61,6 +67,35 @@ object AvroTypedIO { @deprecated("Use AvroTypedIO instead", "0.14.0") object AvroTyped { type AvroIO[T <: HasAvroAnnotation] = AvroTypedIO[T] - def AvroIO[T <: HasAvroAnnotation: TypeTag: Coder](path: String): AvroIO[T] = - AvroTypedIO[T](path) + def AvroIO[T <: HasAvroAnnotation: TypeTag: Coder](path: String): AvroIO[T] = AvroTypedIO[T](path) +} + +final case class AvroMagnolifyTypedIO[T: AvroMagnolifyType: Coder](path: String) extends ScioIO[T] { + override type ReadP = AvroMagnolifyTypedIO.ReadParam + override type WriteP = AvroMagnolifyTypedIO.WriteParam + override val tapT: TapT.Aux[T, T] = TapOf[T] + + override def testId: String = s"AvroIO($path)" + + private lazy val avroT: AvroMagnolifyType[T] = implicitly + private lazy val schema = avroT.schema + private lazy val underlying: GenericRecordIO = GenericRecordIO(path, schema) + + override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = + sc.transform(_.read(underlying)(params).map(avroT.from)) + + override protected def write(data: SCollection[T], params: WriteP): Tap[T] = { + val datumFactory = Option(params.datumFactory).getOrElse(GenericRecordDatumFactory) + implicit val coder: Coder[GenericRecord] = avroCoder(datumFactory, schema) + underlying.writeWithContext(data.transform(_.map(avroT.to)), params).underlying.map(avroT.from) + } + + override def tap(read: ReadP): Tap[T] = underlying.tap(read).map(avroT.from) +} + +object AvroMagnolifyTypedIO { + type ReadParam = GenericRecordIO.ReadParam + val ReadParam = GenericRecordIO.ReadParam + type WriteParam = GenericRecordIO.WriteParam + val WriteParam = GenericRecordIO.WriteParam } diff --git a/scio-avro/src/main/scala/com/spotify/scio/avro/ObjectFileIO.scala b/scio-avro/src/main/scala/com/spotify/scio/avro/ObjectFileIO.scala index 3671b207ac..6f285427e7 100644 --- a/scio-avro/src/main/scala/com/spotify/scio/avro/ObjectFileIO.scala +++ b/scio-avro/src/main/scala/com/spotify/scio/avro/ObjectFileIO.scala @@ -39,7 +39,11 @@ final case class ObjectFileIO[T: Coder](path: String) extends ScioIO[T] { */ override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = { val objectCoder = CoderMaterializer.beamWithDefault(Coder[T]) - sc.read(underlying)(params).map(record => AvroBytesUtil.decode(objectCoder, record)) + sc.transform { self => + self + .read(underlying)(params) + .map(record => AvroBytesUtil.decode(objectCoder, record)) + } } /** diff --git a/scio-avro/src/main/scala/com/spotify/scio/avro/ProtobufIO.scala b/scio-avro/src/main/scala/com/spotify/scio/avro/ProtobufIO.scala index e2dae68683..2853fcdf78 100644 --- a/scio-avro/src/main/scala/com/spotify/scio/avro/ProtobufIO.scala +++ b/scio-avro/src/main/scala/com/spotify/scio/avro/ProtobufIO.scala @@ -18,17 +18,35 @@ package com.spotify.scio.avro import com.google.protobuf.Message import com.spotify.scio.ScioContext -import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT} +import com.spotify.scio.coders.Coder +import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT, TestIO} import com.spotify.scio.protobuf.util.ProtobufUtil import com.spotify.scio.values.SCollection +import magnolify.protobuf.ProtobufType import scala.reflect.ClassTag -final case class ProtobufIO[T <: Message: ClassTag](path: String) extends ScioIO[T] { - override type ReadP = ProtobufIO.ReadParam - override type WriteP = ProtobufIO.WriteParam - override val tapT: TapT.Aux[T, T] = TapOf[T] +sealed trait ProtobufIO[T] extends ScioIO[T] { + final override val tapT: TapT.Aux[T, T] = TapOf[T] +} + +object ProtobufIO { + final def apply[T](path: String): ProtobufIO[T] = + new ProtobufIO[T] with TestIO[T] { + override def testId: String = s"ProtobufIO($path)" + } +} + +object ProtobufObjectFileIO { + type ReadParam = GenericRecordIO.ReadParam + val ReadParam = GenericRecordIO.ReadParam + type WriteParam = GenericRecordIO.WriteParam + val WriteParam = GenericRecordIO.WriteParam +} +final case class ProtobufObjectFileIO[T <: Message: ClassTag](path: String) extends ProtobufIO[T] { + override type ReadP = ProtobufObjectFileIO.ReadParam + override type WriteP = ProtobufObjectFileIO.WriteParam override def testId: String = s"ProtobufIO($path)" private lazy val underlying: ObjectFileIO[T] = ObjectFileIO(path) @@ -53,13 +71,51 @@ final case class ProtobufIO[T <: Message: ClassTag](path: String) extends ScioIO data.write(underlying)(params.copy(metadata = metadata)).underlying } - override def tap(read: ReadP): Tap[T] = - ProtobufFileTap(path, read) + override def tap(read: ReadP): Tap[T] = ProtobufFileTap(path, read) } -object ProtobufIO { - type ReadParam = GenericRecordIO.ReadParam - val ReadParam = GenericRecordIO.ReadParam - type WriteParam = GenericRecordIO.WriteParam - val WriteParam = GenericRecordIO.WriteParam +final case class ProtobufTypedObjectFileIO[T: Coder, U <: Message: ClassTag]( + path: String +)(implicit pt: ProtobufType[T, U]) + extends ProtobufIO[T] { + override type ReadP = ProtobufTypedObjectFileIO.ReadParam + override type WriteP = ProtobufTypedObjectFileIO.WriteParam + override def testId: String = s"ProtobufIO($path)" + + private lazy val underlying: ObjectFileIO[U] = ObjectFileIO(path) + + /** + * Get an SCollection for a Protobuf file. + * + * Protobuf messages are serialized into `Array[Byte]` and stored in Avro files to leverage Avro's + * block file format. + */ + override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = + sc.transform(_.read(underlying)(params).map(pt.from)) + + /** + * Save this SCollection as a Protobuf file. + * + * Protobuf messages are serialized into `Array[Byte]` and stored in Avro files to leverage Avro's + * block file format. + */ + override protected def write(data: SCollection[T], params: WriteP): Tap[T] = { + val metadata = params.metadata ++ ProtobufUtil.schemaMetadataOf[U] + underlying + .writeWithContext( + data.transform(_.map(pt.to)), + params.copy(metadata = metadata) + ) + .underlying + .map(pt.from) + } + + override def tap(read: ReadP): Tap[T] = ProtobufFileTap[U](path, read).map(pt.from) +} + +object ProtobufTypedObjectFileIO { + type ReadParam = ProtobufObjectFileIO.ReadParam + val ReadParam = ProtobufObjectFileIO.ReadParam + type WriteParam = ProtobufObjectFileIO.WriteParam + val WriteParam = ProtobufObjectFileIO.WriteParam } diff --git a/scio-avro/src/main/scala/com/spotify/scio/avro/syntax/SCollectionSyntax.scala b/scio-avro/src/main/scala/com/spotify/scio/avro/syntax/SCollectionSyntax.scala index 129613ef0f..a8ceb0d581 100644 --- a/scio-avro/src/main/scala/com/spotify/scio/avro/syntax/SCollectionSyntax.scala +++ b/scio-avro/src/main/scala/com/spotify/scio/avro/syntax/SCollectionSyntax.scala @@ -24,6 +24,8 @@ import com.spotify.scio.coders.Coder import com.spotify.scio.io.ClosedTap import com.spotify.scio.util.FilenamePolicySupplier import com.spotify.scio.values._ +import magnolify.avro.{AvroType => AvroMagnolifyType} +import magnolify.protobuf.ProtobufType import org.apache.avro.Schema import org.apache.avro.file.CodecFactory import org.apache.avro.specific.SpecificRecord @@ -183,17 +185,17 @@ final class ProtobufSCollectionOps[T <: Message](private val self: SCollection[T */ def saveAsProtobufFile( path: String, - numShards: Int = ProtobufIO.WriteParam.DefaultNumShards, - suffix: String = ProtobufIO.WriteParam.DefaultSuffixProtobuf, - codec: CodecFactory = ProtobufIO.WriteParam.DefaultCodec, - metadata: Map[String, AnyRef] = ProtobufIO.WriteParam.DefaultMetadata, - shardNameTemplate: String = ProtobufIO.WriteParam.DefaultShardNameTemplate, - tempDirectory: String = ProtobufIO.WriteParam.DefaultTempDirectory, + numShards: Int = ProtobufObjectFileIO.WriteParam.DefaultNumShards, + suffix: String = ProtobufObjectFileIO.WriteParam.DefaultSuffixProtobuf, + codec: CodecFactory = ProtobufObjectFileIO.WriteParam.DefaultCodec, + metadata: Map[String, AnyRef] = ProtobufObjectFileIO.WriteParam.DefaultMetadata, + shardNameTemplate: String = ProtobufObjectFileIO.WriteParam.DefaultShardNameTemplate, + tempDirectory: String = ProtobufObjectFileIO.WriteParam.DefaultTempDirectory, filenamePolicySupplier: FilenamePolicySupplier = - ProtobufIO.WriteParam.DefaultFilenamePolicySupplier, - prefix: String = ProtobufIO.WriteParam.DefaultPrefix + ProtobufObjectFileIO.WriteParam.DefaultFilenamePolicySupplier, + prefix: String = ProtobufObjectFileIO.WriteParam.DefaultPrefix )(implicit ct: ClassTag[T]): ClosedTap[T] = { - val param = ProtobufIO.WriteParam[GenericRecord]( + val param = ProtobufObjectFileIO.WriteParam[GenericRecord]( numShards, suffix, codec, @@ -203,7 +205,73 @@ final class ProtobufSCollectionOps[T <: Message](private val self: SCollection[T shardNameTemplate, tempDirectory ) - self.write(ProtobufIO[T](path))(param) + self.write(ProtobufObjectFileIO[T](path))(param) + } +} + +final class TypedMagnolifyProtobufSCollectionOps[T](private val self: SCollection[T]) + extends AnyVal { + + /** + * Save this SCollection as a Protobuf file. + * + * Protobuf messages are serialized into `Array[Byte]` and stored in Avro files to leverage Avro's + * block file format. + */ + def saveAsProtobufFile[U <: Message: ClassTag]( + path: String, + numShards: Int = ProtobufTypedObjectFileIO.WriteParam.DefaultNumShards, + suffix: String = ProtobufTypedObjectFileIO.WriteParam.DefaultSuffixProtobuf, + codec: CodecFactory = ProtobufTypedObjectFileIO.WriteParam.DefaultCodec, + metadata: Map[String, AnyRef] = ProtobufTypedObjectFileIO.WriteParam.DefaultMetadata, + shardNameTemplate: String = ProtobufTypedObjectFileIO.WriteParam.DefaultShardNameTemplate, + tempDirectory: String = ProtobufTypedObjectFileIO.WriteParam.DefaultTempDirectory, + filenamePolicySupplier: FilenamePolicySupplier = + ProtobufTypedObjectFileIO.WriteParam.DefaultFilenamePolicySupplier, + prefix: String = ProtobufTypedObjectFileIO.WriteParam.DefaultPrefix + )(implicit pt: ProtobufType[T, U]): ClosedTap[T] = { + implicit val tCoder: Coder[T] = self.coder + val param = ProtobufTypedObjectFileIO.WriteParam[GenericRecord]( + numShards, + suffix, + codec, + metadata, + filenamePolicySupplier, + prefix, + shardNameTemplate, + tempDirectory + ) + self.write(ProtobufTypedObjectFileIO[T, U](path))(param) + } +} + +final class TypedMagnolifyAvroSCollectionOps[T](private val self: SCollection[T]) { + + def saveAsAvroFile( + path: String, + numShards: Int = AvroTypedIO.WriteParam.DefaultNumShards, + suffix: String = AvroTypedIO.WriteParam.DefaultSuffix, + codec: CodecFactory = AvroTypedIO.WriteParam.DefaultCodec, + metadata: Map[String, AnyRef] = AvroTypedIO.WriteParam.DefaultMetadata, + shardNameTemplate: String = AvroTypedIO.WriteParam.DefaultShardNameTemplate, + tempDirectory: String = AvroTypedIO.WriteParam.DefaultTempDirectory, + filenamePolicySupplier: FilenamePolicySupplier = + AvroTypedIO.WriteParam.DefaultFilenamePolicySupplier, + prefix: String = AvroTypedIO.WriteParam.DefaultPrefix, + datumFactory: AvroDatumFactory[GenericRecord] = AvroTypedIO.WriteParam.DefaultDatumFactory + )(implicit coder: Coder[T], at: AvroMagnolifyType[T]): ClosedTap[T] = { + val param = AvroMagnolifyTypedIO.WriteParam( + numShards, + suffix, + codec, + metadata, + filenamePolicySupplier, + prefix, + shardNameTemplate, + tempDirectory, + datumFactory + ) + self.write(AvroMagnolifyTypedIO[T](path))(param) } } @@ -228,4 +296,12 @@ trait SCollectionSyntax { implicit def avroProtobufSCollectionOps[T <: Message]( c: SCollection[T] ): ProtobufSCollectionOps[T] = new ProtobufSCollectionOps[T](c) + + implicit def typedAvroProtobufSCollectionOps[T]( + c: SCollection[T] + ): TypedMagnolifyProtobufSCollectionOps[T] = new TypedMagnolifyProtobufSCollectionOps[T](c) + + implicit def typedMagnolifyAvroSCollectionOps[T]( + c: SCollection[T] + ): TypedMagnolifyAvroSCollectionOps[T] = new TypedMagnolifyAvroSCollectionOps(c) } diff --git a/scio-avro/src/main/scala/com/spotify/scio/avro/syntax/ScioContextSyntax.scala b/scio-avro/src/main/scala/com/spotify/scio/avro/syntax/ScioContextSyntax.scala index c79a068ddc..bacbfda8aa 100644 --- a/scio-avro/src/main/scala/com/spotify/scio/avro/syntax/ScioContextSyntax.scala +++ b/scio-avro/src/main/scala/com/spotify/scio/avro/syntax/ScioContextSyntax.scala @@ -24,6 +24,8 @@ import com.spotify.scio.avro._ import com.spotify.scio.avro.types.AvroType.HasAvroAnnotation import com.spotify.scio.coders.Coder import com.spotify.scio.values._ +import magnolify.protobuf.ProtobufType +import magnolify.avro.{AvroType => AvroMagnolifyType} import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.apache.avro.specific.SpecificRecord @@ -167,6 +169,16 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { ): SCollection[T] = self.read(AvroTypedIO[T](path))(AvroTypedIO.ReadParam(suffix)) + /** + * Read avro data from `path` as `GenericRecord` and convert to `T` via the implicitly-available + * `magnolify.avro.AvroType[T]` + */ + def typedAvroFileMagnolify[T: AvroMagnolifyType: Coder]( + path: String, + suffix: String = AvroMagnolifyTypedIO.ReadParam.DefaultSuffix + ): SCollection[T] = + self.read(AvroMagnolifyTypedIO[T](path))(AvroMagnolifyTypedIO.ReadParam(suffix)) + /** * Get an SCollection for a Protobuf file. * @@ -175,9 +187,19 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { */ def protobufFile[T <: Message: ClassTag]( path: String, - suffix: String = ProtobufIO.ReadParam.DefaultSuffix + suffix: String = ProtobufObjectFileIO.ReadParam.DefaultSuffix ): SCollection[T] = - self.read(ProtobufIO[T](path))(ProtobufIO.ReadParam(suffix)) + self.read(ProtobufObjectFileIO[T](path))(ProtobufObjectFileIO.ReadParam(suffix)) + + /** + * Read back protobuf messages serialized to `Array[Byte]` and stored in Avro files then map them + * automatically to type `T` via the implicit [[magnolify.protobuf.ProtobufType]] + */ + def typedProtobufFile[T: Coder, U <: Message: ClassTag]( + path: String, + suffix: String = ProtobufObjectFileIO.ReadParam.DefaultSuffix + )(implicit pt: ProtobufType[T, U]): SCollection[T] = + self.read(ProtobufTypedObjectFileIO[T, U](path))(ProtobufObjectFileIO.ReadParam(suffix)) } /** Enhanced with Avro methods. */ diff --git a/scio-avro/src/main/scala/com/spotify/scio/avro/taps.scala b/scio-avro/src/main/scala/com/spotify/scio/avro/taps.scala index c8d80feed6..8a9714eb5d 100644 --- a/scio-avro/src/main/scala/com/spotify/scio/avro/taps.scala +++ b/scio-avro/src/main/scala/com/spotify/scio/avro/taps.scala @@ -79,7 +79,7 @@ object ObjectFileTap { } object ProtobufFileTap { - def apply[T <: Message: ClassTag](path: String, params: ProtobufIO.ReadParam): Tap[T] = + def apply[T <: Message: ClassTag](path: String, params: ProtobufObjectFileIO.ReadParam): Tap[T] = ObjectFileTap(path, params)(Coder.protoMessageCoder[T]) } @@ -99,7 +99,7 @@ final case class AvroTaps(self: Taps) { /** Get a `Future[Tap[T]]` of a Protobuf file. */ def protobufFile[T <: Message: ClassTag]( path: String, - params: ProtobufIO.ReadParam = ProtobufIO.ReadParam() + params: ProtobufObjectFileIO.ReadParam = ProtobufObjectFileIO.ReadParam() ): Future[Tap[T]] = self.mkTap( s"Protobuf: $path", diff --git a/scio-avro/src/main/scala/com/spotify/scio/avro/types/AvroType.scala b/scio-avro/src/main/scala/com/spotify/scio/avro/types/AvroType.scala index b05895f80a..2ce6b640a5 100644 --- a/scio-avro/src/main/scala/com/spotify/scio/avro/types/AvroType.scala +++ b/scio-avro/src/main/scala/com/spotify/scio/avro/types/AvroType.scala @@ -51,6 +51,7 @@ import scala.reflect.runtime.universe._ * @groupname Ungrouped * Other Members */ +@deprecated("Use magnolify API instead.", "0.15.0") object AvroType { /** @@ -87,6 +88,7 @@ object AvroType { @compileTimeOnly( "enable macro paradise (2.12) or -Ymacro-annotations (2.13) to expand macro annotations" ) + @deprecated("Use magnolify API instead.", "0.15.0") class fromSchema(schema: String) extends StaticAnnotation { def macroTransform(annottees: Any*): Any = macro TypeProvider.schemaImpl } @@ -131,6 +133,7 @@ object AvroType { @compileTimeOnly( "enable macro paradise (2.12) or -Ymacro-annotations (2.13) to expand macro annotations" ) + @deprecated("Use magnolify API instead.", "0.15.0") class fromPath(folderGlob: String) extends StaticAnnotation { def macroTransform(annottees: Any*): Any = macro TypeProvider.pathImpl } @@ -161,6 +164,7 @@ object AvroType { @compileTimeOnly( "enable macro paradise (2.12) or -Ymacro-annotations (2.13) to expand macro annotations" ) + @deprecated("Use magnolify API instead.", "0.15.0") class fromSchemaFile(schemaFile: String) extends StaticAnnotation { def macroTransform(annottees: Any*): Any = macro TypeProvider.schemaFileImpl } @@ -188,6 +192,7 @@ object AvroType { @compileTimeOnly( "enable macro paradise (2.12) or -Ymacro-annotations (2.13) to expand macro annotations" ) + @deprecated("Use magnolify API instead.", "0.15.0") class toSchema extends StaticAnnotation { def macroTransform(annottees: Any*): Any = macro TypeProvider.toSchemaImpl } @@ -218,9 +223,11 @@ object AvroType { * Trait for case classes with generated companion objects. * @group trait */ + @deprecated("Use magnolify API instead.", "0.15.0") trait HasAvroAnnotation /** Generate [[org.apache.avro.Schema Schema]] for a case class. */ + @deprecated("Use magnolify API instead.", "0.15.0") def schemaOf[T: TypeTag]: Schema = SchemaProvider.schemaOf[T] /** @@ -228,6 +235,7 @@ object AvroType { * the given case class `T`. * @group converters */ + @deprecated("Use magnolify API instead.", "0.15.0") def fromGenericRecord[T]: GenericRecord => T = macro ConverterProvider.fromGenericRecordImpl[T] @@ -236,10 +244,12 @@ object AvroType { * [[org.apache.avro.generic.GenericRecord GenericRecord]]. * @group converters */ + @deprecated("Use magnolify API instead.", "0.15.0") def toGenericRecord[T]: T => GenericRecord = macro ConverterProvider.toGenericRecordImpl[T] /** Create a new AvroType instance. */ + @deprecated("Use magnolify API instead.", "0.15.0") def apply[T: TypeTag]: AvroType[T] = new AvroType[T] } @@ -248,6 +258,7 @@ object AvroType { * * This decouples generated fields and methods from macro expansion to keep core macro free. */ +@deprecated("Use magnolify API instead.", "0.15.0") class AvroType[T: TypeTag] extends Serializable { private val instance = runtimeMirror(getClass.getClassLoader) .reflectModule(typeOf[T].typeSymbol.companion.asModule) diff --git a/scio-avro/src/test/scala/com/spotify/scio/avro/types/ConverterProviderTest.scala b/scio-avro/src/test/scala/com/spotify/scio/avro/types/ConverterProviderTest.scala index e8c7b98778..cc7b81036a 100644 --- a/scio-avro/src/test/scala/com/spotify/scio/avro/types/ConverterProviderTest.scala +++ b/scio-avro/src/test/scala/com/spotify/scio/avro/types/ConverterProviderTest.scala @@ -18,7 +18,6 @@ package com.spotify.scio.avro.types import java.nio.file.Files - import com.spotify.scio._ import com.spotify.scio.avro._ import org.apache.commons.io.FileUtils diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/MagnolifyAvroExample.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/MagnolifyAvroExample.scala index f4cb43ee7a..b7120890a5 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/MagnolifyAvroExample.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/MagnolifyAvroExample.scala @@ -23,16 +23,9 @@ package com.spotify.scio.examples.extra import com.spotify.scio._ import com.spotify.scio.avro._ -import com.spotify.scio.coders.Coder import com.spotify.scio.examples.common.ExampleData -import com.spotify.scio.examples.extra.MagnolifyAvroExample.wordCountType -import org.apache.avro.generic.GenericRecord object MagnolifyAvroExample { - // limit import scope to avoid polluting namespace - import magnolify.avro._ - - val wordCountType: AvroType[WordCount] = AvroType[WordCount] case class WordCount(word: String, count: Long) } @@ -47,9 +40,6 @@ object MagnolifyAvroExample { // --output=gs://[BUCKET]/[PATH]/wordcount-avro"` object MagnolifyAvroWriteExample { - implicit val genericCoder: Coder[GenericRecord] = - avroGenericRecordCoder(wordCountType.schema) - def main(cmdlineArgs: Array[String]): Unit = { import MagnolifyAvroExample._ @@ -57,8 +47,9 @@ object MagnolifyAvroWriteExample { sc.textFile(args.getOrElse("input", ExampleData.KING_LEAR)) .flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty)) .countByValue - .map(t => wordCountType(WordCount.tupled(t))) - .saveAsAvroFile(args("output"), schema = wordCountType.schema) + .map { case (word, count) => WordCount(word, count) } + // uses implicitly-derived magnolify.avro.AvroType[WordCount] to save to avro + .saveAsAvroFile(args("output")) sc.run() () } @@ -78,8 +69,8 @@ object MagnolifyAvroReadExample { import MagnolifyAvroExample._ val (sc, args) = ContextAndArgs(cmdlineArgs) - sc.avroFile(args("input"), wordCountType.schema) - .map(e => wordCountType(e)) + // uses implicitly-derived magnolify.avro.AvroType[WordCount] to read from avro + sc.typedAvroFileMagnolify[WordCount](args("input")) .map(wc => wc.word + ": " + wc.count) .saveAsTextFile(args("output")) sc.run() diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/MagnolifyBigtableExample.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/MagnolifyBigtableExample.scala index fd52360ea7..7e8e974183 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/MagnolifyBigtableExample.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/MagnolifyBigtableExample.scala @@ -34,11 +34,8 @@ import magnolify.bigtable._ import scala.collection.compat._ object MagnolifyBigtableExample { - // Define case class representation of TensorFlow `Example` case class WordCount(cnt: Long) - // `BigtableType` provides mapping between case classes and `Seq[Mutation]`/`Row` - // for writing/reading. - val WordCountType: BigtableType[WordCount] = BigtableType[WordCount] + val ColumnFamily = "counts" } // ## Magnolify Bigtable Write Example @@ -65,14 +62,10 @@ object MagnolifyBigtableWriteExample { sc.textFile(args.getOrElse("input", ExampleData.KING_LEAR)) .flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty)) .countByValue - // Convert case class to `Seq[Mutation]` and lift it into a key-value pair - // for saving to Bigtable table. - .map { case (word, count) => - val mutations = - WordCountType(WordCount(count), columnFamily = "counts").iterator.to(Iterable) - ByteString.copyFromUtf8(word) -> mutations - } - .saveAsBigtable(btProjectId, btInstanceId, btTableId) + .mapValues(cnt => WordCount(cnt)) + // `keyFn` converts word to a ByteString, while the value is converted via an + // implicitly derived BigtableType[WordCount] + .saveAsBigtable(btProjectId, btInstanceId, btTableId, ColumnFamily, ByteString.copyFromUtf8 _) sc.run() () @@ -99,12 +92,15 @@ object MagnolifyBigtableReadExample { val btInstanceId = args("bigtableInstanceId") val btTableId = args("bigtableTableId") - sc.bigtable(btProjectId, btInstanceId, btTableId) - .map { row => - // Convert Bigtable `Row` to the case class and lift it into a key-value pair. - row.getKey.toStringUtf8 -> WordCountType(row, columnFamily = "counts").cnt - } - .saveAsTextFile(args("output")) + // Internally converts Bigtable `Row` to `(String, WordCount)` via implicit + // BigtableType[WordCount] and the provided `keyFn` + sc.typedBigtable[String, WordCount]( + btProjectId, + btInstanceId, + btTableId, + ColumnFamily, + (bs: ByteString) => bs.toStringUtf8 + ).saveAsTextFile(args("output")) sc.run() () diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/MagnolifyDatastoreExample.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/MagnolifyDatastoreExample.scala index 6e7037daf5..2564bb6156 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/MagnolifyDatastoreExample.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/MagnolifyDatastoreExample.scala @@ -22,19 +22,14 @@ // convert between case classes and Datastore `Entity` types. package com.spotify.scio.examples.extra -import com.google.datastore.v1.client.DatastoreHelper.makeKey import com.google.datastore.v1.Query import com.spotify.scio._ import com.spotify.scio.datastore._ import com.spotify.scio.examples.common.ExampleData -import magnolify.datastore._ object MagnolifyDatastoreExample { - val kind = "magnolify" // Define case class representation of Datastore entities case class WordCount(word: String, count: Long) - // `DatastoreType` provides mapping between case classes and Datatore entities - val wordCountType: EntityType[WordCount] = EntityType[WordCount] } // ## Magnolify Datastore Write Example @@ -54,14 +49,7 @@ object MagnolifyDatastoreWriteExample { sc.textFile(args.getOrElse("input", ExampleData.KING_LEAR)) .flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty)) .countByValue - .map { t => - // Convert case class to `Entity.Builder` - wordCountType - .to(WordCount.tupled(t)) - // Set entity key - .setKey(makeKey(kind, t._1)) - .build() - } + .map { case (word, count) => WordCount(word, count) } .saveAsDatastore(args("output")) sc.run() () @@ -82,9 +70,7 @@ object MagnolifyDatastoreReadExample { import MagnolifyDatastoreExample._ val (sc, args) = ContextAndArgs(cmdlineArgs) - sc.datastore(args("input"), Query.getDefaultInstance) - // Convert `Entity` to case class - .map(e => wordCountType(e)) + sc.typedDatastore[WordCount](args("input"), Query.getDefaultInstance) .map(wc => wc.word + ": " + wc.count) .saveAsTextFile(args("output")) sc.run() diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/MagnolifyTensorFlowExample.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/MagnolifyTensorFlowExample.scala index f52421c46e..e7ad1de029 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/MagnolifyTensorFlowExample.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/MagnolifyTensorFlowExample.scala @@ -35,8 +35,6 @@ object MagnolifyTensorFlowExample { // `Example` type doesn't support `String` natively, derive one from `ByteString` implicit val efString: ExampleField.Primitive[String] = ExampleField.from[ByteString](_.toStringUtf8)(ByteString.copyFromUtf8) - // `TensorFlowType` provides mapping between case classes and TensorFlow `Example` - val wordCountType: ExampleType[WordCount] = ExampleType[WordCount] } // ## Magnolify Tensorflow Write Example @@ -56,8 +54,8 @@ object MagnolifyTensorFlowWriteExample { sc.textFile(args.getOrElse("input", ExampleData.KING_LEAR)) .flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty)) .countByValue - // Convert case class to `Example` and then serialize as `Array[Byte]` - .map(t => wordCountType(WordCount.tupled(t)).toByteArray) + .map { case (word, count) => WordCount(word, count) } + // converts WordCount to Example with the implicitly-derived ExampleType[WordCount] .saveAsTfRecordFile(args("output")) sc.run() () @@ -78,12 +76,8 @@ object MagnolifyTensorFlowReadExample { import MagnolifyTensorFlowExample._ val (sc, args) = ContextAndArgs(cmdlineArgs) - sc.tfRecordFile(args("input")) - .map { b => - // Deserialize `Array[Byte]` as `Example` and then convert to case class - wordCountType(Example.parseFrom(b)) - } - .map(wc => wc.word + ": " + wc.count) + // reads TF Examples and converts to WordCount via the implicitly-derived ExampleType[WordCount] + sc.typedTfRecordFile[WordCount](args("input")) .saveAsTextFile(args("output")) sc.run() () diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/MagnolifyTypedBigQueryTornadoes.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/MagnolifyTypedBigQueryTornadoes.scala new file mode 100644 index 0000000000..6649a6b75c --- /dev/null +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/MagnolifyTypedBigQueryTornadoes.scala @@ -0,0 +1,72 @@ +/* + * Copyright 2019 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Example: Read and write using typed BigQuery API with case classes +// Usage: + +// `sbt "runMain com.spotify.scio.examples.extra.TypedBigQueryTornadoes +// --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] +// --output=[PROJECT]:[DATASET].[TABLE]"` +package com.spotify.scio.examples.extra + +import com.spotify.scio.bigquery._ +import com.spotify.scio.{ContextAndArgs, ScioContext} + +object MagnolifyTypedBigQueryTornadoes { + val query: String = "SELECT tornado, month FROM [bigquery-public-data:samples.gsod]" + case class Row(tornado: Option[Boolean], month: Long) + case class Result(month: Long, tornado_count: Long) + + def pipeline(cmdlineArgs: Array[String]): ScioContext = { + val (sc, args) = ContextAndArgs(cmdlineArgs) + + val resultTap = sc + // Get input from BigQuery and convert elements from `TableRow` to `Row` with the + // implicitly-available `TableRowType[Row]` + .typedBigQuerySelect[Row](Query(query)) + .flatMap(r => if (r.tornado.getOrElse(false)) Seq(r.month) else Nil) + .countByValue + .map(kv => Result(kv._1, kv._2)) + // Save output to BigQuery, convert elements from `Result` to `TableRow` with the + // implicitly-available `TableRowType[Result]` + .saveAsBigQueryTable( + Table.Spec(args("output")), + writeDisposition = WRITE_TRUNCATE, + createDisposition = CREATE_IF_NEEDED + ) + + // Access the loaded tables + resultTap + .output(BigQueryIO.SuccessfulTableLoads) + .map(_.getTableSpec) + .debug(prefix = "Loaded table: ") + + // Access the failed records + resultTap + .output(BigQueryIO.FailedInserts) + .count + .debug(prefix = "Failed inserts: ") + + sc + } + + def main(cmdlineArgs: Array[String]): Unit = { + val sc = pipeline(cmdlineArgs) + sc.run().waitUntilDone() + () + } +} diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/MagnolifyTypedStorageBigQueryTornadoes.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/MagnolifyTypedStorageBigQueryTornadoes.scala new file mode 100644 index 0000000000..16a435f004 --- /dev/null +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/MagnolifyTypedStorageBigQueryTornadoes.scala @@ -0,0 +1,79 @@ +/* + * Copyright 2019 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Example: Read using typed BigQuery Storage API with annotated case classes +// Usage: + +// `sbt "runMain com.spotify.scio.examples.extra.TypedStorageBigQueryTornadoes +// --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] +// --output=[PROJECT]:[DATASET].[TABLE]"` +package com.spotify.scio.examples.extra + +import com.spotify.scio.bigquery._ +import com.spotify.scio.{ContextAndArgs, ScioContext} +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method + +object MagnolifyTypedStorageBigQueryTornadoes { + val table: String = "bigquery-public-data:samples.gsod" + case class Row(month: Long, tornado: Option[Boolean]) + case class Result(month: Long, tornado_count: Long) + + def pipeline(cmdlineArgs: Array[String]): ScioContext = { + val (sc, args) = ContextAndArgs(cmdlineArgs) + + val resultTap = sc + // Get input from BigQuery and convert elements from `TableRow` to `Row` with the + // implicitly-available `TableRowType[Row]` + .typedBigQueryStorageMagnolify[Row]( + Table.Spec(table), + selectedFields = List("tornado", "month"), + rowRestriction = "tornado = true" + ) + .map(_.month) + .countByValue + .map(kv => Result(kv._1, kv._2)) + // Save output to BigQuery, convert elements from `Result` to `TableRow` with the + // implicitly-available `TableRowType[Result]` + .saveAsBigQueryTable( + Table.Spec(args("output")), + method = Method.STORAGE_WRITE_API, + writeDisposition = WRITE_TRUNCATE, + createDisposition = CREATE_IF_NEEDED, + successfulInsertsPropagation = true + ) + + // Access the inserted records + resultTap + .output(BigQueryIO.SuccessfulStorageApiInserts) + .count + .debug(prefix = "Successful inserts: ") + + // Access the failed records + resultTap + .output(BigQueryIO.FailedStorageApiInserts) + .count + .debug(prefix = "Failed inserts: ") + + sc + } + + def main(cmdlineArgs: Array[String]): Unit = { + val sc = pipeline(cmdlineArgs) + sc.run().waitUntilDone() + () + } +} diff --git a/scio-examples/src/test/scala/com/spotify/scio/examples/extra/MagnolifyAvroExampleTest.scala b/scio-examples/src/test/scala/com/spotify/scio/examples/extra/MagnolifyAvroExampleTest.scala index 478d8bed46..7566495777 100644 --- a/scio-examples/src/test/scala/com/spotify/scio/examples/extra/MagnolifyAvroExampleTest.scala +++ b/scio-examples/src/test/scala/com/spotify/scio/examples/extra/MagnolifyAvroExampleTest.scala @@ -20,35 +20,27 @@ package com.spotify.scio.examples.extra import com.spotify.scio.avro.AvroIO import com.spotify.scio.io._ import com.spotify.scio.testing._ -import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder} class MagnolifyAvroExampleTest extends PipelineSpec { import MagnolifyAvroExample._ val textIn: Seq[String] = Seq("a b c d e", "a b a b") val wordCount: Seq[(String, Long)] = Seq(("a", 3L), ("b", 3L), ("c", 1L), ("d", 1L), ("e", 1L)) - val records: Seq[GenericRecord] = wordCount.map { kv => - new GenericRecordBuilder(wordCountType.schema) - .set("word", kv._1) - .set("count", kv._2) - .build() - } + val records: Seq[WordCount] = wordCount.map { case (word, count) => WordCount(word, count) } val textOut: Seq[String] = wordCount.map(kv => kv._1 + ": " + kv._2) "MagnolifyAvroWriteExample" should "work" in { - import MagnolifyAvroWriteExample.genericCoder JobTest[com.spotify.scio.examples.extra.MagnolifyAvroWriteExample.type] .args("--input=in.txt", "--output=wc.avro") .input(TextIO("in.txt"), textIn) - .output(AvroIO[GenericRecord]("wc.avro"))(coll => coll should containInAnyOrder(records)) + .output(AvroIO[WordCount]("wc.avro"))(coll => coll should containInAnyOrder(records)) .run() } "MagnolifyAvroReadExample" should "work" in { - import MagnolifyAvroWriteExample.genericCoder JobTest[com.spotify.scio.examples.extra.MagnolifyAvroReadExample.type] .args("--input=wc.avro", "--output=out.txt") - .input(AvroIO[GenericRecord]("wc.avro"), records) + .input(AvroIO[WordCount]("wc.avro"), records) .output(TextIO("out.txt"))(coll => coll should containInAnyOrder(textOut)) .run() } diff --git a/scio-examples/src/test/scala/com/spotify/scio/examples/extra/MagnolifyBigtableExampleTest.scala b/scio-examples/src/test/scala/com/spotify/scio/examples/extra/MagnolifyBigtableExampleTest.scala new file mode 100644 index 0000000000..eb6c13b653 --- /dev/null +++ b/scio-examples/src/test/scala/com/spotify/scio/examples/extra/MagnolifyBigtableExampleTest.scala @@ -0,0 +1,58 @@ +/* + * Copyright 2024 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.scio.examples.extra + +import com.spotify.scio.bigtable.BigtableIO +import com.spotify.scio.io._ +import com.spotify.scio.testing._ + +class MagnolifyBigtableExampleTest extends PipelineSpec { + import MagnolifyBigtableExample._ + + val project = "my-project" + val instance = "my-instance" + val table = "my-table" + val bigtableOptions: Seq[String] = Seq( + s"--bigtableProjectId=$project", + s"--bigtableInstanceId=$instance", + s"--bigtableTableId=$table" + ) + + val textIn: Seq[String] = Seq("a b c d e", "a b a b") + val wordCount: Seq[(String, Long)] = Seq(("a", 3L), ("b", 3L), ("c", 1L), ("d", 1L), ("e", 1L)) + val expected: Seq[(String, WordCount)] = wordCount.map { case (k, v) => (k, WordCount(v)) } + val expectedText: Seq[String] = expected.map(_.toString) + + "MagnolifyBigtableWriteExample" should "work" in { + JobTest[MagnolifyBigtableWriteExample.type] + .args(bigtableOptions :+ "--input=in.txt": _*) + .input(TextIO("in.txt"), textIn) + .output(BigtableIO[(String, WordCount)](project, instance, table))(coll => + coll should containInAnyOrder(expected) + ) + .run() + } + + "MagnolifyBigtableReadExample" should "work" in { + JobTest[MagnolifyBigtableReadExample.type] + .args(bigtableOptions :+ "--output=out.txt": _*) + .input(BigtableIO[(String, WordCount)](project, instance, table), expected) + .output(TextIO("out.txt"))(coll => coll should containInAnyOrder(expectedText)) + .run() + } +} diff --git a/scio-examples/src/test/scala/com/spotify/scio/examples/extra/MagnolifyDatastoreExampleTest.scala b/scio-examples/src/test/scala/com/spotify/scio/examples/extra/MagnolifyDatastoreExampleTest.scala index ab8514b90b..50cdfc84d0 100644 --- a/scio-examples/src/test/scala/com/spotify/scio/examples/extra/MagnolifyDatastoreExampleTest.scala +++ b/scio-examples/src/test/scala/com/spotify/scio/examples/extra/MagnolifyDatastoreExampleTest.scala @@ -17,37 +17,30 @@ package com.spotify.scio.examples.extra -import com.google.datastore.v1.client.DatastoreHelper.{makeKey, makeValue} -import com.google.datastore.v1.Entity import com.spotify.scio.io._ import com.spotify.scio.datastore._ import com.spotify.scio.testing._ class MagnolifyDatastoreExampleTest extends PipelineSpec { + import MagnolifyDatastoreExample._ + val textIn: Seq[String] = Seq("a b c d e", "a b a b") val wordCount: Seq[(String, Long)] = Seq(("a", 3L), ("b", 3L), ("c", 1L), ("d", 1L), ("e", 1L)) - val entities: Seq[Entity] = wordCount.map { kv => - Entity - .newBuilder() - .setKey(makeKey(MagnolifyDatastoreExample.kind, kv._1)) - .putProperties("word", makeValue(kv._1).build()) - .putProperties("count", makeValue(kv._2).build()) - .build() - } + val entities: Seq[WordCount] = wordCount.map { case (word, count) => WordCount(word, count) } val textOut: Seq[String] = wordCount.map(kv => kv._1 + ": " + kv._2) "MagnolifyDatastoreWriteExample" should "work" in { JobTest[com.spotify.scio.examples.extra.MagnolifyDatastoreWriteExample.type] .args("--input=in.txt", "--output=project") .input(TextIO("in.txt"), textIn) - .output(DatastoreIO("project"))(coll => coll should containInAnyOrder(entities)) + .output(DatastoreIO[WordCount]("project"))(_ should containInAnyOrder(entities)) .run() } "MagnolifyDatastoreReadExample" should "work" in { JobTest[com.spotify.scio.examples.extra.MagnolifyDatastoreReadExample.type] .args("--input=project", "--output=out.txt") - .input(DatastoreIO("project"), entities) + .input(DatastoreIO[WordCount]("project"), entities) .output(TextIO("out.txt"))(coll => coll should containInAnyOrder(textOut)) .run() } diff --git a/scio-examples/src/test/scala/com/spotify/scio/examples/extra/MagnolifyTensorFlowExampleTest.scala b/scio-examples/src/test/scala/com/spotify/scio/examples/extra/MagnolifyTensorFlowExampleTest.scala index 916d764463..94aed60d6b 100644 --- a/scio-examples/src/test/scala/com/spotify/scio/examples/extra/MagnolifyTensorFlowExampleTest.scala +++ b/scio-examples/src/test/scala/com/spotify/scio/examples/extra/MagnolifyTensorFlowExampleTest.scala @@ -17,54 +17,32 @@ package com.spotify.scio.examples.extra -import com.google.protobuf.ByteString import com.spotify.scio.io._ -import com.spotify.scio.tensorflow.TFRecordIO +import com.spotify.scio.tensorflow.TFExampleTypedIO import com.spotify.scio.testing._ -import org.tensorflow.proto.example._ class MagnolifyTensorFlowExampleTest extends PipelineSpec { + import MagnolifyTensorFlowExample._ + val textIn: Seq[String] = Seq("a b c d e", "a b a b") - val wordCount: Seq[(String, Long)] = Seq(("a", 3L), ("b", 3L), ("c", 1L), ("d", 1L), ("e", 1L)) - val examples: Seq[Example] = wordCount.map { kv => - Example - .newBuilder() - .setFeatures( - Features - .newBuilder() - .putFeature( - "word", - Feature - .newBuilder() - .setBytesList(BytesList.newBuilder().addValue(ByteString.copyFromUtf8(kv._1))) - .build() - ) - .putFeature( - "count", - Feature - .newBuilder() - .setInt64List(Int64List.newBuilder().addValue(kv._2)) - .build() - ) - ) - .build() - } - val textOut: Seq[String] = wordCount.map(kv => kv._1 + ": " + kv._2) + val wordCount: Seq[WordCount] = Seq(("a", 3L), ("b", 3L), ("c", 1L), ("d", 1L), ("e", 1L)) + .map { case (word, count) => WordCount(word, count) } + val textOut: Seq[String] = wordCount.map(_.toString()) "MagnolifyTensorFlowWriteExample" should "work" in { - JobTest[com.spotify.scio.examples.extra.MagnolifyTensorFlowWriteExample.type] + JobTest[MagnolifyTensorFlowWriteExample.type] .args("--input=in.txt", "--output=wc.tfrecords") .input(TextIO("in.txt"), textIn) - .output(TFRecordIO("wc.tfrecords")) { - _.map(Example.parseFrom) should containInAnyOrder(examples) + .output(TFExampleTypedIO[WordCount]("wc.tfrecords")) { + _ should containInAnyOrder(wordCount) } .run() } "MagnolifyTensorFlowReadExample" should "work" in { - JobTest[com.spotify.scio.examples.extra.MagnolifyTensorFlowReadExample.type] + JobTest[MagnolifyTensorFlowReadExample.type] .args("--input=wc.tfrecords", "--output=out.txt") - .input(TFRecordIO("wc.tfrecords"), examples.map(_.toByteArray)) + .input(TFExampleTypedIO[WordCount]("wc.tfrecords"), wordCount) .output(TextIO("out.txt"))(coll => coll should containInAnyOrder(textOut)) .run() } diff --git a/scio-examples/src/test/scala/com/spotify/scio/examples/extra/MagnolifyTypedBigQueryTornadoesTest.scala b/scio-examples/src/test/scala/com/spotify/scio/examples/extra/MagnolifyTypedBigQueryTornadoesTest.scala new file mode 100644 index 0000000000..abb58bcb83 --- /dev/null +++ b/scio-examples/src/test/scala/com/spotify/scio/examples/extra/MagnolifyTypedBigQueryTornadoesTest.scala @@ -0,0 +1,46 @@ +/* + * Copyright 2019 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.scio.examples.extra + +import com.spotify.scio.bigquery._ +import com.spotify.scio.testing._ + +class MagnolifyTypedBigQueryTornadoesTest extends PipelineSpec { + import MagnolifyTypedBigQueryTornadoes.{Result, Row} + + val inData: Seq[Row] = Seq( + Row(Some(true), 1), + Row(Some(false), 1), + Row(Some(false), 2), + Row(Some(true), 3), + Row(Some(true), 4), + Row(Some(true), 4) + ) + + val expected: Seq[Result] = Seq(Result(1, 1), Result(3, 1), Result(4, 2)) + + "MagnolifyTypedBigQueryTornadoes" should "work" in { + JobTest[com.spotify.scio.examples.extra.MagnolifyTypedBigQueryTornadoes.type] + .args("--output=dataset.table") + .input(BigQueryIO(MagnolifyTypedBigQueryTornadoes.query), inData) + .output(BigQueryIO[Result]("dataset.table")) { coll => + coll should containInAnyOrder(expected) + } + .run() + } +} diff --git a/scio-examples/src/test/scala/com/spotify/scio/examples/extra/MagnolifyTypedStorageBigQueryTornadoesTest.scala b/scio-examples/src/test/scala/com/spotify/scio/examples/extra/MagnolifyTypedStorageBigQueryTornadoesTest.scala new file mode 100644 index 0000000000..9482c88215 --- /dev/null +++ b/scio-examples/src/test/scala/com/spotify/scio/examples/extra/MagnolifyTypedStorageBigQueryTornadoesTest.scala @@ -0,0 +1,51 @@ +/* + * Copyright 2019 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.scio.examples.extra + +import com.spotify.scio.bigquery._ +import com.spotify.scio.testing._ + +final class MagnolifyTypedStorageBigQueryTornadoesTest extends PipelineSpec { + import MagnolifyTypedStorageBigQueryTornadoes.{Result, Row} + + val inData: Seq[Row] = Seq( + Row(1, Some(true)), + Row(3, Some(true)), + Row(4, Some(true)), + Row(4, Some(true)) + ) + + val expected: Seq[Result] = Seq(Result(1, 1), Result(3, 1), Result(4, 2)) + + "MagnolifyStorageTypedBigQueryTornadoes" should "work" in { + JobTest[com.spotify.scio.examples.extra.MagnolifyTypedStorageBigQueryTornadoes.type] + .args("--output=dataset.table") + .input( + BigQueryIO( + MagnolifyTypedStorageBigQueryTornadoes.table, + List("tornado", "month"), + Some("tornado = true") + ), + inData + ) + .output(BigQueryIO[Result]("dataset.table")) { coll => + coll should containInAnyOrder(expected) + } + .run() + } +} diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala index 3cc3f6ab03..4ce3af2947 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala @@ -26,6 +26,7 @@ import com.spotify.scio.io._ import com.spotify.scio.util.{FilenamePolicySupplier, Functions, ScioUtil} import com.spotify.scio.values.{SCollection, SideOutput, SideOutputCollections} import com.twitter.chill.ClosureCleaner +import magnolify.bigquery.TableRowType import org.apache.avro.generic.GenericRecord import org.apache.beam.sdk.extensions.gcp.options.GcpOptions import org.apache.beam.sdk.io.Compression @@ -280,7 +281,7 @@ final case class BigQueryTypedSelect[T: Coder]( } override protected def write(data: SCollection[T], params: WriteP): Tap[T] = - throw new UnsupportedOperationException("BigQuerySelect is read-only") + throw new UnsupportedOperationException("BigQueryTypedSelect is read-only") override def tap(params: ReadP): Tap[T] = { val tableReference = BigQuery @@ -402,7 +403,7 @@ object BigQueryTypedTable { * Creates a new instance of [[BigQueryTypedTable]] based on the supplied [[Format]]. * * NOTE: LogicalType support when using `Format.GenericRecord` has some caveats: Reading: Bigquery - * types DATE, TIME, DATIME will be read as STRING. Writing: Supports LogicalTypes only for DATE + * types DATE, TIME, DATEIME will be read as STRING. Writing: Supports LogicalTypes only for DATE * and TIME. DATETIME is not yet supported. https://issuetracker.google.com/issues/140681683 */ def apply[F: Coder](table: Table, format: Format[F]): BigQueryTypedTable[F] = @@ -567,7 +568,7 @@ final case class BigQueryStorageSelect(sqlQuery: Query) extends BigQueryIO[Table sc.read(underlying)(BigQueryTypedSelect.ReadParam()) override protected def write(data: SCollection[TableRow], params: WriteP): Tap[TableRow] = - throw new UnsupportedOperationException("BigQuerySelect is read-only") + throw new UnsupportedOperationException("BigQueryStorageSelect is read-only") override def tap(params: ReadP): Tap[TableRow] = underlying.tap(BigQueryTypedSelect.ReadParam()) } @@ -916,3 +917,126 @@ object BigQueryTyped { } } } + +// SELECT + +object BigQueryMagnolifyTypedSelectIO { + type ReadParam = BigQueryTypedSelect.ReadParam + val ReadParam = BigQueryTypedSelect.ReadParam +} + +final case class BigQueryMagnolifyTypedSelectIO[T: TableRowType: Coder]( + query: Query +) extends BigQueryIO[T] { + override type ReadP = BigQuerySelect.ReadParam + override type WriteP = Nothing // ReadOnly + + private lazy val tableRowType: TableRowType[T] = implicitly + private[this] lazy val underlying = + BigQueryTypedSelect(beam.BigQueryIO.readTableRows(), query, identity)(coders.tableRowCoder) + + override def testId: String = s"BigQueryIO(${query.underlying})" + + override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = + sc.transform(_.read(underlying)(params).map(row => tableRowType(row))) + + override protected def write(data: SCollection[T], params: WriteP): Tap[T] = + throw new UnsupportedOperationException("MagnolifyBigQuerySelect is read-only") + + override def tap(params: ReadP): Tap[T] = underlying.tap(params).map(row => tableRowType(row)) +} + +// TABLE + +final case class BigQueryMagnolifyTypedTable[T: TableRowType: Coder]( + table: Table +) extends BigQueryIO[T] + with WriteResultIO[T] { + override type ReadP = Unit + override type WriteP = BigQueryTypedTable.WriteParam[T] + + override def testId: String = s"BigQueryIO(${table.spec})" + + private val tableRowType: TableRowType[T] = implicitly + private val readFn = Functions.serializableFn[SchemaAndRecord, T](sar => + tableRowType(BigQueryUtils.convertGenericRecordToTableRow(sar.getRecord, sar.getTableSchema)) + ) + private val writeFn = Functions.serializableFn[T, TableRow](t => tableRowType(t)) + + private lazy val underlying = { + BigQueryTypedTable( + beam.BigQueryIO.read(readFn), + beam.BigQueryIO.write().withFormatFunction(writeFn), + table, + (gr, ts) => tableRowType(BigQueryUtils.convertGenericRecordToTableRow(gr, ts)) + ) + } + + override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = + sc.read(underlying) + + override protected def writeWithResult( + data: SCollection[T], + params: WriteP + ): (Tap[T], SideOutputCollections) = { + val outputs = data + .write(underlying)(params) + .outputs + .get + + (tap(()), outputs) + } + + override def tap(read: ReadP): Tap[T] = + BigQueryTableRowTypedTap[T](table, tableRowType.apply) +} + +// STORAGE + +final case class BigQueryMagnolifyTypedStorage[T: TableRowType: Coder]( + table: Table, + selectedFields: List[String], + rowRestriction: Option[String] +) extends BigQueryIO[T] { + override type ReadP = Unit + override type WriteP = Nothing // ReadOnly + + override def testId: String = + s"BigQueryIO(${table.spec}, List(${selectedFields.mkString(",")}), $rowRestriction)" + + private lazy val tableRowType: TableRowType[T] = implicitly + private lazy val underlying = BigQueryStorage(table, selectedFields, rowRestriction) + + override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = + sc.transform(_.read(underlying).map(tr => tableRowType(tr))) + + override protected def write(data: SCollection[T], params: WriteP): Tap[T] = + throw new UnsupportedOperationException("MagnolifyBigQueryStorage is read-only") + + override def tap(read: ReadP): Tap[T] = + underlying.tap(read).map(tr => tableRowType(tr)) +} + +object BigQueryMagnolifyTypedStorage { + val ReadParam = BigQueryStorage.ReadParam +} + +final case class BigQueryMagnolifyTypedStorageSelect[T: TableRowType: Coder](sqlQuery: Query) + extends BigQueryIO[T] { + override type ReadP = Unit + override type WriteP = Nothing // ReadOnly + + private[this] lazy val underlying = BigQueryStorageSelect(sqlQuery) + private lazy val tableRowType: TableRowType[T] = implicitly + + override def testId: String = s"BigQueryIO(${sqlQuery.underlying})" + + override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = + sc.transform(_.read(underlying).map(tr => tableRowType(tr))) + + override protected def write(data: SCollection[T], params: WriteP): Tap[T] = + throw new UnsupportedOperationException("MagnolifyBigQueryStorageSelect is read-only") + + override def tap(params: ReadP): Tap[T] = + underlying.tap(params).map(tr => tableRowType(tr)) +} diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/dynamic/syntax/SCollectionSyntax.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/dynamic/syntax/SCollectionSyntax.scala index b17542f267..de32a6191a 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/dynamic/syntax/SCollectionSyntax.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/dynamic/syntax/SCollectionSyntax.scala @@ -25,6 +25,7 @@ import com.spotify.scio.bigquery.{TableRow, Writes} import com.spotify.scio.io.{ClosedTap, EmptyTap} import com.spotify.scio.util.Functions import com.spotify.scio.values.SCollection +import magnolify.bigquery.TableRowType import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.{ CreateDisposition, Method, @@ -37,12 +38,37 @@ import org.apache.beam.sdk.values.ValueInSingleWindow import scala.reflect.runtime.universe._ import scala.util.chaining._ +object DynamicWriteParam extends Writes.WriteParamDefaults + /** * Enhanced version of [[com.spotify.scio.values.SCollection SCollection]] with dynamic destinations * methods. */ final class DynamicBigQueryOps[T](private val self: SCollection[T]) extends AnyVal { + /** + * Save this SCollection to dynamic BigQuery tables specified by `tableFn`, converting elements of + * type `T` to `TableRow` via the implicitly-available `TableRowType[T]` + */ + def saveAsBigQuery( + writeDisposition: WriteDisposition = DynamicWriteParam.DefaultWriteDisposition, + createDisposition: CreateDisposition = DynamicWriteParam.DefaultCreateDisposition, + extendedErrorInfo: Boolean = DynamicWriteParam.DefaultExtendedErrorInfo + )( + tableFn: ValueInSingleWindow[T] => TableDestination + )(implicit tableRowType: TableRowType[T]): ClosedTap[Nothing] = { + val destinations = DynamicDestinationsUtil.tableFn(tableFn, tableRowType.schema) + + new DynamicBigQueryOps(self).saveAsBigQuery( + destinations, + tableRowType.to, + writeDisposition, + createDisposition, + false, + extendedErrorInfo + ) + } + /** * Save this SCollection to dynamic BigQuery tables using the table and schema specified by the * [[org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations DynamicDestinations]]. @@ -52,8 +78,8 @@ final class DynamicBigQueryOps[T](private val self: SCollection[T]) extends AnyV formatFn: T => TableRow, writeDisposition: WriteDisposition, createDisposition: CreateDisposition, - successfulInsertsPropagation: Boolean = false, - extendedErrorInfo: Boolean = false + successfulInsertsPropagation: Boolean, + extendedErrorInfo: Boolean ): ClosedTap[Nothing] = { if (self.context.isTest) { throw new NotImplementedError( @@ -91,15 +117,16 @@ final class DynamicTableRowBigQueryOps[T <: TableRow](private val self: SCollect */ def saveAsBigQuery( schema: TableSchema, - writeDisposition: WriteDisposition = null, - createDisposition: CreateDisposition = null, - extendedErrorInfo: Boolean = false + writeDisposition: WriteDisposition = DynamicWriteParam.DefaultWriteDisposition, + createDisposition: CreateDisposition = DynamicWriteParam.DefaultCreateDisposition, + extendedErrorInfo: Boolean = DynamicWriteParam.DefaultExtendedErrorInfo )(tableFn: ValueInSingleWindow[T] => TableDestination): ClosedTap[Nothing] = new DynamicBigQueryOps(self).saveAsBigQuery( DynamicDestinationsUtil.tableFn(tableFn, schema), identity, writeDisposition, createDisposition, + false, extendedErrorInfo ) } @@ -117,9 +144,9 @@ final class DynamicTypedBigQueryOps[T <: HasAnnotation](private val self: SColle * [[com.spotify.scio.bigquery.types.BigQueryType BigQueryType]]. */ def saveAsTypedBigQuery( - writeDisposition: WriteDisposition = null, - createDisposition: CreateDisposition = null, - extendedErrorInfo: Boolean = false + writeDisposition: WriteDisposition = DynamicWriteParam.DefaultWriteDisposition, + createDisposition: CreateDisposition = DynamicWriteParam.DefaultCreateDisposition, + extendedErrorInfo: Boolean = DynamicWriteParam.DefaultExtendedErrorInfo )( tableFn: ValueInSingleWindow[T] => TableDestination )(implicit tt: TypeTag[T]): ClosedTap[Nothing] = { @@ -131,6 +158,7 @@ final class DynamicTypedBigQueryOps[T <: HasAnnotation](private val self: SColle bqt.toTableRow, writeDisposition, createDisposition, + false, extendedErrorInfo ) } diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/package.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/package.scala index 357b2f7b51..62a377f5b1 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/package.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/package.scala @@ -21,6 +21,7 @@ import com.google.api.services.bigquery.model.{TableRow => GTableRow} import com.spotify.scio.bigquery.instances.CoderInstances import com.spotify.scio.bigquery.syntax.{ FileStorageSyntax, + MagnolifySyntax, SCollectionSyntax, ScioContextSyntax, TableReferenceSyntax, @@ -51,6 +52,7 @@ package object bigquery with TableRowSyntax with TableReferenceSyntax with FileStorageSyntax + with MagnolifySyntax with CoderInstances { /** Alias for BigQuery `CreateDisposition`. */ diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/MagnolifySyntax.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/MagnolifySyntax.scala new file mode 100644 index 0000000000..ef6cca096d --- /dev/null +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/MagnolifySyntax.scala @@ -0,0 +1,148 @@ +/* + * Copyright 2024 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.scio.bigquery.syntax + +import com.spotify.scio.ScioContext +import com.spotify.scio.bigquery.{ + BigQueryMagnolifyTypedSelectIO, + BigQueryMagnolifyTypedStorage, + BigQueryMagnolifyTypedStorageSelect, + BigQueryMagnolifyTypedTable, + BigQueryTypedTable, + Clustering, + Query, + Sharding, + Table, + TimePartitioning +} +import com.spotify.scio.coders.Coder +import com.spotify.scio.io.ClosedTap +import com.spotify.scio.values.SCollection +import magnolify.bigquery.TableRowType +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.{ + CreateDisposition, + Method, + WriteDisposition +} +import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy +import org.joda.time.Duration + +final class MagnolifyBigQueryScioContextOps(private val self: ScioContext) extends AnyVal { + + /** + * Get an SCollection for a BigQuery SELECT query. Both + * [[https://cloud.google.com/bigquery/docs/reference/legacy-sql Legacy SQL]] and + * [[https://cloud.google.com/bigquery/docs/reference/standard-sql/ Standard SQL]] dialects are + * supported. By default the query dialect will be automatically detected. To override this + * behavior, start the query string with `#legacysql` or `#standardsql`. + */ + def typedBigQuerySelect[T: TableRowType: Coder]( + sqlQuery: Query, + flattenResults: Boolean = BigQueryMagnolifyTypedSelectIO.ReadParam.DefaultFlattenResults + ): SCollection[T] = + self.read(BigQueryMagnolifyTypedSelectIO(sqlQuery))( + BigQueryMagnolifyTypedSelectIO.ReadParam(flattenResults) + ) + + /** Get an SCollection for a BigQuery table. */ + def typedBigQueryTable[T: TableRowType: Coder](table: Table): SCollection[T] = + self.read(BigQueryMagnolifyTypedTable(table)) + + /** + * Get an SCollection for a BigQuery table using the storage API. + * + * @param selectedFields + * names of the fields in the table that should be read. If empty, all fields will be read. If + * the specified field is a nested field, all the sub-fields in the field will be selected. + * Fields will always appear in the generated class in the same order as they appear in the + * table, regardless of the order specified in selectedFields. + * @param rowRestriction + * SQL text filtering statement, similar ti a WHERE clause in a query. Currently, we support + * combinations of predicates that are a comparison between a column and a constant value in SQL + * statement. Aggregates are not supported. For example: + * + * {{{ + * "a > DATE '2014-09-27' AND (b > 5 AND c LIKE 'date')" + * }}} + */ + def typedBigQueryStorageMagnolify[T: TableRowType: Coder]( + table: Table, + selectedFields: List[String] = BigQueryMagnolifyTypedStorage.ReadParam.DefaultSelectFields, + rowRestriction: String = null + ): SCollection[T] = + self.read(BigQueryMagnolifyTypedStorage(table, selectedFields, Option(rowRestriction))) + + /** + * Get an SCollection for a BigQuery SELECT query using the storage API. + * + * @param query + * SQL query + */ + def typedBigQueryStorageMagnolify[T: TableRowType: Coder](query: Query): SCollection[T] = + self.read(BigQueryMagnolifyTypedStorageSelect(query)) + +} + +final class MagnolifyBigQuerySCollectionOps[T](private val self: SCollection[T]) { + + def saveAsBigQueryTable( + table: Table, + timePartitioning: TimePartitioning = BigQueryTypedTable.WriteParam.DefaultTimePartitioning, + writeDisposition: WriteDisposition = BigQueryTypedTable.WriteParam.DefaultWriteDisposition, + createDisposition: CreateDisposition = BigQueryTypedTable.WriteParam.DefaultCreateDisposition, + clustering: Clustering = BigQueryTypedTable.WriteParam.DefaultClustering, + method: Method = BigQueryTypedTable.WriteParam.DefaultMethod, + triggeringFrequency: Duration = BigQueryTypedTable.WriteParam.DefaultTriggeringFrequency, + sharding: Sharding = BigQueryTypedTable.WriteParam.DefaultSharding, + failedInsertRetryPolicy: InsertRetryPolicy = + BigQueryTypedTable.WriteParam.DefaultFailedInsertRetryPolicy, + successfulInsertsPropagation: Boolean = + BigQueryTypedTable.WriteParam.DefaultSuccessfulInsertsPropagation, + extendedErrorInfo: Boolean = BigQueryTypedTable.WriteParam.DefaultExtendedErrorInfo, + configOverride: BigQueryTypedTable.WriteParam.ConfigOverride[T] = + BigQueryTypedTable.WriteParam.DefaultConfigOverride + )(implicit coder: Coder[T], tableRowType: TableRowType[T]): ClosedTap[T] = { + val param = BigQueryTypedTable.WriteParam[T]( + method, + tableRowType.schema, + writeDisposition, + createDisposition, + tableRowType.description, + timePartitioning, + clustering, + triggeringFrequency, + sharding, + failedInsertRetryPolicy, + successfulInsertsPropagation, + extendedErrorInfo, + configOverride + ) + self.write(BigQueryMagnolifyTypedTable[T](table))(param) + } + +} + +trait MagnolifySyntax { + implicit def magnolifyBigQueryScioContextOps(sc: ScioContext): MagnolifyBigQueryScioContextOps = + new MagnolifyBigQueryScioContextOps(sc) + + implicit def magnolifyBigQuerySCollectionOps[T]( + scoll: SCollection[T] + ): MagnolifyBigQuerySCollectionOps[T] = + new MagnolifyBigQuerySCollectionOps(scoll) +} diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/taps.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/taps.scala index 8c83dd24dd..1100fca56a 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/taps.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/taps.scala @@ -42,6 +42,16 @@ final case class TableRowJsonTap(path: String, params: TableRowJsonIO.ReadParam) sc.read(TableRowJsonIO(path))(params) } +final case class BigQueryTableRowTypedTap[T: Coder](table: Table, fn: TableRow => T) + extends Tap[T] { + lazy val client: BigQuery = BigQuery.defaultInstance() + + override def value: Iterator[T] = client.tables.rows(table).map(fn) + + override def open(sc: ScioContext): SCollection[T] = + sc.read(BigQueryTypedTable(table, Format.TableRow)(tableRowCoder)).map(fn) +} + final case class BigQueryTypedTap[T: Coder](table: Table, fn: (GenericRecord, TableSchema) => T) extends Tap[T] { lazy val client: BigQuery = BigQuery.defaultInstance() diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/BigQueryType.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/BigQueryType.scala index 0992e70b47..e82b10842c 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/BigQueryType.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/BigQueryType.scala @@ -53,6 +53,7 @@ import scala.util.Try * @groupname Ungrouped * Other Members */ +@deprecated("Use magnolify API instead.", "0.15.0") object BigQueryType { /** @@ -194,6 +195,7 @@ object BigQueryType { @compileTimeOnly( "enable macro paradise (2.12) or -Ymacro-annotations (2.13) to expand macro annotations" ) + @deprecated("Use magnolify API instead.", "0.15.0") class fromTable(tableSpec: String, args: String*) extends StaticAnnotation { def macroTransform(annottees: Any*): Any = macro TypeProvider.tableImpl } @@ -225,6 +227,7 @@ object BigQueryType { @compileTimeOnly( "enable macro paradise (2.12) or -Ymacro-annotations (2.13) to expand macro annotations" ) + @deprecated("Use magnolify API instead.", "0.15.0") class fromSchema(schema: String) extends StaticAnnotation { def macroTransform(annottees: Any*): Any = macro TypeProvider.schemaImpl } @@ -275,6 +278,7 @@ object BigQueryType { @compileTimeOnly( "enable macro paradise (2.12) or -Ymacro-annotations (2.13) to expand macro annotations" ) + @deprecated("Use magnolify API instead.", "0.15.0") class fromStorage( tableSpec: String, args: List[Any] = Nil, @@ -322,6 +326,7 @@ object BigQueryType { @compileTimeOnly( "enable macro paradise (2.12) or -Ymacro-annotations (2.13) to expand macro annotations" ) + @deprecated("Use magnolify API instead.", "0.15.0") class fromQuery(query: String, args: Any*) extends StaticAnnotation { def macroTransform(annottees: Any*): Any = macro TypeProvider.queryImpl } @@ -342,34 +347,40 @@ object BigQueryType { @compileTimeOnly( "enable macro paradise (2.12) or -Ymacro-annotations (2.13) to expand macro annotations" ) + @deprecated("Use magnolify API instead.", "0.15.0") class toTable extends StaticAnnotation { def macroTransform(annottees: Any*): Any = macro TypeProvider.toTableImpl } /** Generate [[org.apache.avro.Schema Schema]] for a case class. */ + @deprecated("Use magnolify API instead.", "0.15.0") def avroSchemaOf[T: TypeTag]: Schema = SchemaProvider.avroSchemaOf[T] /** * Generate [[com.google.api.services.bigquery.model.TableSchema TableSchema]] for a case class. */ + @deprecated("Use magnolify API instead.", "0.15.0") def schemaOf[T: TypeTag]: TableSchema = SchemaProvider.schemaOf[T] /** * Generate a converter function from Avro [[GenericRecord]] to the given case class `T`. * @group converters */ + @deprecated("Use magnolify API instead.", "0.15.0") def fromAvro[T]: GenericRecord => T = macro ConverterProvider.fromAvroImpl[T] /** * Generate a converter function from the given case class `T` to [[GenericRecord]]. * @group converters */ + @deprecated("Use magnolify API instead.", "0.15.0") def toAvro[T]: T => GenericRecord = macro ConverterProvider.toAvroImpl[T] /** * Generate a converter function from [[TableRow]] to the given case class `T`. * @group converters */ + @deprecated("Use magnolify API instead.", "0.15.0") def fromTableRow[T]: TableRow => T = macro ConverterProvider.fromTableRowImpl[T] @@ -377,9 +388,11 @@ object BigQueryType { * Generate a converter function from the given case class `T` to [[TableRow]]. * @group converters */ + @deprecated("Use magnolify API instead.", "0.15.0") def toTableRow[T]: T => TableRow = macro ConverterProvider.toTableRowImpl[T] /** Create a new BigQueryType instance. */ + @deprecated("Use magnolify API instead.", "0.15.0") @inline final def apply[T: TypeTag]: BigQueryType[T] = new BigQueryType[T] } @@ -388,6 +401,7 @@ object BigQueryType { * * This decouples generated fields and methods from macro expansion to keep core macro free. */ +@deprecated("Use magnolify API instead.", "0.15.0") class BigQueryType[T: TypeTag] { private[this] val bases = typeOf[T].companion.baseClasses diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/BigTableIO.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/BigTableIO.scala index 1cabe73a7f..fcbb6c1236 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/BigTableIO.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/BigTableIO.scala @@ -25,6 +25,7 @@ import com.spotify.scio.coders.{Coder, CoderMaterializer} import com.spotify.scio.io.{EmptyTap, EmptyTapOf, ScioIO, Tap, TapT, TestIO} import com.spotify.scio.util.Functions import com.spotify.scio.values.SCollection +import magnolify.bigtable.BigtableType import org.apache.beam.sdk.io.gcp.{bigtable => beam} import org.apache.beam.sdk.io.range.ByteKeyRange import org.apache.beam.sdk.values.KV @@ -56,17 +57,13 @@ final case class BigtableRead(bigtableOptions: BigtableOptions, tableId: String) override protected def read(sc: ScioContext, params: ReadP): SCollection[Row] = { val coder = CoderMaterializer.beam(sc, Coder.protoMessageCoder[Row]) - val opts = bigtableOptions // defeat closure - val read = beam.BigtableIO - .read() - .withProjectId(bigtableOptions.getProjectId) - .withInstanceId(bigtableOptions.getInstanceId) - .withTableId(tableId) - .withBigtableOptionsConfigurator(Functions.serializableFn(_ => opts.toBuilder)) - .withMaxBufferElementCount(params.maxBufferElementCount.map(Int.box).orNull) - .pipe(r => if (params.keyRanges.isEmpty) r else r.withKeyRanges(params.keyRanges.asJava)) - .pipe(r => Option(params.rowFilter).fold(r)(r.withRowFilter)): @nowarn("cat=deprecation") - + val read = BigtableRead.read( + bigtableOptions, + tableId, + params.maxBufferElementCount, + params.keyRanges, + params.rowFilter + ) sc.applyTransform(read).setCoder(coder) } @@ -97,14 +94,119 @@ object BigtableRead { maxBufferElementCount: Option[Int] = ReadParam.DefaultMaxBufferElementCount ) - final def apply(projectId: String, instanceId: String, tableId: String): BigtableRead = { - val bigtableOptions = BigtableOptions - .builder() - .setProjectId(projectId) - .setInstanceId(instanceId) - .build - BigtableRead(bigtableOptions, tableId) + private[scio] def read( + bigtableOptions: BigtableOptions, + tableId: String, + maxBufferElementCount: Option[Int], + keyRanges: Seq[ByteKeyRange], + rowFilter: RowFilter + ): beam.BigtableIO.Read = { + val opts = bigtableOptions // defeat closure + beam.BigtableIO + .read() + .withProjectId(bigtableOptions.getProjectId) + .withInstanceId(bigtableOptions.getInstanceId) + .withTableId(tableId) + .withBigtableOptionsConfigurator(Functions.serializableFn(_ => opts.toBuilder)) + .withMaxBufferElementCount(maxBufferElementCount.map(Int.box).orNull) + .pipe(r => if (keyRanges.isEmpty) r else r.withKeyRanges(keyRanges.asJava)) + .pipe(r => Option(rowFilter).fold(r)(r.withRowFilter)): @nowarn("cat=deprecation") + } +} + +final case class BigtableTypedIO[K: Coder, T: BigtableType: Coder]( + bigtableOptions: BigtableOptions, + tableId: String +) extends BigtableIO[(K, T)] { + override type ReadP = BigtableTypedIO.ReadParam[K] + override type WriteP = BigtableTypedIO.WriteParam[K] + + override def testId: String = + s"BigtableIO(${bigtableOptions.getProjectId}\t${bigtableOptions.getInstanceId}\t$tableId)" + + override protected def read( + sc: ScioContext, + params: ReadP + ): SCollection[(K, T)] = { + val coder = CoderMaterializer.beam(sc, Coder.protoMessageCoder[Row]) + val read = BigtableRead.read( + bigtableOptions, + tableId, + params.maxBufferElementCount, + params.keyRanges, + params.rowFilter + ) + + val bigtableType: BigtableType[T] = implicitly + val cf = params.columnFamily + val keyFn = params.keyFn + sc.transform( + _.applyTransform(read) + .setCoder(coder) + .map(row => keyFn(row.getKey) -> bigtableType(row, cf)) + ) } + + override protected def write( + data: SCollection[(K, T)], + params: WriteP + ): Tap[Nothing] = { + val bigtableType: BigtableType[T] = implicitly + val btParams = params.numOfShards match { + case None => BigtableWrite.Default + case Some(numShards) => + BigtableWrite.Bulk( + numShards, + Option(params.flushInterval).getOrElse(BigtableWrite.Bulk.DefaultFlushInterval) + ) + } + val cf = params.columnFamily + val ts = params.timestamp + val keyFn = params.keyFn + data.transform_("Bigtable write") { coll => + coll + .map { case (key, t) => + val mutations = Iterable(bigtableType.apply(t, cf, ts)).asJava + .asInstanceOf[java.lang.Iterable[Mutation]] + KV.of(keyFn(key), mutations) + } + .applyInternal(BigtableWrite.sink(tableId, bigtableOptions, btParams)) + } + EmptyTap + } + + override def tap(params: ReadP): Tap[Nothing] = + throw new NotImplementedError("Bigtable tap not implemented") +} + +object BigtableTypedIO { + object ReadParam { + val DefaultKeyRanges: Seq[ByteKeyRange] = Seq.empty[ByteKeyRange] + val DefaultRowFilter: RowFilter = null + val DefaultMaxBufferElementCount: Option[Int] = None + } + + final case class ReadParam[K] private ( + columnFamily: String, + keyFn: ByteString => K, + keyRanges: Seq[ByteKeyRange] = ReadParam.DefaultKeyRanges, + rowFilter: RowFilter = ReadParam.DefaultRowFilter, + maxBufferElementCount: Option[Int] = ReadParam.DefaultMaxBufferElementCount + ) + + object WriteParam { + val DefaultTimestamp: Long = 0L + val DefaultNumOfShards: Option[Int] = None + val DefaultFlushInterval: Duration = null + } + + final case class WriteParam[K] private ( + columnFamily: String, + keyFn: K => ByteString, + timestamp: Long = WriteParam.DefaultTimestamp, + numOfShards: Option[Int] = WriteParam.DefaultNumOfShards, + flushInterval: Duration = WriteParam.DefaultFlushInterval + ) } final case class BigtableWrite[T <: Mutation](bigtableOptions: BigtableOptions, tableId: String) @@ -127,27 +229,12 @@ final case class BigtableWrite[T <: Mutation](bigtableOptions: BigtableOptions, data: SCollection[(ByteString, Iterable[T])], params: WriteP ): Tap[Nothing] = { - val sink = - params match { - case BigtableWrite.Default => - val opts = bigtableOptions // defeat closure - beam.BigtableIO - .write() - .withProjectId(bigtableOptions.getProjectId) - .withInstanceId(bigtableOptions.getInstanceId) - .withTableId(tableId) - .withBigtableOptionsConfigurator( - Functions.serializableFn(_ => opts.toBuilder) - ): @nowarn("cat=deprecation") - case BigtableWrite.Bulk(numOfShards, flushInterval) => - new BigtableBulkWriter(tableId, bigtableOptions, numOfShards, flushInterval) - } data.transform_("Bigtable write") { coll => coll .map { case (key, value) => KV.of(key, value.asJava.asInstanceOf[java.lang.Iterable[Mutation]]) } - .applyInternal(sink) + .applyInternal(BigtableWrite.sink(tableId, bigtableOptions, params)) } EmptyTap } @@ -181,4 +268,21 @@ object BigtableWrite { .build BigtableWrite[T](bigtableOptions, tableId) } + + private[scio] def sink(tableId: String, bigtableOptions: BigtableOptions, params: WriteParam) = { + params match { + case BigtableWrite.Default => + val opts = bigtableOptions // defeat closure + beam.BigtableIO + .write() + .withProjectId(bigtableOptions.getProjectId) + .withInstanceId(bigtableOptions.getInstanceId) + .withTableId(tableId) + .withBigtableOptionsConfigurator( + Functions.serializableFn(_ => opts.toBuilder) + ): @nowarn("cat=deprecation") + case BigtableWrite.Bulk(numOfShards, flushInterval) => + new BigtableBulkWriter(tableId, bigtableOptions, numOfShards, flushInterval) + } + } } diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/syntax/SCollectionSyntax.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/syntax/SCollectionSyntax.scala index f918df27bb..c5d395ab67 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/syntax/SCollectionSyntax.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/syntax/SCollectionSyntax.scala @@ -23,8 +23,9 @@ import com.google.protobuf.ByteString import com.spotify.scio.io.ClosedTap import com.spotify.scio.values.SCollection import org.joda.time.Duration - -import com.spotify.scio.bigtable.BigtableWrite +import com.spotify.scio.bigtable.{BigtableTypedIO, BigtableWrite} +import com.spotify.scio.coders.Coder +import magnolify.bigtable.BigtableType /** * Enhanced version of [[com.spotify.scio.values.SCollection SCollection]] with Bigtable methods. @@ -56,8 +57,57 @@ final class SCollectionMutationOps[T <: Mutation]( ) } +final class BigtableTypedOps[K: Coder, T: BigtableType: Coder]( + private val self: SCollection[(K, T)] +) { + private def btOpts(projectId: String, instanceId: String): BigtableOptions = + BigtableOptions.builder().setProjectId(projectId).setInstanceId(instanceId).build + + def saveAsBigtable( + projectId: String, + instanceId: String, + tableId: String, + columnFamily: String, + keyFn: K => ByteString + ): ClosedTap[Nothing] = { + val params = BigtableTypedIO.WriteParam[K](columnFamily, keyFn) + self.write(BigtableTypedIO[K, T](btOpts(projectId, instanceId), tableId))(params) + } + + def saveAsBigtable( + projectId: String, + instanceId: String, + tableId: String, + columnFamily: String, + keyFn: K => ByteString, + timestamp: Long + ): ClosedTap[Nothing] = { + val params = BigtableTypedIO.WriteParam[K](columnFamily, keyFn, timestamp) + self.write(BigtableTypedIO[K, T](btOpts(projectId, instanceId), tableId))(params) + } + + def saveAsBigtable( + bigtableOptions: BigtableOptions, + tableId: String, + columnFamily: String, + keyFn: K => ByteString, + timestamp: Long = BigtableTypedIO.WriteParam.DefaultTimestamp, + numOfShards: Int, + flushInterval: Duration = BigtableTypedIO.WriteParam.DefaultFlushInterval + ): ClosedTap[Nothing] = { + val params = + BigtableTypedIO + .WriteParam[K](columnFamily, keyFn, timestamp, Some(numOfShards), flushInterval) + self.write(BigtableTypedIO[K, T](bigtableOptions, tableId))(params) + } +} + trait SCollectionSyntax { implicit def bigtableMutationOps[T <: Mutation]( sc: SCollection[(ByteString, Iterable[T])] ): SCollectionMutationOps[T] = new SCollectionMutationOps[T](sc) + + implicit def bigtableTypedOps[K: Coder, T: BigtableType: Coder]( + sc: SCollection[(K, T)] + ): BigtableTypedOps[K, T] = new BigtableTypedOps[K, T](sc) } diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/syntax/ScioContextSyntax.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/syntax/ScioContextSyntax.scala index cd21ee3f90..c6517bfec2 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/syntax/ScioContextSyntax.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/syntax/ScioContextSyntax.scala @@ -20,11 +20,12 @@ package com.spotify.scio.bigtable.syntax import com.google.bigtable.admin.v2.GcRule import com.google.bigtable.v2._ import com.google.cloud.bigtable.config.BigtableOptions +import com.google.protobuf.ByteString import com.spotify.scio.ScioContext -import com.spotify.scio.bigtable.BigtableRead -import com.spotify.scio.bigtable.BigtableUtil -import com.spotify.scio.bigtable.TableAdmin +import com.spotify.scio.bigtable.{BigtableRead, BigtableTypedIO, BigtableUtil, TableAdmin} +import com.spotify.scio.coders.Coder import com.spotify.scio.values.SCollection +import magnolify.bigtable.BigtableType import org.apache.beam.sdk.io.range.ByteKeyRange import org.joda.time.Duration @@ -39,6 +40,73 @@ object ScioContextOps { final class ScioContextOps(private val self: ScioContext) extends AnyVal { import ScioContextOps._ + private def btOpts(projectId: String, instanceId: String): BigtableOptions = + BigtableOptions.builder().setProjectId(projectId).setInstanceId(instanceId).build + + def typedBigtable[K: Coder, T: BigtableType: Coder]( + projectId: String, + instanceId: String, + tableId: String, + columnFamily: String, + keyFn: ByteString => K + ): SCollection[(K, T)] = + typedBigtable(btOpts(projectId, instanceId), tableId, columnFamily, keyFn) + + def typedBigtable[K: Coder, T: BigtableType: Coder]( + projectId: String, + instanceId: String, + tableId: String, + columnFamily: String, + keyFn: ByteString => K, + keyRanges: Seq[ByteKeyRange] + ): SCollection[(K, T)] = + typedBigtable(btOpts(projectId, instanceId), tableId, columnFamily, keyFn, keyRanges) + + def typedBigtable[K: Coder, T: BigtableType: Coder]( + projectId: String, + instanceId: String, + tableId: String, + columnFamily: String, + keyFn: ByteString => K, + keyRanges: Seq[ByteKeyRange], + rowFilter: RowFilter + ): SCollection[(K, T)] = + typedBigtable(btOpts(projectId, instanceId), tableId, columnFamily, keyFn, keyRanges, rowFilter) + + def typedBigtable[K: Coder, T: BigtableType: Coder]( + projectId: String, + instanceId: String, + tableId: String, + columnFamily: String, + keyFn: ByteString => K, + keyRanges: Seq[ByteKeyRange], + rowFilter: RowFilter, + maxBufferElementCount: Option[Int] + ): SCollection[(K, T)] = + typedBigtable( + btOpts(projectId, instanceId), + tableId, + columnFamily, + keyFn, + keyRanges, + rowFilter, + maxBufferElementCount + ) + + def typedBigtable[K: Coder, T: BigtableType: Coder]( + bigtableOptions: BigtableOptions, + tableId: String, + columnFamily: String, + keyFn: ByteString => K, + keyRanges: Seq[ByteKeyRange] = BigtableRead.ReadParam.DefaultKeyRanges, + rowFilter: RowFilter = BigtableRead.ReadParam.DefaultRowFilter, + maxBufferElementCount: Option[Int] = BigtableRead.ReadParam.DefaultMaxBufferElementCount + ): SCollection[(K, T)] = { + val params = + BigtableTypedIO.ReadParam(columnFamily, keyFn, keyRanges, rowFilter, maxBufferElementCount) + self.read(BigtableTypedIO[K, T](bigtableOptions, tableId))(params) + } + /** Get an SCollection for a Bigtable table. */ def bigtable( projectId: String, @@ -47,7 +115,7 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { keyRange: ByteKeyRange, rowFilter: RowFilter ): SCollection[Row] = - bigtable(projectId, instanceId, tableId, Seq(keyRange), rowFilter) + bigtable(btOpts(projectId, instanceId), tableId, Seq(keyRange), rowFilter) /** Get an SCollection for a Bigtable table. */ def bigtable( @@ -58,20 +126,32 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { rowFilter: RowFilter, maxBufferElementCount: Option[Int] ): SCollection[Row] = - bigtable(projectId, instanceId, tableId, Seq(keyRange), rowFilter, maxBufferElementCount) + bigtable( + btOpts(projectId, instanceId), + tableId, + Seq(keyRange), + rowFilter, + maxBufferElementCount + ) + + /** Get an SCollection for a Bigtable table. */ + def bigtable( + projectId: String, + instanceId: String, + tableId: String + ): SCollection[Row] = + bigtable(btOpts(projectId, instanceId), tableId) /** Get an SCollection for a Bigtable table. */ def bigtable( projectId: String, instanceId: String, tableId: String, - keyRanges: Seq[ByteKeyRange] = BigtableRead.ReadParam.DefaultKeyRanges, - rowFilter: RowFilter = BigtableRead.ReadParam.DefaultRowFilter, - maxBufferElementCount: Option[Int] = BigtableRead.ReadParam.DefaultMaxBufferElementCount - ): SCollection[Row] = { - val parameters = BigtableRead.ReadParam(keyRanges, rowFilter, maxBufferElementCount) - self.read(BigtableRead(projectId, instanceId, tableId))(parameters) - } + keyRanges: Seq[ByteKeyRange], + rowFilter: RowFilter, + maxBufferElementCount: Option[Int] + ): SCollection[Row] = + bigtable(btOpts(projectId, instanceId), tableId, keyRanges, rowFilter, maxBufferElementCount) /** Get an SCollection for a Bigtable table. */ def bigtable( @@ -96,20 +176,9 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { def bigtable( bigtableOptions: BigtableOptions, tableId: String, - keyRanges: Seq[ByteKeyRange], - rowFilter: RowFilter - ): SCollection[Row] = { - val parameters = BigtableRead.ReadParam(keyRanges, rowFilter) - self.read(BigtableRead(bigtableOptions, tableId))(parameters) - } - - /** Get an SCollection for a Bigtable table. */ - def bigtable( - bigtableOptions: BigtableOptions, - tableId: String, - keyRanges: Seq[ByteKeyRange], - rowFilter: RowFilter, - maxBufferElementCount: Option[Int] + keyRanges: Seq[ByteKeyRange] = BigtableRead.ReadParam.DefaultKeyRanges, + rowFilter: RowFilter = BigtableRead.ReadParam.DefaultRowFilter, + maxBufferElementCount: Option[Int] = BigtableRead.ReadParam.DefaultMaxBufferElementCount ): SCollection[Row] = { val parameters = BigtableRead.ReadParam(keyRanges, rowFilter, maxBufferElementCount) self.read(BigtableRead(bigtableOptions, tableId))(parameters) diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/datastore/DatastoreIO.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/datastore/DatastoreIO.scala index 2977d464d8..a37b7da1d6 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/datastore/DatastoreIO.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/datastore/DatastoreIO.scala @@ -19,30 +19,69 @@ package com.spotify.scio.datastore import com.spotify.scio.ScioContext import com.spotify.scio.values.SCollection -import com.spotify.scio.io.{EmptyTap, EmptyTapOf, ScioIO, Tap, TapT} +import com.spotify.scio.io.{EmptyTap, EmptyTapOf, ScioIO, Tap, TapT, TestIO} import com.google.datastore.v1.{Entity, Query} import com.spotify.scio.coders.{Coder, CoderMaterializer} +import com.spotify.scio.datastore.DatastoreTypedIO.{ReadParam, WriteParam} +import magnolify.datastore.EntityType import org.apache.beam.sdk.io.gcp.datastore.{DatastoreIO => BDatastoreIO, DatastoreV1 => BDatastore} -final case class DatastoreIO(projectId: String) extends ScioIO[Entity] { - override type ReadP = DatastoreIO.ReadParam - override type WriteP = DatastoreIO.WriteParam +sealed trait DatastoreIO[T] extends ScioIO[T] { + final override val tapT: TapT.Aux[T, Nothing] = EmptyTapOf[T] +} - override val tapT: TapT.Aux[Entity, Nothing] = EmptyTapOf[Entity] +object DatastoreIO { + final def apply[T](projectId: String): DatastoreIO[T] = + new DatastoreIO[T] with TestIO[T] { + override def testId: String = s"DatastoreIO($projectId)" + } +} - override protected def read(sc: ScioContext, params: ReadP): SCollection[Entity] = { - val coder = CoderMaterializer.beam(sc, Coder.protoMessageCoder[Entity]) - val read = BDatastoreIO - .v1() - .read() - .withProjectId(projectId) - .withNamespace(params.namespace) - .withQuery(params.query) - sc.applyTransform( - Option(params.configOverride).map(_(read)).getOrElse(read) - ).setCoder(coder) +final case class DatastoreTypedIO[T: EntityType: Coder](projectId: String) extends DatastoreIO[T] { + override type ReadP = DatastoreTypedIO.ReadParam + override type WriteP = DatastoreTypedIO.WriteParam + override def testId: String = s"DatastoreIO($projectId)" + + override protected def read(sc: ScioContext, params: ReadParam): SCollection[T] = { + val entityType: EntityType[T] = implicitly + sc.transform { ctx => + DatastoreEntityIO + .read(ctx, projectId, params.namespace, params.query, params.configOverride) + .map(e => entityType(e)) + } + } + + override protected def write(data: SCollection[T], params: WriteParam): Tap[Nothing] = { + val entityType: EntityType[T] = implicitly + val write = BDatastoreIO.v1.write.withProjectId(projectId) + data.transform_ { scoll => + scoll + .map(t => entityType(t)) + .applyInternal( + Option(params.configOverride).map(_(write)).getOrElse(write) + ) + } + EmptyTap } + override def tap(read: ReadParam): Tap[Nothing] = EmptyTap +} + +object DatastoreTypedIO { + type ReadParam = DatastoreEntityIO.ReadParam + val ReadParam = DatastoreEntityIO.ReadParam + type WriteParam = DatastoreEntityIO.WriteParam + val WriteParam = DatastoreEntityIO.WriteParam +} + +final case class DatastoreEntityIO(projectId: String) extends DatastoreIO[Entity] { + override type ReadP = DatastoreEntityIO.ReadParam + override type WriteP = DatastoreEntityIO.WriteParam + override def testId: String = s"DatastoreIO($projectId)" + + override protected def read(sc: ScioContext, params: ReadP): SCollection[Entity] = + DatastoreEntityIO.read(sc, projectId, params.namespace, params.query, params.configOverride) + override protected def write(data: SCollection[Entity], params: WriteP): Tap[Nothing] = { val write = BDatastoreIO.v1.write.withProjectId(projectId) data.applyInternal( @@ -51,10 +90,10 @@ final case class DatastoreIO(projectId: String) extends ScioIO[Entity] { EmptyTap } - override def tap(read: DatastoreIO.ReadParam): Tap[Nothing] = EmptyTap + override def tap(read: DatastoreEntityIO.ReadParam): Tap[Nothing] = EmptyTap } -object DatastoreIO { +object DatastoreEntityIO { object ReadParam { val DefaultNamespace: String = null @@ -74,4 +113,23 @@ object DatastoreIO { final case class WriteParam private ( configOverride: BDatastore.Write => BDatastore.Write = WriteParam.DefaultConfigOverride ) + + private[scio] def read( + sc: ScioContext, + projectId: String, + namespace: String, + query: Query, + configOverride: BDatastore.Read => BDatastore.Read + ): SCollection[Entity] = { + val coder = CoderMaterializer.beam(sc, Coder.protoMessageCoder[Entity]) + val read = BDatastoreIO + .v1() + .read() + .withProjectId(projectId) + .withNamespace(namespace) + .withQuery(query) + sc.applyTransform( + Option(configOverride).map(_(read)).getOrElse(read) + ).setCoder(coder) + } } diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/datastore/syntax/SCollectionSyntax.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/datastore/syntax/SCollectionSyntax.scala index 8ed685b3bf..bfc420cba9 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/datastore/syntax/SCollectionSyntax.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/datastore/syntax/SCollectionSyntax.scala @@ -19,10 +19,11 @@ import com.google.datastore.v1.Entity package com.spotify.scio.datastore.syntax import com.spotify.scio.values.SCollection -import com.spotify.scio.datastore.DatastoreIO +import com.spotify.scio.datastore.{DatastoreEntityIO, DatastoreTypedIO} import com.spotify.scio.io.ClosedTap import com.google.datastore.v1.Entity -import com.spotify.scio.datastore.DatastoreIO.WriteParam +import com.spotify.scio.coders.Coder +import magnolify.datastore.EntityType import org.apache.beam.sdk.io.gcp.datastore.{DatastoreV1 => BDatastore} final class SCollectionEntityOps[T <: Entity](private val coll: SCollection[T]) extends AnyVal { @@ -33,13 +34,29 @@ final class SCollectionEntityOps[T <: Entity](private val coll: SCollection[T]) */ def saveAsDatastore( projectId: String, - configOverride: BDatastore.Write => BDatastore.Write = WriteParam.DefaultConfigOverride + configOverride: BDatastore.Write => BDatastore.Write = + DatastoreEntityIO.WriteParam.DefaultConfigOverride ): ClosedTap[Nothing] = - coll.covary_[Entity].write(DatastoreIO(projectId))(WriteParam(configOverride)) + coll + .covary_[Entity] + .write(DatastoreEntityIO(projectId))(DatastoreEntityIO.WriteParam(configOverride)) +} + +final class TypedEntitySCollectionOps[T: EntityType: Coder](private val coll: SCollection[T]) { + def saveAsDatastore( + projectId: String, + configOverride: BDatastore.Write => BDatastore.Write = + DatastoreTypedIO.WriteParam.DefaultConfigOverride + ): ClosedTap[Nothing] = + coll.write(DatastoreTypedIO(projectId))(DatastoreTypedIO.WriteParam(configOverride)) } trait SCollectionSyntax { implicit def datastoreEntitySCollectionOps[T <: Entity]( coll: SCollection[T] ): SCollectionEntityOps[T] = new SCollectionEntityOps(coll) + + implicit def typedDatastoreEntitySCollectionOps[T: EntityType: Coder]( + coll: SCollection[T] + ): TypedEntitySCollectionOps[T] = new TypedEntitySCollectionOps(coll) } diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/datastore/syntax/ScioContextSyntax.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/datastore/syntax/ScioContextSyntax.scala index bb50a920a1..83c82c7165 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/datastore/syntax/ScioContextSyntax.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/datastore/syntax/ScioContextSyntax.scala @@ -19,9 +19,10 @@ package com.spotify.scio.datastore.syntax import com.spotify.scio.ScioContext import com.spotify.scio.values.SCollection -import com.spotify.scio.datastore.DatastoreIO +import com.spotify.scio.datastore.{DatastoreEntityIO, DatastoreTypedIO} import com.google.datastore.v1.{Entity, Query} -import com.spotify.scio.datastore.DatastoreIO.ReadParam +import com.spotify.scio.coders.Coder +import magnolify.datastore.EntityType import org.apache.beam.sdk.io.gcp.datastore.{DatastoreV1 => BDatastore} final class ScioContextOps(private val sc: ScioContext) extends AnyVal { @@ -33,10 +34,24 @@ final class ScioContextOps(private val sc: ScioContext) extends AnyVal { def datastore( projectId: String, query: Query, - namespace: String = ReadParam.DefaultNamespace, - configOverride: BDatastore.Read => BDatastore.Read = ReadParam.DefaultConfigOverride + namespace: String = DatastoreEntityIO.ReadParam.DefaultNamespace, + configOverride: BDatastore.Read => BDatastore.Read = + DatastoreEntityIO.ReadParam.DefaultConfigOverride ): SCollection[Entity] = - sc.read(DatastoreIO(projectId))(ReadParam(query, namespace, configOverride)) + sc.read(DatastoreEntityIO(projectId))( + DatastoreEntityIO.ReadParam(query, namespace, configOverride) + ) + + def typedDatastore[T: EntityType: Coder]( + projectId: String, + query: Query, + namespace: String = DatastoreTypedIO.ReadParam.DefaultNamespace, + configOverride: BDatastore.Read => BDatastore.Read = + DatastoreTypedIO.ReadParam.DefaultConfigOverride + ): SCollection[T] = + sc.read(DatastoreTypedIO(projectId))( + DatastoreTypedIO.ReadParam(query, namespace, configOverride) + ) } trait ScioContextSyntax { diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/BigQueryIOTest.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/BigQueryIOTest.scala index 45fbe21966..30a1575509 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/BigQueryIOTest.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/BigQueryIOTest.scala @@ -38,6 +38,8 @@ object BigQueryIOTest { @BigQueryType.toTable case class BQRecord(i: Int, s: String, r: List[String]) + case class MagnolifyRecord(i: Int, s: String) + // BQ Write transform display id data for tableDescription private val TableDescriptionId = DisplayData.Identifier.of( DisplayData.Path.root(), @@ -191,6 +193,58 @@ final class BigQueryIOTest extends ScioIOSpec { testJobTest(xs)(TableRowJsonIO(_))(_.tableRowJsonFile(_))(_.saveAsTableRowJsonFile(_)) } + "MagnolifyBigQuerySelect" should "work" in { + // unsafe implicits must be explicitly imported for TableRowType[MagnolifyRecord] to be derived + import magnolify.bigquery.unsafe._ + val xs = (1 to 100).map(x => MagnolifyRecord(x, x.toString)) + testJobTest(xs, in = "select * from x", out = "project:dataset.out_table") { + BigQueryIO(_) + } { (coll, s) => + coll.typedBigQuerySelect[MagnolifyRecord](Query(s)) + } { (coll, s) => + coll.saveAsBigQueryTable(Table.Spec(s)) + } + } + + "MagnolifyBigQueryTable" should "work" in { + // unsafe implicits must be explicitly imported for TableRowType[MagnolifyRecord] to be derived + import magnolify.bigquery.unsafe._ + val xs = (1 to 100).map(x => MagnolifyRecord(x, x.toString)) + testJobTest(xs, in = "project:dataset.in_table", out = "project:dataset.out_table") { + BigQueryIO(_) + } { (coll, s) => + coll.typedBigQueryTable[MagnolifyRecord](Table.Spec(s)) + } { (coll, s) => + coll.saveAsBigQueryTable(Table.Spec(s)) + } + } + + "MagnolifyBigQueryStorage" should "work with Table" in { + // unsafe implicits must be explicitly imported for TableRowType[MagnolifyRecord] to be derived + import magnolify.bigquery.unsafe._ + val xs = (1 to 100).map(x => MagnolifyRecord(x, x.toString)) + testJobTest(xs, in = "project:dataset.in_table", out = "project:dataset.out_table")( + BigQueryIO(_, List(), None), + Some(BigQueryIO(_)) + ) { (coll, s) => + coll.typedBigQueryStorageMagnolify[MagnolifyRecord](Table.Spec(s)) + } { (coll, s) => + coll.saveAsBigQueryTable(Table.Spec(s)) + } + } + + it should "work with Query" in { + // unsafe implicits must be explicitly imported for TableRowType[MagnolifyRecord] to be derived + import magnolify.bigquery.unsafe._ + val xs = (1 to 100).map(x => MagnolifyRecord(x, x.toString)) + testJobTest(xs, in = "select x, y from z", out = "project:dataset.out_table") { + BigQueryIO(_) + } { (coll, s) => + coll.typedBigQueryStorageMagnolify[MagnolifyRecord](Query(s)) + } { (coll, s) => + coll.saveAsBigQueryTable(Table.Spec(s)) + } + } } object JobWithDuplicateInput { diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigtable/BigtableIOTest.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigtable/BigtableIOTest.scala index eb47617917..95564b34eb 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigtable/BigtableIOTest.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigtable/BigtableIOTest.scala @@ -22,9 +22,13 @@ import com.google.bigtable.v2.{Mutation, Row} import com.google.protobuf.ByteString import com.spotify.scio.testing._ +// must be defined outside the test class or test job will hang +case class Foo(i: Int, s: String) + class BigtableIOTest extends ScioIOSpec { val projectId = "project" val instanceId = "instance" + val columnFamily = "columnFamily" "BigtableIO" should "work with input" in { val xs = (1 to 100).map { x => @@ -46,4 +50,24 @@ class BigtableIOTest extends ScioIOSpec { _.saveAsBigtable(projectId, instanceId, _) ) } + + it should "work with typed input" in { + val xs = (1 to 100).map(x => x.toString -> Foo(x, x.toString)) + testJobTestInput(xs)(BigtableIO[(String, Foo)](projectId, instanceId, _))( + _.typedBigtable[String, Foo]( + projectId, + instanceId, + _, + columnFamily, + (bs: ByteString) => bs.toStringUtf8 + ) + ) + } + + it should "work with typed output" in { + val xs = (1 to 100).map(x => (x.toString, Foo(x, x.toString))) + testJobTestOutput(xs)(BigtableIO(projectId, instanceId, _))( + _.saveAsBigtable(projectId, instanceId, _, columnFamily, ByteString.copyFromUtf8 _) + ) + } } diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/datastore/DatastoreIOTest.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/datastore/DatastoreIOTest.scala index 9461a9c793..785035d2ec 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/datastore/DatastoreIOTest.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/datastore/DatastoreIOTest.scala @@ -34,7 +34,20 @@ object DatastoreJob { } } +object TypedDatastoreJob { + case class MyEntity(int: Long) + + def main(cmdlineArgs: Array[String]): Unit = { + val (sc, args) = ContextAndArgs(cmdlineArgs) + sc.datastore(args("input"), null, null) + .saveAsDatastore(args("output")) + sc.run() + () + } +} + class DatastoreIOTest extends PipelineSpec with ScioIOSpec { + import TypedDatastoreJob.MyEntity "DatastoreIO" should "work" in { val xs = (1L to 100L).map { x => @@ -57,7 +70,7 @@ class DatastoreIOTest extends PipelineSpec with ScioIOSpec { JobTest[DatastoreJob.type] .args("--input=store.in", "--output=store.out") .input(DatastoreIO("store.in"), (1L to 3L).map(newEntity)) - .output(DatastoreIO("store.out"))(coll => coll should containInAnyOrder(xs)) + .output(DatastoreIO[Entity]("store.out"))(coll => coll should containInAnyOrder(xs)) .run() it should "pass correct DatastoreJob" in { @@ -73,4 +86,31 @@ class DatastoreIOTest extends PipelineSpec with ScioIOSpec { } } + it should "work with typed data" in { + val xs = (1L to 100L).map(x => MyEntity(x)) + testJobTest(xs)(DatastoreIO(_))(_.typedDatastore[MyEntity](_, null))(_.saveAsDatastore(_)) + } + + def testTypedDatastore(xs: Seq[MyEntity]): Unit = { + val in = (1L to 3L).map(MyEntity) + JobTest[TypedDatastoreJob.type] + .args("--input=store.in", "--output=store.out") + .input(DatastoreIO[MyEntity]("store.in"), in) + .output(DatastoreIO[MyEntity]("store.out"))(coll => coll should containInAnyOrder(xs)) + .run() + } + + it should "pass correct TypedDatastoreJob" in { + testTypedDatastore((1L to 3L).map(MyEntity)) + } + + it should "fail incorrect TypedDatastoreJob" in { + an[AssertionError] should be thrownBy { + testTypedDatastore((1L to 2L).map(MyEntity)) + } + an[AssertionError] should be thrownBy { + testTypedDatastore((1L to 4L).map(MyEntity)) + } + } + } diff --git a/scio-tensorflow/src/main/scala/com/spotify/scio/tensorflow/TFRecordIO.scala b/scio-tensorflow/src/main/scala/com/spotify/scio/tensorflow/TFRecordIO.scala index 7d157c756e..da1e313052 100644 --- a/scio-tensorflow/src/main/scala/com/spotify/scio/tensorflow/TFRecordIO.scala +++ b/scio-tensorflow/src/main/scala/com/spotify/scio/tensorflow/TFRecordIO.scala @@ -31,7 +31,9 @@ import org.apache.beam.sdk.io.{ import org.apache.beam.sdk.{io => beam} import org.tensorflow.proto.example.{Example, SequenceExample} import com.spotify.scio.io.TapT +import com.spotify.scio.tensorflow.TFExampleIO.ReadParam import com.spotify.scio.util.FilenamePolicySupplier +import magnolify.tensorflow.ExampleType import org.apache.beam.sdk.io.fs.ResourceId import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider import org.apache.beam.sdk.transforms.SerializableFunctions @@ -94,6 +96,59 @@ object TFRecordIO { ) } +object TFExampleTypedIO { + type ReadParam = TFRecordIO.ReadParam + val ReadParam = TFRecordIO.ReadParam + type WriteParam = TFRecordIO.WriteParam + val WriteParam = TFRecordIO.WriteParam +} + +final case class TFExampleTypedIO[T: ExampleType: Coder](path: String) extends ScioIO[T] { + override type WriteP = TFExampleTypedIO.WriteParam + override type ReadP = TFExampleTypedIO.ReadParam + override val tapT: TapT.Aux[T, T] = TapOf[T] + + override def testId: String = s"TFExampleTypedIO($path)" + + override protected def write(data: SCollection[T], params: WriteP): Tap[T] = { + val exampleType: ExampleType[T] = implicitly + data.transform_ { scoll => + scoll + .map(t => exampleType(t).toByteArray) + .applyInternal( + TFRecordMethods.tfWrite( + path, + params.suffix, + params.numShards, + params.compression, + params.filenamePolicySupplier, + params.prefix, + params.shardNameTemplate, + ScioUtil.isWindowed(data), + ScioUtil.tempDirOrDefault(params.tempDirectory, data.context) + ) + ) + } + tap(TFExampleIO.ReadParam(params)) + } + + override def tap(params: ReadP): Tap[T] = { + val exampleType: ExampleType[T] = implicitly + TFRecordMethods + .tap(path, params) + .map(bytes => exampleType(Example.parseFrom(bytes))) + } + + override protected def read(sc: ScioContext, params: ReadParam): SCollection[T] = { + val exampleType: ExampleType[T] = implicitly + sc.transform { ctx => + TFRecordMethods + .read(ctx, path, params) + .map(bytes => exampleType(Example.parseFrom(bytes))) + } + } +} + final case class TFExampleIO(path: String) extends ScioIO[Example] { override type ReadP = TFExampleIO.ReadParam override type WriteP = TFExampleIO.WriteParam @@ -162,7 +217,7 @@ private object TFRecordMethods { ) } - private def tfWrite( + private[scio] def tfWrite( path: String, suffix: String, numShards: Int, diff --git a/scio-tensorflow/src/main/scala/com/spotify/scio/tensorflow/syntax/SCollectionSyntax.scala b/scio-tensorflow/src/main/scala/com/spotify/scio/tensorflow/syntax/SCollectionSyntax.scala index 91d4bcdcfc..9d5a565db8 100644 --- a/scio-tensorflow/src/main/scala/com/spotify/scio/tensorflow/syntax/SCollectionSyntax.scala +++ b/scio-tensorflow/src/main/scala/com/spotify/scio/tensorflow/syntax/SCollectionSyntax.scala @@ -25,12 +25,14 @@ import com.spotify.scio.io.ClosedTap import com.spotify.scio.tensorflow.{ SavedBundlePredictDoFn, TFExampleIO, + TFExampleTypedIO, TFRecordIO, TFSequenceExampleIO } import com.spotify.scio.util.FilenamePolicySupplier import com.spotify.scio.values.SCollection import com.spotify.zoltar.tf.TensorFlowModel +import magnolify.tensorflow.ExampleType /** * Enhanced version of [[com.spotify.scio.values.SCollection SCollection]] with TensorFlow methods. @@ -143,6 +145,38 @@ object PredictSCollectionOps { val DefaultFetchOps: Option[Seq[String]] = None } +final class TypedExampleSCollectionOps[T](private val self: SCollection[T]) { + + /** + * Converts this collection of `T` into Tensorflow [[org.tensorflow.proto.example.Example]]s with + * the provided [[magnolify.tensorflow.ExampleType]], then saves these as a TensorFlow TFRecord + * file. + */ + def saveAsTfRecordFile( + path: String, + suffix: String = TFExampleIO.WriteParam.DefaultSuffix, + compression: Compression = TFExampleIO.WriteParam.DefaultCompression, + numShards: Int = TFExampleIO.WriteParam.DefaultNumShards, + shardNameTemplate: String = TFExampleIO.WriteParam.DefaultShardNameTemplate, + tempDirectory: String = TFExampleIO.WriteParam.DefaultTempDirectory, + filenamePolicySupplier: FilenamePolicySupplier = + TFExampleIO.WriteParam.DefaultFilenamePolicySupplier, + prefix: String = TFExampleIO.WriteParam.DefaultPrefix + )(implicit exampleType: ExampleType[T]): ClosedTap[T] = { + implicit val tCoder: Coder[T] = self.coder + val param = TFExampleTypedIO.WriteParam( + suffix, + compression, + numShards, + filenamePolicySupplier, + prefix, + shardNameTemplate, + tempDirectory + ) + self.write(TFExampleTypedIO(path))(param) + } +} + final class ExampleSCollectionOps[T <: Example](private val self: SCollection[T]) extends AnyVal { /** @@ -320,4 +354,8 @@ trait SCollectionSyntax { implicit def tensorFlowSequenceExampleSCollectionOps[T <: SequenceExample]( s: SCollection[T] ): SequenceExampleSCollectionOps[T] = new SequenceExampleSCollectionOps(s) + + implicit def tensorFlowTypedExampleSCollectionOps[T]( + s: SCollection[T] + ): TypedExampleSCollectionOps[T] = new TypedExampleSCollectionOps(s) } diff --git a/scio-tensorflow/src/main/scala/com/spotify/scio/tensorflow/syntax/ScioContextSyntax.scala b/scio-tensorflow/src/main/scala/com/spotify/scio/tensorflow/syntax/ScioContextSyntax.scala index 7ecedb47ce..b997440cd8 100644 --- a/scio-tensorflow/src/main/scala/com/spotify/scio/tensorflow/syntax/ScioContextSyntax.scala +++ b/scio-tensorflow/src/main/scala/com/spotify/scio/tensorflow/syntax/ScioContextSyntax.scala @@ -18,16 +18,30 @@ package com.spotify.scio.tensorflow.syntax import java.nio.file.Files - import com.spotify.scio.ScioContext -import com.spotify.scio.tensorflow.{TFExampleIO, TFRecordIO, TFSequenceExampleIO} +import com.spotify.scio.coders.Coder +import com.spotify.scio.tensorflow.{TFExampleIO, TFExampleTypedIO, TFRecordIO, TFSequenceExampleIO} import com.spotify.scio.values.{DistCache, SCollection} +import magnolify.tensorflow.ExampleType import org.apache.beam.sdk.io.Compression import org.tensorflow.proto.example.{Example, SequenceExample} import org.tensorflow.metadata.v0._ final class ScioContextOps(private val self: ScioContext) extends AnyVal { + /** + * Get an SCollection for a TensorFlow TFRecord file. Input must be Records are read back as + * Tensorflow [[org.tensorflow.proto.example.Example]]s then mapped to the user type `T` with the + * implicit [[magnolify.tensorflow.ExampleType]] + * + * @group input + */ + def typedTfRecordFile[T: ExampleType: Coder]( + path: String, + compression: Compression = Compression.AUTO + ): SCollection[T] = + self.read(TFExampleTypedIO(path))(TFExampleTypedIO.ReadParam(compression)) + /** * Get an SCollection for a TensorFlow TFRecord file. Note that TFRecord files are not splittable. * The recommended record encoding is [[org.tensorflow.proto.example.Example]] protocol buffers diff --git a/scio-tensorflow/src/test/scala/com/spotify/scio/tensorflow/TFExampleIOTest.scala b/scio-tensorflow/src/test/scala/com/spotify/scio/tensorflow/TFExampleIOTest.scala index 743972c5a8..f04cab6d3a 100644 --- a/scio-tensorflow/src/test/scala/com/spotify/scio/tensorflow/TFExampleIOTest.scala +++ b/scio-tensorflow/src/test/scala/com/spotify/scio/tensorflow/TFExampleIOTest.scala @@ -39,6 +39,13 @@ class TFExampleIOTest extends ScioIOSpec { testTap(xs)(_.saveAsTfRecordFile(_))(".tfrecords") testJobTest(xs)(TFExampleIO(_))(_.tfRecordExampleFile(_))(_.saveAsTfRecordFile(_)) } + + it should "work with typed records" in { + val xs = (1 to 100).map(x => Record(x, x.toString)) + implicit val exampleType: ExampleType[Record] = recordT + testTap(xs)(_.saveAsTfRecordFile(_))(".tfrecords") + testJobTest(xs)(TFExampleTypedIO(_))(_.typedTfRecordFile(_))(_.saveAsTfRecordFile(_)) + } } class TFExampleIOFileNamePolicyTest extends FileNamePolicySpec[Example] { @@ -65,7 +72,7 @@ class TFExampleIOFileNamePolicyTest extends FileNamePolicySpec[Example] { _.map(x => recordT(Record(x, x.toString))).saveAsTfRecordFile( "nonsense", shardNameTemplate = "SSS-of-NNN", - filenamePolicySupplier = testFilenamePolicySupplier + filenamePolicySupplier = testFilenamePolicySupplier(_, _) ) ) } diff --git a/scio-test/src/main/scala/com/spotify/scio/testing/SCollectionMatchers.scala b/scio-test/src/main/scala/com/spotify/scio/testing/SCollectionMatchers.scala index f7d2c91a22..a492bcb3f5 100644 --- a/scio-test/src/main/scala/com/spotify/scio/testing/SCollectionMatchers.scala +++ b/scio-test/src/main/scala/com/spotify/scio/testing/SCollectionMatchers.scala @@ -330,7 +330,7 @@ trait SCollectionMatchers extends EqInstances { ): Matcher[T] = matcher.matcher(_.inEarlyGlobalWindowPanes) - /** Assert that the SCollection in question contains the provided elements. */ + /** Assert that the SCollection in question contains exactly the provided elements. */ def containInAnyOrder[T: Coder: Eq]( value: Iterable[T] ): IterableMatcher[SCollection[T], T] = diff --git a/scio-test/src/main/scala/com/spotify/scio/testing/ScioIOSpec.scala b/scio-test/src/main/scala/com/spotify/scio/testing/ScioIOSpec.scala index 183c8fd0d4..657cc884d3 100644 --- a/scio-test/src/main/scala/com/spotify/scio/testing/ScioIOSpec.scala +++ b/scio-test/src/main/scala/com/spotify/scio/testing/ScioIOSpec.scala @@ -149,30 +149,34 @@ trait ScioIOSpec extends PipelineSpec { } def testJobTest[T: Coder](xs: Seq[T], in: String = "in", out: String = "out")( - ioFn: String => ScioIO[T] + ioFn: String => ScioIO[T], + optOutIOFn: Option[String => ScioIO[T]] = None )( readFn: (ScioContext, String) => SCollection[T] )( writeFn: (SCollection[T], String) => ClosedTap[_] ): Unit = { + val inIO = ioFn(in) + val outIO = optOutIOFn.map(outIoFn => outIoFn(out)).getOrElse(ioFn(out)) + val testJob = (sc: ScioContext) => writeFn(readFn(sc, in), out) JobTest(testJob) - .input(ioFn(in), xs) - .output(ioFn(out))(_ should containInAnyOrder(xs)) + .input(inIO, xs) + .output(outIO)(_ should containInAnyOrder(xs)) .run() the[IllegalArgumentException] thrownBy { JobTest(testJob) .input(CustomIO[T](in), xs) - .output(ioFn(out))(_ should containInAnyOrder(xs)) + .output(outIO)(_ should containInAnyOrder(xs)) .run() - } should have message s"requirement failed: Missing test input: ${ioFn(in).testId}, available: [CustomIO($in)]" + } should have message s"requirement failed: Missing test input: ${inIO.testId}, available: [CustomIO($in)]" the[IllegalArgumentException] thrownBy { JobTest(testJob) - .input(ioFn(in), xs) + .input(inIO, xs) .output(CustomIO[T](out))(_ should containInAnyOrder(xs)) .run() - } should have message s"requirement failed: Missing test output: ${ioFn(out).testId}, available: [CustomIO($out)]" + } should have message s"requirement failed: Missing test output: ${outIO.testId}, available: [CustomIO($out)]" } } diff --git a/scio-test/src/test/scala/com/spotify/scio/avro/AvroIOTest.scala b/scio-test/src/test/scala/com/spotify/scio/avro/AvroIOTest.scala index dc7e535152..57efe11582 100644 --- a/scio-test/src/test/scala/com/spotify/scio/avro/AvroIOTest.scala +++ b/scio-test/src/test/scala/com/spotify/scio/avro/AvroIOTest.scala @@ -34,6 +34,9 @@ import java.io.File object AvroIOTest { @AvroType.toSchema case class AvroRecord(i: Int, s: String, r: List[String]) + case class Track(trackId: String) + + case class Record(i: Int, s: String, r: List[String]) } class AvroIOFileNamePolicyTest extends FileNamePolicySpec[TestRecord] { @@ -59,7 +62,7 @@ class AvroIOFileNamePolicyTest extends FileNamePolicySpec[TestRecord] { _.map(AvroUtils.newSpecificRecord).saveAsAvroFile( "nonsense", shardNameTemplate = "SSS-of-NNN", - filenamePolicySupplier = testFilenamePolicySupplier + filenamePolicySupplier = testFilenamePolicySupplier(_, _) ) ) } @@ -89,7 +92,7 @@ class ObjectIOFileNamePolicyTest extends FileNamePolicySpec[AvroIOTest.AvroRecor _.map(x => AvroRecord(x, x.toString, (1 to x).map(_.toString).toList)).saveAsObjectFile( "nonsense", shardNameTemplate = "SSS-of-NNN", - filenamePolicySupplier = testFilenamePolicySupplier + filenamePolicySupplier = testFilenamePolicySupplier(_, _) ) ) } @@ -117,7 +120,7 @@ class ProtobufIOFileNamePolicyTest extends FileNamePolicySpec[TrackPB] { _.map(x => TrackPB.newBuilder().setTrackId(x.toString).build()).saveAsProtobufFile( "nonsense", shardNameTemplate = "SSS-of-NNN", - filenamePolicySupplier = testFilenamePolicySupplier + filenamePolicySupplier = testFilenamePolicySupplier(_, _) ) ) } @@ -193,6 +196,12 @@ class AvroIOTest extends ScioIOSpec { testJobTest(xs)(io)(_.typedAvroFile[AvroRecord](_))(_.saveAsTypedAvroFile(_)) } + it should "work with typed Avro with magnolify AvroType" in { + val xs = (1 to 100).map(x => Record(x, x.toString, (1 to x).map(_.toString).toList)) + testTap(xs)(_.saveAsAvroFile(_))(".avro") + testJobTest(xs)(AvroIO[Record])(_.typedAvroFileMagnolify[Record](_))(_.saveAsAvroFile(_)) + } + "ObjectFileIO" should "work" in { val xs = (1 to 100).map(x => AvroRecord(x, x.toString, (1 to x).map(_.toString).toList)) testTap(xs)(_.saveAsObjectFile(_))(".obj.avro") @@ -209,4 +218,12 @@ class AvroIOTest extends ScioIOSpec { testJobTest(xs)(ProtobufIO(_))(_.protobufFile[TrackPB](_))(_.saveAsProtobufFile(_)) } + "TypedProtobufIO" should "work" in { + val xs = (1 to 100).map(x => Track(x.toString)) + val suffix = ".protobuf.avro" + testTap(xs)(_.saveAsProtobufFile[TrackPB](_))(suffix) + testJobTest(xs)(ProtobufIO(_))(_.typedProtobufFile[Track, TrackPB](_))( + _.saveAsProtobufFile[TrackPB](_) + ) + } }