Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert SortMergeBucketExample to Parquet + update tests #5191

Merged
merged 7 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,30 @@
// Usage:

// `sbt runMain "com.spotify.scio.examples.extra.SortMergeBucketWriteExample
// --outputL=[OUTPUT]--outputR=[OUTPUT]"`
// --users=[OUTPUT] --accounts=[OUTPUT]"`
// `sbt runMain "com.spotify.scio.examples.extra.SortMergeBucketJoinExample
// --inputL=[INPUT]--inputR=[INPUT] --output=[OUTPUT]"`
// --users=[INPUT] --accounts=[INPUT] --output=[OUTPUT]"`
// `sbt runMain "com.spotify.scio.examples.extra.SortMergeBucketTransformExample
// --inputL=[INPUT]--inputR=[INPUT] --output=[OUTPUT]"`
// --users=[INPUT] --accounts=[INPUT] --output=[OUTPUT]"`
package com.spotify.scio.examples.extra

import com.spotify.scio.{Args, ContextAndArgs, ScioContext}
import com.spotify.scio.avro._
import com.spotify.scio.coders.Coder
import com.spotify.scio.io.ClosedTap
import com.spotify.scio.parquet.ParquetConfiguration
import com.spotify.scio.values.SCollection
import org.apache.avro.Schema
import org.apache.avro.file.CodecFactory
import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
import org.apache.beam.sdk.extensions.smb.BucketMetadata.HashType
import org.apache.beam.sdk.extensions.smb.{AvroSortedBucketIO, TargetParallelism}
import org.apache.beam.sdk.extensions.smb.{
ParquetAvroSortedBucketIO,
ParquetTypeSortedBucketIO,
TargetParallelism
}
import org.apache.beam.sdk.values.TupleTag
import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.hadoop.ParquetOutputFormat

import scala.util.Random

