Skip to content

Commit

Permalink
Add top to collections with fanout (#5487)
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones authored Sep 16, 2024
1 parent 88d07bb commit a464955
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -962,11 +962,9 @@ sealed trait SCollection[T] extends PCollectionWrapper[T] {
* a new SCollection whose single value is an `Iterable` of the top k
* @group transform
*/
def top(
num: Int
)(implicit ord: Ordering[T]): SCollection[Iterable[T]] =
def top(num: Int)(implicit ord: Ordering[T]): SCollection[Iterable[T]] =
this.transform {
_.pApply(Top.of(num, ord)).map((l: JIterable[T]) => l.asScala)
_.pApply(Top.of[T, Ordering[T]](num, ord)).map((l: JIterable[T]) => l.asScala)
}

// =======================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ import com.spotify.scio.ScioContext
import com.spotify.scio.util.Functions
import com.spotify.scio.coders.Coder
import com.twitter.algebird.{Aggregator, Monoid, MonoidAggregator, Semigroup}
import org.apache.beam.sdk.transforms.Combine
import org.apache.beam.sdk.transforms.{Combine, Top}
import org.apache.beam.sdk.values.PCollection

import java.lang.{Iterable => JIterable}

import scala.jdk.CollectionConverters._

/**
* An enhanced SCollection that uses an intermediate node to combine parts of the data to reduce
* load on the final global combine step.
Expand Down Expand Up @@ -111,4 +115,12 @@ class SCollectionWithFanout[T] private[values] (coll: SCollection[T], fanout: In
Combine.globally(Functions.reduceFn(context, sg)).withoutDefaults().withFanout(fanout)
)
}

/** [[SCollection.top]] with fan out. */
def top(num: Int)(implicit ord: Ordering[T]): SCollection[Iterable[T]] = {
coll.transform { in =>
in.pApply(Top.of[T, Ordering[T]](num, ord).withFanout(fanout))
.map((l: JIterable[T]) => l.asScala)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.spotify.scio.util.Functions
import com.spotify.scio.util.TupleFunctions._
import com.twitter.algebird.{Aggregator, Monoid, MonoidAggregator, Semigroup}
import org.apache.beam.sdk.transforms.Combine.PerKeyWithHotKeyFanout
import org.apache.beam.sdk.transforms.Top.TopCombineFn
import org.apache.beam.sdk.transforms.{Combine, SerializableFunction}

/**
Expand Down Expand Up @@ -140,4 +141,10 @@ class SCollectionWithHotKeyFanout[K, V] private[values] (
)
self.applyPerKey(withFanout(Combine.perKey(Functions.reduceFn(context, sg))))(kvToTuple)
}

/** [[PairSCollectionFunctions.topByKey]] with hot key fanout. */
def topByKey(num: Int)(implicit ord: Ordering[V]): SCollection[(K, Iterable[V])] =
self.applyPerKey(withFanout(Combine.perKey(new TopCombineFn[V, Ordering[V]](num, ord))))(
kvListToTuple
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ class SCollectionWithFanoutTest extends NamedTransformSpec {
}
}

it should "support top()" in {
runWithContext { sc =>
def top3[T: Ordering: Coder](elems: T*): SCollection[Iterable[T]] =
sc.parallelize(elems).withFanout(10).top(3)

top3(1, 2, 3, 4) should containSingleValue(Iterable(4, 3, 2))
top3(1L, 2L, 3L, 4L) should containSingleValue(Iterable(4L, 3L, 2L))
top3(1f, 2f, 3f, 4f) should containSingleValue(Iterable(4f, 3f, 2f))
top3(1.0, 2.0, 3.0, 4.0) should containSingleValue(Iterable(4.0, 3.0, 2.0))
}
}

private def shouldFanOut[T](fn: SCollectionWithFanout[Int] => SCollection[T]) = {
runWithContext { sc =>
val p = fn(sc.parallelize(1 to 100).withFanout(10))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,27 @@ class SCollectionWithHotKeyFanoutTest extends NamedTransformSpec {
}
}

it should "support topByKey()" in {
runWithContext { sc =>
val p = sc.parallelize(
List(
("a", 1),
("a", 2),
("a", 3),
("a", 4),
("b", 5),
("b", 6),
("b", 7),
("b", 8)
)
)
val r1 = p.withHotKeyFanout(10).topByKey(3)
val r2 = p.withHotKeyFanout(_.hashCode).topByKey(3)
r1 should containInAnyOrder(Seq(("a", Iterable(4, 3, 2)), ("b", Iterable(8, 7, 6))))
r2 should containInAnyOrder(Seq(("a", Iterable(4, 3, 2)), ("b", Iterable(8, 7, 6))))
}
}

private def shouldFanOut[T](
fn: SCollectionWithHotKeyFanout[String, Int] => SCollection[T]
) = {
Expand Down
2 changes: 1 addition & 1 deletion site/src/main/paradox/extras/Fanout.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Scio ships with two `SCollection` variants that provide _fanout_ over aggregatio
The interim step pairs the data to be aggregated with a synthetic key, then aggregates within this artificial keyspace before passing the partial aggregations on to the final aggregation step.
The interim step requires an additional shuffle but can make the aggregation more parallelizable and reduces the impact of a hot key.

The `aggregate`, `combine`, `fold`, `reduce`, `sum` transforms and their keyed variants are supported.
The `aggregate`, `combine`, `fold`, `reduce`, `sum`, `top` transforms and their keyed variants are supported.

## WithFanout

Expand Down

0 comments on commit a464955

Please sign in to comment.