From d64e3820e0fa00c3dce2fe8eaef4b0d4512602ad Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Fri, 16 Aug 2024 11:31:37 +0200 Subject: [PATCH] Refactor BQ to expose all beam's configurations --- .../scio/bigquery/BigQueryClientIT.scala | 2 +- .../spotify/scio/bigquery/BigQueryIOIT.scala | 4 +- .../scio/bigquery/TypedBigQueryIT.scala | 35 +- .../bigquery/types/BigQueryStorageIT.scala | 8 +- .../scio/bigquery/types/BigQueryTypeIT.scala | 7 +- .../scala/com/spotify/scio/ScioContext.scala | 23 + .../coders/instances/BeamTypeCoders.scala | 4 + .../scio/examples/complete/AutoComplete.scala | 2 +- .../complete/StreamingWordExtract.scala | 2 +- .../complete/TrafficMaxLaneFlow.scala | 2 +- .../examples/complete/TrafficRoutes.scala | 2 +- .../examples/complete/game/GameStats.scala | 4 +- .../complete/game/HourlyTeamScore.scala | 2 +- .../examples/complete/game/LeaderBoard.scala | 4 +- .../examples/complete/game/UserScore.scala | 2 +- .../examples/cookbook/BigQueryTornadoes.scala | 4 +- .../cookbook/CombinePerKeyExamples.scala | 4 +- .../cookbook/DistinctByKeyExample.scala | 4 +- .../examples/cookbook/FilterExamples.scala | 4 +- .../scio/examples/cookbook/JoinExamples.scala | 20 +- .../examples/cookbook/MaxPerKeyExamples.scala | 4 +- .../cookbook/StorageBigQueryTornadoes.scala | 14 +- .../examples/cookbook/TriggerExample.scala | 2 +- .../extra/TypedBigQueryTornadoes.scala | 2 +- .../extra/TypedStorageBigQueryTornadoes.scala | 2 +- .../StorageBigQueryTornadoesTest.scala | 16 +- .../TypedStorageBigQueryTornadoesTest.scala | 13 +- .../spotify/scio/bigquery/BigQueryIO.scala | 1050 +++++------------ .../spotify/scio/bigquery/BigQueryTypes.scala | 120 +- .../spotify/scio/bigquery/MockBigQuery.scala | 4 +- .../scio/bigquery/client/BigQuery.scala | 4 +- .../scio/bigquery/client/QueryOps.scala | 2 +- .../dynamic/syntax/SCollectionSyntax.scala | 8 +- .../bigquery/syntax/SCollectionSyntax.scala | 119 +- .../bigquery/syntax/ScioContextSyntax.scala | 322 +++-- .../com/spotify/scio/bigquery/taps.scala | 107 +- .../scio/bigquery/BigQueryIOTest.scala | 37 +- .../scio/bigquery/BigQueryTypesTest.scala | 4 +- site/src/main/paradox/FAQ.md | 2 +- site/src/main/paradox/io/BigQuery.md | 2 +- 40 files changed, 832 insertions(+), 1141 deletions(-) diff --git a/integration/src/test/scala/com/spotify/scio/bigquery/BigQueryClientIT.scala b/integration/src/test/scala/com/spotify/scio/bigquery/BigQueryClientIT.scala index 02d2914e66..295c671ea0 100644 --- a/integration/src/test/scala/com/spotify/scio/bigquery/BigQueryClientIT.scala +++ b/integration/src/test/scala/com/spotify/scio/bigquery/BigQueryClientIT.scala @@ -154,7 +154,7 @@ class BigQueryClientIT extends AnyFlatSpec with Matchers { "TableService.getRows" should "work" in { val rows = - bq.tables.rows(Table.Spec("bigquery-public-data:samples.shakespeare")).take(10).toList + bq.tables.rows(Table("bigquery-public-data:samples.shakespeare")).take(10).toList val columns = Set("word", "word_count", "corpus", "corpus_date") all(rows.map(_.keySet().asScala)) shouldBe columns } diff --git a/integration/src/test/scala/com/spotify/scio/bigquery/BigQueryIOIT.scala b/integration/src/test/scala/com/spotify/scio/bigquery/BigQueryIOIT.scala index b29dc500dc..4e6c46f2b2 100644 --- a/integration/src/test/scala/com/spotify/scio/bigquery/BigQueryIOIT.scala +++ b/integration/src/test/scala/com/spotify/scio/bigquery/BigQueryIOIT.scala @@ -45,7 +45,7 @@ class BigQueryIOIT extends PipelineSpec { "Select" should "read typed values from a SQL query" in runWithRealContext(options) { sc => - val scoll = sc.read(BigQueryTyped[ShakespeareFromQuery]) + val scoll = sc.typedBigQueryStorage[ShakespeareFromQuery]() scoll should haveSize(10) scoll should satisfy[ShakespeareFromQuery] { _.forall(_.getClass == classOf[ShakespeareFromQuery]) @@ -54,7 +54,7 @@ class BigQueryIOIT extends PipelineSpec { "TableRef" should "read typed values from table" in runWithRealContext(options) { sc => - val scoll = sc.read(BigQueryTyped[ShakespeareFromTable]) + val scoll = sc.typedBigQueryStorage[ShakespeareFromTable]() scoll.take(10) should haveSize(10) scoll should satisfy[ShakespeareFromTable] { _.forall(_.getClass == classOf[ShakespeareFromTable]) diff --git a/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala b/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala index 74cba90165..78930009e4 100644 --- a/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala +++ b/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala @@ -20,7 +20,6 @@ package com.spotify.scio.bigquery import com.google.protobuf.ByteString import com.spotify.scio._ import com.spotify.scio.avro._ -import com.spotify.scio.bigquery.BigQueryTypedTable.Format import com.spotify.scio.bigquery.client.BigQuery import com.spotify.scio.testing._ import magnolify.scalacheck.auto._ @@ -69,7 +68,7 @@ object TypedBigQueryIT { val now = Instant.now().toString(TIME_FORMATTER) val spec = s"data-integration-test:bigquery_avro_it.$name${now}_${Random.nextInt(Int.MaxValue)}" - Table.Spec(spec) + Table(spec) } private val tableRowTable = table("records_tablerow") private val avroTable = table("records_avro") @@ -101,37 +100,25 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll { BigQuery.defaultInstance().tables.delete(avroLogicalTypeTable.ref) } - "TypedBigQuery" should "read records" in { + "typedBigQuery" should "read records" in { val sc = ScioContext(options) sc.typedBigQuery[Record](tableRowTable) should containInAnyOrder(records) sc.run() } - it should "convert to avro format" in { + "bigQueryTableFormat" should "read TableRow records" in { val sc = ScioContext(options) - implicit val coder = avroGenericRecordCoder(Record.avroSchema) - sc.typedBigQuery[Record](tableRowTable) - .map(Record.toAvro) - .map(Record.fromAvro) should containInAnyOrder( - records - ) + val format = BigQueryIO.Format.Default(BigQueryType[Record]) + val data = sc.bigQueryTableFormat(tableRowTable, format) + data should containInAnyOrder(records) sc.run() } - "BigQueryTypedTable" should "read TableRow records" in { + it should "read GenericRecord records" in { val sc = ScioContext(options) - sc - .bigQueryTable(tableRowTable) - .map(Record.fromTableRow) should containInAnyOrder(records) - sc.run() - } - - it should "read GenericRecord recors" in { - val sc = ScioContext(options) - implicit val coder = avroGenericRecordCoder(Record.avroSchema) - sc - .bigQueryTable(tableRowTable, Format.GenericRecord) - .map(Record.fromAvro) should containInAnyOrder(records) + val format = BigQueryIO.Format.Avro(BigQueryType[Record]) + val data = sc.bigQueryTableFormat(tableRowTable, format) + data should containInAnyOrder(records) sc.run() } @@ -157,7 +144,7 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll { |} """.stripMargin) val tap = sc - .bigQueryTable(tableRowTable, Format.GenericRecord) + .bigQueryTableFormat(tableRowTable, BigQueryIO.Format.Avro()) .saveAsBigQueryTable(avroTable, schema = schema, createDisposition = CREATE_IF_NEEDED) val result = sc.run().waitUntilDone() diff --git a/integration/src/test/scala/com/spotify/scio/bigquery/types/BigQueryStorageIT.scala b/integration/src/test/scala/com/spotify/scio/bigquery/types/BigQueryStorageIT.scala index 0f12f40135..86602207e7 100644 --- a/integration/src/test/scala/com/spotify/scio/bigquery/types/BigQueryStorageIT.scala +++ b/integration/src/test/scala/com/spotify/scio/bigquery/types/BigQueryStorageIT.scala @@ -155,8 +155,10 @@ class BigQueryStorageIT extends AnyFlatSpec with Matchers { val (sc, _) = ContextAndArgs( Array("--project=data-integration-test", "--tempLocation=gs://data-integration-test-eu/temp") ) + val bqt = BigQueryType[NestedWithRestriction] + val source = Table(bqt.table.get, "required.int < 3") val p = sc - .typedBigQueryStorage[NestedWithRestriction](rowRestriction = "required.int < 3") + .typedBigQueryStorage[NestedWithRestriction](source) .map { r => val (req, opt, rep) = (r.required, r.optional.get, r.repeated.head) (req.int, req.string, opt.int, opt.string, rep.int, rep.string) @@ -172,7 +174,7 @@ class BigQueryStorageIT extends AnyFlatSpec with Matchers { Array("--project=data-integration-test", "--tempLocation=gs://data-integration-test-eu/temp") ) val p = sc - .typedBigQuery[NestedWithAll](Table.Spec(NestedWithAll.table.format("nested"))) + .typedBigQuery[NestedWithAll](Table(NestedWithAll.table.format("nested"))) .map(r => (r.required.int, r.required.string, r.optional.get.int)) .internal PAssert.that(p).containsInAnyOrder(expected) @@ -243,7 +245,7 @@ class BigQueryStorageIT extends AnyFlatSpec with Matchers { Array("--project=data-integration-test", "--tempLocation=gs://data-integration-test-eu/temp") ) val p = sc - .typedBigQueryStorage[ToTableRequired](Table.Spec("data-integration-test:storage.required")) + .typedBigQueryStorage[ToTableRequired](Table("data-integration-test:storage.required")) .internal PAssert.that(p).containsInAnyOrder(expected) sc.run() diff --git a/integration/src/test/scala/com/spotify/scio/bigquery/types/BigQueryTypeIT.scala b/integration/src/test/scala/com/spotify/scio/bigquery/types/BigQueryTypeIT.scala index c08ea44056..78b36c29d5 100644 --- a/integration/src/test/scala/com/spotify/scio/bigquery/types/BigQueryTypeIT.scala +++ b/integration/src/test/scala/com/spotify/scio/bigquery/types/BigQueryTypeIT.scala @@ -187,10 +187,9 @@ class BigQueryTypeIT extends AnyFlatSpec with Matchers { tableReference.setProjectId("data-integration-test") tableReference.setDatasetId("partition_a") tableReference.setTableId("table_$LATEST") - Table.Ref(tableReference).latest().ref.getTableId shouldBe "table_20170302" + Table(tableReference).latest().ref.getTableId shouldBe "table_20170302" - Table - .Spec("data-integration-test:partition_a.table_$LATEST") + Table("data-integration-test:partition_a.table_$LATEST") .latest() .ref .getTableId shouldBe "table_20170302" @@ -210,7 +209,7 @@ class BigQueryTypeIT extends AnyFlatSpec with Matchers { val bqt = BigQueryType[FromTableT] bqt.isQuery shouldBe false bqt.isTable shouldBe true - bqt.query shouldBe None + bqt.queryRaw shouldBe None bqt.table shouldBe Some("bigquery-public-data:samples.shakespeare") val fields = bqt.schema.getFields.asScala fields.size shouldBe 4 diff --git a/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala b/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala index ce3e169b6a..026f0616c7 100644 --- a/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala +++ b/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala @@ -51,6 +51,8 @@ import scala.reflect.ClassTag import scala.util.control.NoStackTrace import scala.util.{Failure, Success, Try} import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions +import org.apache.beam.sdk.transforms.errorhandling.BadRecord +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.BadRecordErrorHandler /** Runner specific context. */ trait RunnerContext { @@ -851,6 +853,27 @@ class ScioContext private[scio] ( this.applyTransform(Create.timestamped(v.asJava).withCoder(coder)) } + // ======================================================================= + // Error handler + // ======================================================================= + def registerBadRecordErrorHandler[O <: POutput]( + sinkTransform: PTransform[PCollection[BadRecord], O] + ): BadRecordErrorHandler[O] = + pipeline.registerBadRecordErrorHandler(sinkTransform) + + def badRecordErrorHandler(): (BadRecordErrorHandler[PCollectionTuple], SCollection[BadRecord]) = { + val tag = new TupleTag[BadRecord]() + val sideOutput = PCollectionTuple.empty(pipeline) + val sinkTransform = new PTransform[PCollection[BadRecord], PCollectionTuple] { + override def expand(input: PCollection[BadRecord]): PCollectionTuple = + sideOutput.and(tag, input) + } + + val handler = pipeline.registerBadRecordErrorHandler(sinkTransform) + val errorOutput = wrap(sideOutput.get(tag)) + (handler, errorOutput) + } + // ======================================================================= // Metrics // ======================================================================= diff --git a/scio-core/src/main/scala/com/spotify/scio/coders/instances/BeamTypeCoders.scala b/scio-core/src/main/scala/com/spotify/scio/coders/instances/BeamTypeCoders.scala index df8cca640b..a5dcf33167 100644 --- a/scio-core/src/main/scala/com/spotify/scio/coders/instances/BeamTypeCoders.scala +++ b/scio-core/src/main/scala/com/spotify/scio/coders/instances/BeamTypeCoders.scala @@ -20,6 +20,7 @@ package com.spotify.scio.coders.instances import com.google.api.client.json.GenericJson import com.google.api.client.json.JsonObjectParser import com.google.api.client.json.gson.GsonFactory +import com.spotify.scio.ScioContext import com.spotify.scio.coders.{Coder, CoderGrammar} import com.spotify.scio.util.ScioUtil @@ -29,6 +30,7 @@ import org.apache.beam.sdk.io.FileIO.ReadableFile import org.apache.beam.sdk.io.fs.{MatchResult, MetadataCoderV2, ResourceId, ResourceIdCoder} import org.apache.beam.sdk.io.ReadableFileCoder import org.apache.beam.sdk.schemas.{Schema => BSchema} +import org.apache.beam.sdk.transforms.errorhandling.BadRecord import org.apache.beam.sdk.transforms.windowing.{ BoundedWindow, GlobalWindow, @@ -66,6 +68,8 @@ trait BeamTypeCoders extends CoderGrammar { str => DefaultJsonObjectParser.parseAndClose(new StringReader(str), ScioUtil.classOf[T]), DefaultJsonObjectParser.getJsonFactory().toString(_) ) + + def badRecordCoder(sc: ScioContext): Coder[BadRecord] = beam(BadRecord.getCoder(sc.pipeline)) } private[coders] object BeamTypeCoders extends BeamTypeCoders { diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/complete/AutoComplete.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/complete/AutoComplete.scala index 62ffb8440a..a70839ccae 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/complete/AutoComplete.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/complete/AutoComplete.scala @@ -145,7 +145,7 @@ object AutoComplete { if (outputToBigqueryTable) { tags .map(kv => Record(kv._1, kv._2.map(p => Tag(p._1, p._2)).toList)) - .saveAsTypedBigQueryTable(Table.Spec(args("output"))) + .saveAsTypedBigQueryTable(Table(args("output"))) } if (outputToDatastore) { val kind = args.getOrElse("kind", "autocomplete-demo") diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/complete/StreamingWordExtract.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/complete/StreamingWordExtract.scala index e6d428cc04..0eafc26805 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/complete/StreamingWordExtract.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/complete/StreamingWordExtract.scala @@ -49,7 +49,7 @@ object StreamingWordExtract { .flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty)) .map(_.toUpperCase) .map(s => TableRow("string_field" -> s)) - .saveAsBigQueryTable(Table.Spec(args("output")), schema) + .saveAsBigQueryTable(Table(args("output")), schema) val result = sc.run() exampleUtils.waitToFinish(result.pipelineResult) diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/complete/TrafficMaxLaneFlow.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/complete/TrafficMaxLaneFlow.scala index 34fc0f9f5b..78cdecc7b4 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/complete/TrafficMaxLaneFlow.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/complete/TrafficMaxLaneFlow.scala @@ -126,7 +126,7 @@ object TrafficMaxLaneFlow { ts ) } - .saveAsTypedBigQueryTable(Table.Spec(args("output"))) + .saveAsTypedBigQueryTable(Table(args("output"))) val result = sc.run() exampleUtils.waitToFinish(result.pipelineResult) diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/complete/TrafficRoutes.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/complete/TrafficRoutes.scala index c24b283b34..f9f4b3511d 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/complete/TrafficRoutes.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/complete/TrafficRoutes.scala @@ -111,7 +111,7 @@ object TrafficRoutes { .map { case (r, ts) => Record(r.route, r.avgSpeed, r.slowdownEvent, ts) } - .saveAsTypedBigQueryTable(Table.Spec(args("output"))) + .saveAsTypedBigQueryTable(Table(args("output"))) val result = sc.run() exampleUtils.waitToFinish(result.pipelineResult) diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/complete/game/GameStats.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/complete/game/GameStats.scala index 1750c3c3ef..01d846c6fd 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/complete/game/GameStats.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/complete/game/GameStats.scala @@ -113,7 +113,7 @@ object GameStats { // Done using windowing information, convert back to regular `SCollection` .toSCollection // Save to the BigQuery table defined by "output" in the arguments passed in + "_team" suffix - .saveAsTypedBigQueryTable(Table.Spec(args("output") + "_team")) + .saveAsTypedBigQueryTable(Table(args("output") + "_team")) userEvents // Window over a variable length of time - sessions end after sessionGap minutes no activity @@ -141,7 +141,7 @@ object GameStats { AvgSessionLength(mean, fmt.print(w.start())) } // Save to the BigQuery table defined by "output" + "_sessions" suffix - .saveAsTypedBigQueryTable(Table.Spec(args("output") + "_sessions")) + .saveAsTypedBigQueryTable(Table(args("output") + "_sessions")) // Execute the pipeline val result = sc.run() diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/complete/game/HourlyTeamScore.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/complete/game/HourlyTeamScore.scala index 3baf8c8c0e..95fb9ebd29 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/complete/game/HourlyTeamScore.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/complete/game/HourlyTeamScore.scala @@ -91,7 +91,7 @@ object HourlyTeamScore { TeamScoreSums(team, score, start) } // Save to the BigQuery table defined by "output" in the arguments passed in - .saveAsTypedBigQueryTable(Table.Spec(args("output"))) + .saveAsTypedBigQueryTable(Table(args("output"))) // Execute the pipeline sc.run() diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/complete/game/LeaderBoard.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/complete/game/LeaderBoard.scala index a77e34219a..e7533ba9d9 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/complete/game/LeaderBoard.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/complete/game/LeaderBoard.scala @@ -96,7 +96,7 @@ object LeaderBoard { // Done with windowing information, convert back to regular `SCollection` .toSCollection // Save to the BigQuery table defined by "output" in the arguments passed in + "_team" suffix - .saveAsTypedBigQueryTable(Table.Spec(args("output") + "_team")) + .saveAsTypedBigQueryTable(Table(args("output") + "_team")) gameEvents // Use a global window for unbounded data, which updates calculation every 10 minutes, @@ -126,7 +126,7 @@ object LeaderBoard { // Map summed results from tuples into `UserScoreSums` case class, so we can save to BQ .map(kv => UserScoreSums(kv._1, kv._2, fmt.print(Instant.now()))) // Save to the BigQuery table defined by "output" in the arguments passed in + "_user" suffix - .saveAsTypedBigQueryTable(Table.Spec(args("output") + "_user")) + .saveAsTypedBigQueryTable(Table(args("output") + "_user")) // Execute the pipeline val result = sc.run() diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/complete/game/UserScore.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/complete/game/UserScore.scala index 946ac76620..590314e2f3 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/complete/game/UserScore.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/complete/game/UserScore.scala @@ -62,7 +62,7 @@ object UserScore { // Map summed results from tuples into `UserScoreSums` case class, so we can save to BQ .map(UserScoreSums.tupled) // Save to the BigQuery table defined by "output" in the arguments passed in - .saveAsTypedBigQueryTable(Table.Spec(args("output"))) + .saveAsTypedBigQueryTable(Table(args("output"))) // Execute the pipeline sc.run() diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/BigQueryTornadoes.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/BigQueryTornadoes.scala index 13125695e4..12bcf84817 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/BigQueryTornadoes.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/BigQueryTornadoes.scala @@ -45,7 +45,7 @@ object BigQueryTornadoes { ) // Open a BigQuery table as a `SCollection[TableRow]` - val table = Table.Spec(args.getOrElse("input", ExampleData.WEATHER_SAMPLES_TABLE)) + val table = Table(args.getOrElse("input", ExampleData.WEATHER_SAMPLES_TABLE)) val resultTap = sc .bigQueryTable(table) // Extract months with tornadoes @@ -55,7 +55,7 @@ object BigQueryTornadoes { // Map `(Long, Long)` tuples into result `TableRow`s .map(kv => TableRow("month" -> kv._1, "tornado_count" -> kv._2)) // Save result as a BigQuery table - .saveAsBigQueryTable(Table.Spec(args("output")), schema, WRITE_TRUNCATE, CREATE_IF_NEEDED) + .saveAsBigQueryTable(Table(args("output")), schema, WRITE_TRUNCATE, CREATE_IF_NEEDED) // Access the loaded tables resultTap diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/CombinePerKeyExamples.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/CombinePerKeyExamples.scala index cc032fe712..209861d479 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/CombinePerKeyExamples.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/CombinePerKeyExamples.scala @@ -47,7 +47,7 @@ object CombinePerKeyExamples { ) // Open a BigQuery table as a `SCollection[TableRow]` - val table = Table.Spec(args.getOrElse("input", ExampleData.SHAKESPEARE_TABLE)) + val table = Table(args.getOrElse("input", ExampleData.SHAKESPEARE_TABLE)) sc.bigQueryTable(table) // Extract words and corresponding play names .flatMap { row => @@ -64,7 +64,7 @@ object CombinePerKeyExamples { // Map `(String, String)` tuples into result `TableRow`s .map(kv => TableRow("word" -> kv._1, "all_plays" -> kv._2)) // Save result as a BigQuery table - .saveAsBigQueryTable(Table.Spec(args("output")), schema, WRITE_TRUNCATE, CREATE_IF_NEEDED) + .saveAsBigQueryTable(Table(args("output")), schema, WRITE_TRUNCATE, CREATE_IF_NEEDED) // Execute the pipeline sc.run() diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/DistinctByKeyExample.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/DistinctByKeyExample.scala index ded1fd8551..ba145ba180 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/DistinctByKeyExample.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/DistinctByKeyExample.scala @@ -46,7 +46,7 @@ object DistinctByKeyExample { ) // Open a BigQuery table as a `SCollection[TableRow]` - val table = Table.Spec(args.getOrElse("input", ExampleData.SHAKESPEARE_TABLE)) + val table = Table(args.getOrElse("input", ExampleData.SHAKESPEARE_TABLE)) sc.bigQueryTable(table) // Extract words and corresponding play names .flatMap { row => @@ -59,7 +59,7 @@ object DistinctByKeyExample { // Map `(String, String)` tuples into result `TableRow`s .map(kv => TableRow("word" -> kv._1, "reference_play" -> kv._2)) // Save result as a BigQuery table - .saveAsBigQueryTable(Table.Spec(args("output")), schema, WRITE_TRUNCATE, CREATE_IF_NEEDED) + .saveAsBigQueryTable(Table(args("output")), schema, WRITE_TRUNCATE, CREATE_IF_NEEDED) // Execute the pipeline sc.run() diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/FilterExamples.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/FilterExamples.scala index 31fe4b0541..0708fd1960 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/FilterExamples.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/FilterExamples.scala @@ -51,7 +51,7 @@ object FilterExamples { val monthFilter = args.int("monthFilter", 7) // Open BigQuery table as a `SCollection[TableRow]` - val table = Table.Spec(args.getOrElse("input", ExampleData.WEATHER_SAMPLES_TABLE)) + val table = Table(args.getOrElse("input", ExampleData.WEATHER_SAMPLES_TABLE)) val pipe = sc .bigQueryTable(table) // Map `TableRow`s into `Record`s @@ -81,7 +81,7 @@ object FilterExamples { TableRow("year" -> r.year, "month" -> r.month, "day" -> r.day, "mean_temp" -> r.meanTemp) } // Save result as a BigQuery table - .saveAsBigQueryTable(Table.Spec(args("output")), schema, WRITE_TRUNCATE, CREATE_IF_NEEDED) + .saveAsBigQueryTable(Table(args("output")), schema, WRITE_TRUNCATE, CREATE_IF_NEEDED) // Execute the pipeline sc.run() diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/JoinExamples.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/JoinExamples.scala index 61bc7a1fd7..2126d853f9 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/JoinExamples.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/JoinExamples.scala @@ -63,9 +63,9 @@ object JoinExamples { // Extract both sides as `SCollection[(String, String)]`s val eventsInfo = - sc.bigQueryTable(Table.Spec(ExampleData.EVENT_TABLE)).flatMap(extractEventInfo) + sc.bigQueryTable(Table(ExampleData.EVENT_TABLE)).flatMap(extractEventInfo) val countryInfo = - sc.bigQueryTable(Table.Spec(ExampleData.COUNTRY_TABLE)).map(extractCountryInfo) + sc.bigQueryTable(Table(ExampleData.COUNTRY_TABLE)).map(extractCountryInfo) eventsInfo // Left outer join to produce `SCollection[(String, (String, Option[String]))] @@ -92,9 +92,9 @@ object SideInputJoinExamples { // Extract both sides as `SCollection[(String, String)]`s, and then convert right hand side as // a `SideInput` of `Map[String, String]` val eventsInfo = - sc.bigQueryTable(Table.Spec(ExampleData.EVENT_TABLE)).flatMap(extractEventInfo) + sc.bigQueryTable(Table(ExampleData.EVENT_TABLE)).flatMap(extractEventInfo) val countryInfo = sc - .bigQueryTable(Table.Spec(ExampleData.COUNTRY_TABLE)) + .bigQueryTable(Table(ExampleData.COUNTRY_TABLE)) .map(extractCountryInfo) .asMapSideInput @@ -127,9 +127,9 @@ object HashJoinExamples { // Extract both sides as `SCollection[(String, String)]`s val eventsInfo = - sc.bigQueryTable(Table.Spec(ExampleData.EVENT_TABLE)).flatMap(extractEventInfo) + sc.bigQueryTable(Table(ExampleData.EVENT_TABLE)).flatMap(extractEventInfo) val countryInfo = - sc.bigQueryTable(Table.Spec(ExampleData.COUNTRY_TABLE)).map(extractCountryInfo) + sc.bigQueryTable(Table(ExampleData.COUNTRY_TABLE)).map(extractCountryInfo) eventsInfo // Hash join uses side input under the hood and is a drop-in replacement for regular join @@ -155,9 +155,9 @@ object SkewedJoinExamples { // Extract both sides as `SCollection[(String, String)]`s val eventsInfo = - sc.bigQueryTable(Table.Spec(ExampleData.EVENT_TABLE)).flatMap(extractEventInfo) + sc.bigQueryTable(Table(ExampleData.EVENT_TABLE)).flatMap(extractEventInfo) val countryInfo = - sc.bigQueryTable(Table.Spec(ExampleData.COUNTRY_TABLE)) + sc.bigQueryTable(Table(ExampleData.COUNTRY_TABLE)) .map(extractCountryInfo) eventsInfo @@ -219,9 +219,9 @@ object SparseJoinExamples { // Extract both sides as `SCollection[(String, String)]`s val eventsInfo = - sc.bigQueryTable(Table.Spec(ExampleData.EVENT_TABLE)).flatMap(extractEventInfo) + sc.bigQueryTable(Table(ExampleData.EVENT_TABLE)).flatMap(extractEventInfo) val countryInfo = - sc.bigQueryTable(Table.Spec(ExampleData.COUNTRY_TABLE)).map(extractCountryInfo) + sc.bigQueryTable(Table(ExampleData.COUNTRY_TABLE)).map(extractCountryInfo) eventsInfo // Sparse Join is useful when LHS is much larger than the RHS which cannot fit in memory, but diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/MaxPerKeyExamples.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/MaxPerKeyExamples.scala index 3ca121027e..5d5f61b7d7 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/MaxPerKeyExamples.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/MaxPerKeyExamples.scala @@ -44,7 +44,7 @@ object MaxPerKeyExamples { ) // Open a BigQuery table as a `SCollection[TableRow]` - val table = Table.Spec(args.getOrElse("input", ExampleData.WEATHER_SAMPLES_TABLE)) + val table = Table(args.getOrElse("input", ExampleData.WEATHER_SAMPLES_TABLE)) sc.bigQueryTable(table) // Extract month and mean temperature as `(Long, Double)` tuples .map(row => (row.getLong("month"), row.getDouble("mean_temp"))) @@ -54,7 +54,7 @@ object MaxPerKeyExamples { // Map `(Long, Double)` tuples into result `TableRow`s .map(kv => TableRow("month" -> kv._1, "max_mean_temp" -> kv._2)) // Save result as a BigQuery table - .saveAsBigQueryTable(Table.Spec(args("output")), schema, WRITE_TRUNCATE, CREATE_IF_NEEDED) + .saveAsBigQueryTable(Table(args("output")), schema, WRITE_TRUNCATE, CREATE_IF_NEEDED) // Execute the pipeline sc.run() diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/StorageBigQueryTornadoes.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/StorageBigQueryTornadoes.scala index f020f0efb0..445186ae04 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/StorageBigQueryTornadoes.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/StorageBigQueryTornadoes.scala @@ -45,13 +45,13 @@ object StorageBigQueryTornadoes { ) // Open a BigQuery table as a `SCollection[TableRow]` - val table = Table.Spec(args.getOrElse("input", ExampleData.WEATHER_SAMPLES_TABLE)) + val table = Table( + args.getOrElse("input", ExampleData.WEATHER_SAMPLES_TABLE), + selectedFields = List("tornado", "month"), + rowRestriction = "tornado = true" + ) val resultTap = sc - .bigQueryStorage( - table, - selectedFields = List("tornado", "month"), - rowRestriction = "tornado = true" - ) + .bigQueryStorage(table) .map(_.getLong("month")) // Count occurrences of each unique month to get `(Long, Long)` .countByValue @@ -59,7 +59,7 @@ object StorageBigQueryTornadoes { .map(kv => TableRow("month" -> kv._1, "tornado_count" -> kv._2)) // Save result as a BigQuery table .saveAsBigQueryTable( - table = Table.Spec(args("output")), + table = Table(args("output")), schema = schema, writeDisposition = WRITE_TRUNCATE, createDisposition = CREATE_IF_NEEDED, diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/TriggerExample.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/TriggerExample.scala index 5834d745ae..e233ade6ee 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/TriggerExample.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/TriggerExample.scala @@ -137,7 +137,7 @@ object TriggerExample { sequentialResults ) ) - .saveAsTypedBigQueryTable(Table.Spec(args("output"))) + .saveAsTypedBigQueryTable(Table(args("output"))) val result = sc.run() exampleUtils.waitToFinish(result.pipelineResult) diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/TypedBigQueryTornadoes.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/TypedBigQueryTornadoes.scala index ffc7493d2e..e6cbe51df8 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/TypedBigQueryTornadoes.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/TypedBigQueryTornadoes.scala @@ -52,7 +52,7 @@ object TypedBigQueryTornadoes { .map(kv => Result(kv._1, kv._2)) // Convert elements from Result to TableRow and save output to BigQuery. .saveAsTypedBigQueryTable( - Table.Spec(args("output")), + Table(args("output")), writeDisposition = WRITE_TRUNCATE, createDisposition = CREATE_IF_NEEDED ) diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/TypedStorageBigQueryTornadoes.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/TypedStorageBigQueryTornadoes.scala index 2c2b1b345e..3cbfab3e28 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/TypedStorageBigQueryTornadoes.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/TypedStorageBigQueryTornadoes.scala @@ -57,7 +57,7 @@ object TypedStorageBigQueryTornadoes { .map(kv => Result(kv._1, kv._2)) // Convert elements from Result to TableRow and save output to BigQuery. .saveAsTypedBigQueryTable( - Table.Spec(args("output")), + Table(args("output")), method = Method.STORAGE_WRITE_API, writeDisposition = WRITE_TRUNCATE, createDisposition = CREATE_IF_NEEDED, diff --git a/scio-examples/src/test/scala/com/spotify/scio/examples/cookbook/StorageBigQueryTornadoesTest.scala b/scio-examples/src/test/scala/com/spotify/scio/examples/cookbook/StorageBigQueryTornadoesTest.scala index d79b8282b9..ed4e12afc5 100644 --- a/scio-examples/src/test/scala/com/spotify/scio/examples/cookbook/StorageBigQueryTornadoesTest.scala +++ b/scio-examples/src/test/scala/com/spotify/scio/examples/cookbook/StorageBigQueryTornadoesTest.scala @@ -32,16 +32,14 @@ final class StorageBigQueryTornadoesTest extends PipelineSpec { .map(t => TableRow("month" -> t._1, "tornado_count" -> t._2)) "BigQueryTornadoes" should "work" in { + val table = Table( + "bigquery-public-data:samples.gsod", + List("tornado", "month"), + "tornado = true" + ) JobTest[com.spotify.scio.examples.cookbook.StorageBigQueryTornadoes.type] - .args("--input=bigquery-public-data:samples.gsod", "--output=dataset.table") - .input( - BigQueryIO( - "bigquery-public-data:samples.gsod", - List("tornado", "month"), - Some("tornado = true") - ), - inData - ) + .args(s"--input=${table.spec}", "--output=dataset.table") + .input(BigQueryIO(table), inData) .output(BigQueryIO[TableRow]("dataset.table")) { coll => coll should containInAnyOrder(expected) } diff --git a/scio-examples/src/test/scala/com/spotify/scio/examples/extra/TypedStorageBigQueryTornadoesTest.scala b/scio-examples/src/test/scala/com/spotify/scio/examples/extra/TypedStorageBigQueryTornadoesTest.scala index 78288bd8df..fff2232d67 100644 --- a/scio-examples/src/test/scala/com/spotify/scio/examples/extra/TypedStorageBigQueryTornadoesTest.scala +++ b/scio-examples/src/test/scala/com/spotify/scio/examples/extra/TypedStorageBigQueryTornadoesTest.scala @@ -35,14 +35,11 @@ final class TypedStorageBigQueryTornadoesTest extends PipelineSpec { "StorageTypedBigQueryTornadoes" should "work" in { JobTest[com.spotify.scio.examples.extra.TypedStorageBigQueryTornadoes.type] .args("--output=dataset.table") - .input( - BigQueryIO( - TypedStorageBigQueryTornadoes.Row.table, - List("tornado", "month"), - Some("tornado = true") - ), - inData - ) + .input(BigQueryIO[TypedStorageBigQueryTornadoes.Row], inData) +// TypedStorageBigQueryTornadoes.Row.table, +// List("tornado", "month"), +// Some("tornado = true") +// ), .output(BigQueryIO[Result]("dataset.table")) { coll => coll should containInAnyOrder(expected) } diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala index 3cc3f6ab03..6e7bcd27ba 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala @@ -18,16 +18,16 @@ package com.spotify.scio.bigquery import com.google.api.services.bigquery.model.TableSchema +import com.google.cloud.bigquery.storage.v1.DataFormat import com.spotify.scio.ScioContext -import com.spotify.scio.bigquery.client.BigQuery +import com.spotify.scio.bigquery.types.BigQueryType import com.spotify.scio.bigquery.types.BigQueryType.HasAnnotation import com.spotify.scio.coders._ import com.spotify.scio.io._ import com.spotify.scio.util.{FilenamePolicySupplier, Functions, ScioUtil} import com.spotify.scio.values.{SCollection, SideOutput, SideOutputCollections} -import com.twitter.chill.ClosureCleaner +import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord -import org.apache.beam.sdk.extensions.gcp.options.GcpOptions import org.apache.beam.sdk.io.Compression import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.{Method => ReadMethod} import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.{ @@ -37,85 +37,317 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.{ } import org.apache.beam.sdk.io.gcp.bigquery._ import org.apache.beam.sdk.io.gcp.{bigquery => beam} -import org.apache.beam.sdk.transforms.SerializableFunction +import org.apache.beam.sdk.transforms.errorhandling.{BadRecord, ErrorHandler} import org.apache.beam.sdk.values.{PCollection, PCollectionTuple} import org.joda.time.Duration -import java.util.concurrent.ConcurrentHashMap -import java.util.function +import scala.util.chaining._ import scala.jdk.CollectionConverters._ import scala.reflect.runtime.universe._ -import scala.util.chaining._ -private object Reads { +case class BigQueryIO[T: Coder](source: Source) extends ScioIO[T] with WriteResultIO[T] { + import BigQueryIO._ + + final override val tapT: TapT.Aux[T, T] = TapOf[T] - private[this] val cache = new ConcurrentHashMap[ScioContext, BigQuery]() + override type ReadP = ReadParam[T] + override type WriteP = WriteParam[T] - @inline private def client(sc: ScioContext): BigQuery = - cache.computeIfAbsent( - sc, - new function.Function[ScioContext, BigQuery] { - override def apply(context: ScioContext): BigQuery = { - val opts = context.optionsAs[GcpOptions] - BigQuery(opts.getProject, opts.getGcpCredential) - } - } + override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = { + val coder = CoderMaterializer.beam(sc, Coder[T]) + val t = beam.BigQueryIO + .read(Functions.serializableFn(params.format.parseFn)) + .withCoder(coder) + .pipe(r => withSource(r)(source)) + .withMethod(params.method) + .withFormat(params.format.dataFormat) + .pipe(r => withResultFlattening(r)(params)) + .pipe(r => Option(params.errorHandler).fold(r)(r.withErrorHandler)) + .pipe(r => Option(params.configOverride).fold(r)(_.apply(r))) + + sc.applyTransform(t) + } + + override protected def writeWithResult( + data: SCollection[T], + params: WriteP + ): (Tap[T], SideOutputCollections) = { + val method: WriteMethod = resolveMethod( + params.method, + data.context.optionsAs[BigQueryOptions], + data.internal.isBounded ) + val t = beam.BigQueryIO + .write[T]() + .withMethod(method) + .pipe(w => withSink(w)(source)) + .pipe(w => withFormatFunction(w)(params.format)) + .pipe(w => Option(params.schema).fold(w)(w.withSchema)) + .pipe(w => Option(params.createDisposition).fold(w)(w.withCreateDisposition)) + .pipe(w => Option(params.writeDisposition).fold(w)(w.withWriteDisposition)) + .pipe(w => Option(params.tableDescription).fold(w)(w.withTableDescription)) + .pipe(w => Option(params.timePartitioning).map(_.asJava).fold(w)(w.withTimePartitioning)) + .pipe(w => Option(params.clustering).map(_.asJava).fold(w)(w.withClustering)) + .pipe(w => Option(params.triggeringFrequency).fold(w)(w.withTriggeringFrequency)) + .pipe(w => Option(params.sharding).fold(w)(withSharding(method, w))) + .pipe(w => Option(params.failedInsertRetryPolicy).fold(w)(w.withFailedInsertRetryPolicy)) + .pipe(w => withSuccessfulInsertsPropagation(method, w)(params.successfulInsertsPropagation)) + .pipe(w => if (params.extendedErrorInfo) w.withExtendedErrorInfo() else w) + .pipe(w => Option(params.errorHandler).fold(w)(w.withErrorHandler)) + .pipe(w => Option(params.configOverride).fold(w)(_.apply(w))) - private[scio] def bqReadQuery[T](sc: ScioContext)( - typedRead: beam.BigQueryIO.TypedRead[T], - sqlQuery: String, - flattenResults: Boolean = false - ): SCollection[T] = { - val bigQueryClient = client(sc) - val labels = sc.labels - val read = bigQueryClient.query - .newQueryJob(sqlQuery, flattenResults, labels) - .map { job => - sc.onClose(_ => bigQueryClient.waitForJobs(job)) - typedRead.from(job.table).withoutValidation() - } + val wr = data.applyInternal(t) + val outputs = sideOutputs( + data, + method, + params.successfulInsertsPropagation, + params.extendedErrorInfo, + wr + ) - sc.applyTransform(read.get) + (tap(ReadParam(params)), outputs) } - // TODO: support labels Inheritance like in bqReadQuery - private[scio] def bqReadStorage[T](sc: ScioContext)( - typedRead: beam.BigQueryIO.TypedRead[T], - table: Table, - selectedFields: List[String] = BigQueryStorage.ReadParam.DefaultSelectFields, - rowRestriction: Option[String] = BigQueryStorage.ReadParam.DefaultRowRestriction - ): SCollection[T] = { - val read = typedRead - .from(table.spec) - .withMethod(ReadMethod.DIRECT_READ) - .withSelectedFields(selectedFields.asJava) - .pipe(r => rowRestriction.fold(r)(r.withRowRestriction)) - - sc.applyTransform(read) + override def tap(read: ReadP): Tap[T] = { + val table = ensureTable(source) + BigQueryTap(table, read) } } -private[bigquery] object Writes { - def resolveMethod( - method: WriteMethod, - options: BigQueryOptions, - isBounded: PCollection.IsBounded - ): WriteMethod = (method, isBounded) match { - case (WriteMethod.DEFAULT, _) - if options.getUseStorageWriteApi && options.getUseStorageWriteApiAtLeastOnce => - WriteMethod.STORAGE_API_AT_LEAST_ONCE - case (WriteMethod.DEFAULT, _) if options.getUseStorageWriteApi => - WriteMethod.STORAGE_WRITE_API - case (WriteMethod.DEFAULT, PCollection.IsBounded.BOUNDED) => - WriteMethod.FILE_LOADS - case (WriteMethod.DEFAULT, PCollection.IsBounded.UNBOUNDED) => - WriteMethod.STREAMING_INSERTS - case _ => - method +object BigQueryIO { + implicit lazy val coderTableDestination: Coder[TableDestination] = Coder.kryo + + lazy val SuccessfulTableLoads: SideOutput[TableDestination] = SideOutput() + lazy val SuccessfulInserts: SideOutput[TableRow] = SideOutput() + lazy val SuccessfulStorageApiInserts: SideOutput[TableRow] = SideOutput() + + implicit lazy val coderBigQueryInsertError: Coder[BigQueryInsertError] = Coder.kryo + implicit lazy val coderBigQueryStorageApiInsertError: Coder[BigQueryStorageApiInsertError] = + Coder.kryo + + lazy val FailedInserts: SideOutput[TableRow] = SideOutput() + lazy val FailedInsertsWithErr: SideOutput[BigQueryInsertError] = SideOutput() + lazy val FailedStorageApiInserts: SideOutput[BigQueryStorageApiInsertError] = SideOutput() + + @inline def apply[T](id: String): TestIO[T] = + new TestIO[T] { + final override val tapT: TapT.Aux[T, T] = TapOf[T] + override def testId: String = s"BigQueryIO($id)" + } + + def apply[T <: HasAnnotation: TypeTag: Coder]: BigQueryIO[T] = { + val bqt = BigQueryType[T] + val source = if (bqt.isQuery) { + Query(bqt.queryRaw.get) + } else if (bqt.isStorage) { + val selectedFields = bqt.selectedFields + val rowRestriction = bqt.rowRestriction + if (selectedFields.isEmpty && rowRestriction.isEmpty) { + Table(bqt.table.get) + } else { + val filter = Table.Filter(selectedFields.getOrElse(Nil), rowRestriction) + Table(bqt.table.get, filter) + } + } else { + Table(bqt.table.get) + } + BigQueryIO(source) + } + + /** Defines the format in which BigQuery can be read and written to. */ + sealed trait Format[T] extends Serializable { + type BqType + + def dataFormat: DataFormat + + protected def fromSchemaAndRecord(input: SchemaAndRecord): BqType + + def from(x: BqType): T + def to(x: T): BqType + + private[bigquery] def parseFn(input: SchemaAndRecord): T = + from(fromSchemaAndRecord(input)) + } + + object Format { + + class Default[T](_from: TableRow => T, _to: T => TableRow) extends Format[T] { + override type BqType = TableRow + + override def dataFormat: DataFormat = DataFormat.AVRO + + override def fromSchemaAndRecord(input: SchemaAndRecord): TableRow = + BigQueryAvroUtilsWrapper.convertGenericRecordToTableRow( + input.getRecord, + input.getTableSchema + ) + + override def from(x: TableRow): T = _from(x) + override def to(x: T): TableRow = _to(x) + } + + object Default { + def apply(): Default[TableRow] = new Default(identity, identity) + def apply[T](from: TableRow => T, to: T => TableRow): Default[T] = new Default(from, to) + def apply[T](bqt: BigQueryType[T]): Default[T] = new Default(bqt.fromTableRow, bqt.toTableRow) + } + + class Avro[T](_from: GenericRecord => T, _to: T => GenericRecord) extends Format[T] { + override type BqType = GenericRecord + override def dataFormat: DataFormat = DataFormat.AVRO + + override def fromSchemaAndRecord(input: SchemaAndRecord): GenericRecord = input.getRecord + override def from(x: GenericRecord): T = _from(x) + override def to(x: T): GenericRecord = _to(x) + + private[bigquery] def formatFunction(x: AvroWriteRequest[T]): GenericRecord = to(x.getElement) + private[bigquery] def avroSchemaFactory(tableSchema: TableSchema): Schema = + BigQueryAvroUtilsWrapper.toGenericAvroSchema("root", tableSchema.getFields) + } + + object Avro { + def apply(): Avro[GenericRecord] = new Avro(identity, identity) + def apply[T](from: GenericRecord => T, to: T => GenericRecord): Avro[T] = new Avro(from, to) + def apply[T](bqt: BigQueryType[T]): Avro[T] = new Avro(bqt.fromAvro, bqt.toAvro) + } + } + + object ReadParam { + type ConfigOverride[T] = beam.BigQueryIO.TypedRead[T] => beam.BigQueryIO.TypedRead[T] + + val DefaultFlattenResults: Boolean = false + val DefaultErrorHandler: ErrorHandler[BadRecord, _] = null + val DefaultConfigOverride: Null = null + + private[scio] def apply[T](params: WriteParam[T]): ReadParam[T] = { + val format = params.format + // select read method matching with write method + val method = params.method match { + case WriteMethod.DEFAULT | WriteMethod.STREAMING_INSERTS => ReadMethod.DEFAULT + case WriteMethod.FILE_LOADS => ReadMethod.EXPORT + case WriteMethod.STORAGE_WRITE_API | WriteMethod.STORAGE_API_AT_LEAST_ONCE => + ReadMethod.DIRECT_READ + } + // after write, we'll always read the whole table + TableReadParam(format, method) + } + } + + sealed trait ReadParam[T] { + def format: Format[T] + def method: ReadMethod + + def errorHandler: ErrorHandler[BadRecord, _] = ReadParam.DefaultErrorHandler + def configOverride: ReadParam.ConfigOverride[T] = ReadParam.DefaultConfigOverride + } + + final case class QueryReadParam[T]( + override val format: Format[T], + override val method: ReadMethod, + flattenResults: Boolean = ReadParam.DefaultFlattenResults, + override val errorHandler: ErrorHandler[BadRecord, _] = ReadParam.DefaultErrorHandler, + override val configOverride: ReadParam.ConfigOverride[T] = ReadParam.DefaultConfigOverride + ) extends ReadParam[T] + + final case class TableReadParam[T]( + override val format: Format[T], + override val method: ReadMethod, + override val errorHandler: ErrorHandler[BadRecord, _] = ReadParam.DefaultErrorHandler, + override val configOverride: ReadParam.ConfigOverride[T] = ReadParam.DefaultConfigOverride + ) extends ReadParam[T] + + object WriteParam { + type ConfigOverride[T] = beam.BigQueryIO.Write[T] => beam.BigQueryIO.Write[T] + + val DefaultMethod: WriteMethod = WriteMethod.DEFAULT + val DefaultSchema: TableSchema = null + val DefaultWriteDisposition: WriteDisposition = null + val DefaultCreateDisposition: CreateDisposition = null + val DefaultTableDescription: String = null + val DefaultTimePartitioning: TimePartitioning = null + val DefaultClustering: Clustering = null + val DefaultTriggeringFrequency: Duration = null + val DefaultSharding: Sharding = null + val DefaultFailedInsertRetryPolicy: InsertRetryPolicy = null + val DefaultSuccessfulInsertsPropagation: Boolean = false + val DefaultExtendedErrorInfo: Boolean = false + val DefaultErrorHandler: ErrorHandler[BadRecord, _] = null + val DefaultConfigOverride: Null = null + } + + case class WriteParam[T] private ( + format: Format[T], + method: WriteMethod = WriteParam.DefaultMethod, + schema: TableSchema = WriteParam.DefaultSchema, + writeDisposition: WriteDisposition = WriteParam.DefaultWriteDisposition, + createDisposition: CreateDisposition = WriteParam.DefaultCreateDisposition, + tableDescription: String = WriteParam.DefaultTableDescription, + timePartitioning: TimePartitioning = WriteParam.DefaultTimePartitioning, + clustering: Clustering = WriteParam.DefaultClustering, + triggeringFrequency: Duration = WriteParam.DefaultTriggeringFrequency, + sharding: Sharding = WriteParam.DefaultSharding, + failedInsertRetryPolicy: InsertRetryPolicy = WriteParam.DefaultFailedInsertRetryPolicy, + successfulInsertsPropagation: Boolean = WriteParam.DefaultSuccessfulInsertsPropagation, + extendedErrorInfo: Boolean = WriteParam.DefaultExtendedErrorInfo, + errorHandler: ErrorHandler[BadRecord, _] = WriteParam.DefaultErrorHandler, + configOverride: WriteParam.ConfigOverride[T] = WriteParam.DefaultConfigOverride + ) + + private[bigquery] def withSource[T]( + r: beam.BigQueryIO.TypedRead[T] + )(source: Source): beam.BigQueryIO.TypedRead[T] = + source match { + case q: Query => + r.fromQuery(q.underlying) + .pipe { r => + // TODO dryRun ? + q.underlying.trim.split("\n")(0).trim.toLowerCase match { + case "#legacysql" => r + case "#standardsql" => r.usingStandardSql() + case _ => r.usingStandardSql() + } + } + case t: Table => + val selectedFields = t.filter.map(_.selectedFields).filter(_.nonEmpty).map(_.asJava) + val rowRestriction = t.filter.flatMap(_.rowRestriction) + r + .from(t.spec) + .pipe(r => selectedFields.fold(r)(r.withSelectedFields)) + .pipe(r => rowRestriction.fold(r)(r.withRowRestriction)) + } + + private[bigquery] def withResultFlattening[T]( + r: beam.BigQueryIO.TypedRead[T] + )(param: ReadParam[T]): beam.BigQueryIO.TypedRead[T] = { + param match { + case p: QueryReadParam[_] if !p.flattenResults => r.withoutResultFlattening() + case _ => r + } + } + + private def ensureTable(source: Source): Table = + source match { + case t: Table => t + case _: Query => throw new IllegalArgumentException("Cannot write with query") + } + + private[bigquery] def withSink[T](w: beam.BigQueryIO.Write[T])( + source: Source + ): beam.BigQueryIO.Write[T] = + w.to(ensureTable(source).spec) + + private[bigquery] def withFormatFunction[T]( + w: beam.BigQueryIO.Write[T] + )(format: Format[T]): beam.BigQueryIO.Write[T] = format match { + case f: Format.Default[T] => + w.withFormatFunction(Functions.serializableFn(f.to)) + case f: Format.Avro[T] => + w.useAvroLogicalTypes() + .withAvroFormatFunction(Functions.serializableFn(f.formatFunction)) + .withAvroSchemaFactory(Functions.serializableFn(f.avroSchemaFactory)) } - def withSharding[T](method: WriteMethod, w: beam.BigQueryIO.Write[T])( + private[bigquery] def withSharding[T](method: WriteMethod, w: beam.BigQueryIO.Write[T])( sharding: Sharding ): beam.BigQueryIO.Write[T] = { import WriteMethod._ @@ -131,7 +363,10 @@ private[bigquery] object Writes { } } - def withSuccessfulInsertsPropagation[T](method: WriteMethod, w: beam.BigQueryIO.Write[T])( + private[bigquery] def withSuccessfulInsertsPropagation[T]( + method: WriteMethod, + w: beam.BigQueryIO.Write[T] + )( successfulInsertsPropagation: Boolean ): beam.BigQueryIO.Write[T] = { import WriteMethod._ @@ -145,7 +380,7 @@ private[bigquery] object Writes { } } - def sideOutputs( + private[bigquery] def sideOutputs( data: SCollection[_], method: WriteMethod, successfulInsertsPropagation: Boolean, @@ -185,393 +420,25 @@ private[bigquery] object Writes { SideOutputCollections(tuple, sc) } - trait WriteParam[T] { - def configOverride: beam.BigQueryIO.Write[T] => beam.BigQueryIO.Write[T] - } - - trait WriteParamDefaults { - - type ConfigOverride[T] = beam.BigQueryIO.Write[T] => beam.BigQueryIO.Write[T] - - val DefaultMethod: WriteMethod = WriteMethod.DEFAULT - val DefaultSchema: TableSchema = null - val DefaultWriteDisposition: WriteDisposition = null - val DefaultCreateDisposition: CreateDisposition = null - val DefaultTableDescription: String = null - val DefaultTimePartitioning: TimePartitioning = null - val DefaultClustering: Clustering = null - val DefaultTriggeringFrequency: Duration = null - val DefaultSharding: Sharding = null - val DefaultFailedInsertRetryPolicy: InsertRetryPolicy = null - val DefaultSuccessfulInsertsPropagation: Boolean = false - val DefaultExtendedErrorInfo: Boolean = false - val DefaultConfigOverride: Null = null - } -} - -sealed trait BigQueryIO[T] extends ScioIO[T] { - final override val tapT: TapT.Aux[T, T] = TapOf[T] -} - -object BigQueryIO { - implicit lazy val coderTableDestination: Coder[TableDestination] = Coder.kryo - - lazy val SuccessfulTableLoads: SideOutput[TableDestination] = SideOutput() - lazy val SuccessfulInserts: SideOutput[TableRow] = SideOutput() - lazy val SuccessfulStorageApiInserts: SideOutput[TableRow] = SideOutput() - - implicit lazy val coderBigQueryInsertError: Coder[BigQueryInsertError] = Coder.kryo - implicit lazy val coderBigQueryStorageApiInsertError: Coder[BigQueryStorageApiInsertError] = - Coder.kryo - - lazy val FailedInserts: SideOutput[TableRow] = SideOutput() - lazy val FailedInsertsWithErr: SideOutput[BigQueryInsertError] = SideOutput() - lazy val FailedStorageApiInserts: SideOutput[BigQueryStorageApiInsertError] = SideOutput() - - private[bigquery] val storageWriteMethod = - Set(WriteMethod.STORAGE_WRITE_API, WriteMethod.STORAGE_API_AT_LEAST_ONCE) - - @inline final def apply[T](id: String): BigQueryIO[T] = - new BigQueryIO[T] with TestIO[T] { - override def testId: String = s"BigQueryIO($id)" - } - - @inline final def apply[T](source: Source): BigQueryIO[T] = - new BigQueryIO[T] with TestIO[T] { - override def testId: String = source match { - case t: Table => s"BigQueryIO(${t.spec})" - case q: Query => s"BigQueryIO(${q.underlying})" - } - } - - @inline final def apply[T]( - id: String, - selectedFields: List[String], - rowRestriction: Option[String] - ): BigQueryIO[T] = - new BigQueryIO[T] with TestIO[T] { - override def testId: String = - s"BigQueryIO($id, List(${selectedFields.mkString(",")}), $rowRestriction)" - } -} - -object BigQueryTypedSelect { - object ReadParam { - val DefaultFlattenResults: Boolean = false - } - - final case class ReadParam private (flattenResults: Boolean = ReadParam.DefaultFlattenResults) -} - -final case class BigQueryTypedSelect[T: Coder]( - reader: beam.BigQueryIO.TypedRead[T], - sqlQuery: Query, - fromTableRow: TableRow => T -) extends BigQueryIO[T] { - override type ReadP = BigQueryTypedSelect.ReadParam - override type WriteP = Nothing // ReadOnly - - override def testId: String = s"BigQueryIO(${sqlQuery.underlying})" - - override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = { - val coder = CoderMaterializer.beam(sc, Coder[T]) - val rc = reader.withCoder(coder) - Reads.bqReadQuery(sc)(rc, sqlQuery.underlying, params.flattenResults) - } - - override protected def write(data: SCollection[T], params: WriteP): Tap[T] = - throw new UnsupportedOperationException("BigQuerySelect is read-only") - - override def tap(params: ReadP): Tap[T] = { - val tableReference = BigQuery - .defaultInstance() - .query - .run(sqlQuery.underlying, flattenResults = params.flattenResults) - BigQueryTap(tableReference).map(fromTableRow) - } -} - -/** - * Get an SCollection for a BigQuery SELECT query. Both - * [[https://cloud.google.com/bigquery/docs/reference/legacy-sql Legacy SQL]] and - * [[https://cloud.google.com/bigquery/docs/reference/standard-sql/ Standard SQL]] dialects are - * supported. By default the query dialect will be automatically detected. To override this - * behavior, start the query string with `#legacysql` or `#standardsql`. - */ -final case class BigQuerySelect(sqlQuery: Query) extends BigQueryIO[TableRow] { - override type ReadP = BigQuerySelect.ReadParam - override type WriteP = Nothing // ReadOnly - - private[this] lazy val underlying = - BigQueryTypedSelect(beam.BigQueryIO.readTableRows(), sqlQuery, identity)(coders.tableRowCoder) - - override def testId: String = s"BigQueryIO(${sqlQuery.underlying})" - - override protected def read(sc: ScioContext, params: ReadP): SCollection[TableRow] = - sc.read(underlying)(params) - - override protected def write(data: SCollection[TableRow], params: WriteP): Tap[TableRow] = - throw new UnsupportedOperationException("BigQuerySelect is read-only") - - override def tap(params: ReadP): Tap[TableRow] = underlying.tap(params) -} - -object BigQuerySelect { - type ReadParam = BigQueryTypedSelect.ReadParam - val ReadParam = BigQueryTypedSelect.ReadParam - - @inline final def apply(sqlQuery: String): BigQuerySelect = new BigQuerySelect(Query(sqlQuery)) -} - -object BigQueryTypedTable { - - /** Defines the format in which BigQuery can be read and written to. */ - sealed abstract class Format[F] - object Format { - case object GenericRecord extends Format[GenericRecord] - case object TableRow extends Format[TableRow] - } - - final case class WriteParam[T] private ( + private def resolveMethod( method: WriteMethod, - schema: TableSchema, - writeDisposition: WriteDisposition, - createDisposition: CreateDisposition, - tableDescription: String, - timePartitioning: TimePartitioning, - clustering: Clustering, - triggeringFrequency: Duration, - sharding: Sharding, - failedInsertRetryPolicy: InsertRetryPolicy, - successfulInsertsPropagation: Boolean, - extendedErrorInfo: Boolean, - configOverride: WriteParam.ConfigOverride[T] - ) extends Writes.WriteParam[T] - - object WriteParam extends Writes.WriteParamDefaults { - @inline final def apply[T]( - method: WriteMethod = DefaultMethod, - schema: TableSchema = DefaultSchema, - writeDisposition: WriteDisposition = DefaultWriteDisposition, - createDisposition: CreateDisposition = DefaultCreateDisposition, - tableDescription: String = DefaultTableDescription, - timePartitioning: TimePartitioning = DefaultTimePartitioning, - clustering: Clustering = DefaultClustering, - triggeringFrequency: Duration = DefaultTriggeringFrequency, - sharding: Sharding = DefaultSharding, - failedInsertRetryPolicy: InsertRetryPolicy = DefaultFailedInsertRetryPolicy, - successfulInsertsPropagation: Boolean = DefaultSuccessfulInsertsPropagation, - extendedErrorInfo: Boolean = DefaultExtendedErrorInfo, - configOverride: ConfigOverride[T] = DefaultConfigOverride - ): WriteParam[T] = new WriteParam( - method, - schema, - writeDisposition, - createDisposition, - tableDescription, - timePartitioning, - clustering, - triggeringFrequency, - sharding, - failedInsertRetryPolicy, - successfulInsertsPropagation, - extendedErrorInfo, - configOverride - ) - } - - private[this] def tableRow(table: Table): BigQueryTypedTable[TableRow] = - BigQueryTypedTable( - beam.BigQueryIO.readTableRows(), - beam.BigQueryIO.writeTableRows(), - table, - BigQueryUtils.convertGenericRecordToTableRow(_, _) - )(coders.tableRowCoder) - - private[this] def genericRecord( - table: Table - )(implicit c: Coder[GenericRecord]): BigQueryTypedTable[GenericRecord] = - BigQueryTypedTable( - _.getRecord(), - identity[GenericRecord], - (genericRecord: GenericRecord, _: TableSchema) => genericRecord, - table - ) - - /** - * Creates a new instance of [[BigQueryTypedTable]] based on the supplied [[Format]]. - * - * NOTE: LogicalType support when using `Format.GenericRecord` has some caveats: Reading: Bigquery - * types DATE, TIME, DATIME will be read as STRING. Writing: Supports LogicalTypes only for DATE - * and TIME. DATETIME is not yet supported. https://issuetracker.google.com/issues/140681683 - */ - def apply[F: Coder](table: Table, format: Format[F]): BigQueryTypedTable[F] = - format match { - case Format.GenericRecord => genericRecord(table) - case Format.TableRow => tableRow(table) - } - - def apply[T: Coder]( - readerFn: SchemaAndRecord => T, - writerFn: T => TableRow, - tableRowFn: TableRow => T, - table: Table - ): BigQueryTypedTable[T] = { - val rFn = ClosureCleaner.clean(readerFn) - val wFn = ClosureCleaner.clean(writerFn) - val reader = beam.BigQueryIO.read(Functions.serializableFn(rFn)) - val writer = beam.BigQueryIO - .write[T]() - .withFormatFunction(Functions.serializableFn(wFn)) - val fn: (GenericRecord, TableSchema) => T = (gr, ts) => - tableRowFn(BigQueryUtils.convertGenericRecordToTableRow(gr, ts)) - - BigQueryTypedTable(reader, writer, table, fn) - } - - def apply[T: Coder]( - readerFn: SchemaAndRecord => T, - writerFn: T => GenericRecord, - fn: (GenericRecord, TableSchema) => T, - table: Table - ): BigQueryTypedTable[T] = { - val rFn = ClosureCleaner.clean(readerFn) - val wFn = ClosureCleaner.clean(writerFn) - val reader = beam.BigQueryIO.read(rFn(_)) - val writer = beam.BigQueryIO - .write[T]() - .useAvroLogicalTypes() - .withAvroFormatFunction(input => wFn(input.getElement())) - .withAvroSchemaFactory { ts => - BigQueryAvroUtilsWrapper.toGenericAvroSchema("root", ts.getFields()) - } - - BigQueryTypedTable(reader, writer, table, fn) - } -} - -final case class BigQueryTypedTable[T: Coder]( - reader: beam.BigQueryIO.TypedRead[T], - writer: beam.BigQueryIO.Write[T], - table: Table, - fn: (GenericRecord, TableSchema) => T -) extends BigQueryIO[T] - with WriteResultIO[T] { - override type ReadP = Unit - override type WriteP = BigQueryTypedTable.WriteParam[T] - - override def testId: String = s"BigQueryIO(${table.spec})" - - override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = { - val coder = CoderMaterializer.beam(sc, Coder[T]) - val io = reader.from(table.ref).withCoder(coder) - sc.applyTransform(s"Read BQ table ${table.spec}", io) - } - - override protected def writeWithResult( - data: SCollection[T], - params: WriteP - ): (Tap[T], SideOutputCollections) = { - val method = Writes.resolveMethod( - params.method, - data.context.optionsAs[BigQueryOptions], - data.internal.isBounded - ) - - val transform = writer - .to(table.ref) - .withMethod(params.method) - .pipe(w => Option(params.schema).fold(w)(w.withSchema)) - .pipe(w => Option(params.createDisposition).fold(w)(w.withCreateDisposition)) - .pipe(w => Option(params.writeDisposition).fold(w)(w.withWriteDisposition)) - .pipe(w => Option(params.tableDescription).fold(w)(w.withTableDescription)) - .pipe(w => Option(params.timePartitioning).map(_.asJava).fold(w)(w.withTimePartitioning)) - .pipe(w => Option(params.clustering).map(_.asJava).fold(w)(w.withClustering)) - .pipe(w => Option(params.triggeringFrequency).fold(w)(w.withTriggeringFrequency)) - .pipe(w => Option(params.sharding).fold(w)(Writes.withSharding(method, w))) - .pipe(w => - Writes.withSuccessfulInsertsPropagation(method, w)(params.successfulInsertsPropagation) - ) - .pipe(w => if (params.extendedErrorInfo) w.withExtendedErrorInfo() else w) - .pipe(w => Option(params.failedInsertRetryPolicy).fold(w)(w.withFailedInsertRetryPolicy)) - .pipe(w => Option(params.configOverride).fold(w)(_(w))) - - val wr = data.applyInternal(transform) - val outputs = Writes.sideOutputs( - data, - method, - params.successfulInsertsPropagation, - params.extendedErrorInfo, - wr - ) - - (tap(()), outputs) - } - - override def tap(read: ReadP): Tap[T] = BigQueryTypedTap(table, fn) -} - -/** Get an IO for a BigQuery table using the storage API. */ -final case class BigQueryStorage( - table: Table, - selectedFields: List[String], - rowRestriction: Option[String] -) extends BigQueryIO[TableRow] { - override type ReadP = Unit - override type WriteP = Nothing // ReadOnly - - override def testId: String = - s"BigQueryIO(${table.spec}, List(${selectedFields.mkString(",")}), $rowRestriction)" - - override protected def read(sc: ScioContext, params: ReadP): SCollection[TableRow] = { - val coder = CoderMaterializer.beam(sc, coders.tableRowCoder) - val read = beam.BigQueryIO.readTableRows().withCoder(coder) - Reads.bqReadStorage(sc)( - read, - table, - selectedFields, - rowRestriction - ) - } - - override protected def write(data: SCollection[TableRow], params: WriteP): Tap[TableRow] = - throw new UnsupportedOperationException("BigQueryStorage is read-only") - - override def tap(read: ReadP): Tap[TableRow] = { - val readOptions = StorageUtil.tableReadOptions(selectedFields, rowRestriction) - BigQueryStorageTap(table, readOptions) - } -} - -object BigQueryStorage { - object ReadParam { - val DefaultSelectFields: List[String] = Nil - val DefaultRowRestriction: Option[String] = None + options: BigQueryOptions, + isBounded: PCollection.IsBounded + ): WriteMethod = (method, isBounded) match { + case (WriteMethod.DEFAULT, _) + if options.getUseStorageWriteApi && options.getUseStorageWriteApiAtLeastOnce => + WriteMethod.STORAGE_API_AT_LEAST_ONCE + case (WriteMethod.DEFAULT, _) if options.getUseStorageWriteApi => + WriteMethod.STORAGE_WRITE_API + case (WriteMethod.DEFAULT, PCollection.IsBounded.BOUNDED) => + WriteMethod.FILE_LOADS + case (WriteMethod.DEFAULT, PCollection.IsBounded.UNBOUNDED) => + WriteMethod.STREAMING_INSERTS + case _ => + method } } -final case class BigQueryStorageSelect(sqlQuery: Query) extends BigQueryIO[TableRow] { - override type ReadP = Unit - override type WriteP = Nothing // ReadOnly - - private[this] lazy val underlying = - BigQueryTypedSelect( - beam.BigQueryIO.readTableRows().withMethod(ReadMethod.DIRECT_READ), - sqlQuery, - identity - )(coders.tableRowCoder) - - override def testId: String = s"BigQueryIO(${sqlQuery.underlying})" - - override protected def read(sc: ScioContext, params: ReadP): SCollection[TableRow] = - sc.read(underlying)(BigQueryTypedSelect.ReadParam()) - - override protected def write(data: SCollection[TableRow], params: WriteP): Tap[TableRow] = - throw new UnsupportedOperationException("BigQuerySelect is read-only") - - override def tap(params: ReadP): Tap[TableRow] = underlying.tap(BigQueryTypedSelect.ReadParam()) -} - /** Get an IO for a BigQuery TableRow JSON file. */ final case class TableRowJsonIO(path: String) extends ScioIO[TableRow] { override type ReadP = TableRowJsonIO.ReadParam @@ -630,289 +497,4 @@ object TableRowJsonIO { ) } } - -} - -object BigQueryTyped { - import com.spotify.scio.bigquery.{Table => STable} - - @annotation.implicitNotFound( - """ - Can't find annotation for type ${T}. - Make sure this class is annotated with BigQueryType.fromStorage, BigQueryType.fromTable or - BigQueryType.fromQuery. - Alternatively, use BigQueryTyped.Storage(""), BigQueryTyped.Table("
"), or - BigQueryTyped.Query("") to get a ScioIO instance. - """ - ) - sealed trait IO[T <: HasAnnotation] { - type F[_ <: HasAnnotation] <: ScioIO[_] - def impl: F[T] - } - - object IO { - type Aux[T <: HasAnnotation, F0[_ <: HasAnnotation] <: ScioIO[_]] = - IO[T] { type F[A <: HasAnnotation] = F0[A] } - - implicit def tableIO[T <: HasAnnotation: TypeTag: Coder](implicit - t: BigQueryType.Table[T] - ): Aux[T, Table] = - new IO[T] { - type F[A <: HasAnnotation] = Table[A] - def impl: Table[T] = Table(STable.Spec(t.table)) - } - - implicit def queryIO[T <: HasAnnotation: TypeTag: Coder](implicit - t: BigQueryType.Query[T] - ): Aux[T, Select] = - new IO[T] { - type F[A <: HasAnnotation] = Select[A] - def impl: Select[T] = Select(Query(t.queryRaw)) - } - - implicit def storageIO[T <: HasAnnotation: TypeTag: Coder](implicit - t: BigQueryType.StorageOptions[T] - ): Aux[T, Storage] = - new IO[T] { - type F[A <: HasAnnotation] = Storage[A] - def impl: Storage[T] = Storage(STable.Spec(t.table), Nil, None) - } - } - - /** - * Get a typed SCollection for a BigQuery table or a SELECT query. - * - * Note that `T` must be annotated with - * [[com.spotify.scio.bigquery.types.BigQueryType.fromTable BigQueryType.fromStorage]], - * [[com.spotify.scio.bigquery.types.BigQueryType.fromTable BigQueryType.fromTable]], or - * [[com.spotify.scio.bigquery.types.BigQueryType.fromQuery BigQueryType.fromQuery]] - * - * The source (table) specified in the annotation will be used - */ - @inline final def apply[T <: HasAnnotation](implicit t: IO[T]): t.F[T] = - t.impl - - /** - * Get a typed SCollection for a BigQuery SELECT query. - * - * Both [[https://cloud.google.com/bigquery/docs/reference/legacy-sql Legacy SQL]] and - * [[https://cloud.google.com/bigquery/docs/reference/standard-sql/ Standard SQL]] dialects are - * supported. By default the query dialect will be automatically detected. To override this - * behavior, start the query string with `#legacysql` or `#standardsql`. - */ - final case class Select[T <: HasAnnotation: TypeTag: Coder](query: Query) extends BigQueryIO[T] { - override type ReadP = Unit - override type WriteP = Nothing // ReadOnly - - private[this] lazy val underlying = { - val fromAvro = BigQueryType[T].fromAvro - val fromTableRow = BigQueryType[T].fromTableRow - val reader = beam.BigQueryIO - .read(new SerializableFunction[SchemaAndRecord, T] { - override def apply(input: SchemaAndRecord): T = fromAvro(input.getRecord) - }) - BigQueryTypedSelect(reader, query, fromTableRow) - } - - override def testId: String = s"BigQueryIO(${query.underlying})" - - override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = - sc.read(underlying)(BigQueryTypedSelect.ReadParam()) - - override protected def write(data: SCollection[T], params: WriteP): Tap[T] = - throw new UnsupportedOperationException("Select queries are read-only") - - override def tap(params: ReadP): Tap[T] = underlying.tap(BigQueryTypedSelect.ReadParam()) - } - - object Select { - @inline final def apply[T <: HasAnnotation: TypeTag: Coder]( - query: String - ): Select[T] = new Select[T](Query(query)) - } - - /** Get a typed SCollection for a BigQuery table. */ - final case class Table[T <: HasAnnotation: TypeTag: Coder](table: STable) - extends BigQueryIO[T] - with WriteResultIO[T] { - override type ReadP = Unit - override type WriteP = Table.WriteParam[T] - - private val underlying = BigQueryTypedTable[T]( - (i: SchemaAndRecord) => BigQueryType[T].fromAvro(i.getRecord), - BigQueryType[T].toTableRow, - BigQueryType[T].fromTableRow, - table - ) - - override def testId: String = s"BigQueryIO(${table.spec})" - - override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = - sc.read(underlying) - - override protected def writeWithResult( - data: SCollection[T], - params: WriteP - ): (Tap[T], SideOutputCollections) = { - val outputs = data - .withName(s"${data.tfName}$$Write") - .write(underlying)(params) - .outputs - .get - - (tap(()), outputs) - } - - override def tap(read: ReadP): Tap[T] = - BigQueryTypedTap[T](table, underlying.fn) - } - - object Table { - final case class WriteParam[T] private ( - method: WriteMethod, - writeDisposition: WriteDisposition, - createDisposition: CreateDisposition, - timePartitioning: TimePartitioning, - clustering: Clustering, - triggeringFrequency: Duration, - sharding: Sharding, - failedInsertRetryPolicy: InsertRetryPolicy, - successfulInsertsPropagation: Boolean, - extendedErrorInfo: Boolean, - configOverride: WriteParam.ConfigOverride[T] - ) extends Writes.WriteParam[T] - - object WriteParam extends Writes.WriteParamDefaults { - - @inline final def apply[T]( - method: WriteMethod = DefaultMethod, - writeDisposition: WriteDisposition = DefaultWriteDisposition, - createDisposition: CreateDisposition = DefaultCreateDisposition, - timePartitioning: TimePartitioning = DefaultTimePartitioning, - clustering: Clustering = DefaultClustering, - triggeringFrequency: Duration = DefaultTriggeringFrequency, - sharding: Sharding = DefaultSharding, - failedInsertRetryPolicy: InsertRetryPolicy = DefaultFailedInsertRetryPolicy, - successfulInsertsPropagation: Boolean = DefaultSuccessfulInsertsPropagation, - extendedErrorInfo: Boolean = DefaultExtendedErrorInfo, - configOverride: ConfigOverride[T] = DefaultConfigOverride - ): WriteParam[T] = new WriteParam( - method, - writeDisposition, - createDisposition, - timePartitioning, - clustering, - triggeringFrequency, - sharding, - failedInsertRetryPolicy, - successfulInsertsPropagation, - extendedErrorInfo, - configOverride - ) - - implicit private[Table] def typedTableWriteParam[T: TypeTag, Info]( - params: Table.WriteParam[T] - ): BigQueryTypedTable.WriteParam[T] = - BigQueryTypedTable.WriteParam( - params.method, - BigQueryType[T].schema, - params.writeDisposition, - params.createDisposition, - BigQueryType[T].tableDescription.orNull, - params.timePartitioning, - params.clustering, - params.triggeringFrequency, - params.sharding, - params.failedInsertRetryPolicy, - params.successfulInsertsPropagation, - params.extendedErrorInfo, - params.configOverride - ) - } - - } - - /** Get a typed SCollection for a BigQuery table using the storage API. */ - final case class Storage[T <: HasAnnotation: TypeTag: Coder]( - table: STable, - selectedFields: List[String], - rowRestriction: Option[String] - ) extends BigQueryIO[T] { - override type ReadP = Unit - override type WriteP = Nothing // ReadOnly - - override def testId: String = - s"BigQueryIO(${table.spec}, List(${selectedFields.mkString(",")}), $rowRestriction)" - - override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = { - val coder = CoderMaterializer.beam(sc, Coder[T]) - val fromAvro = BigQueryType[T].fromAvro - val reader = beam.BigQueryIO - .read(new SerializableFunction[SchemaAndRecord, T] { - override def apply(input: SchemaAndRecord): T = fromAvro(input.getRecord) - }) - .withCoder(coder) - Reads.bqReadStorage(sc)(reader, table, selectedFields, rowRestriction) - } - - override protected def write(data: SCollection[T], params: WriteP): Tap[T] = - throw new UnsupportedOperationException("Storage API is read-only") - - override def tap(read: ReadP): Tap[T] = { - val fn = BigQueryType[T].fromTableRow - val readOptions = StorageUtil.tableReadOptions(selectedFields, rowRestriction) - BigQueryStorageTap(table, readOptions).map(fn) - } - } - - final case class StorageQuery[T <: HasAnnotation: TypeTag: Coder](sqlQuery: Query) - extends BigQueryIO[T] { - override type ReadP = Unit - override type WriteP = Nothing // ReadOnly - - private[this] lazy val underlying = { - val fromAvro = BigQueryType[T].fromAvro - val fromTableRow = BigQueryType[T].fromTableRow - val reader = beam.BigQueryIO - .read(new SerializableFunction[SchemaAndRecord, T] { - override def apply(input: SchemaAndRecord): T = fromAvro(input.getRecord) - }) - .withMethod(ReadMethod.DIRECT_READ) - BigQueryTypedSelect(reader, sqlQuery, fromTableRow) - } - - override def testId: String = s"BigQueryIO($sqlQuery)" - - override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = - sc.read(underlying)(BigQueryTypedSelect.ReadParam()) - - override protected def write(data: SCollection[T], params: WriteP): Tap[T] = - throw new UnsupportedOperationException("Storage API is read-only") - - override def tap(read: ReadP): Tap[T] = underlying.tap(BigQueryTypedSelect.ReadParam()) - } - - private[scio] def dynamic[T <: HasAnnotation: TypeTag: Coder]( - newSource: Option[Source] - ): ScioIO.ReadOnly[T, Unit] = { - val bqt = BigQueryType[T] - newSource match { - // newSource is missing, T's companion object must have either table or query - // The case where newSource is null is only there - // for legacy support and should not exists once - // BigQueryScioContext.typedBigQuery is removed - case None if bqt.isTable => - val table = STable.Spec(bqt.table.get) - ScioIO.ro[T](Table[T](table)) - case None if bqt.isQuery => - val query = Query(bqt.queryRaw.get) - Select[T](query) - case Some(s: STable) => - ScioIO.ro(Table[T](s)) - case Some(s: Query) => - Select[T](s) - case _ => - throw new IllegalArgumentException(s"Missing table or query field in companion object") - } - } } diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala index 47818e3bba..ba47e5ebd5 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala @@ -34,10 +34,15 @@ import java.math.MathContext import java.nio.ByteBuffer import scala.jdk.CollectionConverters._ -sealed trait Source +sealed trait Source { + protected type Impl <: Source + def latest(bq: BigQuery): Impl + def latest(): Impl = latest(BigQuery.defaultInstance()) +} /** A wrapper type [[Query]] which wraps a SQL String. */ final case class Query(underlying: String) extends Source { + override protected type Impl = Query /** * A helper method to replace the "$LATEST" placeholder in query to the latest common partition. @@ -61,64 +66,105 @@ final case class Query(underlying: String) extends Source { * @return * [[Query]] with "$LATEST" replaced */ - def latest(bq: BigQuery): Query = - Query(BigQueryPartitionUtil.latestQuery(bq, underlying)) + override def latest(bq: BigQuery): Query = + copy(BigQueryPartitionUtil.latestQuery(bq, underlying)) - def latest(): Query = latest(BigQuery.defaultInstance()) + override def toString: String = underlying } /** - * [[Table]] abstracts the multiple ways of referencing Bigquery tables. Tables can be referenced by - * a table spec `String` or by a table reference [[GTableReference]]. - * - * Example: - * {{{ - * val table = Table.Spec("bigquery-public-data:samples.shakespeare") - * sc.bigQueryTable(table) - * .filter(r => "hamlet".equals(r.getString("corpus")) && "Polonius".equals(r.getString("word"))) - * .saveAsTextFile("./output.txt") - * sc.run() - * }}} + * Bigquery [[Table]]. Tables can be referenced by a table spec `String` or by a table reference + * [[GTableReference]]. * - * Or create a [[Table]] from a [[GTableReference]]: + * Example: Create a [[Table]] from a [[GTableReference]]: * {{{ * val tableReference = new TableReference * tableReference.setProjectId("bigquery-public-data") * tableReference.setDatasetId("samples") * tableReference.setTableId("shakespeare") - * val table = Table.Ref(tableReference) + * val table = Table(tableReference) + * }}} + * or with a spec string: + * {{{ + * val table = Table("bigquery-public-data:samples.shakespeare") * }}} * * A helper method is provided to replace the "$LATEST" placeholder in the table name to the latest * common partition. * {{{ - * val table = Table.Spec("some_project:some_data.some_table_$LATEST").latest() + * val table = Table("some_project:some_data.some_table_$LATEST").latest() * }}} */ -sealed trait Table extends Source { - def spec: String - - def ref: GTableReference - - def latest(bg: BigQuery): Table +case class Table private (ref: GTableReference, filter: Option[Table.Filter]) extends Source { + override protected type Impl = Table + lazy val spec: String = BigQueryHelpers.toTableSpec(ref) + def latest(bq: BigQuery): Table = { + val latestSpec = BigQueryPartitionUtil.latestTable(bq, spec) + val latestRef = BigQueryHelpers.parseTableSpec(latestSpec) + copy(latestRef) + } - def latest(): Table + override def toString: String = filter match { + case None => spec + case Some(Table.Filter(selectedFields, rowRestriction)) => + val sb = new StringBuilder("SELECT ") + selectedFields match { + case Nil => sb.append("*") + case _ => sb.append(selectedFields.mkString(",")) + } + sb.append(" FROM `") + .append(ref.getProjectId) + .append(".") + .append(ref.getDatasetId) + .append(".") + .append(ref.getTableId) + .append("`") + rowRestriction.foreach(r => sb.append(" WHERE ").append(r)) + sb.result() + } } object Table { - final case class Ref(ref: GTableReference) extends Table { - override lazy val spec: String = BigQueryHelpers.toTableSpec(ref) - def latest(bq: BigQuery): Ref = - Ref(Spec(spec).latest(bq).ref) - def latest(): Ref = latest(BigQuery.defaultInstance()) + final case class Filter( + selectedFields: List[String], + rowRestriction: Option[String] + ) - } - final case class Spec(spec: String) extends Table { - override val ref: GTableReference = BigQueryHelpers.parseTableSpec(spec) - def latest(bq: BigQuery): Spec = - Spec(BigQueryPartitionUtil.latestTable(bq, spec)) - def latest(): Spec = latest(BigQuery.defaultInstance()) - } + def apply(ref: GTableReference): Table = + new Table(ref, None) + + def apply(ref: GTableReference, selectedFields: List[String]): Table = + new Table(ref, Some(Filter(selectedFields, None))) + + def apply(ref: GTableReference, rowRestriction: String): Table = + new Table(ref, Some(Filter(List.empty, Some(rowRestriction)))) + + def apply(ref: GTableReference, selectedFields: List[String], rowRestriction: String): Table = + new Table(ref, Some(Filter(selectedFields, Some(rowRestriction)))) + + def apply(ref: GTableReference, filter: Table.Filter): Table = + new Table(ref, Some(filter)) + + def apply(spec: String): Table = + new Table(BigQueryHelpers.parseTableSpec(spec), None) + + def apply(spec: String, selectedFields: List[String]): Table = + new Table(BigQueryHelpers.parseTableSpec(spec), Some(Filter(selectedFields, None))) + + def apply(spec: String, rowRestriction: String): Table = + new Table(BigQueryHelpers.parseTableSpec(spec), Some(Filter(List.empty, Some(rowRestriction)))) + + def apply(spec: String, selectedFields: List[String], rowRestriction: String): Table = + new Table( + BigQueryHelpers.parseTableSpec(spec), + Some(Filter(selectedFields, Some(rowRestriction))) + ) + + def apply(spec: String, filter: Table.Filter): Table = + new Table(BigQueryHelpers.parseTableSpec(spec), Some(filter)) + + def apply(spec: String, filter: Option[Table.Filter]): Table = + new Table(BigQueryHelpers.parseTableSpec(spec), filter) } /** diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/MockBigQuery.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/MockBigQuery.scala index d45ce08afb..1c7e6f3b79 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/MockBigQuery.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/MockBigQuery.scala @@ -181,7 +181,7 @@ class MockTable( */ def withSample(numRows: Int): Unit = { ensureUnique() - val rows = bq.tables.rows(Table.Ref(original)).take(numRows).toList + val rows = bq.tables.rows(Table(original)).take(numRows).toList require(rows.length == numRows, s"Sample size ${rows.length} != requested $numRows") writeRows(rows) () @@ -193,7 +193,7 @@ class MockTable( */ def withSample(minNumRows: Int, maxNumRows: Int): Unit = { ensureUnique() - val rows = bq.tables.rows(Table.Ref(original)).take(maxNumRows).toList + val rows = bq.tables.rows(Table(original)).take(maxNumRows).toList require( rows.length >= minNumRows && rows.length <= maxNumRows, s"Sample size ${rows.length} < requested minimal $minNumRows" diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/BigQuery.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/BigQuery.scala index bebbadfba6..d9cab5a134 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/BigQuery.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/BigQuery.scala @@ -89,7 +89,7 @@ final class BigQuery private (val client: Client) { val rows = if (newSource == null) { // newSource is missing, T's companion object must have either table or query if (bqt.isTable) { - tables.rows(STable.Spec(bqt.table.get)) + tables.rows(STable(bqt.table.get)) } else if (bqt.isQuery) { query.rows(bqt.queryRaw.get) } else { @@ -98,7 +98,7 @@ final class BigQuery private (val client: Client) { } else { // newSource can be either table or query Try(BigQueryHelpers.parseTableSpec(newSource)).toOption - .map(STable.Ref) + .map(STable.apply) .map(tables.rows) .getOrElse(query.rows(newSource)) } diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/QueryOps.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/QueryOps.scala index ab8395f320..34ee176c4c 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/QueryOps.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/QueryOps.scala @@ -111,7 +111,7 @@ final private[client] class QueryOps(client: Client, tableService: TableOps, job newQueryJob(config).map { job => jobService.waitForJobs(job) - tableService.rows(STable.Ref(job.table)) + tableService.rows(STable(job.table)) }.get } diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/dynamic/syntax/SCollectionSyntax.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/dynamic/syntax/SCollectionSyntax.scala index b17542f267..65eee8b4d3 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/dynamic/syntax/SCollectionSyntax.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/dynamic/syntax/SCollectionSyntax.scala @@ -21,7 +21,7 @@ import com.google.api.services.bigquery.model.TableSchema import com.spotify.scio.bigquery.dynamic._ import com.spotify.scio.bigquery.types.BigQueryType import com.spotify.scio.bigquery.types.BigQueryType.HasAnnotation -import com.spotify.scio.bigquery.{TableRow, Writes} +import com.spotify.scio.bigquery.{BigQueryIO, TableRow} import com.spotify.scio.io.{ClosedTap, EmptyTap} import com.spotify.scio.util.Functions import com.spotify.scio.values.SCollection @@ -68,12 +68,14 @@ final class DynamicBigQueryOps[T](private val self: SCollection[T]) extends AnyV .withFormatFunction(Functions.serializableFn(formatFn)) .pipe(w => Option(createDisposition).fold(w)(w.withCreateDisposition)) .pipe(w => Option(writeDisposition).fold(w)(w.withWriteDisposition)) - .pipe(w => Writes.withSuccessfulInsertsPropagation(method, w)(successfulInsertsPropagation)) + .pipe(w => + BigQueryIO.withSuccessfulInsertsPropagation(method, w)(successfulInsertsPropagation) + ) .pipe(w => if (extendedErrorInfo) w.withExtendedErrorInfo() else w) val wr = self.applyInternal(t) val outputs = - Writes.sideOutputs(self, method, successfulInsertsPropagation, extendedErrorInfo, wr) + BigQueryIO.sideOutputs(self, method, successfulInsertsPropagation, extendedErrorInfo, wr) ClosedTap[Nothing](EmptyTap, Some(outputs)) } } diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/SCollectionSyntax.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/SCollectionSyntax.scala index ff2a6ee079..e7962e4c8d 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/SCollectionSyntax.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/SCollectionSyntax.scala @@ -18,11 +18,8 @@ package com.spotify.scio.bigquery.syntax import com.google.api.services.bigquery.model.TableSchema -import com.spotify.scio.bigquery.BigQueryTyped.Table.{WriteParam => TableWriteParam} -import com.spotify.scio.bigquery.BigQueryTypedTable.{Format, WriteParam => TypedTableWriteParam} import com.spotify.scio.bigquery.TableRowJsonIO.{WriteParam => TableRowJsonWriteParam} import com.spotify.scio.bigquery.types.BigQueryType.HasAnnotation -import com.spotify.scio.bigquery.coders import com.spotify.scio.bigquery._ import com.spotify.scio.coders.Coder import com.spotify.scio.io._ @@ -36,6 +33,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.{ WriteDisposition } import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy +import org.apache.beam.sdk.transforms.errorhandling.{BadRecord, ErrorHandler} import org.joda.time.Duration import scala.reflect.runtime.universe._ @@ -63,23 +61,26 @@ final class SCollectionTableRowOps[T <: TableRow](private val self: SCollection[ */ def saveAsBigQueryTable( table: Table, - schema: TableSchema = TypedTableWriteParam.DefaultSchema, - writeDisposition: WriteDisposition = TypedTableWriteParam.DefaultWriteDisposition, - createDisposition: CreateDisposition = TypedTableWriteParam.DefaultCreateDisposition, - tableDescription: String = TypedTableWriteParam.DefaultTableDescription, - timePartitioning: TimePartitioning = TypedTableWriteParam.DefaultTimePartitioning, - clustering: Clustering = TypedTableWriteParam.DefaultClustering, - method: Method = TypedTableWriteParam.DefaultMethod, - triggeringFrequency: Duration = TypedTableWriteParam.DefaultTriggeringFrequency, - sharding: Sharding = TypedTableWriteParam.DefaultSharding, + schema: TableSchema = BigQueryIO.WriteParam.DefaultSchema, + writeDisposition: WriteDisposition = BigQueryIO.WriteParam.DefaultWriteDisposition, + createDisposition: CreateDisposition = BigQueryIO.WriteParam.DefaultCreateDisposition, + tableDescription: String = BigQueryIO.WriteParam.DefaultTableDescription, + timePartitioning: TimePartitioning = BigQueryIO.WriteParam.DefaultTimePartitioning, + clustering: Clustering = BigQueryIO.WriteParam.DefaultClustering, + method: Method = BigQueryIO.WriteParam.DefaultMethod, + triggeringFrequency: Duration = BigQueryIO.WriteParam.DefaultTriggeringFrequency, + sharding: Sharding = BigQueryIO.WriteParam.DefaultSharding, failedInsertRetryPolicy: InsertRetryPolicy = - TypedTableWriteParam.DefaultFailedInsertRetryPolicy, - successfulInsertsPropagation: Boolean = TableWriteParam.DefaultSuccessfulInsertsPropagation, - extendedErrorInfo: Boolean = TypedTableWriteParam.DefaultExtendedErrorInfo, - configOverride: TypedTableWriteParam.ConfigOverride[TableRow] = - TableWriteParam.DefaultConfigOverride + BigQueryIO.WriteParam.DefaultFailedInsertRetryPolicy, + successfulInsertsPropagation: Boolean = + BigQueryIO.WriteParam.DefaultSuccessfulInsertsPropagation, + extendedErrorInfo: Boolean = BigQueryIO.WriteParam.DefaultExtendedErrorInfo, + errorHandler: ErrorHandler[BadRecord, _] = BigQueryIO.WriteParam.DefaultErrorHandler, + configOverride: BigQueryIO.WriteParam.ConfigOverride[TableRow] = + BigQueryIO.WriteParam.DefaultConfigOverride ): ClosedTap[TableRow] = { - val param = TypedTableWriteParam( + val param = BigQueryIO.WriteParam[TableRow]( + BigQueryIO.Format.Default(), method, schema, writeDisposition, @@ -92,12 +93,13 @@ final class SCollectionTableRowOps[T <: TableRow](private val self: SCollection[ failedInsertRetryPolicy, successfulInsertsPropagation, extendedErrorInfo, + errorHandler, configOverride ) self .covary[TableRow] - .write(BigQueryTypedTable(table, Format.TableRow)(coders.tableRowCoder))(param) + .write(BigQueryIO(table))(param) } /** @@ -169,23 +171,26 @@ final class SCollectionGenericRecordOps[T <: GenericRecord](private val self: SC */ def saveAsBigQueryTable( table: Table, - schema: TableSchema = TypedTableWriteParam.DefaultSchema, - writeDisposition: WriteDisposition = TypedTableWriteParam.DefaultWriteDisposition, - createDisposition: CreateDisposition = TypedTableWriteParam.DefaultCreateDisposition, - tableDescription: String = TypedTableWriteParam.DefaultTableDescription, - timePartitioning: TimePartitioning = TypedTableWriteParam.DefaultTimePartitioning, - clustering: Clustering = TypedTableWriteParam.DefaultClustering, - method: Method = TypedTableWriteParam.DefaultMethod, - triggeringFrequency: Duration = TypedTableWriteParam.DefaultTriggeringFrequency, - sharding: Sharding = TypedTableWriteParam.DefaultSharding, + schema: TableSchema = BigQueryIO.WriteParam.DefaultSchema, + writeDisposition: WriteDisposition = BigQueryIO.WriteParam.DefaultWriteDisposition, + createDisposition: CreateDisposition = BigQueryIO.WriteParam.DefaultCreateDisposition, + tableDescription: String = BigQueryIO.WriteParam.DefaultTableDescription, + timePartitioning: TimePartitioning = BigQueryIO.WriteParam.DefaultTimePartitioning, + clustering: Clustering = BigQueryIO.WriteParam.DefaultClustering, + method: Method = BigQueryIO.WriteParam.DefaultMethod, + triggeringFrequency: Duration = BigQueryIO.WriteParam.DefaultTriggeringFrequency, + sharding: Sharding = BigQueryIO.WriteParam.DefaultSharding, failedInsertRetryPolicy: InsertRetryPolicy = - TypedTableWriteParam.DefaultFailedInsertRetryPolicy, - successfulInsertsPropagation: Boolean = TableWriteParam.DefaultSuccessfulInsertsPropagation, - extendedErrorInfo: Boolean = TypedTableWriteParam.DefaultExtendedErrorInfo, - configOverride: TypedTableWriteParam.ConfigOverride[GenericRecord] = - TypedTableWriteParam.DefaultConfigOverride + BigQueryIO.WriteParam.DefaultFailedInsertRetryPolicy, + successfulInsertsPropagation: Boolean = + BigQueryIO.WriteParam.DefaultSuccessfulInsertsPropagation, + extendedErrorInfo: Boolean = BigQueryIO.WriteParam.DefaultExtendedErrorInfo, + errorHandler: ErrorHandler[BadRecord, _] = BigQueryIO.WriteParam.DefaultErrorHandler, + configOverride: BigQueryIO.WriteParam.ConfigOverride[GenericRecord] = + BigQueryIO.WriteParam.DefaultConfigOverride ): ClosedTap[GenericRecord] = { - val param = TypedTableWriteParam( + val param = BigQueryIO.WriteParam[GenericRecord]( + BigQueryIO.Format.Avro(), method, schema, writeDisposition, @@ -198,15 +203,12 @@ final class SCollectionGenericRecordOps[T <: GenericRecord](private val self: SC failedInsertRetryPolicy, successfulInsertsPropagation, extendedErrorInfo, + errorHandler, configOverride ) self .covary[GenericRecord] - .write( - BigQueryTypedTable(table, Format.GenericRecord)( - self.coder.asInstanceOf[Coder[GenericRecord]] - ) - )(param) + .write(BigQueryIO[GenericRecord](table)(self.coder.asInstanceOf[Coder[GenericRecord]]))(param) } } @@ -227,7 +229,7 @@ final class SCollectionTypedOps[T <: HasAnnotation](private val self: SCollectio * case class Result(name: String, score: Double) * * val p: SCollection[Result] = // process data and convert elements to Result - * p.saveAsTypedBigQueryTable(Table.Spec("myproject:mydataset.mytable")) + * p.saveAsTypedBigQueryTable(Table("myproject:mydataset.mytable")) * }}} * * It could also be an empty class with schema from @@ -241,7 +243,7 @@ final class SCollectionTypedOps[T <: HasAnnotation](private val self: SCollectio * * sc.typedBigQuery[Row]() * .sample(withReplacement = false, fraction = 0.1) - * .saveAsTypedBigQueryTable(Table.Spec("myproject:samples.gsod")) + * .saveAsTypedBigQueryTable(Table("myproject:samples.gsod")) * }}} * * * @@ -260,22 +262,30 @@ final class SCollectionTypedOps[T <: HasAnnotation](private val self: SCollectio */ def saveAsTypedBigQueryTable( table: Table, - timePartitioning: TimePartitioning = TableWriteParam.DefaultTimePartitioning, - writeDisposition: WriteDisposition = TableWriteParam.DefaultWriteDisposition, - createDisposition: CreateDisposition = TableWriteParam.DefaultCreateDisposition, - clustering: Clustering = TableWriteParam.DefaultClustering, - method: Method = TableWriteParam.DefaultMethod, - triggeringFrequency: Duration = TableWriteParam.DefaultTriggeringFrequency, - sharding: Sharding = TableWriteParam.DefaultSharding, - failedInsertRetryPolicy: InsertRetryPolicy = TableWriteParam.DefaultFailedInsertRetryPolicy, - successfulInsertsPropagation: Boolean = TableWriteParam.DefaultSuccessfulInsertsPropagation, - extendedErrorInfo: Boolean = TableWriteParam.DefaultExtendedErrorInfo, - configOverride: TableWriteParam.ConfigOverride[T] = TableWriteParam.DefaultConfigOverride + timePartitioning: TimePartitioning = BigQueryIO.WriteParam.DefaultTimePartitioning, + writeDisposition: WriteDisposition = BigQueryIO.WriteParam.DefaultWriteDisposition, + createDisposition: CreateDisposition = BigQueryIO.WriteParam.DefaultCreateDisposition, + clustering: Clustering = BigQueryIO.WriteParam.DefaultClustering, + method: Method = BigQueryIO.WriteParam.DefaultMethod, + triggeringFrequency: Duration = BigQueryIO.WriteParam.DefaultTriggeringFrequency, + sharding: Sharding = BigQueryIO.WriteParam.DefaultSharding, + failedInsertRetryPolicy: InsertRetryPolicy = + BigQueryIO.WriteParam.DefaultFailedInsertRetryPolicy, + successfulInsertsPropagation: Boolean = + BigQueryIO.WriteParam.DefaultSuccessfulInsertsPropagation, + extendedErrorInfo: Boolean = BigQueryIO.WriteParam.DefaultExtendedErrorInfo, + errorHandler: ErrorHandler[BadRecord, _] = BigQueryIO.WriteParam.DefaultErrorHandler, + configOverride: BigQueryIO.WriteParam.ConfigOverride[T] = + BigQueryIO.WriteParam.DefaultConfigOverride )(implicit tt: TypeTag[T], coder: Coder[T]): ClosedTap[T] = { - val param = TableWriteParam[T]( + val bqt = BigQueryType[T] + val param = BigQueryIO.WriteParam[T]( + BigQueryIO.Format.Avro[T](bqt), method, + bqt.schema, writeDisposition, createDisposition, + BigQueryIO.WriteParam.DefaultTableDescription, timePartitioning, clustering, triggeringFrequency, @@ -283,9 +293,10 @@ final class SCollectionTypedOps[T <: HasAnnotation](private val self: SCollectio failedInsertRetryPolicy, successfulInsertsPropagation, extendedErrorInfo, + errorHandler, configOverride ) - self.write(BigQueryTyped.Table[T](table))(param) + self.write(BigQueryIO[T](table))(param) } } diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/ScioContextSyntax.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/ScioContextSyntax.scala index fa82cffe93..72da878457 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/ScioContextSyntax.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/ScioContextSyntax.scala @@ -19,30 +19,26 @@ package com.spotify.scio.bigquery.syntax import com.spotify.scio.ScioContext import com.spotify.scio.bigquery.types.BigQueryType.HasAnnotation -import com.spotify.scio.bigquery.{ - BigQuerySelect, - BigQueryStorage, - BigQueryStorageSelect, - BigQueryType, - BigQueryTyped, - Query, - Source, - Table, - TableRow, - TableRowJsonIO -} +import com.spotify.scio.bigquery.{BigQueryIO, Query, Source, Table, TableRow, TableRowJsonIO} import com.spotify.scio.coders.Coder import com.spotify.scio.values._ import scala.reflect.runtime.universe._ -import com.spotify.scio.bigquery.BigQueryTypedTable -import com.spotify.scio.bigquery.BigQueryTypedTable.Format import com.spotify.scio.bigquery.coders.tableRowCoder +import com.spotify.scio.bigquery.types.BigQueryType import org.apache.beam.sdk.io.Compression +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method import org.apache.beam.sdk.io.fs.EmptyMatchTreatment +import org.apache.beam.sdk.transforms.errorhandling.{BadRecord, ErrorHandler} +import org.slf4j.{Logger, LoggerFactory} + +object ScioContextOps { + @transient private lazy val logger: Logger = LoggerFactory.getLogger(this.getClass) +} /** Enhanced version of [[ScioContext]] with BigQuery methods. */ final class ScioContextOps(private val self: ScioContext) extends AnyVal { + import ScioContextOps._ /** * Get an SCollection for a BigQuery SELECT query. Both @@ -51,38 +47,71 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { * supported. By default the query dialect will be automatically detected. To override this * behavior, start the query string with `#legacysql` or `#standardsql`. */ - def bigQuerySelect( + def bigQuerySelect[T: Coder]( sqlQuery: Query, - flattenResults: Boolean - ): SCollection[TableRow] = - self.read(BigQuerySelect(sqlQuery))(BigQuerySelect.ReadParam(flattenResults)) + flattenResults: Boolean = BigQueryIO.ReadParam.DefaultFlattenResults, + errorHandler: ErrorHandler[BadRecord, _] = BigQueryIO.ReadParam.DefaultErrorHandler, + configOverride: BigQueryIO.ReadParam.ConfigOverride[TableRow] = + BigQueryIO.ReadParam.DefaultConfigOverride + ): SCollection[TableRow] = { + val params = BigQueryIO.QueryReadParam( + BigQueryIO.Format.Default(), + Method.DEFAULT, + flattenResults, + errorHandler, + configOverride + ) + self.read(BigQueryIO[TableRow](sqlQuery))(params) + } - /** - * Get an SCollection for a BigQuery SELECT query. Both - * [[https://cloud.google.com/bigquery/docs/reference/legacy-sql Legacy SQL]] and - * [[https://cloud.google.com/bigquery/docs/reference/standard-sql/ Standard SQL]] dialects are - * supported. By default the query dialect will be automatically detected. To override this - * behavior, start the query string with `#legacysql` or `#standardsql`. - */ - def bigQuerySelect( - sqlQuery: Query - ): SCollection[TableRow] = - bigQuerySelect(sqlQuery, BigQuerySelect.ReadParam.DefaultFlattenResults) + def bigQuerySelectFormat[T: Coder]( + sqlQuery: Query, + format: BigQueryIO.Format[T], + flattenResults: Boolean = BigQueryIO.ReadParam.DefaultFlattenResults, + errorHandler: ErrorHandler[BadRecord, _] = BigQueryIO.ReadParam.DefaultErrorHandler, + configOverride: BigQueryIO.ReadParam.ConfigOverride[T] = + BigQueryIO.ReadParam.DefaultConfigOverride + ): SCollection[T] = { + val params = BigQueryIO.QueryReadParam( + format, + Method.DEFAULT, + flattenResults, + errorHandler, + configOverride + ) + self.read(BigQueryIO[T](sqlQuery))(params) + } - /** Get an SCollection for a BigQuery table. */ - def bigQueryTable(table: Table): SCollection[TableRow] = - bigQueryTable(table, BigQueryTypedTable.Format.TableRow)(tableRowCoder) + def bigQueryTable( + table: Table, + errorHandler: ErrorHandler[BadRecord, _] = BigQueryIO.ReadParam.DefaultErrorHandler, + configOverride: BigQueryIO.ReadParam.ConfigOverride[TableRow] = + BigQueryIO.ReadParam.DefaultConfigOverride + ): SCollection[TableRow] = { + val params = BigQueryIO.TableReadParam( + BigQueryIO.Format.Default(), + Method.DEFAULT, + errorHandler, + configOverride + ) + self.read(BigQueryIO[TableRow](table))(params) + } - /** - * Get an SCollection for a BigQuery table using the specified [[Format]]. - * - * Reading records as GenericRecord **should** offer better performance over TableRow records. - * - * Note: When using `Format.GenericRecord` Bigquery types DATE, TIME and DATETIME are read as - * STRING. - */ - def bigQueryTable[F: Coder](table: Table, format: Format[F]): SCollection[F] = - self.read(BigQueryTypedTable(table, format)) + def bigQueryTableFormat[T: Coder]( + table: Table, + format: BigQueryIO.Format[T], + errorHandler: ErrorHandler[BadRecord, _] = BigQueryIO.ReadParam.DefaultErrorHandler, + configOverride: BigQueryIO.ReadParam.ConfigOverride[T] = + BigQueryIO.ReadParam.DefaultConfigOverride + ): SCollection[T] = { + val params = BigQueryIO.TableReadParam( + format, + Method.DEFAULT, + errorHandler, + configOverride + ) + self.read(BigQueryIO[T](table))(params) + } /** * Get an SCollection for a BigQuery table using the storage API. @@ -103,10 +132,34 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { */ def bigQueryStorage( table: Table, - selectedFields: List[String] = BigQueryStorage.ReadParam.DefaultSelectFields, - rowRestriction: String = null - ): SCollection[TableRow] = - self.read(BigQueryStorage(table, selectedFields, Option(rowRestriction))) + errorHandler: ErrorHandler[BadRecord, _] = BigQueryIO.ReadParam.DefaultErrorHandler, + configOverride: BigQueryIO.ReadParam.ConfigOverride[TableRow] = + BigQueryIO.ReadParam.DefaultConfigOverride + ): SCollection[TableRow] = { + val params = BigQueryIO.TableReadParam( + BigQueryIO.Format.Default(), + Method.DIRECT_READ, + errorHandler, + configOverride + ) + self.read(BigQueryIO[TableRow](table))(params) + } + + def bigQueryStorageFormat[T: Coder]( + table: Table, + format: BigQueryIO.Format[T], + errorHandler: ErrorHandler[BadRecord, _] = BigQueryIO.ReadParam.DefaultErrorHandler, + configOverride: BigQueryIO.ReadParam.ConfigOverride[T] = + BigQueryIO.ReadParam.DefaultConfigOverride + ): SCollection[T] = { + val params = BigQueryIO.TableReadParam( + format, + Method.DIRECT_READ, + errorHandler, + configOverride + ) + self.read(BigQueryIO[T](table))(params) + } /** * Get an SCollection for a BigQuery SELECT query using the storage API. @@ -114,99 +167,115 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { * @param query * SQL query */ - def bigQueryStorage(query: Query): SCollection[TableRow] = - self.read(BigQueryStorageSelect(query)) - - def typedBigQuery[T <: HasAnnotation: TypeTag: Coder](): SCollection[T] = - typedBigQuery(None) + def bigQuerySelectStorage( + query: Query, + flattenResults: Boolean = BigQueryIO.ReadParam.DefaultFlattenResults, + errorHandler: ErrorHandler[BadRecord, _] = BigQueryIO.ReadParam.DefaultErrorHandler, + configOverride: BigQueryIO.ReadParam.ConfigOverride[TableRow] = + BigQueryIO.ReadParam.DefaultConfigOverride + ): SCollection[TableRow] = { + val params = BigQueryIO.QueryReadParam( + BigQueryIO.Format.Default(), + Method.DIRECT_READ, + flattenResults, + errorHandler, + configOverride + ) + self.read(BigQueryIO[TableRow](query))(params) + } - def typedBigQuery[T <: HasAnnotation: TypeTag: Coder]( - newSource: Source - ): SCollection[T] = typedBigQuery(Option(newSource)) + def bigQuerySelectStorageFormat[T: Coder]( + query: Query, + format: BigQueryIO.Format[T], + flattenResults: Boolean = BigQueryIO.ReadParam.DefaultFlattenResults, + errorHandler: ErrorHandler[BadRecord, _] = BigQueryIO.ReadParam.DefaultErrorHandler, + configOverride: BigQueryIO.ReadParam.ConfigOverride[T] = + BigQueryIO.ReadParam.DefaultConfigOverride + ): SCollection[T] = { + val params = BigQueryIO.QueryReadParam( + format, + Method.DIRECT_READ, + flattenResults, + errorHandler, + configOverride + ) + self.read(BigQueryIO[T](query))(params) + } - /** Get a typed SCollection for BigQuery Table or a SELECT query using the Storage API. */ + /** Get a typed SCollection for BigQuery Table or a SELECT query. */ def typedBigQuery[T <: HasAnnotation: TypeTag: Coder]( - newSource: Option[Source] + source: Source = null, + errorHandler: ErrorHandler[BadRecord, _] = BigQueryIO.ReadParam.DefaultErrorHandler, + configOverride: BigQueryIO.ReadParam.ConfigOverride[T] = + BigQueryIO.ReadParam.DefaultConfigOverride ): SCollection[T] = { val bqt = BigQueryType[T] - if (bqt.isStorage) { - newSource - .asInstanceOf[Option[Table]] - .map(typedBigQueryStorage(_)) - .getOrElse(typedBigQueryStorage()) - } else { - self.read(BigQueryTyped.dynamic[T](newSource)) + val format = BigQueryIO.Format.Avro(BigQueryType[T]) + val source = Option(source).getOrElse { + if (bqt.isQuery) { + Query(bqt.queryRaw.get) + } else { + if (bqt.isStorage) { + logger.warn( + "Using BigQueryType.fromStorage with standard API. " + + "selectedFields and rowRestriction are ignored. " + + "Use typedBigQueryStorage instead" + ) + } + Table(bqt.table.get) + } } - } - /** - * Get a typed SCollection for a BigQuery storage API. - * - * Note that `T` must be annotated with - * [[com.spotify.scio.bigquery.types.BigQueryType.fromSchema BigQueryType.fromStorage]] or - * [[com.spotify.scio.bigquery.types.BigQueryType.fromQuery BigQueryType.fromQuery]] - */ - def typedBigQueryStorage[T <: HasAnnotation: TypeTag: Coder](): SCollection[T] = { - val bqt = BigQueryType[T] - if (bqt.isQuery) { - self.read(BigQueryTyped.StorageQuery[T](Query(bqt.queryRaw.get))) - } else { - val table = Table.Spec(bqt.table.get) - val rr = bqt.rowRestriction - val fields = bqt.selectedFields.getOrElse(BigQueryStorage.ReadParam.DefaultSelectFields) - self.read(BigQueryTyped.Storage[T](table, fields, rr)) + val params = source match { + case _: Query => + BigQueryIO.QueryReadParam[T]( + format, + Method.DEFAULT, + BigQueryIO.ReadParam.DefaultFlattenResults, + errorHandler, + configOverride + ) + case t: Table => + if (t.filter.nonEmpty) { + logger.warn( + "Using filtered table with standard API. " + + "selectedFields and rowRestriction are ignored. " + + "Use typedBigQueryStorage instead" + ) + } + BigQueryIO.TableReadParam[T](format, Method.DEFAULT, errorHandler, configOverride) } + self.read(BigQueryIO[T](source))(params) } + /** Get a typed SCollection for a BigQuery storage API. */ def typedBigQueryStorage[T <: HasAnnotation: TypeTag: Coder]( - table: Table - ): SCollection[T] = - self.read( - BigQueryTyped.Storage[T]( - table, - BigQueryType[T].selectedFields.getOrElse(BigQueryStorage.ReadParam.DefaultSelectFields), - BigQueryType[T].rowRestriction - ) - ) - - def typedBigQueryStorage[T <: HasAnnotation: TypeTag: Coder]( - rowRestriction: String + source: Source = null, + errorHandler: ErrorHandler[BadRecord, _] = BigQueryIO.ReadParam.DefaultErrorHandler, + configOverride: BigQueryIO.ReadParam.ConfigOverride[T] = + BigQueryIO.ReadParam.DefaultConfigOverride ): SCollection[T] = { - val bqt = BigQueryType[T] - val table = Table.Spec(bqt.table.get) - self.read( - BigQueryTyped.Storage[T]( - table, - bqt.selectedFields.getOrElse(BigQueryStorage.ReadParam.DefaultSelectFields), - Option(rowRestriction) - ) - ) - } + val io = Option(source) match { + case Some(s) => BigQueryIO[T](s) + case None => BigQueryIO[T] + } - def typedBigQueryStorage[T <: HasAnnotation: TypeTag: Coder]( - table: Table, - rowRestriction: String - ): SCollection[T] = - self.read( - BigQueryTyped.Storage[T]( - table, - BigQueryType[T].selectedFields.getOrElse(BigQueryStorage.ReadParam.DefaultSelectFields), - Option(rowRestriction) - ) - ) + val format = BigQueryIO.Format.Avro(BigQueryType[T]) + val params = io.source match { + case _: Query => + BigQueryIO.QueryReadParam[T]( + format, + Method.DIRECT_READ, + BigQueryIO.ReadParam.DefaultFlattenResults, + errorHandler, + configOverride + ) + case _: Table => + BigQueryIO.TableReadParam[T](format, Method.DIRECT_READ, errorHandler, configOverride) + } - def typedBigQueryStorage[T <: HasAnnotation: TypeTag: Coder]( - table: Table, - selectedFields: List[String], - rowRestriction: String - ): SCollection[T] = - self.read( - BigQueryTyped.Storage[T]( - table, - selectedFields, - Option(rowRestriction) - ) - ) + self.read(io)(params) + } /** Get an SCollection for a BigQuery TableRow JSON file. */ def tableRowJsonFile( @@ -214,10 +283,11 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { compression: Compression = TableRowJsonIO.ReadParam.DefaultCompression, emptyMatchTreatment: EmptyMatchTreatment = TableRowJsonIO.ReadParam.DefaultEmptyMatchTreatment, suffix: String = TableRowJsonIO.ReadParam.DefaultSuffix - ): SCollection[TableRow] = + ): SCollection[TableRow] = { self.read(TableRowJsonIO(path))( TableRowJsonIO.ReadParam(compression, emptyMatchTreatment, suffix) ) + } } trait ScioContextSyntax { diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/taps.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/taps.scala index 8c83dd24dd..12b490e6bb 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/taps.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/taps.scala @@ -17,21 +17,19 @@ package com.spotify.scio.bigquery +import com.google.api.services.bigquery.model.TableReference import com.google.cloud.bigquery.storage.v1beta1.ReadOptions.TableReadOptions -import com.google.api.services.bigquery.model.{TableReference, TableSchema} import com.spotify.scio.ScioContext -import com.spotify.scio.avro._ +import com.spotify.scio.bigquery.BigQueryIO.Format import com.spotify.scio.bigquery.client.BigQuery import com.spotify.scio.coders.Coder import com.spotify.scio.io.{FileStorage, Tap, Taps} import com.spotify.scio.values.SCollection -import org.apache.avro.generic.GenericRecord +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method -import scala.jdk.CollectionConverters._ import scala.concurrent.Future import scala.reflect.runtime.universe._ -import com.spotify.scio.bigquery.BigQueryTypedTable.Format -import com.twitter.chill.Externalizer +import scala.jdk.CollectionConverters._ /** Tap for BigQuery TableRow JSON files. */ final case class TableRowJsonTap(path: String, params: TableRowJsonIO.ReadParam) @@ -42,43 +40,19 @@ final case class TableRowJsonTap(path: String, params: TableRowJsonIO.ReadParam) sc.read(TableRowJsonIO(path))(params) } -final case class BigQueryTypedTap[T: Coder](table: Table, fn: (GenericRecord, TableSchema) => T) +/** Tap for BigQuery tables. */ +final case class BigQueryTap[T: Coder](table: Table, params: BigQueryIO.ReadParam[T]) extends Tap[T] { - lazy val client: BigQuery = BigQuery.defaultInstance() - lazy val ts: TableSchema = client.tables.table(table.spec).getSchema - - override def value: Iterator[T] = - client.tables.avroRows(table).map(gr => fn(gr, ts)) - - override def open(sc: ScioContext): SCollection[T] = { - val ser = Externalizer(ts) - // TODO this is inefficient. Migrate to TableRow API ? - val coder = avroGenericRecordCoder - sc.read(BigQueryTypedTable(table, Format.GenericRecord)(coder)).map(gr => fn(gr, ser.get)) + override def value: Iterator[T] = { + val tables = BigQuery.defaultInstance().tables + params.format match { + case f: Format.Default[T] => tables.rows(table).map(f.from) + case f: Format.Avro[T] => tables.avroRows(table).map(f.from) + } } -} - -/** Tap for BigQuery tables. */ -final case class BigQueryTap(table: TableReference) extends Tap[TableRow] { - override def value: Iterator[TableRow] = - BigQuery.defaultInstance().tables.rows(Table.Ref(table)) - override def open(sc: ScioContext): SCollection[TableRow] = - sc.read(BigQueryTypedTable(Table.Ref(table), Format.TableRow)) -} -/** Tap for BigQuery tables using storage api. */ -final case class BigQueryStorageTap(table: Table, readOptions: TableReadOptions) - extends Tap[TableRow] { - override def value: Iterator[TableRow] = - BigQuery.defaultInstance().tables.storageRows(table, readOptions) - override def open(sc: ScioContext): SCollection[TableRow] = - sc.read( - BigQueryStorage( - table, - readOptions.getSelectedFieldsList.asScala.toList, - Option(readOptions.getRowRestriction) - ) - ) + override def open(sc: ScioContext): SCollection[T] = + sc.read(BigQueryIO[T](table))(params) } final case class BigQueryTaps(self: Taps) { @@ -90,11 +64,13 @@ final case class BigQueryTaps(self: Taps) { private lazy val bqc = BigQuery.defaultInstance() /** Get a `Future[Tap[TableRow]]` for BigQuery SELECT query. */ - def bigQuerySelect(sqlQuery: String, flattenResults: Boolean = false): Future[Tap[TableRow]] = + def bigQuerySelect(sqlQuery: String): Future[Tap[TableRow]] = mkTap( s"BigQuery SELECT: $sqlQuery", () => isQueryDone(sqlQuery), - () => BigQuerySelect(Query(sqlQuery)).tap(BigQuerySelect.ReadParam(flattenResults)) + () => + BigQueryIO[TableRow](Query(sqlQuery)) + .tap(BigQueryIO.QueryReadParam(BigQueryIO.Format.Default(), Method.DEFAULT)) ) /** Get a `Future[Tap[TableRow]]` for BigQuery table. */ @@ -102,7 +78,9 @@ final case class BigQueryTaps(self: Taps) { mkTap( s"BigQuery Table: $table", () => bqc.tables.exists(table), - () => BigQueryTypedTable(Table.Ref(table), Format.TableRow).tap(()) + () => + BigQueryIO[TableRow](Table(table)) + .tap(BigQueryIO.TableReadParam(BigQueryIO.Format.Default(), Method.DEFAULT)) ) /** Get a `Future[Tap[TableRow]]` for BigQuery table. */ @@ -111,25 +89,15 @@ final case class BigQueryTaps(self: Taps) { /** Get a `Future[Tap[T]]` for typed BigQuery source. */ def typedBigQuery[T <: HasAnnotation: TypeTag: Coder]( - newSource: String = null + newSource: Option[Source] = None ): Future[Tap[T]] = { val bqt = BigQueryType[T] - lazy val table = - scala.util.Try(BigQueryHelpers.parseTableSpec(newSource)).toOption - val rows = - newSource match { - // newSource is missing, T's companion object must have either table or query - case null if bqt.isTable => - bigQueryTable(bqt.table.get) - case null if bqt.isQuery => - bigQuerySelect(bqt.queryRaw.get) - case null => - throw new IllegalArgumentException(s"Missing table or query field in companion object") - case _ if table.isDefined => - bigQueryTable(table.get) - case _ => - bigQuerySelect(newSource) - } + val rows = newSource match { + case Some(q: Query) => bigQuerySelect(q.underlying) + case Some(t: Table) => bigQueryTable(t.ref) + case None if bqt.isQuery => bigQuerySelect(bqt.queryRaw.get) + case _ => bigQueryTable(bqt.table.get) + } import scala.concurrent.ExecutionContext.Implicits.global rows.map(_.map(bqt.fromTableRow)) } @@ -145,6 +113,12 @@ final case class BigQueryTaps(self: Taps) { () => TableRowJsonTap(path, params) ) + def bigQueryStorage( + tableSpec: String, + readOptions: TableReadOptions + ): Future[Tap[TableRow]] = + bigQueryStorage(BigQueryHelpers.parseTableSpec(tableSpec), readOptions) + def bigQueryStorage( table: TableReference, readOptions: TableReadOptions @@ -153,9 +127,12 @@ final case class BigQueryTaps(self: Taps) { s"BigQuery direct read table: $table", () => bqc.tables.exists(table), () => { + val format = BigQueryIO.Format.Default() val selectedFields = readOptions.getSelectedFieldsList.asScala.toList val rowRestriction = Option(readOptions.getRowRestriction) - BigQueryStorage(Table.Ref(table), selectedFields, rowRestriction).tap(()) + val filter = Table.Filter(selectedFields, rowRestriction) + val source = Table(table, filter) + BigQueryIO[TableRow](source).tap(BigQueryIO.TableReadParam(format, Method.DIRECT_READ)) } ) @@ -163,16 +140,16 @@ final case class BigQueryTaps(self: Taps) { table: TableReference, readOptions: TableReadOptions ): Future[Tap[T]] = { - val fn = BigQueryType[T].fromTableRow mkTap( s"BigQuery direct read table: $table", () => bqc.tables.exists(table), () => { + val format = BigQueryIO.Format.Avro(BigQueryType[T]) val selectedFields = readOptions.getSelectedFieldsList.asScala.toList val rowRestriction = Option(readOptions.getRowRestriction) - BigQueryStorage(Table.Ref(table), selectedFields, rowRestriction) - .tap(()) - .map(fn) + val filter = Table.Filter(selectedFields, rowRestriction) + val source = Table(table, filter) + BigQueryIO[T](source).tap(BigQueryIO.TableReadParam(format, Method.DIRECT_READ)) } ) } diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/BigQueryIOTest.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/BigQueryIOTest.scala index 45fbe21966..76bc35bd5c 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/BigQueryIOTest.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/BigQueryIOTest.scala @@ -18,7 +18,6 @@ package com.spotify.scio.bigquery import com.spotify.scio.avro._ -import com.spotify.scio.bigquery.BigQueryTypedTable.Format import com.spotify.scio.coders.Coder import com.spotify.scio.{ContextAndArgs, ScioContext} import com.spotify.scio.testing._ @@ -87,11 +86,11 @@ final class BigQueryIOTest extends ScioIOSpec { val desc = "table-description" val sc = ScioContext() implicit val coder: Coder[GenericRecord] = avroGenericRecordCoder - val io = BigQueryTypedTable[GenericRecord]( - table = Table.Spec("project:dataset.out_table"), - format = Format.GenericRecord + val io = BigQueryIO[GenericRecord]( + Table("project:dataset.out_table") ) - val params = BigQueryTypedTable.WriteParam[GenericRecord]( + val params = BigQueryIO.WriteParam[GenericRecord]( + format = BigQueryIO.Format.Avro(), createDisposition = CreateDisposition.CREATE_NEVER, configOverride = _.withTableDescription(desc) ) @@ -111,9 +110,7 @@ final class BigQueryIOTest extends ScioIOSpec { val xs = (1 to 100).map(x => TableRow("x" -> x.toString)) testJobTest(xs, in = "project:dataset.in_table", out = "project:dataset.out_table")( BigQueryIO(_) - )((sc, s) => sc.bigQueryTable(Table.Spec(s)))((coll, s) => - coll.saveAsBigQueryTable(Table.Spec(s)) - ) + )((sc, s) => sc.bigQueryTable(Table(s)))((coll, s) => coll.saveAsBigQueryTable(Table(s))) } /** @@ -132,7 +129,7 @@ final class BigQueryIOTest extends ScioIOSpec { val context = ScioContext() context .parallelize(xs) - .saveAsBigQueryTable(Table.Spec("project:dataset.dummy"), createDisposition = CREATE_NEVER) + .saveAsBigQueryTable(Table("project:dataset.dummy"), createDisposition = CREATE_NEVER) // We want to validate on the job graph, and we need not actually execute the pipeline. unconsumedReads(context) shouldBe empty @@ -145,7 +142,7 @@ final class BigQueryIOTest extends ScioIOSpec { context .parallelize(xs) .saveAsTypedBigQueryTable( - Table.Spec("project:dataset.dummy"), + Table("project:dataset.dummy"), createDisposition = CREATE_NEVER ) // We want to validate on the job graph, and we need not actually execute the pipeline. @@ -154,35 +151,31 @@ final class BigQueryIOTest extends ScioIOSpec { } it should "read the same input table with different predicate and projections using bigQueryStorage" in { - JobTest[JobWithDuplicateInput.type] .args("--input=table.in") .input( - BigQueryIO[TableRow]("table.in", List("a"), Some("a > 0")), + BigQueryIO[TableRow](Table("table.in", List("a"), "a > 0")), (1 to 3).map(x => TableRow("x" -> x.toString)) ) .input( - BigQueryIO[TableRow]("table.in", List("b"), Some("b > 0")), + BigQueryIO[TableRow](Table("table.in", List("b"), "b > 0")), (1 to 3).map(x => TableRow("x" -> x.toString)) ) .run() - } it should "read the same input table with different predicate and projections using typedBigQueryStorage" in { - JobTest[TypedJobWithDuplicateInput.type] .args("--input=table.in") .input( - BigQueryIO[BQRecord]("table.in", List("a"), Some("a > 0")), + BigQueryIO[BQRecord](Table("table.in", List("a"), "a > 0")), (1 to 3).map(x => BQRecord(x, x.toString, (1 to x).map(_.toString).toList)) ) .input( - BigQueryIO[BQRecord]("table.in", List("b"), Some("b > 0")), + BigQueryIO[BQRecord](Table("table.in", List("b"), "b > 0")), (1 to 3).map(x => BQRecord(x, x.toString, (1 to x).map(_.toString).toList)) ) .run() - } "TableRowJsonIO" should "work" in { @@ -196,8 +189,8 @@ final class BigQueryIOTest extends ScioIOSpec { object JobWithDuplicateInput { def main(cmdlineArgs: Array[String]): Unit = { val (sc, args) = ContextAndArgs(cmdlineArgs) - sc.bigQueryStorage(Table.Spec(args("input")), List("a"), "a > 0") - sc.bigQueryStorage(Table.Spec(args("input")), List("b"), "b > 0") + sc.bigQueryStorage(Table(args("input"), List("a"), "a > 0")) + sc.bigQueryStorage(Table(args("input"), List("b"), "b > 0")) sc.run() () } @@ -208,8 +201,8 @@ object TypedJobWithDuplicateInput { def main(cmdlineArgs: Array[String]): Unit = { val (sc, args) = ContextAndArgs(cmdlineArgs) - sc.typedBigQueryStorage[BQRecord](Table.Spec(args("input")), List("a"), "a > 0") - sc.typedBigQueryStorage[BQRecord](Table.Spec(args("input")), List("b"), "b > 0") + sc.typedBigQueryStorage[BQRecord](Table(args("input"), List("a"), "a > 0")) + sc.typedBigQueryStorage[BQRecord](Table(args("input"), List("b"), "b > 0")) sc.run() () } diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/BigQueryTypesTest.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/BigQueryTypesTest.scala index 137eb836c6..b15297b21a 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/BigQueryTypesTest.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/BigQueryTypesTest.scala @@ -21,7 +21,7 @@ import org.scalatest.matchers.should.Matchers import org.scalatest.flatspec.AnyFlatSpec class BigQueryTypesTest extends AnyFlatSpec with Matchers { - "Table.Spec" should "fail malformed spec" in { - an[IllegalArgumentException] shouldBe thrownBy(Table.Spec("bad spec")) + "Table" should "fail malformed spec" in { + an[IllegalArgumentException] shouldBe thrownBy(Table("bad spec")) } } diff --git a/site/src/main/paradox/FAQ.md b/site/src/main/paradox/FAQ.md index 9db67a0bfe..82fcf9674b 100644 --- a/site/src/main/paradox/FAQ.md +++ b/site/src/main/paradox/FAQ.md @@ -553,7 +553,7 @@ def main(cmdlineArgs: Array[String]): Unit = { val p: SCollection[(String, Int)] = ??? p.map(kv => Result(kv._1, kv._2)) - .saveAsTypedBigQueryTable(Table.Spec(args("output"))) + .saveAsTypedBigQueryTable(Table(args("output"))) } ``` diff --git a/site/src/main/paradox/io/BigQuery.md b/site/src/main/paradox/io/BigQuery.md index 7bf3b718cf..65fd4afcca 100644 --- a/site/src/main/paradox/io/BigQuery.md +++ b/site/src/main/paradox/io/BigQuery.md @@ -222,7 +222,7 @@ def main(cmdlineArgs: Array[String]): Unit = { .flatMap(r => if (r.tornado.getOrElse(false)) Seq(r.month) else Nil) .countByValue .map(kv => Result(kv._1, kv._2)) - .saveAsTypedBigQueryTable(Table.Spec(args("output"))) // schema from Row.schema + .saveAsTypedBigQueryTable(Table(args("output"))) // schema from Row.schema sc.run() () }