Skip to content

Commit

Permalink
[euphoria-flink] #260 reformatting and corrections
Browse files Browse the repository at this point in the history
  • Loading branch information
mareksimunek committed Feb 13, 2018
1 parent 48d9b6a commit f84b9b8
Show file tree
Hide file tree
Showing 5 changed files with 320 additions and 316 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,97 +25,98 @@

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

@Audience(Audience.Type.EXECUTOR)
public class MultiValueContext<T> implements Context, Collector<T> {

private final List<T> elements = new ArrayList<>(1);
@Nullable
final Context wrap;

public MultiValueContext() {
this(null);
}

public MultiValueContext(Context wrap) {
this.wrap = wrap;
}

/**
* Replace the stored value with given one.
*
* @param elem the element to store
*/
@Override
public void collect(T elem) {
elements.add(elem);
}

@Override
public Context asContext() {
return this;
}

/**
* Retrieve window associated with the stored element.
*/
@Override
public Window<?> getWindow() throws UnsupportedOperationException {
if (wrap == null) {
throw new UnsupportedOperationException(
"The window is unknown in this context");
}
return wrap.getWindow();
}

@Override
public Counter getCounter(String name) {
if (wrap == null) {
throw new UnsupportedOperationException(
"Accumulators not supported in this context");
}
return wrap.getCounter(name);
private final List<T> elements = new ArrayList<>(1);
@Nullable
final Context wrap;

public MultiValueContext() {
this(null);
}

public MultiValueContext(Context wrap) {
this.wrap = wrap;
}

/**
* Replace the stored value with given one.
*
* @param elem the element to store
*/
@Override
public void collect(T elem) {
elements.add(elem);
}

@Override
public Context asContext() {
return this;
}

/**
* Retrieve window associated with the stored element.
*/
@Override
public Window<?> getWindow() throws UnsupportedOperationException {
if (wrap == null) {
throw new UnsupportedOperationException(
"The window is unknown in this context");
}

@Override
public Histogram getHistogram(String name) {
if (wrap == null) {
throw new UnsupportedOperationException(
"Accumulators not supported in this context");
}
return wrap.getHistogram(name);

return wrap.getWindow();
}

@Override
public Counter getCounter(String name) {
if (wrap == null) {
throw new UnsupportedOperationException(
"Accumulators not supported in this context");
}

@Override
public Timer getTimer(String name) {
if (wrap == null) {
throw new UnsupportedOperationException(
"Accumulators not supported in this context");
}
return wrap.getTimer(name);

return wrap.getCounter(name);
}

@Override
public Histogram getHistogram(String name) {
if (wrap == null) {
throw new UnsupportedOperationException(
"Accumulators not supported in this context");
}
return wrap.getHistogram(name);

/**
* Retrieve and reset the stored elements.
*
* @return the stored value
*/
public List<T> getAndResetValue() {
List<T> copiedElements = new ArrayList<>(elements);
elements.clear();
return copiedElements;
}
}

/**
* Retrieve value of this context.
*
* @return value
*/
public List<T> get() {
return elements;
@Override
public Timer getTimer(String name) {
if (wrap == null) {
throw new UnsupportedOperationException(
"Accumulators not supported in this context");
}
return wrap.getTimer(name);

}

/**
* Retrieve and reset the stored elements.
*
* @return the stored value
*/
public List<T> getAndResetValues() {
List<T> copiedElements = new ArrayList<>(elements);
elements.clear();
return copiedElements;
}

/**
* Retrieve value of this context.
*
* @return value
*/
public List<T> get() {
return Collections.unmodifiableList(elements);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,14 @@
import java.util.function.BiFunction;
import java.util.stream.Collectors;

/**
* Translate flow for Flink Batch Mode. Only first translation match is used in flow
*/
public class BatchFlowTranslator extends FlowTranslator {

public interface SplitAssignerFactory
extends BiFunction<LocatableInputSplit[], Integer, InputSplitAssigner>, Serializable {}
extends BiFunction<LocatableInputSplit[], Integer, InputSplitAssigner>, Serializable {
}

public static final SplitAssignerFactory DEFAULT_SPLIT_ASSIGNER_FACTORY =
(splits, partitions) -> new LocatableInputSplitAssigner(splits);
Expand All @@ -63,16 +67,14 @@ private Translation(
}

static <O extends Operator<?, ?>> void add(
Map<Class, List<Translation>> idx,
Class<O> type, BatchOperatorTranslator<O> translator)
{
Map<Class, List<Translation>> idx,
Class<O> type, BatchOperatorTranslator<O> translator) {
add(idx, type, translator, null);
}

static <O extends Operator<?, ?>> void add(
Map<Class, List<Translation>> idx,
Class<O> type, BatchOperatorTranslator<O> translator, UnaryPredicate<O> accept)
{
Map<Class, List<Translation>> idx,
Class<O> type, BatchOperatorTranslator<O> translator, UnaryPredicate<O> accept) {
idx.putIfAbsent(type, new ArrayList<>());
idx.get(type).add(new Translation<>(translator, accept));
}
Expand Down Expand Up @@ -100,7 +102,7 @@ public BatchFlowTranslator(Settings settings,

// basic operators
Translation.add(translations, FlowUnfolder.InputOperator.class, new InputTranslator
(splitAssignerFactory));
(splitAssignerFactory));
Translation.add(translations, FlatMap.class, new FlatMapTranslator());
Translation.add(translations, ReduceStateByKey.class, new ReduceStateByKeyTranslator());
Translation.add(translations, Union.class, new UnionTranslator());
Expand All @@ -111,16 +113,16 @@ public BatchFlowTranslator(Settings settings,

// ~ batch broadcast join for a very small left side
Translation.add(translations, Join.class, new BroadcastHashJoinTranslator(),
BroadcastHashJoinTranslator::wantTranslate);
BroadcastHashJoinTranslator::wantTranslate);
}

@SuppressWarnings("unchecked")
@Override
protected Collection<TranslateAcceptor> getAcceptors() {
return translations.entrySet().stream()
.flatMap((entry) -> entry.getValue()
.stream()
.map(translator -> new TranslateAcceptor(entry.getKey(), translator.accept)))
.flatMap((entry) -> entry.getValue()
.stream()
.map(translator -> new TranslateAcceptor(entry.getKey(), translator.accept)))
.collect(Collectors.toList());
}

Expand All @@ -131,27 +133,33 @@ protected FlowOptimizer createOptimizer() {
return opt;
}

/**
* Take only first translation operator
* @param flow the user defined flow to be translated
*
* @return all output sinks
*/
@Override
@SuppressWarnings("unchecked")
public List<DataSink<?>> translateInto(Flow flow) {
// transform flow to acyclic graph of supported operators
DAG<FlinkOperator<Operator<?, ?>>> dag = flowToDag(flow);

BatchExecutorContext executorContext = new BatchExecutorContext(env, (DAG) dag,
accumulatorFactory, settings);
accumulatorFactory, settings);

// translate each operator to proper Flink transformation
dag.traverse().map(Node::get).forEach(op -> {
Operator<?, ?> originalOp = op.getOriginalOperator();
List<Translation> txs = this.translations.get(originalOp.getClass());
if (txs.isEmpty()) {
throw new UnsupportedOperationException(
"Operator " + op.getClass().getSimpleName() + " not supported");
"Operator " + op.getClass().getSimpleName() + " not supported");
}
// ~ verify the flowToDag translation
Translation<Operator<?, ?>> firstMatch = null;
for (Translation tx : txs) {
if (tx.accept == null || Boolean.TRUE.equals(tx.accept.apply(originalOp))) {
if (tx.accept == null || (boolean)tx.accept.apply(originalOp)) {
firstMatch = tx;
break;
}
Expand All @@ -169,18 +177,18 @@ public List<DataSink<?>> translateInto(Flow flow) {
// process all sinks in the DAG (leaf nodes)
final List<DataSink<?>> sinks = new ArrayList<>();
dag.getLeafs()
.stream()
.map(Node::get)
.filter(op -> op.output().getOutputSink() != null)
.forEach(op -> {

final DataSink<?> sink = op.output().getOutputSink();
sinks.add(sink);
DataSet<?> flinkOutput =
Objects.requireNonNull(executorContext.getOutputStream(op));

flinkOutput.output(new DataSinkWrapper<>((DataSink) sink));
});
.stream()
.map(Node::get)
.filter(op -> op.output().getOutputSink() != null)
.forEach(op -> {

final DataSink<?> sink = op.output().getOutputSink();
sinks.add(sink);
DataSet<?> flinkOutput =
Objects.requireNonNull(executorContext.getOutputStream(op));

flinkOutput.output(new DataSinkWrapper<>((DataSink) sink));
});

return sinks;
}
Expand Down
Loading

0 comments on commit f84b9b8

Please sign in to comment.