Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#27 [euphoria-flink] Rewrite windowing to native implementation of StreamOperator #50

Merged
merged 3 commits into from
Mar 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@
import cz.seznam.euphoria.core.util.Settings;
import cz.seznam.euphoria.flink.batch.BatchFlowTranslator;
import cz.seznam.euphoria.flink.streaming.StreamingFlowTranslator;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.core.memory.HeapMemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.api.common.ExecutionConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -57,7 +55,7 @@ public class FlinkExecutor implements Executor {
private Duration checkpointInterval;

private boolean objectReuse = false;

// executor to submit flows, if closed all executions should be interrupted
private final ExecutorService submitExecutor = Executors.newCachedThreadPool();

Expand All @@ -67,12 +65,6 @@ public FlinkExecutor() {

public FlinkExecutor(boolean localEnv) {
this.localEnv = localEnv;
if (localEnv) {
// flink race condition bug hackfix
if (!MemorySegmentFactory.isInitialized()) {
MemorySegmentFactory.initializeFactory(HeapMemorySegment.FACTORY);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,33 @@
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;

/** Helper around storage descriptors. */
/**
* Converts Euphoria {@link cz.seznam.euphoria.core.client.operator.state.StorageDescriptor}
* to Flink {@link org.apache.flink.api.common.state.StateDescriptor}
*/
public class Descriptors {

/**
* Converts the given Euphoria descriptor into its Flink equivalent.
*
* @param descriptor the Euphoria descriptor
* @param <T> the type of the described value
* @return the Flink equivalent of the the given euphoria descriptor
*/
public static <T> ReducingStateDescriptor<T>
from(ValueStorageDescriptor.MergingValueStorageDescriptor<T> descriptor) {
return new ReducingStateDescriptor<T>(
return new ReducingStateDescriptor<>(
descriptor.getName(),
new ReducingMerger<>(descriptor.getValueMerger()),
descriptor.getValueClass());
}

/**
* Converts the given euphoria descriptor into its flink equivalent.
*
* @param <T> the type of the value described
* @param descriptor the euphoria descriptor
* Converts the given Euphoria descriptor into its Flink equivalent.
*
* @return the flink equivalent of the the given euphoria descriptor
* @param descriptor the Euphoria descriptor
* @param <T> the type of the described value
* @return the Flink equivalent of the the given euphoria descriptor
*/
public static <T> ValueStateDescriptor<T> from(ValueStorageDescriptor<T> descriptor) {
return new ValueStateDescriptor<>(
Expand All @@ -48,11 +57,10 @@ public static <T> ValueStateDescriptor<T> from(ValueStorageDescriptor<T> descrip
}

/**
* Converts the given euphoria descriptor into its flink equivalent.
* Converts the given Euphoria descriptor into its Flink equivalent.
*
* @param <T> the type of the value described
* @param descriptor the euphoria descriptor
*
* @param <T> the type of the value described
* @return the flink equivalent of the given euphoria descriptor
*/
public static <T> ListStateDescriptor<T> from(ListStorageDescriptor<T> descriptor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,29 @@
*/
package cz.seznam.euphoria.flink.storage;

import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.operator.state.ListStorage;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.runtime.state.KvState;

import java.util.Collections;

/**
* Implementation of {@link ListStorage} using Flink state API
*/
public class FlinkListStorage<T> implements ListStorage<T> {
public class FlinkListStorage<T, W extends Window> implements ListStorage<T> {

private final ListState<T> state;
private final W window;

public FlinkListStorage(ListState<T> state) {
public FlinkListStorage(ListState<T> state, W window) {
this.state = state;
this.window = window;
}

@Override
public void add(T element) {
setNamespace();
try {
state.add(element);
} catch (Exception ex) {
Expand All @@ -40,15 +47,30 @@ public void add(T element) {

@Override
public Iterable<T> get() {
setNamespace();
try {
return state.get();
Iterable<T> optional = state.get();
if (optional == null) {
return Collections.emptyList();
}
return optional;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}

@Override
public void clear() {
setNamespace();
state.clear();
}

/**
* Make sure that namespace is set correctly in the underlying
* keyed state backend.
*/
@SuppressWarnings("unchecked")
private void setNamespace() {
((KvState) state).setCurrentNamespace(window);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,27 @@
*/
package cz.seznam.euphoria.flink.storage;

import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.operator.state.ValueStorage;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.runtime.state.KvState;

public class FlinkReducingValueStorage<T> implements ValueStorage<T> {
public class FlinkReducingValueStorage<T, W extends Window> implements ValueStorage<T> {

private final ReducingState<T> state;
private final T defaultValue;

public FlinkReducingValueStorage(ReducingState<T> state, T defaultValue) {
private final W window;

public FlinkReducingValueStorage(ReducingState<T> state, T defaultValue, W window) {
this.state = state;
this.defaultValue = defaultValue;
this.window = window;
}

@Override
public void set(T value) {
setNamespace();
try {
state.clear();
state.add(value);
Expand All @@ -40,6 +46,7 @@ public void set(T value) {

@Override
public T get() {
setNamespace();
try {
T s = state.get();
return (s == null) ? defaultValue : s;
Expand All @@ -50,6 +57,16 @@ public T get() {

@Override
public void clear() {
setNamespace();
state.clear();
}

/**
* Make sure that namespace window is set correctly in the underlying
* keyed state backend.
*/
@SuppressWarnings("unchecked")
private void setNamespace() {
((KvState) state).setCurrentNamespace(window);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,27 @@
*/
package cz.seznam.euphoria.flink.storage;

import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.operator.state.ValueStorage;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.runtime.state.KvState;

/**
* Implementation of {@link ValueStorage} using Flink state API
*/
public class FlinkValueStorage<T> implements ValueStorage<T> {
public class FlinkValueStorage<T, W extends Window> implements ValueStorage<T> {

private final ValueState<T> state;
private final W window;

public FlinkValueStorage(ValueState<T> state) {
public FlinkValueStorage(ValueState<T> state, W window) {
this.state = state;
this.window = window;
}

@Override
public void set(T value) {
setNamespace();
try {
state.update(value);
} catch (Exception ex) {
Expand All @@ -40,6 +45,7 @@ public void set(T value) {

@Override
public T get() {
setNamespace();
try {
return state.value();
} catch (Exception ex) {
Expand All @@ -49,6 +55,16 @@ public T get() {

@Override
public void clear() {
setNamespace();
state.clear();
}

/**
* Make sure that namespace window is set correctly in the underlying
* keyed state backend.
*/
@SuppressWarnings("unchecked")
private void setNamespace() {
((KvState) state).setCurrentNamespace(window);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package cz.seznam.euphoria.flink.streaming;

import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement;
import cz.seznam.euphoria.core.client.functional.UnaryFunctor;
import cz.seznam.euphoria.core.client.operator.FlatMap;
import cz.seznam.euphoria.flink.FlinkOperator;
Expand All @@ -31,7 +32,7 @@ public DataStream<?> translate(FlinkOperator<FlatMap> operator,
UnaryFunctor mapper = operator.getOriginalOperator().getFunctor();
return input
.flatMap(new StreamingUnaryFunctorWrapper<>(mapper))
.returns((Class) StreamingWindowedElement.class)
.returns((Class) WindowedElement.class)
.name(operator.getName())
.setParallelism(operator.getParallelism());
}
Expand Down

This file was deleted.

Loading