Skip to content

Commit

Permalink
[euphoria-flink] #260 Flink - broadcast hash join
Browse files Browse the repository at this point in the history
  • Loading branch information
mareksimunek committed Feb 12, 2018
1 parent 88f365f commit 94c28d4
Show file tree
Hide file tree
Showing 8 changed files with 494 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.seznam.euphoria.spark;
package cz.seznam.euphoria.core.client.operator;

import cz.seznam.euphoria.core.annotation.audience.Audience;
import cz.seznam.euphoria.core.client.operator.JoinHint;

@Audience(Audience.Type.CLIENT)
public class JoinHints {
Expand All @@ -28,8 +27,7 @@ public static BroadcastHashJoin broadcastHashJoin() {
}

/**
* Broadcasts optional join side to all executors. See {@link BroadcastHashJoinTranslator}
* for more details.
* Broadcasts optional join side to all executors.
*/
public static class BroadcastHashJoin implements JoinHint {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright 2016-2018 Seznam.cz, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.seznam.euphoria.core.executor.util;

import cz.seznam.euphoria.core.annotation.audience.Audience;
import cz.seznam.euphoria.core.client.accumulators.Counter;
import cz.seznam.euphoria.core.client.accumulators.Histogram;
import cz.seznam.euphoria.core.client.accumulators.Timer;
import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.io.Collector;
import cz.seznam.euphoria.core.client.io.Context;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;

@Audience(Audience.Type.EXECUTOR)
public class MultiValueContext<T> implements Context, Collector<T> {

private final List<T> elements = new ArrayList<>(1);
@Nullable
final Context wrap;

public MultiValueContext() {
this(null);
}

public MultiValueContext(Context wrap) {
this.wrap = wrap;
}

/**
* Replace the stored value with given one.
*
* @param elem the element to store
*/
@Override
public void collect(T elem) {
elements.add(elem);
}

@Override
public Context asContext() {
return this;
}

/**
* Retrieve window associated with the stored element.
*/
@Override
public Window<?> getWindow() throws UnsupportedOperationException {
if (wrap == null) {
throw new UnsupportedOperationException(
"The window is unknown in this context");
}
return wrap.getWindow();
}

@Override
public Counter getCounter(String name) {
if (wrap == null) {
throw new UnsupportedOperationException(
"Accumulators not supported in this context");
}
return wrap.getCounter(name);
}

@Override
public Histogram getHistogram(String name) {
if (wrap == null) {
throw new UnsupportedOperationException(
"Accumulators not supported in this context");
}
return wrap.getHistogram(name);

}

@Override
public Timer getTimer(String name) {
if (wrap == null) {
throw new UnsupportedOperationException(
"Accumulators not supported in this context");
}
return wrap.getTimer(name);

}

/**
* Retrieve and reset the stored elements.
*
* @return the stored value
*/
public List<T> getAndResetValue() {
List<T> copiedElements = new ArrayList<>(elements);
elements.clear();
return copiedElements;
}

/**
* Retrieve value of this context.
*
* @return value
*/
public List<T> get() {
return elements;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package cz.seznam.euphoria.core.executor.util;/*
* Copyright 2016-2018 Seznam.cz, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import cz.seznam.euphoria.core.client.functional.UnaryPredicate;
import cz.seznam.euphoria.core.client.operator.Operator;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class Translation<T, O extends Operator<?, ?>> {
final T translator;
final UnaryPredicate<O> accept;

private Translation(
T translator, UnaryPredicate<O> accept) {
this.translator = Objects.requireNonNull(translator);
this.accept = accept;
}

static <T, O extends Operator<?, ?>> void add(
Map<Class, List<Translation>> idx,
Class<O> type, T translator) {
add(idx, type, translator, null);
}

static <T, O extends Operator<?, ?>> void add(
Map<Class, List<Translation>> idx,
Class<O> type, T translator, UnaryPredicate<O> accept) {
idx.putIfAbsent(type, new ArrayList<>());
idx.get(type).add(new Translation<>(translator, accept));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,18 @@
*/
package cz.seznam.euphoria.flink.batch;

import cz.seznam.euphoria.flink.accumulators.FlinkAccumulatorFactory;
import cz.seznam.euphoria.shadow.com.google.common.base.Preconditions;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.functional.UnaryPredicate;
import cz.seznam.euphoria.core.executor.graph.DAG;
import cz.seznam.euphoria.core.executor.graph.Node;
import cz.seznam.euphoria.core.client.io.DataSink;
import cz.seznam.euphoria.core.client.operator.FlatMap;
import cz.seznam.euphoria.core.client.operator.Operator;
import cz.seznam.euphoria.core.client.operator.ReduceByKey;
import cz.seznam.euphoria.core.client.operator.ReduceStateByKey;
import cz.seznam.euphoria.core.client.operator.Union;
import cz.seznam.euphoria.core.client.operator.*;
import cz.seznam.euphoria.core.executor.FlowUnfolder;
import cz.seznam.euphoria.core.executor.graph.DAG;
import cz.seznam.euphoria.core.executor.graph.Node;
import cz.seznam.euphoria.core.util.Settings;
import cz.seznam.euphoria.flink.FlinkOperator;
import cz.seznam.euphoria.flink.FlowOptimizer;
import cz.seznam.euphoria.flink.FlowTranslator;
import cz.seznam.euphoria.flink.accumulators.FlinkAccumulatorFactory;
import cz.seznam.euphoria.flink.batch.io.DataSinkWrapper;
import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
import org.apache.flink.api.java.DataSet;
Expand All @@ -40,12 +35,7 @@
import org.apache.flink.core.io.LocatableInputSplit;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

Expand All @@ -67,22 +57,23 @@ private Translation(
this.accept = accept;
}

static <O extends Operator<?, ?>> void set(
Map<Class, Translation> idx,
static <O extends Operator<?, ?>> void add(
Map<Class, List<Translation>> idx,
Class<O> type, BatchOperatorTranslator<O> translator)
{
set(idx, type, translator, null);
add(idx, type, translator, null);
}

static <O extends Operator<?, ?>> void set(
Map<Class, Translation> idx,
static <O extends Operator<?, ?>> void add(
Map<Class, List<Translation>> idx,
Class<O> type, BatchOperatorTranslator<O> translator, UnaryPredicate<O> accept)
{
idx.put(type, new Translation<>(translator, accept));
idx.putIfAbsent(type, new ArrayList<>());
idx.get(type).add(new Translation<>(translator, accept));
}
}

private final Map<Class, Translation> translations = new IdentityHashMap<>();
private final Map<Class, List<Translation>> translations = new IdentityHashMap<>();

private final Settings settings;
private final ExecutionEnvironment env;
Expand All @@ -103,21 +94,28 @@ public BatchFlowTranslator(Settings settings,
this.accumulatorFactory = Objects.requireNonNull(accumulatorFactory);

// basic operators
Translation.set(translations, FlowUnfolder.InputOperator.class, new InputTranslator(splitAssignerFactory));
Translation.set(translations, FlatMap.class, new FlatMapTranslator());
Translation.set(translations, ReduceStateByKey.class, new ReduceStateByKeyTranslator());
Translation.set(translations, Union.class, new UnionTranslator());
Translation.add(translations, FlowUnfolder.InputOperator.class, new InputTranslator
(splitAssignerFactory));
Translation.add(translations, FlatMap.class, new FlatMapTranslator());
Translation.add(translations, ReduceStateByKey.class, new ReduceStateByKeyTranslator());
Translation.add(translations, Union.class, new UnionTranslator());

// derived operators
Translation.set(translations, ReduceByKey.class, new ReduceByKeyTranslator(),
Translation.add(translations, ReduceByKey.class, new ReduceByKeyTranslator(),
ReduceByKeyTranslator::wantTranslate);

// ~ batch broadcast join for a very small left side
Translation.add(translations, Join.class, new BroadcastHashJoinTranslator(),
BroadcastHashJoinTranslator::wantTranslate);
}

@SuppressWarnings("unchecked")
@Override
protected Collection<TranslateAcceptor> getAcceptors() {
return translations.entrySet().stream()
.map(e -> new TranslateAcceptor(e.getKey(), e.getValue().accept))
.flatMap((entry) -> entry.getValue()
.stream()
.map(translator -> new TranslateAcceptor(entry.getKey(), translator.accept)))
.collect(Collectors.toList());
}

Expand All @@ -140,19 +138,27 @@ public List<DataSink<?>> translateInto(Flow flow) {
// translate each operator to proper Flink transformation
dag.traverse().map(Node::get).forEach(op -> {
Operator<?, ?> originalOp = op.getOriginalOperator();
Translation<Operator<?, ?>> tx = translations.get(originalOp.getClass());
if (tx == null) {
List<Translation> txs = this.translations.get(originalOp.getClass());
if (txs.isEmpty()) {
throw new UnsupportedOperationException(
"Operator " + op.getClass().getSimpleName() + " not supported");
}
// ~ verify the flowToDag translation
Preconditions.checkState(
tx.accept == null || Boolean.TRUE.equals(tx.accept.apply(originalOp)));

DataSet<?> out = tx.translator.translate(op, executorContext);

// save output of current operator to context
executorContext.setOutput(op, out);
Translation<Operator<?, ?>> firstMatch = null;
for (Translation tx : txs) {
if (tx.accept == null || Boolean.TRUE.equals(tx.accept.apply(originalOp))) {
firstMatch = tx;
break;
}
}
final DataSet<?> out;
if (firstMatch != null) {
out = firstMatch.translator.translate(op, executorContext);
// save output of current operator to context
executorContext.setOutput(op, out);
} else {
throw new IllegalStateException("No matching translation.");
}
});

// process all sinks in the DAG (leaf nodes)
Expand Down
Loading

0 comments on commit 94c28d4

Please sign in to comment.