Skip to content

Commit

Permalink
[euphoria-core] #259 Hints are now in operator, not Dataset
Browse files Browse the repository at this point in the history
  • Loading branch information
mareksimunek committed Mar 7, 2018
1 parent 83ec83b commit 323457c
Show file tree
Hide file tree
Showing 19 changed files with 185 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import cz.seznam.euphoria.core.client.io.DataSink;
import cz.seznam.euphoria.core.client.io.DataSource;
import cz.seznam.euphoria.core.client.operator.Operator;
import cz.seznam.euphoria.core.client.operator.hint.OutputHint;
import cz.seznam.euphoria.core.client.operator.hint.OutputHintAware;

import javax.annotation.Nullable;
import java.io.Serializable;
Expand All @@ -33,7 +31,7 @@
* @param <T> type of elements of this data set
*/
@Audience(Audience.Type.CLIENT)
public interface Dataset<T> extends OutputHintAware<OutputHint>, Serializable {
public interface Dataset<T> extends Serializable {

/**
* @return the flow associated with this data set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.io.DataSource;
import cz.seznam.euphoria.core.client.operator.Operator;
import cz.seznam.euphoria.core.client.operator.hint.OutputHint;

import java.util.Set;

/**
* Various dataset related utils.
Expand All @@ -42,9 +39,9 @@ public class Datasets {
* @return a dataset representing the output of the given operator
*/
public static <IN, OUT> Dataset<OUT> createOutputFor(
Flow flow, Dataset<IN> input, Operator<IN, OUT> op, Set<OutputHint> outputHints) {
Flow flow, Dataset<IN> input, Operator<IN, OUT> op) {

return new OutputDataset<>(flow, op, input.isBounded(), outputHints);
return new OutputDataset<>(flow, op, input.isBounded());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,9 @@
import cz.seznam.euphoria.core.client.io.DataSink;
import cz.seznam.euphoria.core.client.io.DataSource;
import cz.seznam.euphoria.core.client.operator.Operator;
import cz.seznam.euphoria.core.client.operator.hint.OutputHint;

import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;

/**
* {@code PCollection} that is input of a {@code Flow}.
Expand Down Expand Up @@ -61,15 +58,6 @@ public void persist(DataSink<T> sink) {
"The input dataset is already stored.");
}

/**
* Input Dataset doesn't have hints
* @return empty set
*/
@Override
public Set<OutputHint> getHints() {
return Collections.emptySet();
}

@Override
public Flow getFlow() {
return flow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@
import cz.seznam.euphoria.core.client.io.DataSink;
import cz.seznam.euphoria.core.client.io.DataSource;
import cz.seznam.euphoria.core.client.operator.Operator;
import cz.seznam.euphoria.core.client.operator.hint.OutputHint;

import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Set;

/**
* {@code PCollection} that is output of some operator.
Expand All @@ -35,15 +33,13 @@ class OutputDataset<T> implements Dataset<T> {
private final Flow flow;
private final Operator<?, T> producer;
private final boolean bounded;
private final Set<OutputHint> outputHints;

private DataSink<T> outputSink = null;

public OutputDataset(Flow flow, Operator<?, T> producer, boolean bounded, Set<OutputHint> outputHints) {
public OutputDataset(Flow flow, Operator<?, T> producer, boolean bounded) {
this.flow = flow;
this.producer = producer;
this.bounded = bounded;
this.outputHints = outputHints;
}

@Nullable
Expand Down Expand Up @@ -83,9 +79,4 @@ public boolean isBounded() {
public Collection<Operator<?, ?>> getConsumers() {
return flow.getConsumersOf(this);
}

@Override
public Set<OutputHint> getHints() {
return outputHints;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ public static class OutputBuilder<IN> implements Builders.Output<IN> {
@Override
public Dataset<IN> output(OutputHint... outputHints) {
Flow flow = input.getFlow();
AssignEventTime<IN> op = new AssignEventTime<>(name, flow, input, eventTimeFn, Sets.newHashSet
(outputHints));
AssignEventTime<IN> op = new AssignEventTime<>(name, flow, input, eventTimeFn,
Sets.newHashSet(outputHints));
flow.add(op);
return op.output();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,10 @@ public static OfBuilder named(String name) {

final UnaryPredicate<IN> predicate;

Filter(String name, Flow flow, Dataset<IN> input, UnaryPredicate<IN> predicate, Set<OutputHint>
outputHints) {
Filter(String name,
Flow flow, Dataset<IN> input,
UnaryPredicate<IN> predicate,
Set<OutputHint> outputHints) {
super(name, flow, input, outputHints);
this.predicate = predicate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ public static class OutputBuilder<IN, OUT> implements Builders.Output<OUT> {
@Override
public Dataset<OUT> output(OutputHint... outputHints) {
Flow flow = input.getFlow();
FlatMap<IN, OUT> map = new FlatMap<>(name, flow, input, functor, evtTimeFn, Sets.newHashSet
(outputHints));
FlatMap<IN, OUT> map = new FlatMap<>(name, flow, input, functor, evtTimeFn,
Sets.newHashSet(outputHints));
flow.add(map);
return map.output();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@
)
public class Join<LEFT, RIGHT, KEY, OUT, W extends Window>
extends StateAwareWindowWiseOperator<Object, Either<LEFT, RIGHT>,
Either<LEFT, RIGHT>, KEY, Pair<KEY, OUT>, W, Join<LEFT, RIGHT, KEY, OUT, W>>
implements Builders.OutputValues<KEY, OUT> {
Either<LEFT, RIGHT>, KEY, Pair<KEY, OUT>, W, Join<LEFT, RIGHT, KEY, OUT, W>> {

public enum Type {
INNER,
Expand Down Expand Up @@ -287,11 +286,6 @@ public Dataset<Pair<KEY, OUT>> output() {
return output;
}

@Override
public Dataset<Pair<KEY, OUT>> output(OutputHint... outputHints) {
return output;
}

@SuppressWarnings("unchecked")
private static final ListStorageDescriptor LEFT_STATE_DESCR =
ListStorageDescriptor.of("left", (Class) Object.class);
Expand Down Expand Up @@ -519,14 +513,15 @@ public BinaryFunctor<LEFT, RIGHT, OUT> getJoiner() {
final Flow flow = getFlow();

final MapElements<LEFT, Either<LEFT, RIGHT>> leftMap = new MapElements<>(
getName() + "::Map-left", flow, left, Either::left);
getName() + "::Map-left", flow, left, Either::left, this.getHints());

final MapElements<RIGHT, Either<LEFT, RIGHT>> rightMap = new MapElements<>(
getName() + "::Map-right", flow, right, Either::right);
getName() + "::Map-right", flow, right, Either::right, this.getHints());

final Union<Either<LEFT, RIGHT>> union =
new Union<>(getName() + "::Union", flow, Arrays.asList(
leftMap.output(), rightMap.output()));
new Union<>(getName() + "::Union", flow,
Arrays.asList(leftMap.output(), rightMap.output()),
this.getHints());

final ReduceStateByKey<Either<LEFT, RIGHT>, KEY, Either<LEFT, RIGHT>, OUT, StableJoinState, W>
reduce = new ReduceStateByKey(
Expand All @@ -541,7 +536,8 @@ public BinaryFunctor<LEFT, RIGHT, OUT> getJoiner() {
return ctx == null
? new StableJoinState(storages)
: new EarlyEmittingJoinState(storages, ctx);
}, new StateSupport.MergeFromStateMerger<>());
}, new StateSupport.MergeFromStateMerger<>(),
this.getHints());

final DAG<Operator<?, ?>> dag = DAG.of(leftMap, rightMap);
dag.add(union, leftMap, rightMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,15 @@ public static OfBuilder named(String name) {
this(name, flow, input, (el, ctx) -> mapper.apply(el), Collections.emptySet());
}

MapElements(String name,
Flow flow,
Dataset<IN> input,
UnaryFunction<IN, OUT> mapper,
Set<OutputHint> outputHints) {
this(name, flow, input, (el, ctx) -> mapper.apply(el), outputHints);
}


MapElements(String name,
Flow flow,
Dataset<IN> input,
Expand All @@ -175,7 +184,7 @@ public static OfBuilder named(String name) {
return DAG.of(
// do not use the client API here, because it modifies the Flow!
new FlatMap<IN, OUT>(getName(), getFlow(), input,
(i, c) -> c.collect(mapper.apply(i, c.asContext())), null));
(i, c) -> c.collect(mapper.apply(i, c.asContext())), null, this.getHints()));
}

public UnaryFunctionEnv<IN, OUT> getMapper() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public abstract class Operator<IN, OUT> implements Serializable {
/** Associated Flow. */
private final Flow flow;

private Set<OutputHint> hints;

protected Operator(String name, Flow flow) {
this.name = name;
this.flow = flow;
Expand Down Expand Up @@ -73,7 +75,12 @@ public final Flow getFlow() {
*/
final Dataset<OUT> createOutput(final Dataset<IN> input, Set<OutputHint> outputHints) {
Flow flow = input.getFlow();
return Datasets.createOutputFor(flow, input, this, outputHints);
this.hints = outputHints;
return Datasets.createOutputFor(flow, input, this);
}

public Set<OutputHint> getHints() {
return hints;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@
public class ReduceByKey<IN, KEY, VALUE, OUT, W extends Window>
extends StateAwareWindowWiseSingleInputOperator<
IN, IN, IN, KEY, Pair<KEY, OUT>, W,
ReduceByKey<IN, KEY, VALUE, OUT, W>>
implements Builders.OutputValues<KEY, OUT> {
ReduceByKey<IN, KEY, VALUE, OUT, W>> {


public static class OfBuilder implements Builders.Of {
Expand Down Expand Up @@ -443,11 +442,6 @@ public UnaryFunction<IN, VALUE> getValueExtractor() {
return valueExtractor;
}

@Override
public Dataset<Pair<KEY, OUT>> output(OutputHint... outputHints) {
return output();
}

@SuppressWarnings("unchecked")
@Override
public DAG<Operator<?, ?>> getBasicOps() {
Expand All @@ -460,7 +454,7 @@ public Dataset<Pair<KEY, OUT>> output(OutputHint... outputHints) {
Operator reduceState = new ReduceStateByKey(getName(),
flow, input, keyExtractor, valueExtractor,
windowing,
stateFactory, stateCombine);
stateFactory, stateCombine, this.getHints());
return DAG.of(reduceState);
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
*/
package cz.seznam.euphoria.core.client.operator.hint;

import cz.seznam.euphoria.core.annotation.audience.Audience;

/**
* Extra information for runner about Dataset size
*/
@Audience(Audience.Type.CLIENT)
public enum SizeHint implements OutputHint {
/**
* Indicate to runner that dataset can fit in memory and this information
Expand Down
Loading

0 comments on commit 323457c

Please sign in to comment.