Skip to content

Commit

Permalink
[euphoria-core] code style corrections 2
Browse files Browse the repository at this point in the history
  • Loading branch information
mareksimunek committed Mar 8, 2018
1 parent f3a6b99 commit 43ae4b2
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ public static class DatasetBuilder5<IN, KEY, VALUE, OUT, W extends Window>
public Dataset<Pair<KEY, OUT>> output(OutputHint... outputHints) {
Flow flow = input.getFlow();
ReduceByKey<IN, KEY, VALUE, OUT, W> reduce = new ReduceByKey<>(
name, flow, input, keyExtractor, valueExtractor,
name, flow, input, keyExtractor, valueExtractor,
windowing, reducer, valuesComparator, Sets.newHashSet(outputHints));
flow.add(reduce);
return reduce.output();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ public void testBuild_Windowing() {
assertTrue(join.getWindowing() instanceof Time);
}


@Test
@SuppressWarnings("unchecked")
public void testBuild_Hints() {
Expand All @@ -218,12 +217,27 @@ public void testBuild_Hints() {

assertTrue(outputDataset.getProducer().getHints().contains(SizeHint.FITS_IN_MEMORY));

Join join = (Join) flow.operators().stream().filter(op -> op instanceof Join).findFirst().get();
assertTrue(join.listInputs().stream().anyMatch(input ->
((Dataset) input).getProducer().getHints().contains(new Util.TestHint())));
assertTrue(join.listInputs().stream().anyMatch(input ->
((Dataset) input).getProducer().getHints().contains(new Util.TestHint2())));
assertEquals(2, ((Dataset) join.listInputs().stream().findFirst().get()).getProducer().getHints().size());
Join join = (Join) flow.operators()
.stream()
.filter(op -> op instanceof Join)
.findFirst()
.get();
assertTrue(join.listInputs()
.stream()
.anyMatch(input ->
((Dataset) input).getProducer().getHints().contains(new Util.TestHint())));

assertTrue(join.listInputs()
.stream()
.anyMatch(input ->
((Dataset) input).getProducer().getHints().contains(new Util.TestHint2())));

assertEquals(2,
((Dataset) join.listInputs()
.stream()
.findFirst()
.get()
).getProducer().getHints().size());

}

Expand All @@ -235,7 +249,9 @@ public void testBuild_Hints_afterWindowing() {
Dataset<String> right = Util.createMockDataset(flow, 1);

Join.named("Join1")
.of(MapElements.of(left).using(i -> i).output(new Util.TestHint(), new Util.TestHint2(), new Util.TestHint2()),
.of(MapElements.of(left)
.using(i -> i)
.output(new Util.TestHint(), new Util.TestHint2(), new Util.TestHint2()),
right)
.by(String::length, String::length)
.using((String l, String r, Collector<String> c) -> {
Expand All @@ -244,12 +260,31 @@ public void testBuild_Hints_afterWindowing() {
.windowBy(Time.of(Duration.ofHours(1)))
.output();

Join join = (Join) flow.operators().stream().filter(op -> op instanceof Join).findFirst().get();
assertTrue(join.listInputs().stream().anyMatch(input ->
((Dataset)input).getProducer().getHints().contains(new Util.TestHint())));
assertTrue(join.listInputs().stream().anyMatch(input ->
((Dataset)input).getProducer().getHints().contains(new Util.TestHint2())));
assertEquals(2, ((Dataset) join.listInputs().stream().findFirst().get()).getProducer().getHints().size());
Join join = (Join) flow.operators()
.stream()
.filter(op -> op instanceof Join)
.findFirst()
.get();
assertTrue(join.listInputs()
.stream()
.anyMatch(input ->
((Dataset) input).getProducer().getHints().contains(new Util.TestHint())));

assertTrue(join.listInputs()
.stream()
.anyMatch(input ->
((Dataset) input)
.getProducer()
.getHints()
.contains(new Util.TestHint2())));

assertEquals(2,
((Dataset) join.listInputs()
.stream()
.findFirst()
.get()
).getProducer().getHints().size());

assertTrue(join.getWindowing() instanceof Time);
}
}

0 comments on commit 43ae4b2

Please sign in to comment.