diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java index 01e9f2b6..a74f28b1 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java @@ -348,7 +348,7 @@ public static class DatasetBuilder5 public Dataset> output(OutputHint... outputHints) { Flow flow = input.getFlow(); ReduceByKey reduce = new ReduceByKey<>( - name, flow, input, keyExtractor, valueExtractor, + name, flow, input, keyExtractor, valueExtractor, windowing, reducer, valuesComparator, Sets.newHashSet(outputHints)); flow.add(reduce); return reduce.output(); diff --git a/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/JoinTest.java b/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/JoinTest.java index c19880aa..0c5fbea3 100644 --- a/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/JoinTest.java +++ b/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/JoinTest.java @@ -198,7 +198,6 @@ public void testBuild_Windowing() { assertTrue(join.getWindowing() instanceof Time); } - @Test @SuppressWarnings("unchecked") public void testBuild_Hints() { @@ -218,12 +217,27 @@ public void testBuild_Hints() { assertTrue(outputDataset.getProducer().getHints().contains(SizeHint.FITS_IN_MEMORY)); - Join join = (Join) flow.operators().stream().filter(op -> op instanceof Join).findFirst().get(); - assertTrue(join.listInputs().stream().anyMatch(input -> - ((Dataset) input).getProducer().getHints().contains(new Util.TestHint()))); - assertTrue(join.listInputs().stream().anyMatch(input -> - ((Dataset) input).getProducer().getHints().contains(new Util.TestHint2()))); - assertEquals(2, ((Dataset) join.listInputs().stream().findFirst().get()).getProducer().getHints().size()); + Join join = (Join) flow.operators() + .stream() + .filter(op -> op instanceof Join) + .findFirst() + .get(); + assertTrue(join.listInputs() + .stream() + .anyMatch(input -> + ((Dataset) input).getProducer().getHints().contains(new Util.TestHint()))); + + assertTrue(join.listInputs() + .stream() + .anyMatch(input -> + ((Dataset) input).getProducer().getHints().contains(new Util.TestHint2()))); + + assertEquals(2, + ((Dataset) join.listInputs() + .stream() + .findFirst() + .get() + ).getProducer().getHints().size()); } @@ -235,7 +249,9 @@ public void testBuild_Hints_afterWindowing() { Dataset right = Util.createMockDataset(flow, 1); Join.named("Join1") - .of(MapElements.of(left).using(i -> i).output(new Util.TestHint(), new Util.TestHint2(), new Util.TestHint2()), + .of(MapElements.of(left) + .using(i -> i) + .output(new Util.TestHint(), new Util.TestHint2(), new Util.TestHint2()), right) .by(String::length, String::length) .using((String l, String r, Collector c) -> { @@ -244,12 +260,31 @@ public void testBuild_Hints_afterWindowing() { .windowBy(Time.of(Duration.ofHours(1))) .output(); - Join join = (Join) flow.operators().stream().filter(op -> op instanceof Join).findFirst().get(); - assertTrue(join.listInputs().stream().anyMatch(input -> - ((Dataset)input).getProducer().getHints().contains(new Util.TestHint()))); - assertTrue(join.listInputs().stream().anyMatch(input -> - ((Dataset)input).getProducer().getHints().contains(new Util.TestHint2()))); - assertEquals(2, ((Dataset) join.listInputs().stream().findFirst().get()).getProducer().getHints().size()); + Join join = (Join) flow.operators() + .stream() + .filter(op -> op instanceof Join) + .findFirst() + .get(); + assertTrue(join.listInputs() + .stream() + .anyMatch(input -> + ((Dataset) input).getProducer().getHints().contains(new Util.TestHint()))); + + assertTrue(join.listInputs() + .stream() + .anyMatch(input -> + ((Dataset) input) + .getProducer() + .getHints() + .contains(new Util.TestHint2()))); + + assertEquals(2, + ((Dataset) join.listInputs() + .stream() + .findFirst() + .get() + ).getProducer().getHints().size()); + assertTrue(join.getWindowing() instanceof Time); } } \ No newline at end of file