diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Dataset.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Dataset.java index c96d9d36..af71aad8 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Dataset.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Dataset.java @@ -20,9 +20,12 @@ import cz.seznam.euphoria.core.client.io.DataSink; import cz.seznam.euphoria.core.client.io.DataSource; import cz.seznam.euphoria.core.client.operator.Operator; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; +import cz.seznam.euphoria.core.client.operator.hint.OutputHintAware; + +import javax.annotation.Nullable; import java.io.Serializable; import java.util.Collection; -import javax.annotation.Nullable; /** * A dataset abstraction. @@ -30,7 +33,7 @@ * @param type of elements of this data set */ @Audience(Audience.Type.CLIENT) -public interface Dataset extends Serializable { +public interface Dataset extends OutputHintAware, Serializable { /** * @return the flow associated with this data set diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Datasets.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Datasets.java index b8d8fe35..019054e1 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Datasets.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Datasets.java @@ -19,6 +19,9 @@ import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.io.DataSource; import cz.seznam.euphoria.core.client.operator.Operator; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; + +import java.util.Set; /** * Various dataset related utils. @@ -39,9 +42,9 @@ public class Datasets { * @return a dataset representing the output of the given operator */ public static Dataset createOutputFor( - Flow flow, Dataset input, Operator op) { + Flow flow, Dataset input, Operator op, Set outputHints) { - return new OutputDataset<>(flow, op, input.isBounded()); + return new OutputDataset<>(flow, op, input.isBounded(), outputHints); } /** diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/InputDataset.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/InputDataset.java index a478c6a4..bc27d316 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/InputDataset.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/InputDataset.java @@ -20,8 +20,12 @@ import cz.seznam.euphoria.core.client.io.DataSink; import cz.seznam.euphoria.core.client.io.DataSource; import cz.seznam.euphoria.core.client.operator.Operator; -import java.util.Collection; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; + import javax.annotation.Nullable; +import java.util.Collection; +import java.util.Collections; +import java.util.Set; /** * {@code PCollection} that is input of a {@code Flow}. @@ -57,6 +61,15 @@ public void persist(DataSink sink) { "The input dataset is already stored."); } + /** + * Input Dataset doesn't have hints + * @return empty set + */ + @Override + public Set getHints() { + return Collections.emptySet(); + } + @Override public Flow getFlow() { return flow; diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/OutputDataset.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/OutputDataset.java index a7b518e5..12b1a06a 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/OutputDataset.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/OutputDataset.java @@ -20,8 +20,11 @@ import cz.seznam.euphoria.core.client.io.DataSink; import cz.seznam.euphoria.core.client.io.DataSource; import cz.seznam.euphoria.core.client.operator.Operator; -import java.util.Collection; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; + import javax.annotation.Nullable; +import java.util.Collection; +import java.util.Set; /** * {@code PCollection} that is output of some operator. @@ -32,13 +35,15 @@ class OutputDataset implements Dataset { private final Flow flow; private final Operator producer; private final boolean bounded; + private final Set outputHints; private DataSink outputSink = null; - public OutputDataset(Flow flow, Operator producer, boolean bounded) { + public OutputDataset(Flow flow, Operator producer, boolean bounded, Set outputHints) { this.flow = flow; this.producer = producer; this.bounded = bounded; + this.outputHints = outputHints; } @Nullable @@ -79,5 +84,8 @@ public boolean isBounded() { return flow.getConsumersOf(this); } - + @Override + public Set getHints() { + return outputHints; + } } diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/AssignEventTime.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/AssignEventTime.java index 2695d5fb..a7b72504 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/AssignEventTime.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/AssignEventTime.java @@ -16,14 +16,17 @@ package cz.seznam.euphoria.core.client.operator; import cz.seznam.euphoria.core.annotation.audience.Audience; -import cz.seznam.euphoria.core.client.functional.ExtractEventTime; import cz.seznam.euphoria.core.annotation.operator.Derived; import cz.seznam.euphoria.core.annotation.operator.StateComplexity; import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.flow.Flow; +import cz.seznam.euphoria.core.client.functional.ExtractEventTime; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; import cz.seznam.euphoria.core.executor.graph.DAG; +import cz.seznam.euphoria.shadow.com.google.common.collect.Sets; import java.util.Objects; +import java.util.Set; /** A convenient alias for assignment of event time. * @@ -87,9 +90,10 @@ public static class OutputBuilder implements Builders.Output { } @Override - public Dataset output() { + public Dataset output(OutputHint... outputHints) { Flow flow = input.getFlow(); - AssignEventTime op = new AssignEventTime<>(name, flow, input, eventTimeFn); + AssignEventTime op = new AssignEventTime<>(name, flow, input, eventTimeFn, Sets.newHashSet + (outputHints)); flow.add(op); return op.output(); } @@ -98,8 +102,8 @@ public Dataset output() { private final ExtractEventTime eventTimeFn; AssignEventTime(String name, Flow flow, Dataset input, - ExtractEventTime eventTimeFn) { - super(name, flow, input); + ExtractEventTime eventTimeFn, Set outputHints) { + super(name, flow, input, outputHints); this.eventTimeFn = eventTimeFn; } diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Builders.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Builders.java index ce0773e7..485a0a8f 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Builders.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Builders.java @@ -20,10 +20,9 @@ import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.functional.UnaryFunction; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; import cz.seznam.euphoria.core.client.util.Pair; -import java.util.Set; - /** * Common methods used in operator builders to share related javadoc * descriptions.

@@ -94,7 +93,7 @@ public interface Output { * * @return the dataset representing the new operator's output */ - Dataset output(); + Dataset output(OutputHint... outputHints); } public interface OutputValues extends Output> { @@ -106,24 +105,13 @@ public interface OutputValues extends Output> { * * @return the dataset representing the new operator's output */ - default Dataset outputValues() { + default Dataset outputValues(OutputHint... outputHints) { return MapElements .named("extract-values") .of(output()) .using(Pair::getSecond) - .output(); + .output(outputHints); } } - public interface OutputWithHint extends Output { - - /** - * Add runtime specific hints for the operator - * - * @param hints runtime specific hints - * @return output builder - */ - Output withHints(Set hints); - } - } diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/CountByKey.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/CountByKey.java index 96f868f8..f5c55e2b 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/CountByKey.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/CountByKey.java @@ -23,11 +23,14 @@ import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.functional.UnaryFunction; -import cz.seznam.euphoria.core.executor.graph.DAG; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; import cz.seznam.euphoria.core.client.util.Pair; +import cz.seznam.euphoria.core.executor.graph.DAG; +import cz.seznam.euphoria.shadow.com.google.common.collect.Sets; import javax.annotation.Nullable; import java.util.Objects; +import java.util.Set; /** * Operator counting elements with same key. @@ -100,8 +103,8 @@ public static class WindowingBuilder } @Override - public Dataset> output() { - return windowBy(null).output(); + public Dataset> output(OutputHint... outputHints) { + return windowBy(null).output(outputHints); } } @@ -122,10 +125,10 @@ public static class OutputBuilder } @Override - public Dataset> output() { + public Dataset> output(OutputHint... outputHints) { Flow flow = input.getFlow(); CountByKey count = new CountByKey<>( - name, flow, input, keyExtractor, windowing); + name, flow, input, keyExtractor, windowing, Sets.newHashSet(outputHints)); flow.add(count); return count.output(); } @@ -163,9 +166,10 @@ public static OfBuilder named(String name) { Flow flow, Dataset input, UnaryFunction extractor, - @Nullable Windowing windowing) { + @Nullable Windowing windowing, + Set outputHints) { - super(name, flow, input, extractor, windowing); + super(name, flow, input, extractor, windowing, outputHints); } @Override diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Distinct.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Distinct.java index 7129dade..ec7db07a 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Distinct.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Distinct.java @@ -24,11 +24,14 @@ import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.functional.CombinableReduceFunction; import cz.seznam.euphoria.core.client.functional.UnaryFunction; -import cz.seznam.euphoria.core.executor.graph.DAG; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; import cz.seznam.euphoria.core.client.util.Pair; +import cz.seznam.euphoria.core.executor.graph.DAG; +import cz.seznam.euphoria.shadow.com.google.common.collect.Sets; import javax.annotation.Nullable; import java.util.Objects; +import java.util.Set; /** * Operator outputting distinct (based on {@link Object#equals}) elements. @@ -126,8 +129,7 @@ private WindowingBuilder( return new OutputBuilder<>(name, input, mapper, windowing); } - @Override - public Dataset output() { + public Dataset output(OutputHint... outputHints) { return new OutputBuilder<>(name, input, mapper, null).output(); } } @@ -149,10 +151,10 @@ public static class OutputBuilder } @Override - public Dataset output() { + public Dataset output(OutputHint... outputHints) { Flow flow = input.getFlow(); Distinct distinct = new Distinct<>( - name, flow, input, mapper, windowing); + name, flow, input, mapper, windowing, Sets.newHashSet(outputHints)); flow.add(distinct); return distinct.output(); } @@ -190,9 +192,10 @@ public static OfBuilder named(String name) { Flow flow, Dataset input, UnaryFunction mapper, - @Nullable Windowing windowing) { + @Nullable Windowing windowing, + Set outputHints) { - super(name, flow, input, mapper, windowing); + super(name, flow, input, mapper, windowing, outputHints); } @Override diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ElementWiseOperator.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ElementWiseOperator.java index 7c967833..ab3b5b5a 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ElementWiseOperator.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ElementWiseOperator.java @@ -18,6 +18,9 @@ import cz.seznam.euphoria.core.annotation.audience.Audience; import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.flow.Flow; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; + +import java.util.Set; /** * Operator working element-wise, with no context between elements. @@ -29,9 +32,9 @@ public abstract class ElementWiseOperator protected final Dataset output; - protected ElementWiseOperator(String name, Flow flow, Dataset input) { + protected ElementWiseOperator(String name, Flow flow, Dataset input, Set outputHints) { super(name, flow, input); - this.output = createOutput(input); + this.output = createOutput(input, outputHints); } @Override diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Filter.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Filter.java index 10ebd126..3d2b44fc 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Filter.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Filter.java @@ -21,9 +21,12 @@ import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.functional.UnaryPredicate; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; import cz.seznam.euphoria.core.executor.graph.DAG; +import cz.seznam.euphoria.shadow.com.google.common.collect.Sets; import java.util.Objects; +import java.util.Set; /** * Operator performing a filter operation. @@ -94,9 +97,9 @@ private OutputBuilder(String name, Dataset input, UnaryPredicate predica } @Override - public Dataset output() { + public Dataset output(OutputHint... outputHints) { Flow flow = input.getFlow(); - Filter filter = new Filter<>(name, flow, input, predicate); + Filter filter = new Filter<>(name, flow, input, predicate, Sets.newHashSet(outputHints)); flow.add(filter); return filter.output(); @@ -134,8 +137,9 @@ public static OfBuilder named(String name) { final UnaryPredicate predicate; - Filter(String name, Flow flow, Dataset input, UnaryPredicate predicate) { - super(name, flow, input); + Filter(String name, Flow flow, Dataset input, UnaryPredicate predicate, Set + outputHints) { + super(name, flow, input, outputHints); this.predicate = predicate; } diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/FlatMap.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/FlatMap.java index c7db0553..b7c7a068 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/FlatMap.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/FlatMap.java @@ -16,16 +16,20 @@ package cz.seznam.euphoria.core.client.operator; import cz.seznam.euphoria.core.annotation.audience.Audience; -import cz.seznam.euphoria.core.client.functional.ExtractEventTime; import cz.seznam.euphoria.core.annotation.operator.Basic; import cz.seznam.euphoria.core.annotation.operator.StateComplexity; import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.flow.Flow; +import cz.seznam.euphoria.core.client.functional.ExtractEventTime; import cz.seznam.euphoria.core.client.functional.UnaryFunctor; import cz.seznam.euphoria.core.client.io.Collector; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; +import cz.seznam.euphoria.shadow.com.google.common.collect.Sets; import javax.annotation.Nullable; +import java.util.Collections; import java.util.Objects; +import java.util.Set; /** * A transformation of a dataset from one type into another allowing user code @@ -142,9 +146,9 @@ public static class EventTimeBuilder implements Builders.Output { } @Override - public Dataset output() { + public Dataset output(OutputHint... outputHints) { return new OutputBuilder<>( - this.using.name, this.using.input, this.functor, null).output(); + this.using.name, this.using.input, this.functor, null).output(outputHints); } } @@ -166,9 +170,10 @@ public static class OutputBuilder implements Builders.Output { } @Override - public Dataset output() { + public Dataset output(OutputHint... outputHints) { Flow flow = input.getFlow(); - FlatMap map = new FlatMap<>(name, flow, input, functor, evtTimeFn); + FlatMap map = new FlatMap<>(name, flow, input, functor, evtTimeFn, Sets.newHashSet + (outputHints)); flow.add(map); return map.output(); } @@ -207,12 +212,19 @@ public static OfBuilder named(String name) { FlatMap(String name, Flow flow, Dataset input, UnaryFunctor functor, - @Nullable ExtractEventTime evtTimeFn) { - super(name, flow, input); + @Nullable ExtractEventTime evtTimeFn, + Set outputHints) { + super(name, flow, input, outputHints); this.functor = functor; this.eventTimeFn = evtTimeFn; } + FlatMap(String name, Flow flow, Dataset input, + UnaryFunctor functor, + @Nullable ExtractEventTime evtTimeFn) { + this(name, flow, input, functor, evtTimeFn, Collections.emptySet()); + } + /** * Retrieves the user defined map function to be applied to this operator's * input elements. diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/FullJoin.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/FullJoin.java index 005e9672..0afa10a3 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/FullJoin.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/FullJoin.java @@ -40,7 +40,6 @@ *

  • {@code by .......................} {@link UnaryFunction}s transforming left and right elements into keys *
  • {@code using ....................} {@link BinaryFunctor} receiving left and right element from joined window *
  • {@code [windowBy] ...............} windowing function (see {@link Windowing}), default attached windowing - *
  • {@code [withHints] ..............} specify hints about runtime data characteristics, see {@link JoinHint} *
  • {@code (output | outputValues) ..} build output dataset * * diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java index 335819e6..72b42e8f 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java @@ -26,6 +26,7 @@ import cz.seznam.euphoria.core.client.functional.BinaryFunctor; import cz.seznam.euphoria.core.client.functional.UnaryFunction; import cz.seznam.euphoria.core.client.io.Collector; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; import cz.seznam.euphoria.core.client.operator.state.ListStorage; import cz.seznam.euphoria.core.client.operator.state.ListStorageDescriptor; import cz.seznam.euphoria.core.client.operator.state.State; @@ -35,11 +36,11 @@ import cz.seznam.euphoria.core.client.util.Pair; import cz.seznam.euphoria.core.executor.graph.DAG; import cz.seznam.euphoria.shadow.com.google.common.annotations.VisibleForTesting; +import cz.seznam.euphoria.shadow.com.google.common.collect.Sets; import javax.annotation.Nullable; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.Objects; import java.util.Set; @@ -58,7 +59,6 @@ *
  • {@code by .......................} {@link UnaryFunction}s transforming left and right elements into keys *
  • {@code using ....................} {@link BinaryFunctor} receiving left and right element from joined window *
  • {@code [windowBy] ...............} windowing function (see {@link Windowing}), default attached windowing - *
  • {@code [withHints] ..............} specify hints about runtime data characteristics, see {@link JoinHint} *
  • {@code (output | outputValues) ..} build output dataset * * @@ -75,7 +75,7 @@ public class Join extends StateAwareWindowWiseOperator, Either, KEY, Pair, W, Join> - implements HintAware, Builders.OutputValues { + implements Builders.OutputValues { public enum Type { INNER, @@ -156,7 +156,7 @@ public Join.WindowingBuilder using( } public static class WindowingBuilder - implements Builders.OutputWithHint, JoinHint>, + implements Builders.Output>, Builders.OutputValues, OptionalMethodBuilder> { @@ -186,69 +186,19 @@ public static class WindowingBuilder } @Override - public Dataset> output() { - return windowBy(null).withHints(Collections.emptySet()).output(); + public Dataset> output(OutputHint... outputHints) { + return windowBy(null).output(outputHints); } - @Override - public OutputBuilder withHints(Set hints) { - return windowBy(null).withHints(hints); - } - - public HintBuilderOutput windowBy( + public OutputBuilder windowBy( Windowing, W> windowing) { - return new HintBuilderOutput<>(name, left, right, leftKeyExtractor, - rightKeyExtractor, joinFunc, type, windowing); - } - } - - public static class HintBuilderOutput - implements Builders.OutputWithHint, JoinHint>, - Builders.OutputValues { - - private final String name; - private final Dataset left; - private final Dataset right; - private final UnaryFunction leftKeyExtractor; - private final UnaryFunction rightKeyExtractor; - private final BinaryFunctor joinFunc; - private final Type type; - - @Nullable - private final Windowing, W> windowing; - - HintBuilderOutput(String name, - Dataset left, - Dataset right, - UnaryFunction leftKeyExtractor, - UnaryFunction rightKeyExtractor, - BinaryFunctor joinFunc, - Type type, - @Nullable Windowing, W> windowing) { - this.name = Objects.requireNonNull(name); - this.left = Objects.requireNonNull(left); - this.right = Objects.requireNonNull(right); - this.leftKeyExtractor = Objects.requireNonNull(leftKeyExtractor); - this.rightKeyExtractor = Objects.requireNonNull(rightKeyExtractor); - this.joinFunc = Objects.requireNonNull(joinFunc); - this.type = Objects.requireNonNull(type); - this.windowing = windowing; - } - - @Override - public Dataset> output() { - return withHints(Collections.emptySet()).output(); - } - - @Override - public OutputBuilder withHints(Set hints) { return new OutputBuilder<>(name, left, right, leftKeyExtractor, - rightKeyExtractor, joinFunc, type, windowing, hints); + rightKeyExtractor, joinFunc, type, windowing); } } public static class OutputBuilder - implements Builders.OutputValues { + implements Builders.OutputValues, Builders.Output> { private final String name; private final Dataset left; @@ -260,7 +210,6 @@ public static class OutputBuilder @Nullable private final Windowing, W> windowing; - private final Set hints; OutputBuilder(String name, Dataset left, @@ -269,8 +218,7 @@ public static class OutputBuilder UnaryFunction rightKeyExtractor, BinaryFunctor joinFunc, Type type, - @Nullable Windowing, W> windowing, - Set hints) { + @Nullable Windowing, W> windowing) { this.name = Objects.requireNonNull(name); this.left = Objects.requireNonNull(left); this.right = Objects.requireNonNull(right); @@ -279,15 +227,14 @@ public static class OutputBuilder this.joinFunc = Objects.requireNonNull(joinFunc); this.type = Objects.requireNonNull(type); this.windowing = windowing; - this.hints = Objects.requireNonNull(hints); } @Override - public Dataset> output() { + public Dataset> output(OutputHint... outputHints) { final Flow flow = left.getFlow(); final Join join = new Join<>( name, flow, left, right, leftKeyExtractor, - rightKeyExtractor, joinFunc, type, windowing, hints); + rightKeyExtractor, joinFunc, type, windowing, Sets.newHashSet(outputHints)); flow.add(join); return join.output(); } @@ -302,7 +249,6 @@ public Dataset> output() { @VisibleForTesting final UnaryFunction rightKeyExtractor; private final Type type; - private final Set hints; Join(String name, Flow flow, @@ -312,7 +258,7 @@ public Dataset> output() { BinaryFunctor functor, Type type, @Nullable Windowing, W> windowing, - Set hints) { + Set outputHints) { super(name, flow, windowing, (Either elem) -> { if (elem.isLeft()) { return leftKeyExtractor.apply(elem.left()); @@ -325,10 +271,9 @@ public Dataset> output() { this.rightKeyExtractor = rightKeyExtractor; this.functor = functor; @SuppressWarnings("unchecked") - Dataset> output = createOutput((Dataset) left); + Dataset> output = createOutput((Dataset) left, outputHints); this.output = output; this.type = type; - this.hints = Objects.requireNonNull(hints); } @Override @@ -342,6 +287,11 @@ public Dataset> output() { return output; } + @Override + public Dataset> output(OutputHint... outputHints) { + return output; + } + @SuppressWarnings("unchecked") private static final ListStorageDescriptor LEFT_STATE_DESCR = ListStorageDescriptor.of("left", (Class) Object.class); @@ -563,11 +513,6 @@ public BinaryFunctor getJoiner() { return functor; } - @Override - public Set getHints() { - return hints; - } - @Override @SuppressWarnings("unchecked") public DAG> getBasicOps() { @@ -603,5 +548,4 @@ public Set getHints() { dag.add(reduce, union); return dag; } - } diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/JoinHints.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/JoinHints.java deleted file mode 100644 index 02b30ea1..00000000 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/JoinHints.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2016-2018 Seznam.cz, a.s. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package cz.seznam.euphoria.core.client.operator; - -import cz.seznam.euphoria.core.annotation.audience.Audience; - -@Audience(Audience.Type.CLIENT) -public class JoinHints { - - private static final BroadcastHashJoin BROADCAST_HASH_JOIN = new BroadcastHashJoin(); - - public static BroadcastHashJoin broadcastHashJoin() { - return BROADCAST_HASH_JOIN; - } - - /** - * Broadcasts optional join side to all executors. - */ - public static class BroadcastHashJoin implements JoinHint { - - private BroadcastHashJoin() { - - } - } - -} diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/LeftJoin.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/LeftJoin.java index b2ebc28c..03039f41 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/LeftJoin.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/LeftJoin.java @@ -40,7 +40,6 @@ *
  • {@code by .......................} {@link UnaryFunction}s transforming left and right elements into keys *
  • {@code using ....................} {@link BinaryFunctor} receiving left and right element from joined window *
  • {@code [windowBy] ...............} windowing function (see {@link Windowing}), default attached windowing - *
  • {@code [withHints] ..............} specify hints about runtime data characteristics, see {@link JoinHint} *
  • {@code (output | outputValues) ..} build output dataset * * diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/MapElements.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/MapElements.java index 85274370..4c4bf906 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/MapElements.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/MapElements.java @@ -22,9 +22,13 @@ import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.functional.UnaryFunction; import cz.seznam.euphoria.core.client.functional.UnaryFunctionEnv; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; import cz.seznam.euphoria.core.executor.graph.DAG; +import cz.seznam.euphoria.shadow.com.google.common.collect.Sets; +import java.util.Collections; import java.util.Objects; +import java.util.Set; /** * Simple one-to-one transformation of input elements. It is a special case of @@ -107,9 +111,9 @@ public static class OutputBuilder implements Builders.Output { } @Override - public Dataset output() { + public Dataset output(OutputHint... outputHints) { Flow flow = input.getFlow(); - MapElements map = new MapElements<>(name, flow, input, mapper); + MapElements map = new MapElements<>(name, flow, input, mapper, Sets.newHashSet(outputHints)); flow.add(map); return map.output(); @@ -150,14 +154,14 @@ public static OfBuilder named(String name) { Flow flow, Dataset input, UnaryFunction mapper) { - this(name, flow, input, (el, ctx) -> mapper.apply(el)); + this(name, flow, input, (el, ctx) -> mapper.apply(el), Collections.emptySet()); } MapElements(String name, Flow flow, Dataset input, - UnaryFunctionEnv mapper) { - super(name, flow, input); + UnaryFunctionEnv mapper, Set outputHints) { + super(name, flow, input, outputHints); this.mapper = mapper; } diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Operator.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Operator.java index ff8431e4..d95f5d20 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Operator.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Operator.java @@ -19,10 +19,12 @@ import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.dataset.Datasets; import cz.seznam.euphoria.core.client.flow.Flow; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; import cz.seznam.euphoria.core.executor.graph.DAG; import java.io.Serializable; import java.util.Collection; +import java.util.Set; /** * An operator base class. All operators inherit his class. @@ -69,9 +71,9 @@ public final Flow getFlow() { * * @return a newly created dataset associated with this operator as its output */ - final Dataset createOutput(final Dataset input) { + final Dataset createOutput(final Dataset input, Set outputHints) { Flow flow = input.getFlow(); - return Datasets.createOutputFor(flow, input, this); + return Datasets.createOutputFor(flow, input, this, outputHints); } /** diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java index 81aed03c..87e08218 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java @@ -27,10 +27,10 @@ import cz.seznam.euphoria.core.client.functional.ReduceFunction; import cz.seznam.euphoria.core.client.functional.ReduceFunctor; import cz.seznam.euphoria.core.client.functional.UnaryFunction; -import cz.seznam.euphoria.core.executor.graph.DAG; import cz.seznam.euphoria.core.client.io.Collector; import cz.seznam.euphoria.core.client.io.ExternalIterable; import cz.seznam.euphoria.core.client.io.SpillTools; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; import cz.seznam.euphoria.core.client.operator.state.ListStorage; import cz.seznam.euphoria.core.client.operator.state.ListStorageDescriptor; import cz.seznam.euphoria.core.client.operator.state.State; @@ -40,11 +40,15 @@ import cz.seznam.euphoria.core.client.operator.state.ValueStorage; import cz.seznam.euphoria.core.client.operator.state.ValueStorageDescriptor; import cz.seznam.euphoria.core.client.util.Pair; +import cz.seznam.euphoria.core.executor.graph.DAG; import cz.seznam.euphoria.core.executor.util.SingleValueContext; +import cz.seznam.euphoria.shadow.com.google.common.collect.Sets; import javax.annotation.Nullable; +import java.util.Collections; import java.util.Comparator; import java.util.Objects; +import java.util.Set; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -97,6 +101,7 @@ public class ReduceByKey ReduceByKey> implements Builders.OutputValues { + public static class OfBuilder implements Builders.Of { private final String name; @@ -279,14 +284,15 @@ public static class DatasetBuilder4 } @Override - public Dataset> output() { + public Dataset> output(OutputHint... outputHints) { return new DatasetBuilder5<>( name, input, keyExtractor, valueExtractor, - reducer, null, valuesComparator).output(); + reducer, null, valuesComparator).output(outputHints); } } + public static class SortableDatasetBuilder4 extends DatasetBuilder4 { @@ -319,7 +325,6 @@ public DatasetBuilder4 withSortedValues( } - public static class DatasetBuilder5 extends DatasetBuilder4 implements Builders.OutputValues { @@ -341,11 +346,11 @@ public static class DatasetBuilder5 } @Override - public Dataset> output() { + public Dataset> output(OutputHint... outputHints) { Flow flow = input.getFlow(); ReduceByKey reduce = new ReduceByKey<>( name, flow, input, keyExtractor, valueExtractor, - windowing, reducer, valuesComparator); + windowing, reducer, valuesComparator, Sets.newHashSet(outputHints)); flow.add(reduce); return reduce.output(); } @@ -395,10 +400,9 @@ public static OfBuilder named(String name) { this( name, flow, input, keyExtractor, valueExtractor, windowing, (ReduceFunctor) toReduceFunctor(reducer), - null); + null, Collections.emptySet()); } - ReduceByKey(String name, Flow flow, Dataset input, @@ -407,8 +411,21 @@ public static OfBuilder named(String name) { @Nullable Windowing windowing, ReduceFunctor reducer, @Nullable BinaryFunction valueComparator) { + this(name, flow, input, keyExtractor, valueExtractor, windowing, reducer, valueComparator, + Collections.emptySet()); + } + + ReduceByKey(String name, + Flow flow, + Dataset input, + UnaryFunction keyExtractor, + UnaryFunction valueExtractor, + @Nullable Windowing windowing, + ReduceFunctor reducer, + @Nullable BinaryFunction valueComparator, + Set outputHints) { - super(name, flow, input, keyExtractor, windowing); + super(name, flow, input, keyExtractor, windowing, outputHints); this.reducer = reducer; this.valueExtractor = valueExtractor; this.valueComparator = valueComparator; @@ -426,6 +443,11 @@ public UnaryFunction getValueExtractor() { return valueExtractor; } + @Override + public Dataset> output(OutputHint... outputHints) { + return output(); + } + @SuppressWarnings("unchecked") @Override public DAG> getBasicOps() { diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java index 4c1778c7..f7bdb953 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java @@ -23,13 +23,17 @@ import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.functional.UnaryFunction; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; import cz.seznam.euphoria.core.client.operator.state.State; import cz.seznam.euphoria.core.client.operator.state.StateFactory; import cz.seznam.euphoria.core.client.operator.state.StateMerger; import cz.seznam.euphoria.core.client.util.Pair; +import cz.seznam.euphoria.shadow.com.google.common.collect.Sets; import javax.annotation.Nullable; +import java.util.Collections; import java.util.Objects; +import java.util.Set; /** * A {@link ReduceStateByKey} operator is a stateful, complex, lower-level-api, @@ -277,10 +281,10 @@ public static class DatasetBuilder5< } @Override - public Dataset> output() { + public Dataset> output(OutputHint... outputHints) { return new DatasetBuilder6<>(name, input, keyExtractor, valueExtractor, stateFactory, stateMerger, null) - .output(); + .output(outputHints); } } @@ -303,13 +307,13 @@ public static class DatasetBuilder6< } @Override - public Dataset> output() { + public Dataset> output(OutputHint... outputHints) { Flow flow = input.getFlow(); ReduceStateByKey reduceStateByKey = new ReduceStateByKey<>(name, flow, input, keyExtractor, valueExtractor, - windowing, stateFactory, stateMerger); + windowing, stateFactory, stateMerger, Sets.newHashSet(outputHints)); flow.add(reduceStateByKey); return reduceStateByKey.output(); @@ -355,9 +359,21 @@ public static OfBuilder named(String name) { UnaryFunction valueExtractor, @Nullable Windowing windowing, StateFactory stateFactory, - StateMerger stateMerger) - { - super(name, flow, input, keyExtractor, windowing); + StateMerger stateMerger) { + this(name, flow, input, keyExtractor, valueExtractor, windowing, stateFactory, stateMerger, + Collections.emptySet()); + } + + ReduceStateByKey(String name, + Flow flow, + Dataset input, + UnaryFunction keyExtractor, + UnaryFunction valueExtractor, + @Nullable Windowing windowing, + StateFactory stateFactory, + StateMerger stateMerger, + Set outputHints) { + super(name, flow, input, keyExtractor, windowing, outputHints); this.stateFactory = stateFactory; this.valueExtractor = valueExtractor; this.stateCombiner = stateMerger; diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java index 9afe5343..c6e5768d 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java @@ -27,12 +27,13 @@ import cz.seznam.euphoria.core.client.functional.ReduceFunction; import cz.seznam.euphoria.core.client.functional.ReduceFunctor; import cz.seznam.euphoria.core.client.functional.UnaryFunction; -import cz.seznam.euphoria.core.executor.graph.DAG; import cz.seznam.euphoria.core.client.io.Collector; import cz.seznam.euphoria.core.client.util.Pair; -import java.util.stream.Stream; +import cz.seznam.euphoria.core.executor.graph.DAG; import javax.annotation.Nullable; +import java.util.Collections; +import java.util.stream.Stream; /** * Reduces all elements in a window. The operator corresponds to @@ -278,7 +279,7 @@ private ReduceWindow( ReduceFunctor reducer, @Nullable BinaryFunction valueComparator) { - super(name, flow, input, e -> B_ZERO, windowing); + super(name, flow, input, e -> B_ZERO, windowing, Collections.emptySet()); this.reducer = reducer; this.valueExtractor = valueExtractor; this.valueComparator = valueComparator; diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/RightJoin.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/RightJoin.java index 0c2043d6..c665d098 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/RightJoin.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/RightJoin.java @@ -40,7 +40,6 @@ *
  • {@code by .......................} {@link UnaryFunction}s transforming left and right elements into keys *
  • {@code using ....................} {@link BinaryFunctor} receiving left and right element from joined window *
  • {@code [windowBy] ...............} windowing function (see {@link Windowing}), default attached windowing - *
  • {@code [withHints] ..............} specify hints about runtime data characteristics, see {@link JoinHint} *
  • {@code (output | outputValues) ..} build output dataset * * diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseSingleInputOperator.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseSingleInputOperator.java index 62e944c3..3459ebe7 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseSingleInputOperator.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseSingleInputOperator.java @@ -17,14 +17,16 @@ import cz.seznam.euphoria.core.annotation.audience.Audience; import cz.seznam.euphoria.core.client.dataset.Dataset; -import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.dataset.windowing.Window; +import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.functional.UnaryFunction; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; import javax.annotation.Nullable; import java.util.Collection; import java.util.Collections; +import java.util.Set; /** * Operator operating on window level with state information. @@ -39,14 +41,14 @@ public class StateAwareWindowWiseSingleInputOperator< private final Dataset output; protected StateAwareWindowWiseSingleInputOperator( - String name, - Flow flow, Dataset input, - UnaryFunction extractor, - @Nullable Windowing windowing) { + String name, + Flow flow, Dataset input, + UnaryFunction extractor, + @Nullable Windowing windowing, Set outputHints) { super(name, flow, windowing, extractor); this.input = input; - this.output = createOutput(input); + this.output = createOutput(input, outputHints); } @Override diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SumByKey.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SumByKey.java index 9ab036a0..58ee7c0b 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SumByKey.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SumByKey.java @@ -23,12 +23,16 @@ import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.functional.UnaryFunction; -import cz.seznam.euphoria.core.executor.graph.DAG; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; import cz.seznam.euphoria.core.client.util.Pair; import cz.seznam.euphoria.core.client.util.Sums; +import cz.seznam.euphoria.core.executor.graph.DAG; +import cz.seznam.euphoria.shadow.com.google.common.collect.Sets; import javax.annotation.Nullable; +import java.util.Collections; import java.util.Objects; +import java.util.Set; /** * Operator for summing of long values extracted from elements. The sum is operated upon @@ -93,7 +97,7 @@ public ValueByBuilder keyBy(UnaryFunction keyExtractor) } public static class ValueByBuilder implements Builders.WindowBy>, - Builders.Output>, + Builders.Output>, Builders.OutputValues { @@ -118,9 +122,9 @@ public ByBuilder2 valueBy(UnaryFunction valueExtractor) { } @Override - public Dataset> output() { + public Dataset> output(OutputHint... outputHints) { return new OutputBuilder<>(name, input, keyExtractor, e -> 1L, null) - .output(); + .output(outputHints); } } @@ -154,7 +158,7 @@ public static class ByBuilder2 } @Override - public Dataset> output() { + public Dataset> output(OutputHint... outputHints) { return new OutputBuilder<>( name, input, keyExtractor, valueExtractor, null) .output(); @@ -177,12 +181,12 @@ public static class OutputBuilder this.windowing = windowing; } @Override - public Dataset> output() { + public Dataset> output(OutputHint... outputHints) { Flow flow = input.getFlow(); SumByKey sumByKey = new SumByKey<>( name, flow, input, keyExtractor, valueExtractor, - windowing); + windowing, Sets.newHashSet(outputHints)); flow.add(sumByKey); return sumByKey.output(); } @@ -223,9 +227,18 @@ public static OfBuilder named(String name) { Dataset input, UnaryFunction keyExtractor, UnaryFunction valueExtractor, - @Nullable Windowing windowing) - { - super(name, flow, input, keyExtractor, windowing); + @Nullable Windowing windowing) { + this(name, flow, input, keyExtractor, valueExtractor, windowing, Collections.emptySet()); + } + + SumByKey(String name, + Flow flow, + Dataset input, + UnaryFunction keyExtractor, + UnaryFunction valueExtractor, + @Nullable Windowing windowing, + Set outputHints) { + super(name, flow, input, keyExtractor, windowing, outputHints); this.valueExtractor = valueExtractor; } diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java index da9ed2e3..48e76e76 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java @@ -23,8 +23,8 @@ import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.functional.UnaryFunction; -import cz.seznam.euphoria.core.executor.graph.DAG; import cz.seznam.euphoria.core.client.io.Collector; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; import cz.seznam.euphoria.core.client.operator.state.State; import cz.seznam.euphoria.core.client.operator.state.StateContext; import cz.seznam.euphoria.core.client.operator.state.StorageProvider; @@ -32,8 +32,11 @@ import cz.seznam.euphoria.core.client.operator.state.ValueStorageDescriptor; import cz.seznam.euphoria.core.client.util.Pair; import cz.seznam.euphoria.core.client.util.Triple; +import cz.seznam.euphoria.core.executor.graph.DAG; +import cz.seznam.euphoria.shadow.com.google.common.collect.Sets; import javax.annotation.Nullable; +import java.util.Set; import static java.util.Objects.requireNonNull; @@ -222,9 +225,9 @@ public static class WindowByBuilder> } @Override - public Dataset> output() { + public Dataset> output(OutputHint... outputHints) { return new OutputBuilder<>( - name, input, keyFn, valueFn, scoreFn, null).output(); + name, input, keyFn, valueFn, scoreFn, null).output(outputHints); } } @@ -247,11 +250,11 @@ public static class OutputBuilder< } @Override - public Dataset> output() { + public Dataset> output(OutputHint... outputHints) { Flow flow = input.getFlow(); TopPerKey top = new TopPerKey<>(flow, name, input, keyFn, valueFn, - scoreFn, windowing); + scoreFn, windowing, Sets.newHashSet(outputHints)); flow.add(top); return top.output(); } @@ -296,8 +299,9 @@ public static OfBuilder named(String name) { UnaryFunction keyFn, UnaryFunction valueFn, UnaryFunction scoreFn, - @Nullable Windowing windowing) { - super(name, flow, input, keyFn, windowing); + @Nullable Windowing windowing, + Set outputHints) { + super(name, flow, input, keyFn, windowing, outputHints); this.valueFn = valueFn; this.scoreFn = scoreFn; diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Union.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Union.java index d12fde7c..f21fa5b3 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Union.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Union.java @@ -20,12 +20,16 @@ import cz.seznam.euphoria.core.annotation.operator.StateComplexity; import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.flow.Flow; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; import cz.seznam.euphoria.shadow.com.google.common.base.Preconditions; +import cz.seznam.euphoria.shadow.com.google.common.collect.Sets; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.Set; /** * The union of at least two datasets of the same type.

    @@ -119,9 +123,9 @@ public static class OutputBuilder } @Override - public Dataset output() { + public Dataset output(OutputHint... outputHints) { final Flow flow = dataSets.get(0).getFlow(); - final Union union = new Union<>(name, flow, dataSets); + final Union union = new Union<>(name, flow, dataSets, Sets.newHashSet(outputHints)); flow.add(union); return union.output(); } @@ -176,6 +180,11 @@ public static OfBuilder named(String name) { @SuppressWarnings("unchecked") Union(String name, Flow flow, List> dataSets) { + this(name, flow, dataSets, Collections.emptySet()); + } + + @SuppressWarnings("unchecked") + Union(String name, Flow flow, List> dataSets, Set outputHints) { super(name, flow); Preconditions.checkArgument( dataSets.size() > 1, @@ -184,7 +193,7 @@ public static OfBuilder named(String name) { dataSets.stream().map(Dataset::getFlow).distinct().count() == 1, "Only data sets from the same flow can be passed to Union."); this.dataSets = dataSets; - this.output = createOutput(dataSets.get(0)); + this.output = createOutput(dataSets.get(0), outputHints); } /** diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Hint.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/hint/OutputHint.java similarity index 81% rename from euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Hint.java rename to euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/hint/OutputHint.java index 18d2bab6..65b831ef 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Hint.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/hint/OutputHint.java @@ -13,13 +13,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package cz.seznam.euphoria.core.client.operator; +package cz.seznam.euphoria.core.client.operator.hint; import cz.seznam.euphoria.core.annotation.audience.Audience; import java.io.Serializable; +/** + * Specify hints about runtime data characteristics + */ @Audience(Audience.Type.INTERNAL) -public interface Hint extends Serializable { +public interface OutputHint extends Serializable { } diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/HintAware.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/hint/OutputHintAware.java similarity index 78% rename from euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/HintAware.java rename to euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/hint/OutputHintAware.java index 6fec4b70..f1c6bc16 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/HintAware.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/hint/OutputHintAware.java @@ -13,18 +13,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package cz.seznam.euphoria.core.client.operator; +package cz.seznam.euphoria.core.client.operator.hint; import cz.seznam.euphoria.core.annotation.audience.Audience; + import java.util.Set; @Audience(Audience.Type.INTERNAL) -public interface HintAware { +public interface OutputHintAware { /** - * Returns all hints for the operator. + * Returns all hints for the operator or Dataset. * - * @return hints for the operator + * @return hints for the operator or Dataset */ Set getHints(); } diff --git a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/JoinHint.java b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/hint/SizeHint.java similarity index 65% rename from euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/JoinHint.java rename to euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/hint/SizeHint.java index 63d67aac..f14cf50f 100644 --- a/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/JoinHint.java +++ b/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/hint/SizeHint.java @@ -13,11 +13,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package cz.seznam.euphoria.core.client.operator; - -import cz.seznam.euphoria.core.annotation.audience.Audience; - -@Audience(Audience.Type.INTERNAL) -public interface JoinHint extends Hint { +package cz.seznam.euphoria.core.client.operator.hint; +/** + * Extra information for runner about Dataset size + */ +public enum SizeHint implements OutputHint { + /** + * Indicate to runner that dataset can fit in memory and this information + * could be used for optimization (e.g. Broadcast hash join) + */ + FITS_IN_MEMORY } diff --git a/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/JoinTest.java b/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/JoinTest.java index 903cb45e..c8339a25 100644 --- a/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/JoinTest.java +++ b/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/JoinTest.java @@ -19,8 +19,8 @@ import cz.seznam.euphoria.core.client.dataset.windowing.Time; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.io.Collector; +import cz.seznam.euphoria.core.client.operator.hint.OutputHint; import cz.seznam.euphoria.core.client.util.Pair; -import cz.seznam.euphoria.shadow.com.google.common.collect.Sets; import org.junit.Test; import java.time.Duration; @@ -198,51 +198,58 @@ public void testBuild_Windowing() { assertTrue(join.getWindowing() instanceof Time); } + @Test + @SuppressWarnings("unchecked") public void testBuild_Hints() { Flow flow = Flow.create("TEST"); Dataset left = Util.createMockDataset(flow, 1); Dataset right = Util.createMockDataset(flow, 1); Join.named("Join1") - .of(left, right) + .of(MapElements.of(left).using(i -> i).output(new TestHint(), new TestHint2()), + right) .by(String::length, String::length) .using((String l, String r, Collector c) -> { // no-op }) - .withHints(Sets.newHashSet(new TestHint(), new TestHint2(), new TestHint2())) .output(); - Join join = (Join) flow.operators().iterator().next(); - assertTrue(join.getHints().contains(new TestHint())); - assertTrue(join.getHints().contains(new TestHint2())); - assertEquals(2, join.getHints().size()); + Join join = (Join) flow.operators().stream().filter(op -> op instanceof Join).findFirst().get(); + assertTrue(join.listInputs().stream().anyMatch(input -> ((Dataset) input).getHints().contains(new + TestHint()))); + assertTrue(join.listInputs().stream().anyMatch(input -> ((Dataset) input).getHints().contains(new + TestHint2()))); + assertEquals(2, ((Dataset) join.listInputs().stream().findFirst().get()).getHints().size()); } @Test + @SuppressWarnings("unchecked") public void testBuild_Hints_afterWindowing() { Flow flow = Flow.create("TEST"); Dataset left = Util.createMockDataset(flow, 1); Dataset right = Util.createMockDataset(flow, 1); Join.named("Join1") - .of(left, right) + .of(MapElements.of(left).using(i -> i).output(new TestHint(), new TestHint2(), new TestHint2()), + right) .by(String::length, String::length) .using((String l, String r, Collector c) -> { // no-op }) .windowBy(Time.of(Duration.ofHours(1))) - .withHints(Sets.newHashSet(new TestHint(), new TestHint2(), new TestHint2())) .output(); - Join join = (Join) flow.operators().iterator().next(); - assertTrue(join.getHints().contains(new TestHint())); - assertTrue(join.getHints().contains(new TestHint2())); - assertEquals(2, join.getHints().size()); + Join join = (Join) flow.operators().stream().filter(op -> op instanceof Join).findFirst().get(); + assertTrue(join.listInputs().stream().anyMatch(input -> ((Dataset) input).getHints().contains(new + TestHint()))); + assertTrue(join.listInputs().stream().anyMatch(input -> ((Dataset) input).getHints().contains(new + TestHint2()))); + assertEquals(2, ((Dataset) join.listInputs().stream().findFirst().get()).getHints().size()); assertTrue(join.getWindowing() instanceof Time); } - private static class TestHint implements JoinHint { + private static class TestHint implements OutputHint { @Override public int hashCode() { @@ -255,7 +262,7 @@ public boolean equals(Object obj) { } } - private static class TestHint2 implements JoinHint { + private static class TestHint2 implements OutputHint { @Override diff --git a/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/MapElementsTest.java b/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/MapElementsTest.java index c8577920..2e1fe4f3 100644 --- a/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/MapElementsTest.java +++ b/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/MapElementsTest.java @@ -17,11 +17,10 @@ import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.flow.Flow; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.operator.hint.SizeHint; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.*; public class MapElementsTest { @@ -82,4 +81,18 @@ public void testBuild_ImplicitName() { MapElements map = (MapElements) flow.operators().iterator().next(); assertEquals("MapElements", map.getName()); } + + @Test + public void testBuild_Hints() { + Flow flow = Flow.create("TEST"); + Dataset dataset = Util.createMockDataset(flow, 1); + + Dataset dataSetWithHint = MapElements.of(dataset).using(i -> i).output(SizeHint.FITS_IN_MEMORY); + + assertTrue(dataSetWithHint.getHints().contains(SizeHint.FITS_IN_MEMORY)); + assertEquals(1, dataSetWithHint.getHints().size()); + + Dataset dataSetWithoutHint = MapElements.of(dataset).using(i -> i).output(); + assertEquals(0, dataSetWithoutHint.getHints().size()); + } } \ No newline at end of file diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BroadcastHashJoinTranslator.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BroadcastHashJoinTranslator.java index 392b93b0..0017a87c 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BroadcastHashJoinTranslator.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BroadcastHashJoinTranslator.java @@ -15,13 +15,14 @@ */ package cz.seznam.euphoria.flink.batch; +import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.dataset.windowing.MergingWindowing; import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.functional.BinaryFunctor; import cz.seznam.euphoria.core.client.functional.UnaryFunction; import cz.seznam.euphoria.core.client.operator.Join; -import cz.seznam.euphoria.core.client.operator.JoinHints; +import cz.seznam.euphoria.core.client.operator.hint.SizeHint; import cz.seznam.euphoria.core.client.util.Pair; import cz.seznam.euphoria.core.executor.util.MultiValueContext; import cz.seznam.euphoria.flink.FlinkOperator; @@ -38,8 +39,11 @@ public class BroadcastHashJoinTranslator implements BatchOperatorTranslator { + @SuppressWarnings("unchecked") static boolean wantTranslate(Join o) { - return o.getHints().contains(JoinHints.broadcastHashJoin()) + return o.listInputs() + .stream() + .anyMatch(input -> ((Dataset) input).getHints().contains(SizeHint.FITS_IN_MEMORY)) && (o.getType() == Join.Type.LEFT || o.getType() == Join.Type.RIGHT) && !(o.getWindowing() instanceof MergingWindowing); } diff --git a/euphoria-fluent/src/main/java/cz/seznam/euphoria/fluent/Dataset.java b/euphoria-fluent/src/main/java/cz/seznam/euphoria/fluent/Dataset.java index 60b7f865..51efbe81 100644 --- a/euphoria-fluent/src/main/java/cz/seznam/euphoria/fluent/Dataset.java +++ b/euphoria-fluent/src/main/java/cz/seznam/euphoria/fluent/Dataset.java @@ -18,11 +18,11 @@ import cz.seznam.euphoria.core.client.functional.UnaryFunction; import cz.seznam.euphoria.core.client.functional.UnaryFunctor; import cz.seznam.euphoria.core.client.io.DataSink; +import cz.seznam.euphoria.core.client.operator.Builders.Output; import cz.seznam.euphoria.core.client.operator.Distinct; import cz.seznam.euphoria.core.client.operator.FlatMap; import cz.seznam.euphoria.core.client.operator.MapElements; import cz.seznam.euphoria.core.client.operator.Union; -import cz.seznam.euphoria.core.client.operator.Builders.Output; import cz.seznam.euphoria.core.executor.Executor; import static java.util.Objects.requireNonNull; diff --git a/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/BroadcastHashJoinTest.java b/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/BroadcastHashJoinTest.java index 607fb476..f82e0c3d 100644 --- a/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/BroadcastHashJoinTest.java +++ b/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/BroadcastHashJoinTest.java @@ -17,13 +17,13 @@ import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.io.Collector; -import cz.seznam.euphoria.core.client.operator.JoinHints; import cz.seznam.euphoria.core.client.operator.LeftJoin; +import cz.seznam.euphoria.core.client.operator.MapElements; import cz.seznam.euphoria.core.client.operator.RightJoin; +import cz.seznam.euphoria.core.client.operator.hint.SizeHint; import cz.seznam.euphoria.core.client.util.Pair; import cz.seznam.euphoria.operator.test.junit.AbstractOperatorTest; import cz.seznam.euphoria.operator.test.junit.Processing; -import cz.seznam.euphoria.shadow.com.google.common.collect.Sets; import org.junit.Test; import java.util.Arrays; @@ -40,11 +40,10 @@ public void leftBroadcastHashJoin() { @Override protected Dataset> getOutput( Dataset left, Dataset right) { - return LeftJoin.of(left, right) + return LeftJoin.of(left, MapElements.of(right).using(i -> i).output(SizeHint.FITS_IN_MEMORY)) .by(e -> e, e -> (int) (e % 10)) .using((Integer l, Optional r, Collector c) -> c.collect(l + "+" + r.orElse(null))) - .withHints(Sets.newHashSet(JoinHints.broadcastHashJoin())) .output(); } @@ -81,11 +80,10 @@ public void rightBroadcastHashJoin() { @Override protected Dataset> getOutput( Dataset left, Dataset right) { - return RightJoin.of(left, right) + return RightJoin.of(MapElements.of(left).using(i -> i).output(SizeHint.FITS_IN_MEMORY), right) .by(e -> e, e -> (int) (e % 10)) .using((Optional l, Long r, Collector c) -> c.collect(l.orElse(null) + "+" + r)) - .withHints(Sets.newHashSet(JoinHints.broadcastHashJoin())) .output(); } @@ -123,11 +121,10 @@ public void keyHashCollisionBroadcastHashJoin() { @Override protected Dataset> getOutput( Dataset left, Dataset right) { - return LeftJoin.of(left, right) + return LeftJoin.of(left, MapElements.of(right).using(i -> i).output(SizeHint.FITS_IN_MEMORY)) .by(e -> e, e -> e % 2 == 0 ? sameHashCodeKey2 : sameHashCodeKey1) .using((String l, Optional r, Collector c) -> c.collect(l + "+" + r.orElse(null))) - .withHints(Sets.newHashSet(JoinHints.broadcastHashJoin())) .output(); } diff --git a/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/BroadcastHashJoinTranslator.java b/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/BroadcastHashJoinTranslator.java index 0121c230..9e3c7f9f 100644 --- a/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/BroadcastHashJoinTranslator.java +++ b/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/BroadcastHashJoinTranslator.java @@ -18,12 +18,13 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; +import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.dataset.windowing.MergingWindowing; import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.functional.UnaryFunction; import cz.seznam.euphoria.core.client.operator.Join; -import cz.seznam.euphoria.core.client.operator.JoinHints; +import cz.seznam.euphoria.core.client.operator.hint.SizeHint; import cz.seznam.euphoria.core.client.util.Either; import cz.seznam.euphoria.core.client.util.Pair; import org.apache.spark.api.java.JavaPairRDD; @@ -47,14 +48,17 @@ * map side join with lookups to in memory hash table on non-optional side. *

    *

    - * In order to use this translator, you need to pass {@link JoinHints.BroadcastHashJoin} hint + * In order to use this translator, you need to have on one Dataset {@link SizeHint#FITS_IN_MEMORY} hint * to the {@link Join} operator. *

    */ public class BroadcastHashJoinTranslator implements SparkOperatorTranslator { + @SuppressWarnings("unchecked") static boolean wantTranslate(Join o) { - return o.getHints().contains(JoinHints.broadcastHashJoin()) + return o.listInputs() + .stream() + .anyMatch(input -> ((Dataset) input).getHints().contains(SizeHint.FITS_IN_MEMORY)) && (o.getType() == Join.Type.LEFT || o.getType() == Join.Type.RIGHT) && !(o.getWindowing() instanceof MergingWindowing); } @@ -65,7 +69,9 @@ public JavaRDD translate(Join operator, SparkExecutorContext context) { // ~ sanity check Preconditions.checkArgument( - operator.getHints().contains(JoinHints.broadcastHashJoin()), + operator.listInputs() + .stream() + .anyMatch(input -> ((Dataset) input).getHints().contains(SizeHint.FITS_IN_MEMORY)), "Missing broadcastHashJoin hint"); Preconditions.checkArgument( operator.getType() == Join.Type.LEFT || operator.getType() == Join.Type.RIGHT,