Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[euphoria-core] #259 Hints are not runtime specific. #268

Merged
merged 2 commits into from
Mar 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
import cz.seznam.euphoria.core.client.io.DataSink;
import cz.seznam.euphoria.core.client.io.DataSource;
import cz.seznam.euphoria.core.client.operator.Operator;

import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.Collection;
import javax.annotation.Nullable;

/**
* A dataset abstraction.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,4 @@ public boolean isBounded() {
public Collection<Operator<?, ?>> getConsumers() {
return flow.getConsumersOf(this);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@
package cz.seznam.euphoria.core.client.operator;

import cz.seznam.euphoria.core.annotation.audience.Audience;
import cz.seznam.euphoria.core.client.functional.ExtractEventTime;
import cz.seznam.euphoria.core.annotation.operator.Derived;
import cz.seznam.euphoria.core.annotation.operator.StateComplexity;
import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.functional.ExtractEventTime;
import cz.seznam.euphoria.core.client.operator.hint.OutputHint;
import cz.seznam.euphoria.core.executor.graph.DAG;
import cz.seznam.euphoria.shadow.com.google.common.collect.Sets;

import java.util.Objects;
import java.util.Set;

/** A convenient alias for assignment of event time.
*
Expand Down Expand Up @@ -87,9 +90,10 @@ public static class OutputBuilder<IN> implements Builders.Output<IN> {
}

@Override
public Dataset<IN> output() {
public Dataset<IN> output(OutputHint... outputHints) {
Flow flow = input.getFlow();
AssignEventTime<IN> op = new AssignEventTime<>(name, flow, input, eventTimeFn);
AssignEventTime<IN> op = new AssignEventTime<>(name, flow, input, eventTimeFn,
Sets.newHashSet(outputHints));
flow.add(op);
return op.output();
}
Expand All @@ -98,16 +102,16 @@ public Dataset<IN> output() {
private final ExtractEventTime<IN> eventTimeFn;

AssignEventTime(String name, Flow flow, Dataset<IN> input,
ExtractEventTime<IN> eventTimeFn) {
super(name, flow, input);
ExtractEventTime<IN> eventTimeFn, Set<OutputHint> outputHints) {
super(name, flow, input, outputHints);
this.eventTimeFn = eventTimeFn;
}

@Override
public DAG<Operator<?, ?>> getBasicOps() {
return DAG.of(new FlatMap<>(
getName(), getFlow(), input,
(i, c) -> c.collect(i), eventTimeFn));
(i, c) -> c.collect(i), eventTimeFn, getHints()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@
import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.functional.UnaryFunction;
import cz.seznam.euphoria.core.client.operator.hint.OutputHint;
import cz.seznam.euphoria.core.client.util.Pair;

import java.util.Set;

/**
* Common methods used in operator builders to share related javadoc
* descriptions.<p>
Expand Down Expand Up @@ -94,7 +93,7 @@ public interface Output<T> {
*
* @return the dataset representing the new operator's output
*/
Dataset<T> output();
Dataset<T> output(OutputHint... outputHints);
}

public interface OutputValues<K, V> extends Output<Pair<K, V>> {
Expand All @@ -106,24 +105,13 @@ public interface OutputValues<K, V> extends Output<Pair<K, V>> {
*
* @return the dataset representing the new operator's output
*/
default Dataset<V> outputValues() {
default Dataset<V> outputValues(OutputHint... outputHints) {
return MapElements
.named("extract-values")
.of(output())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hints should be set here I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't, output Dataset is with values. There would hint get lost. (last operator is MapElements). I added test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would the hint get lost? The graph can be arbitrarily complicated and as a general rule, this should yeild the same runtime behavior:

 Dataset input = ...;
 Dataset<Pair> output = ReduceByKey.of(input)
   ...
   .output(hints);
 MapElements.of(input)
   .using(Pair::getSecond)
   .output()
   .persist(...)

it should behave the same way as

 Dataset input = ...;
 ReduceByKey.of(input)
    ...
    .outputValues(hints)
    .persist(...)

That is because non-shuffle operators should forward hints to downstream Datasets (unless overridden). That is due to the construction of the hints. The operator that the hint applies to is really RBK, not the MapElements.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussion, for now Hints doesn't propagate downstream in flow. Because we don't know if Hint will be still valid in next operator.

.using(Pair::getSecond)
.output();
.output(outputHints);
}
}

public interface OutputWithHint<T, HINT extends Hint> extends Output<T> {

/**
* Add runtime specific hints for the operator
*
* @param hints runtime specific hints
* @return output builder
*/
Output<T> withHints(Set<HINT> hints);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.functional.UnaryFunction;
import cz.seznam.euphoria.core.executor.graph.DAG;
import cz.seznam.euphoria.core.client.operator.hint.OutputHint;
import cz.seznam.euphoria.core.client.util.Pair;
import cz.seznam.euphoria.core.executor.graph.DAG;
import cz.seznam.euphoria.shadow.com.google.common.collect.Sets;

import javax.annotation.Nullable;
import java.util.Objects;
import java.util.Set;

/**
* Operator counting elements with same key.
Expand All @@ -49,7 +52,7 @@
)
public class CountByKey<IN, KEY, W extends Window>
extends StateAwareWindowWiseSingleInputOperator<
IN, IN, IN, KEY, Pair<KEY, Long>, W, CountByKey<IN, KEY, W>> {
IN, IN, IN, KEY, Pair<KEY, Long>, W, CountByKey<IN, KEY, W>> {

public static class OfBuilder implements Builders.Of {
private final String name;
Expand Down Expand Up @@ -100,8 +103,8 @@ public static class WindowingBuilder<IN, KEY>
}

@Override
public Dataset<Pair<KEY, Long>> output() {
return windowBy(null).output();
public Dataset<Pair<KEY, Long>> output(OutputHint... outputHints) {
return windowBy(null).output(outputHints);
}
}

Expand All @@ -122,10 +125,10 @@ public static class OutputBuilder<IN, KEY, W extends Window>
}

@Override
public Dataset<Pair<KEY, Long>> output() {
public Dataset<Pair<KEY, Long>> output(OutputHint... outputHints) {
Flow flow = input.getFlow();
CountByKey<IN, KEY, W> count = new CountByKey<>(
name, flow, input, keyExtractor, windowing);
name, flow, input, keyExtractor, windowing, Sets.newHashSet(outputHints));
flow.add(count);
return count.output();
}
Expand Down Expand Up @@ -163,20 +166,22 @@ public static OfBuilder named(String name) {
Flow flow,
Dataset<IN> input,
UnaryFunction<IN, KEY> extractor,
@Nullable Windowing<IN, W> windowing) {
@Nullable Windowing<IN, W> windowing,
Set<OutputHint> outputHints) {

super(name, flow, input, extractor, windowing);
super(name, flow, input, extractor, windowing, outputHints);
}

@Override
public DAG<Operator<?, ?>> getBasicOps() {
SumByKey<IN, KEY, W> sum = new SumByKey<>(
getName(),
input.getFlow(),
input,
keyExtractor,
e -> 1L,
windowing);
getName(),
input.getFlow(),
input,
keyExtractor,
e -> 1L,
windowing,
getHints());
return DAG.of(sum);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.functional.CombinableReduceFunction;
import cz.seznam.euphoria.core.client.functional.UnaryFunction;
import cz.seznam.euphoria.core.executor.graph.DAG;
import cz.seznam.euphoria.core.client.operator.hint.OutputHint;
import cz.seznam.euphoria.core.client.util.Pair;
import cz.seznam.euphoria.core.executor.graph.DAG;
import cz.seznam.euphoria.shadow.com.google.common.collect.Sets;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;

/**
* Operator outputting distinct (based on {@link Object#equals}) elements.
Expand Down Expand Up @@ -126,8 +130,7 @@ private WindowingBuilder(
return new OutputBuilder<>(name, input, mapper, windowing);
}

@Override
public Dataset<ELEM> output() {
public Dataset<ELEM> output(OutputHint... outputHints) {
return new OutputBuilder<>(name, input, mapper, null).output();
}
}
Expand All @@ -149,10 +152,10 @@ public static class OutputBuilder<IN, ELEM, W extends Window>
}

@Override
public Dataset<ELEM> output() {
public Dataset<ELEM> output(OutputHint... outputHints) {
Flow flow = input.getFlow();
Distinct<IN, ELEM, W> distinct = new Distinct<>(
name, flow, input, mapper, windowing);
name, flow, input, mapper, windowing, Sets.newHashSet(outputHints));
flow.add(distinct);
return distinct.output();
}
Expand Down Expand Up @@ -190,9 +193,10 @@ public static OfBuilder named(String name) {
Flow flow,
Dataset<IN> input,
UnaryFunction<IN, ELEM> mapper,
@Nullable Windowing<IN, W> windowing) {
@Nullable Windowing<IN, W> windowing,
Set<OutputHint> outputHints) {

super(name, flow, input, mapper, windowing);
super(name, flow, input, mapper, windowing, outputHints);
}

@Override
Expand All @@ -203,10 +207,11 @@ public static OfBuilder named(String name) {
new ReduceByKey<>(name,
flow, input, getKeyExtractor(), e -> null,
windowing,
(CombinableReduceFunction<Void>) e -> null);
(CombinableReduceFunction<Void>) e -> null,
Collections.emptySet());

MapElements format = new MapElements<>(
getName() + "::" + "Map", flow, reduce.output(), Pair::getFirst);
getName() + "::" + "Map", flow, reduce.output(), Pair::getFirst, getHints());

DAG<Operator<?, ?>> dag = DAG.of(reduce);
dag.add(format, reduce);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import cz.seznam.euphoria.core.annotation.audience.Audience;
import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.operator.hint.OutputHint;

import java.util.Set;

/**
* Operator working element-wise, with no context between elements.
Expand All @@ -29,9 +32,9 @@ public abstract class ElementWiseOperator<IN, OUT>

protected final Dataset<OUT> output;

protected ElementWiseOperator(String name, Flow flow, Dataset<IN> input) {
protected ElementWiseOperator(String name, Flow flow, Dataset<IN> input, Set<OutputHint> outputHints) {
super(name, flow, input);
this.output = createOutput(input);
this.output = createOutput(input, outputHints);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.functional.UnaryPredicate;
import cz.seznam.euphoria.core.client.operator.hint.OutputHint;
import cz.seznam.euphoria.core.executor.graph.DAG;
import cz.seznam.euphoria.shadow.com.google.common.collect.Sets;

import java.util.Objects;
import java.util.Set;

/**
* Operator performing a filter operation.
Expand Down Expand Up @@ -94,9 +97,9 @@ private OutputBuilder(String name, Dataset<IN> input, UnaryPredicate<IN> predica
}

@Override
public Dataset<IN> output() {
public Dataset<IN> output(OutputHint... outputHints) {
Flow flow = input.getFlow();
Filter<IN> filter = new Filter<>(name, flow, input, predicate);
Filter<IN> filter = new Filter<>(name, flow, input, predicate, Sets.newHashSet(outputHints));
flow.add(filter);

return filter.output();
Expand Down Expand Up @@ -134,8 +137,11 @@ public static OfBuilder named(String name) {

final UnaryPredicate<IN> predicate;

Filter(String name, Flow flow, Dataset<IN> input, UnaryPredicate<IN> predicate) {
super(name, flow, input);
Filter(String name,
Flow flow, Dataset<IN> input,
UnaryPredicate<IN> predicate,
Set<OutputHint> outputHints) {
super(name, flow, input, outputHints);
this.predicate = predicate;
}

Expand All @@ -151,6 +157,8 @@ public UnaryPredicate<IN> getPredicate() {
if (predicate.apply(elem)) {
collector.collect(elem);
}
}, null));
},
null,
getHints()));
}
}
Loading