Expand All @@ -49,7 +56,7 @@ object SortMergeBucketExample {
| "fields": [
| {
| "name": "userId",
| "type": ["null", {"type": "string", "avro.java.string": "String"}]
| "type": "int"
| },
| {
| "name": "age", "type": "int"
Expand All @@ -58,10 +65,11 @@ object SortMergeBucketExample {
|""".stripMargin
)

def user(id: String, age: Int): GenericRecord = new GenericRecordBuilder(UserDataSchema)
.set("userId", id)
.set("age", age)
.build()
def user(id: Int, age: Int): GenericRecord =
new GenericRecordBuilder(UserDataSchema)
.set("userId", id)
.set("age", age)
.build()
}

object SortMergeBucketWriteExample {
Expand All @@ -72,46 +80,54 @@ object SortMergeBucketWriteExample {

def pipeline(cmdLineArgs: Array[String]): ScioContext = {
val (sc, args) = ContextAndArgs(cmdLineArgs)
pipeline(sc, args)
sc
}

sc.parallelize(0 until 500)
.map(i => SortMergeBucketExample.user(i.toString, i % 100))
def pipeline(sc: ScioContext, args: Args): (ClosedTap[GenericRecord], ClosedTap[Account]) = {
val userWriteTap = sc
.parallelize(0 until 500)
.map(i => SortMergeBucketExample.user(i % 100, i % 100))
.saveAsSortedBucket(
AvroSortedBucketIO
.write(classOf[String], "userId", SortMergeBucketExample.UserDataSchema)
ParquetAvroSortedBucketIO
.write(classOf[Integer], "userId", SortMergeBucketExample.UserDataSchema)
.to(args("users"))
.withTempDirectory(sc.options.getTempLocation)
.withCodec(CodecFactory.snappyCodec())
.withHashType(HashType.MURMUR3_32)
.withFilenamePrefix("example-prefix")
.withNumBuckets(2)
.withNumShards(1)
)

// #SortMergeBucketExample_sink
sc.parallelize(250 until 750)
val accountWriteTap = sc
.parallelize(250 until 750)
.map { i =>
Account
.newBuilder()
.setId(i)
.setName(i.toString)
.setId(i % 100)
.setName(s"name$i")
.setType(s"type${i % 5}")
.setAmount(Random.nextDouble() * 1000)
.build()
}
.saveAsSortedBucket(
AvroSortedBucketIO
.write[String, Account](classOf[String], "name", classOf[Account])
ParquetAvroSortedBucketIO
.write[Integer, Account](classOf[Integer], "id", classOf[Account])
.to(args("accounts"))
.withSorterMemoryMb(128)
.withTempDirectory(sc.options.getTempLocation)
.withCodec(CodecFactory.snappyCodec())
.withConfiguration(
ParquetConfiguration.of(ParquetOutputFormat.BLOCK_SIZE -> 512 * 1024 * 1024)
)
.withHashType(HashType.MURMUR3_32)
.withFilenamePrefix("part") // Default is "bucket"
.withNumBuckets(1)
.withNumShards(1)
)
// #SortMergeBucketExample_sink
sc

(userWriteTap, accountWriteTap)
}

def secondaryKeyExample(
Expand All @@ -121,11 +137,11 @@ object SortMergeBucketWriteExample {
in
// #SortMergeBucketExample_sink_secondary
.saveAsSortedBucket(
AvroSortedBucketIO
.write[String, String, Account](
ParquetAvroSortedBucketIO
.write[Integer, String, Account](
// primary key class and field
classOf[String],
"name",
classOf[Integer],
"id",
// secondary key class and field
classOf[String],
"type",
Expand All @@ -139,6 +155,7 @@ object SortMergeBucketWriteExample {
def main(cmdLineArgs: Array[String]): Unit = {
val sc = pipeline(cmdLineArgs)
sc.run().waitUntilDone()
()
}
}

Expand All @@ -148,36 +165,45 @@ object SortMergeBucketJoinExample {
implicit val coder: Coder[GenericRecord] =
avroGenericRecordCoder(SortMergeBucketExample.UserDataSchema)

case class UserAccountData(userId: String, age: Int, balance: Double) {
override def toString: String = s"$userId\t$age\t$balance"
}
case class AccountProjection(id: Int, amount: Double)

def pipeline(cmdLineArgs: Array[String]): ScioContext = {
val (sc, args) = ContextAndArgs(cmdLineArgs)
pipeline(sc, args)
sc
}

val mapFn: ((String, (GenericRecord, Account))) => UserAccountData = {
case (userId, (userData, account)) =>
UserAccountData(userId, userData.get("age").toString.toInt, account.getAmount)
}

def pipeline(sc: ScioContext, args: Args): ClosedTap[String] = {
// #SortMergeBucketExample_join
sc.sortMergeJoin(
classOf[String],
AvroSortedBucketIO
.read(new TupleTag[GenericRecord]("lhs"), SortMergeBucketExample.UserDataSchema)
// 1. Only 1 user per user ID
// 2. Out of key intersection 250-499, only 100 (300-349, 400-499) with age < 50
.withPredicate((xs, x) => xs.size() == 0 && x.get("age").asInstanceOf[Int] < 50)
classOf[Integer],
ParquetAvroSortedBucketIO
.read(new TupleTag[GenericRecord](), SortMergeBucketExample.UserDataSchema)
.withProjection(
SchemaBuilder
.record("UserProjection")
.fields
.requiredInt("userId")
.requiredInt("age")
.endRecord
)
// Filter at the Parquet IO level to users under 50
// Filtering at the IO level whenever possible, as it reduces total bytes read
.withFilterPredicate(FilterApi.lt(FilterApi.intColumn("age"), Int.box(50)))
// Filter at the SMB Cogrouping level to a single record per user
// Filter at the Cogroup level if your filter depends on the materializing key group
.withPredicate((xs, _) => xs.size() == 0)
.from(args("users")),
AvroSortedBucketIO
.read(new TupleTag[Account]("rhs"), classOf[Account])
ParquetTypeSortedBucketIO
.read(new TupleTag[AccountProjection]())
.from(args("accounts")),
TargetParallelism.max()
).map(mapFn) // Apply mapping function
).map { case (_, (userData, account)) =>
(userData.get("age").asInstanceOf[Int], account.amount)
}.groupByKey
.mapValues(amounts => amounts.sum / amounts.size)
.saveAsTextFile(args("output"))
// #SortMergeBucketExample_join

sc
}

def main(cmdLineArgs: Array[String]): Unit = {
Expand All @@ -190,48 +216,46 @@ object SortMergeBucketJoinExample {
object SortMergeBucketTransformExample {
import com.spotify.scio.smb._

// ParquetTypeSortedBucketIO supports case class projections for reading and writing
case class AccountProjection(id: Int, amount: Double)
case class CombinedAccount(id: Int, age: Int, totalValue: Double)

def pipeline(cmdLineArgs: Array[String]): ScioContext = {
val (sc, args) = ContextAndArgs(cmdLineArgs)
pipeline(sc, args)
sc
}

implicit val coderUserData: Coder[GenericRecord] =
avroGenericRecordCoder(SortMergeBucketExample.UserDataSchema)
implicit val coderAccount: Coder[Account] =
avroSpecificRecordCoder

// #SortMergeBucketExample_transform
val (readLhs, readRhs) = (
AvroSortedBucketIO
.read(new TupleTag[GenericRecord]("lhs"), SortMergeBucketExample.UserDataSchema)
.from(args("users")),
AvroSortedBucketIO
.read(new TupleTag[Account]("rhs"), classOf[Account])
.from(args("accounts"))
def pipeline(sc: ScioContext, args: Args): ClosedTap[CombinedAccount] = {
implicit val coder: Coder[GenericRecord] = avroGenericRecordCoder(
SortMergeBucketExample.UserDataSchema
)

// #SortMergeBucketExample_transform
sc.sortMergeTransform(
classOf[String],
readLhs,
readRhs,
classOf[Integer],
ParquetAvroSortedBucketIO
.read(new TupleTag[GenericRecord](), SortMergeBucketExample.UserDataSchema)
// Filter at the Parquet IO level to users under 50
.withFilterPredicate(FilterApi.lt(FilterApi.intColumn("age"), Int.box(50)))
.from(args("users")),
ParquetTypeSortedBucketIO
.read(new TupleTag[AccountProjection]())
.from(args("accounts")),
TargetParallelism.auto()
).to(
AvroSortedBucketIO
.transformOutput(classOf[String], "name", classOf[Account])
ParquetTypeSortedBucketIO
.transformOutput[Integer, CombinedAccount]("id")
.to(args("output"))
).via { case (key, (users, accounts), outputCollector) =>
users.foreach { _ =>
val sum = accounts.map(_.amount).sum
users.foreach { user =>
outputCollector.accept(
Account
.newBuilder()
.setId(key.toInt)
.setName(key)
.setType("combinedAmount")
.setAmount(accounts.foldLeft(0.0)(_ + _.getAmount))
.build()
CombinedAccount(key, user.get("age").asInstanceOf[Integer], sum)
)
}
}
// #SortMergeBucketExample_transform
sc
}

def secondaryReadExample(cmdLineArgs: Array[String]): Unit = {
Expand All @@ -241,7 +265,7 @@ object SortMergeBucketTransformExample {
sc.sortMergeGroupByKey(
classOf[String], // primary key class
classOf[String], // secondary key class
AvroSortedBucketIO
ParquetAvroSortedBucketIO
.read(new TupleTag[Account]("account"), classOf[Account])
.from(args("accounts"))
).map { case ((primaryKey, secondaryKey), elements) =>
Expand Down
Loading
Loading