Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor BQ to expose all beam's configurations #5456

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

RustedBones
Copy link
Contributor

Here are the main changes:

  • the BQ Table source has a single normalized definition, with multiple constrictors (form string spec or TableReference). It nows includes an optional Table.Filter that can be used is the storage read API to project and filter.

  • read API changes with

API method returned type
bigQuerySelect export TableRow
bigQuerySelectFormat export T
bigQueryTable export TableRow
bigQueryTableFormat export T
bigQueryStorage direct TableRow
bigQueryStorageFormat direct T
typedBigQuery export T
typedBigQueryStorage direct T

Format API take a BigqueryIO.Format object allowing to convert either from GenericRecord (this should be prefered) or TableRow

  • The Storage Api allow to pass an ErrorHandler. In order to preserve a flat structure ScioContext.errorSink(): ErrorSink has been added. This allow to do the following
val errorSink = sc.errorSink()
sc.bigQueryStorageFormat[MyType](
  table,
  format,
  errorHandler = errorSink.handler
)
val errors: SCollection[BadRecord] = errorSink.sink()

The handler can be passed to multiple IOs before sink is materialized. The sink will flatten the errors from the IOs.

Copy link

codecov bot commented Aug 20, 2024

Codecov Report

Attention: Patch coverage is 48.12030% with 138 lines in your changes missing coverage. Please review.

Project coverage is 61.50%. Comparing base (27fd3ca) to head (2d66440).
Report is 12 commits behind head on main.

Files Patch % Lines
...otify/scio/bigquery/syntax/ScioContextSyntax.scala 30.15% 44 Missing ⚠️
...n/scala/com/spotify/scio/bigquery/BigQueryIO.scala 67.76% 39 Missing ⚠️
...rc/main/scala/com/spotify/scio/bigquery/taps.scala 0.00% 22 Missing ⚠️
...cala/com/spotify/scio/bigquery/BigQueryTypes.scala 46.42% 15 Missing ⚠️
...otify/scio/bigquery/syntax/SCollectionSyntax.scala 25.00% 9 Missing ⚠️
...io/bigquery/dynamic/syntax/SCollectionSyntax.scala 0.00% 3 Missing ⚠️
...scala/com/spotify/scio/bigquery/MockBigQuery.scala 0.00% 2 Missing ⚠️
...la/com/spotify/scio/bigquery/client/BigQuery.scala 0.00% 2 Missing ⚠️
.../src/main/scala/com/spotify/scio/ScioContext.scala 50.00% 1 Missing ⚠️
...la/com/spotify/scio/bigquery/client/QueryOps.scala 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #5456      +/-   ##
==========================================
+ Coverage   61.31%   61.50%   +0.19%     
==========================================
  Files         312      313       +1     
  Lines       11068    11084      +16     
  Branches      792      784       -8     
==========================================
+ Hits         6786     6817      +31     
+ Misses       4282     4267      -15     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

import org.apache.beam.sdk.values.{PCollection, PCollectionTuple, TupleTag}

/**
* A sink for error records.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bit more explanation on error records could be helpful, maybe:

Suggested change
* A sink for error records.
* A sink for error records.
*
* An error record is produced by certain PTransforms that catch processing exceptions and transform the resulting (element, exception) pair into a [[BadRecord]] instance.
* When an ErrorSink is configured (via ScioContext#errorSink), these BadRecords can be accessed as an SCollection by invoking the ErrorSink#sink method.
* An ErrorSink is useful if you'd like to set up special handling of exceptions (incrementing Counters, logging the exceptions in a database, etc).

* Once the [[sink]] is materialized, the [[handler]] must not be used anymore.
*/
sealed trait ErrorSink {
def handler: ErrorHandler[BadRecord, _]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could def handler be private[scio]? not sure when a user would need to access this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the API exposed by beam. As mentioned in the description we do not pass the ErrorSink directly.

sc.bigQueryStorageFormat[MyType](
  table,
  format,
  errorHandler = errorSink.handler
)

I was thinking of adding to the ScioContext a beam java like API too

def registerBadRecordErrorHandler[T](handler: PTransform[PCollection[BadRecord], T] sinkTransform): BadRecordErrorHandler[OutputT]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants