Skip to content

Commit

Permalink
(fix #5472) Set desiredBundleSizeBytes to Long.MaxValue BinaryIO read…
Browse files Browse the repository at this point in the history
…s to prevent file subrange-splitting (#5473)
  • Loading branch information
clairemcginty authored Sep 10, 2024
1 parent d74056e commit d485c98
Showing 1 changed file with 11 additions and 3 deletions.
14 changes: 11 additions & 3 deletions scio-core/src/main/scala/com/spotify/scio/io/BinaryIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ final case class BinaryIO(path: String) extends ScioIO[Array[Byte]] {

override protected def read(sc: ScioContext, params: ReadP): SCollection[Array[Byte]] = {
val filePattern = ScioUtil.filePattern(path, params.suffix)
val desiredBundleSizeBytes = 64 * 1024 * 1024L // 64 mb
val coder = ByteArrayCoder.of()
val srcFn = Functions.serializableFn { path: String =>
new BinaryIO.BinarySource(path, params.emptyMatchTreatment, params.reader)
Expand All @@ -74,7 +73,8 @@ final case class BinaryIO(path: String) extends ScioIO[Array[Byte]] {
)
.applyTransform(
"Read all via FileBasedSource",
new ReadAllViaFileBasedSource[Array[Byte]](desiredBundleSizeBytes, srcFn, coder)
// Setting desiredBundleSizeBytes to Long.MaxValue prevents Beam from trying to split files
new ReadAllViaFileBasedSource[Array[Byte]](Long.MaxValue, srcFn, coder)
)
}

Expand Down Expand Up @@ -326,6 +326,9 @@ object BinaryIO {
false
}
}

override def allowsDynamicSplitting(): Boolean =
false
}
}

Expand All @@ -344,8 +347,13 @@ object BinaryIO {
fileMetadata: Metadata,
start: Long,
end: Long
): FileBasedSource[Array[Byte]] =
): FileBasedSource[Array[Byte]] = {
require(
start == 0,
s"Range with offset $start requested, but BinaryIO is unsplittable"
)
new BinarySingleFileSource(binaryFileReader, fileMetadata, start, end)
}

override def createSingleFileReader(
options: PipelineOptions
Expand Down

0 comments on commit d485c98

Please sign in to comment.