Skip to content

Commit

Permalink
(fix #4970) Default to Parquet-SplittableDoFn if RunnerV2 is enabled (#…
Browse files Browse the repository at this point in the history
…4973)

* (fix #4970) Default to Parquet-SplittableDoFn if RunnerV2 is enabled

* remove private modifier

* Add test case

* Wrap getExperiments in optional

* Extract helper function

* fmt
  • Loading branch information
clairemcginty authored Aug 31, 2023
1 parent dd3e9f3 commit b97c4dd
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,12 +213,7 @@ object ParquetAvroIO {
}
}

val useSplittableDoFn = jobConf.getBoolean(
ParquetReadConfiguration.UseSplittableDoFn,
ParquetReadConfiguration.UseSplittableDoFnDefault
)

if (useSplittableDoFn) {
if (ParquetReadConfiguration.getUseSplittableDoFn(jobConf, sc.options)) {
readSplittableDoFn(sc, jobConf, path)
} else {
readLegacy(sc, jobConf, path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@

package com.spotify.scio.parquet.read

import org.apache.beam.sdk.options.{ExperimentalOptions, PipelineOptions}
import org.apache.hadoop.conf.Configuration
import org.slf4j.LoggerFactory

object ParquetReadConfiguration {
private val log = LoggerFactory.getLogger(getClass)

// Key
val SplitGranularity = "scio.parquet.read.splitgranularity"
Expand All @@ -36,4 +41,21 @@ object ParquetReadConfiguration {
// SplittableDoFn
val UseSplittableDoFn = "scio.parquet.read.useSplittableDoFn"
private[scio] val UseSplittableDoFnDefault = false

private[scio] def getUseSplittableDoFn(conf: Configuration, opts: PipelineOptions): Boolean = {
Option(conf.get(UseSplittableDoFn)) match {
case Some(v) => v.toBoolean
case None if dataflowRunnerV2Enabled(opts) =>
log.info(
"Defaulting to SplittableDoFn-based Parquet read as Dataflow Runner V2 is enabled. To opt out, " +
"set `scio.parquet.read.useSplittableDoFn -> false` in your read Configuration."
)
true
case None =>
UseSplittableDoFnDefault
}
}

private def dataflowRunnerV2Enabled(opts: PipelineOptions): Boolean =
Option(opts.as(classOf[ExperimentalOptions]).getExperiments).exists(_.contains("use_runner_v2"))
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,8 @@ final case class ParquetExampleIO(path: String) extends ScioIO[Example] {

override protected def read(sc: ScioContext, params: ReadP): SCollection[Example] = {
val conf = ParquetConfiguration.ofNullable(params.conf)
val useSplittableDoFn = conf.getBoolean(
ParquetReadConfiguration.UseSplittableDoFn,
ParquetReadConfiguration.UseSplittableDoFnDefault
)

if (useSplittableDoFn) {
if (ParquetReadConfiguration.getUseSplittableDoFn(conf, sc.options)) {
readSplittableDoFn(sc, conf, params)
} else {
readLegacy(sc, conf, params)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,8 @@ final case class ParquetTypeIO[T: ClassTag: Coder: ParquetType](

override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = {
val conf = ParquetConfiguration.ofNullable(params.conf)
val useSplittableDoFn = conf.getBoolean(
ParquetReadConfiguration.UseSplittableDoFn,
ParquetReadConfiguration.UseSplittableDoFnDefault
)

if (useSplittableDoFn) {
if (ParquetReadConfiguration.getUseSplittableDoFn(conf, sc.options)) {
readSplittableDoFn(sc, conf, params)
} else {
readLegacy(sc, conf, params)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.spotify.scio.parquet.types._
import com.spotify.scio.testing.PipelineSpec
import org.apache.commons.io.FileUtils
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.util.SerializableUtils
import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.io.api.Binary
Expand Down Expand Up @@ -318,6 +319,36 @@ class ParquetReadFnTest extends PipelineSpec with BeforeAndAfterAll {
)
}

"ParquetReadConfiguration" should "default to using splittableDoFn only if RunnerV2 experiment is enabled" in {
// Default to true if RunnerV2 is set and user hasn't configured SDF explicitly
ParquetReadConfiguration.getUseSplittableDoFn(
ParquetConfiguration.empty(),
PipelineOptionsFactory.fromArgs("--experiments=use_runner_v2,another_experiment").create()
) shouldBe true

// Default to false if RunnerV2 is not set
ParquetReadConfiguration.getUseSplittableDoFn(
ParquetConfiguration.empty(),
PipelineOptionsFactory.fromArgs("--experiments=another_experiment").create()
) shouldBe false

ParquetReadConfiguration.getUseSplittableDoFn(
ParquetConfiguration.empty(),
PipelineOptionsFactory.fromArgs().create()
) shouldBe false

// Respect user's configuration, if set
ParquetReadConfiguration.getUseSplittableDoFn(
ParquetConfiguration.of(ParquetReadConfiguration.UseSplittableDoFn -> false),
PipelineOptionsFactory.fromArgs("--experiments=use_runner_v2").create()
) shouldBe false

ParquetReadConfiguration.getUseSplittableDoFn(
ParquetConfiguration.of(ParquetReadConfiguration.UseSplittableDoFn -> true),
PipelineOptionsFactory.fromArgs().create()
) shouldBe true
}

private def listFiles(dir: String): Seq[String] =
Files
.list(Paths.get(dir))
Expand Down

0 comments on commit b97c4dd

Please sign in to comment.