diff --git a/site/src/main/paradox/extras/Sort-Merge-Bucket.md b/site/src/main/paradox/extras/Sort-Merge-Bucket.md index 776aa0b31d..910f218b9a 100644 --- a/site/src/main/paradox/extras/Sort-Merge-Bucket.md +++ b/site/src/main/paradox/extras/Sort-Merge-Bucket.md @@ -97,8 +97,8 @@ be manually read by a downstream user. ## Avro String keys -If you're using `AvroSortedBucketIO`, be aware of how Avro String fields are decoded. Configuration -errors can result in the following runtime exception: +As of **Scio 0.14.0**, Avro `CharSequence` are backed by `String` instead of default `Utf8`. +With previous versions you may encounter the following when using Avro `CharSequence` keys: ```bash Cause: java.lang.ClassCastException: class org.apache.avro.util.Utf8 cannot be cast to class java.lang.String @@ -106,90 +106,17 @@ Cause: java.lang.ClassCastException: class org.apache.avro.util.Utf8 cannot be c [info] at org.apache.beam.sdk.extensions.smb.BucketMetadata.encodeKeyBytes(BucketMetadata.java:222) ``` -### SpecificRecords - -Scio 0.10.4 specifically has a bug in the default String decoding behavior for `SpecificRecords`: by default, -they're decoded at runtime into `org.apache.avro.util.Utf8` objects, rather than `java.lang.String`s -(the generated getter/setter signatures use `CharSequence` as an umbrella type). This bug has been fixed in -Scio 0.11+. If you cannot upgrade, you can mitigate this by ensuring your `SpecificRecord` schema has the property -`java-class: java.lang.String` set in the key field. This can be done either in the avsc/avdl schema or in -Java/Scala code: - -```scala -val mySchema: org.apache.avro.Schema = ??? -mySchema - .getField("keyField") - .schema() - .addProp( - org.apache.avro.specific.SpecificData.CLASS_PROP, - "java.lang.String".asInstanceOf[Object] - ) -``` - -Note: If you're using [sbt-avro](https://github.com/sbt/sbt-avro#examples) for schema generation, you can -just set the SBT property `avroStringType := "String"` instead. - -### GenericRecords - -For GenericRecords, `org.apache.avro.util.Utf8` decoding has always been the default. If you're reading -Avro GenericRecords in your SMB join, set the `avro.java.string: String` property in the Schema of the key field. - -```scala -val mySchema: org.apache.avro.Schema = ??? -mySchema - .getField("keyField") - .schema() - .addProp( - org.apache.avro.generic.GenericData.STRING_PROP, - "String".asInstanceOf[Object] - ) -``` +You'll have to either recompile your avro schema using `String` type, +or add the `GenericData.StringType.String` property to your Avro schema with [setStringType](https://avro.apache.org/docs/1.11.1/api/java/org/apache/avro/generic/GenericData.html#setStringType-org.apache.avro.Schema-org.apache.avro.generic.GenericData.StringType-) ## Parquet SMB supports Parquet reads and writes in both Avro and case class formats. -As of **Scio 0.14.0** and above, Scio supports logical types in parquet-avro out of the box. - -Earlier versions of Scio require you to manually supply a _logical type supplier_ in your Parquet `Configuration` parameter: - -```scala mdoc:fail:silent -import org.apache.avro.specific.SpecificRecordBase - -import org.apache.beam.sdk.extensions.smb.{AvroLogicalTypeSupplier, ParquetAvroSortedBucketIO} -import org.apache.beam.sdk.values.TupleTag -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.avro.{AvroDataSupplier, AvroReadSupport, AvroWriteSupport} -import com.spotify.scio.avro.TestRecord - -// Reads -val readConf = new Configuration() -readConf.setClass(AvroReadSupport.AVRO_DATA_SUPPLIER, classOf[AvroLogicalTypeSupplier], classOf[AvroDataSupplier]) - -ParquetAvroSortedBucketIO - .read[TestRecord](new TupleTag[TestRecord], classOf[TestRecord]) - .withConfiguration(readConf) - -// Writes -val writeConf = new Configuration() -writeConf.setClass(AvroWriteSupport.AVRO_DATA_SUPPLIER, classOf[AvroLogicalTypeSupplier], classOf[AvroDataSupplier]) - -ParquetAvroSortedBucketIO - .write(classOf[String], "myKeyField", classOf[TestRecord]) - .withConfiguration(writeConf) - -// Transforms -val transformConf = new Configuration() -transformConf.setClass(AvroReadSupport.AVRO_DATA_SUPPLIER, classOf[AvroLogicalTypeSupplier], classOf[AvroDataSupplier]) -transformConf.setClass(AvroWriteSupport.AVRO_DATA_SUPPLIER, classOf[AvroLogicalTypeSupplier], classOf[AvroDataSupplier]) - -ParquetAvroSortedBucketIO - .transformOutput(classOf[String], "myKeyField", classOf[TestRecord]) - .withConfiguration(transformConf) -``` +As of **Scio 0.14.0** and above, Scio supports specific record logical types in parquet-avro out of the box. -Note that if you're using a non-default Avro version (i.e. Avro 1.11), you'll have to supply a custom logical type supplier -using Avro 1.11 classes. See @ref:[Logical Types in Parquet](../io/Parquet.md#logical-types) for more information. +When using generic record, you have to manually supply a _data supplier_ in your Parquet `Configuration` parameter. +See @ref:[Logical Types in Parquet](../io/Parquet.md#logical-types) for more information. ## Tuning parameters for SMB transforms diff --git a/site/src/main/paradox/io/Parquet.md b/site/src/main/paradox/io/Parquet.md index 47d2035a3e..245256455d 100644 --- a/site/src/main/paradox/io/Parquet.md +++ b/site/src/main/paradox/io/Parquet.md @@ -86,7 +86,7 @@ object ParquetJob { ### Write Avro to Parquet files -Both Avro [generic](https://avro.apache.org/docs/1.8.1/api/java/org/apache/avro/generic/GenericData.Record.html) and [specific](https://avro.apache.org/docs/1.8.2/api/java/org/apache/avro/specific/package-summary.html) records are supported when writing. +Both Avro [generic](https://avro.apache.org/docs/current/api/java/org/apache/avro/generic/GenericData.Record.html) and [specific](https://avro.apache.org/docs/current/api/java/org/apache/avro/specific/package-summary.html) records are supported when writing. Type of Avro specific records will hold information about schema, therefore Scio will figure out the schema by itself: @@ -117,66 +117,52 @@ def result = input.saveAsParquetAvroFile("gs://path-to-data/lake/output", schema ### Logical Types -As of **Scio 0.14.0** and above, Scio supports logical types in parquet-avro out of the box. +As of **Scio 0.14.0** and above, Scio supports specific record logical types in parquet-avro out of the box. -If you're on an earlier version of Scio and your Avro schema contains a logical type, you'll need to supply an additional Configuration parameter for your reads and writes. +When using generic record you'll need to supply the additional Configuration parameter +`AvroReadSupport.AVRO_DATA_SUPPLIER` for reads or `AvroWriteSupport.AVRO_DATA_SUPPLIER` for writes to use logical types. -If you're using the default version of Avro (1.8), you can use Scio's pre-built logical type conversions: - -```scala mdoc:fail:silent +```scala mdoc:compile-only import com.spotify.scio._ -import com.spotify.scio.values.SCollection +import com.spotify.scio.avro._ +import com.spotify.scio.coders.Coder import com.spotify.scio.parquet.avro._ -import com.spotify.scio.avro.TestRecord +import com.spotify.scio.parquet.ParquetConfiguration +import com.spotify.scio.values.SCollection +import org.apache.avro.Conversions +import org.apache.avro.generic.GenericRecord +import org.apache.avro.data.TimeConversions +import org.apache.avro.generic.GenericData +import org.apache.parquet.avro.{AvroDataSupplier, AvroReadSupport, AvroWriteSupport} val sc: ScioContext = ??? -val data: SCollection[TestRecord] = sc.parallelize(List[TestRecord]()) +implicit val coder: Coder[GenericRecord] = ??? +val data: SCollection[GenericRecord] = ??? -// Reads -import com.spotify.scio.parquet.ParquetConfiguration +class AvroLogicalTypeSupplier extends AvroDataSupplier { + override def get(): GenericData = { + val data = GenericData.get() -import org.apache.parquet.avro.AvroReadSupport + // Add conversions as needed + data.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion()) + + data + } +} +// Reads sc.parquetAvroFile( "somePath", - conf = ParquetConfiguration.of(AvroReadSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier]) + conf = ParquetConfiguration.of(AvroReadSupport.AVRO_DATA_SUPPLIER -> classOf[AvroLogicalTypeSupplier]) ) // Writes -import org.apache.parquet.avro.AvroWriteSupport - data.saveAsParquetAvroFile( "somePath", - conf = ParquetConfiguration.of(AvroWriteSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier]) + conf = ParquetConfiguration.of(AvroWriteSupport.AVRO_DATA_SUPPLIER -> classOf[AvroLogicalTypeSupplier]) ) ``` -(If you're using `scio-smb`, you can use the provided class `org.apache.beam.sdk.extensions.smb.AvroLogicalTypeSupplier` instead.) - -If you're using Avro 1.11, you'll have to create your own logical type supplier class, as Scio's `LogicalTypeSupplier` uses -classes present in Avro 1.8 but not 1.11. A sample Avro 1.11 logical-type supplier might look like: - -```scala -import org.apache.avro.Conversions; -import org.apache.avro.data.TimeConversions; -import org.apache.avro.generic.GenericData; -import org.apache.avro.specific.SpecificData; -import org.apache.parquet.avro.AvroDataSupplier; - -case class AvroLogicalTypeSupplier() extends AvroDataSupplier { - override def get(): GenericData = { - val specificData = SpecificData.get() - - // Add conversions as needed - specificData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion()) - - specificData - } -} -``` - -Then, you'll have to specify your logical type supplier class in your `Configuration` as outlined above. - ## Case classes Scio uses [magnolify-parquet](https://github.com/spotify/magnolify/blob/master/docs/parquet.md) to derive Parquet reader and writer for case classes at compile time, similar to how @ref:[coders](../internals/Coders.md) work. See this [mapping table](https://github.com/spotify/magnolify/blob/master/docs/mapping.md) for how Scala and Parquet types map; enum type mapping is also specifically [documented](https://github.com/spotify/magnolify/blob/main/docs/enums.md).