diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/FlinkExecutor.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/FlinkExecutor.java index 42d02c88..cffd8959 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/FlinkExecutor.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/FlinkExecutor.java @@ -21,10 +21,8 @@ import cz.seznam.euphoria.core.util.Settings; import cz.seznam.euphoria.flink.batch.BatchFlowTranslator; import cz.seznam.euphoria.flink.streaming.StreamingFlowTranslator; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.core.memory.HeapMemorySegment; -import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.api.common.ExecutionConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,7 +55,7 @@ public class FlinkExecutor implements Executor { private Duration checkpointInterval; private boolean objectReuse = false; - + // executor to submit flows, if closed all executions should be interrupted private final ExecutorService submitExecutor = Executors.newCachedThreadPool(); @@ -67,12 +65,6 @@ public FlinkExecutor() { public FlinkExecutor(boolean localEnv) { this.localEnv = localEnv; - if (localEnv) { - // flink race condition bug hackfix - if (!MemorySegmentFactory.isInitialized()) { - MemorySegmentFactory.initializeFactory(HeapMemorySegment.FACTORY); - } - } } /** diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/storage/Descriptors.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/storage/Descriptors.java index a08b00d5..4ab20015 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/storage/Descriptors.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/storage/Descriptors.java @@ -21,24 +21,33 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; -/** Helper around storage descriptors. */ +/** + * Converts Euphoria {@link cz.seznam.euphoria.core.client.operator.state.StorageDescriptor} + * to Flink {@link org.apache.flink.api.common.state.StateDescriptor} + */ public class Descriptors { + /** + * Converts the given Euphoria descriptor into its Flink equivalent. + * + * @param descriptor the Euphoria descriptor + * @param the type of the described value + * @return the Flink equivalent of the the given euphoria descriptor + */ public static ReducingStateDescriptor from(ValueStorageDescriptor.MergingValueStorageDescriptor descriptor) { - return new ReducingStateDescriptor( + return new ReducingStateDescriptor<>( descriptor.getName(), new ReducingMerger<>(descriptor.getValueMerger()), descriptor.getValueClass()); } /** - * Converts the given euphoria descriptor into its flink equivalent. - * - * @param the type of the value described - * @param descriptor the euphoria descriptor + * Converts the given Euphoria descriptor into its Flink equivalent. * - * @return the flink equivalent of the the given euphoria descriptor + * @param descriptor the Euphoria descriptor + * @param the type of the described value + * @return the Flink equivalent of the the given euphoria descriptor */ public static ValueStateDescriptor from(ValueStorageDescriptor descriptor) { return new ValueStateDescriptor<>( @@ -48,11 +57,10 @@ public static ValueStateDescriptor from(ValueStorageDescriptor descrip } /** - * Converts the given euphoria descriptor into its flink equivalent. + * Converts the given Euphoria descriptor into its Flink equivalent. * - * @param the type of the value described * @param descriptor the euphoria descriptor - * + * @param the type of the value described * @return the flink equivalent of the given euphoria descriptor */ public static ListStateDescriptor from(ListStorageDescriptor descriptor) { diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/storage/FlinkListStorage.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/storage/FlinkListStorage.java index 11dca1ba..bd7b23ca 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/storage/FlinkListStorage.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/storage/FlinkListStorage.java @@ -15,22 +15,29 @@ */ package cz.seznam.euphoria.flink.storage; +import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.operator.state.ListStorage; import org.apache.flink.api.common.state.ListState; +import org.apache.flink.runtime.state.KvState; + +import java.util.Collections; /** * Implementation of {@link ListStorage} using Flink state API */ -public class FlinkListStorage implements ListStorage { +public class FlinkListStorage implements ListStorage { private final ListState state; + private final W window; - public FlinkListStorage(ListState state) { + public FlinkListStorage(ListState state, W window) { this.state = state; + this.window = window; } @Override public void add(T element) { + setNamespace(); try { state.add(element); } catch (Exception ex) { @@ -40,8 +47,13 @@ public void add(T element) { @Override public Iterable get() { + setNamespace(); try { - return state.get(); + Iterable optional = state.get(); + if (optional == null) { + return Collections.emptyList(); + } + return optional; } catch (Exception ex) { throw new RuntimeException(ex); } @@ -49,6 +61,16 @@ public Iterable get() { @Override public void clear() { + setNamespace(); state.clear(); } + + /** + * Make sure that namespace is set correctly in the underlying + * keyed state backend. + */ + @SuppressWarnings("unchecked") + private void setNamespace() { + ((KvState) state).setCurrentNamespace(window); + } } diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/storage/FlinkReducingValueStorage.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/storage/FlinkReducingValueStorage.java index 4f3236cb..6ea21a62 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/storage/FlinkReducingValueStorage.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/storage/FlinkReducingValueStorage.java @@ -15,21 +15,27 @@ */ package cz.seznam.euphoria.flink.storage; +import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.operator.state.ValueStorage; import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.runtime.state.KvState; -public class FlinkReducingValueStorage implements ValueStorage { +public class FlinkReducingValueStorage implements ValueStorage { private final ReducingState state; private final T defaultValue; - public FlinkReducingValueStorage(ReducingState state, T defaultValue) { + private final W window; + + public FlinkReducingValueStorage(ReducingState state, T defaultValue, W window) { this.state = state; this.defaultValue = defaultValue; + this.window = window; } @Override public void set(T value) { + setNamespace(); try { state.clear(); state.add(value); @@ -40,6 +46,7 @@ public void set(T value) { @Override public T get() { + setNamespace(); try { T s = state.get(); return (s == null) ? defaultValue : s; @@ -50,6 +57,16 @@ public T get() { @Override public void clear() { + setNamespace(); state.clear(); } + + /** + * Make sure that namespace window is set correctly in the underlying + * keyed state backend. + */ + @SuppressWarnings("unchecked") + private void setNamespace() { + ((KvState) state).setCurrentNamespace(window); + } } diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/storage/FlinkValueStorage.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/storage/FlinkValueStorage.java index 764e0568..0c9fe0b7 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/storage/FlinkValueStorage.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/storage/FlinkValueStorage.java @@ -15,22 +15,27 @@ */ package cz.seznam.euphoria.flink.storage; +import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.operator.state.ValueStorage; import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.runtime.state.KvState; /** * Implementation of {@link ValueStorage} using Flink state API */ -public class FlinkValueStorage implements ValueStorage { +public class FlinkValueStorage implements ValueStorage { private final ValueState state; + private final W window; - public FlinkValueStorage(ValueState state) { + public FlinkValueStorage(ValueState state, W window) { this.state = state; + this.window = window; } @Override public void set(T value) { + setNamespace(); try { state.update(value); } catch (Exception ex) { @@ -40,6 +45,7 @@ public void set(T value) { @Override public T get() { + setNamespace(); try { return state.value(); } catch (Exception ex) { @@ -49,6 +55,16 @@ public T get() { @Override public void clear() { + setNamespace(); state.clear(); } + + /** + * Make sure that namespace window is set correctly in the underlying + * keyed state backend. + */ + @SuppressWarnings("unchecked") + private void setNamespace() { + ((KvState) state).setCurrentNamespace(window); + } } diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/FlatMapTranslator.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/FlatMapTranslator.java index fec563d3..b192b13c 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/FlatMapTranslator.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/FlatMapTranslator.java @@ -15,6 +15,7 @@ */ package cz.seznam.euphoria.flink.streaming; +import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement; import cz.seznam.euphoria.core.client.functional.UnaryFunctor; import cz.seznam.euphoria.core.client.operator.FlatMap; import cz.seznam.euphoria.flink.FlinkOperator; @@ -31,7 +32,7 @@ public DataStream translate(FlinkOperator operator, UnaryFunctor mapper = operator.getOriginalOperator().getFunctor(); return input .flatMap(new StreamingUnaryFunctorWrapper<>(mapper)) - .returns((Class) StreamingWindowedElement.class) + .returns((Class) WindowedElement.class) .name(operator.getName()) .setParallelism(operator.getParallelism()); } diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/FlinkStreamingStateStorageProvider.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/FlinkStreamingStateStorageProvider.java deleted file mode 100644 index 493d34cb..00000000 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/FlinkStreamingStateStorageProvider.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Copyright 2016 Seznam.cz, a.s. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package cz.seznam.euphoria.flink.streaming; - -import cz.seznam.euphoria.core.client.operator.state.ListStorage; -import cz.seznam.euphoria.core.client.operator.state.ListStorageDescriptor; -import cz.seznam.euphoria.core.client.operator.state.StorageProvider; -import cz.seznam.euphoria.core.client.operator.state.ValueStorage; -import cz.seznam.euphoria.core.client.operator.state.ValueStorageDescriptor; -import cz.seznam.euphoria.flink.storage.Descriptors; -import cz.seznam.euphoria.flink.storage.FlinkListStorage; -import cz.seznam.euphoria.flink.storage.FlinkValueStorage; -import org.apache.flink.api.common.functions.RuntimeContext; - -import java.io.Serializable; - -/** - * Storage provider using flink's state API. - */ -class FlinkStreamingStateStorageProvider implements StorageProvider, Serializable { - - private transient RuntimeContext context; - - void initialize(RuntimeContext context) { - this.context = context; - } - - @Override - public ValueStorage getValueStorage(ValueStorageDescriptor descriptor) { - return new FlinkValueStorage<>(context.getState(Descriptors.from(descriptor))); - } - - @Override - public ListStorage getListStorage(ListStorageDescriptor descriptor) { - return new FlinkListStorage<>(context.getListState(Descriptors.from(descriptor))); - } -} diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceByKeyTranslator.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceByKeyTranslator.java deleted file mode 100644 index 1a1e3d06..00000000 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceByKeyTranslator.java +++ /dev/null @@ -1,270 +0,0 @@ -/** - * Copyright 2016 Seznam.cz, a.s. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package cz.seznam.euphoria.flink.streaming; - -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; -import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; -import cz.seznam.euphoria.core.client.functional.UnaryFunction; -import cz.seznam.euphoria.core.client.operator.ReduceByKey; -import cz.seznam.euphoria.core.client.util.Pair; -import cz.seznam.euphoria.flink.FlinkOperator; -import cz.seznam.euphoria.flink.functions.IteratorIterable; -import cz.seznam.euphoria.flink.functions.PartitionerWrapper; -import cz.seznam.euphoria.flink.streaming.windowing.FlinkWindow; -import cz.seznam.euphoria.flink.streaming.windowing.MultiWindowedElement; -import cz.seznam.euphoria.flink.streaming.windowing.MultiWindowedElementWindowFunction; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.datastream.WindowedStream; -import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction; -import org.apache.flink.streaming.api.functions.windowing.WindowFunction; -import org.apache.flink.streaming.api.windowing.windows.Window; -import org.apache.flink.util.Collector; - -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.Set; - -class ReduceByKeyTranslator implements StreamingOperatorTranslator { - - @Override - @SuppressWarnings("unchecked") - public DataStream translate(FlinkOperator operator, - StreamingExecutorContext context) { - DataStream input = - Iterables.getOnlyElement(context.getInputStreams(operator)); - - ReduceByKey origOperator = operator.getOriginalOperator(); - final UnaryFunction reducer = origOperator.getReducer(); - final UnaryFunction keyExtractor = origOperator.getKeyExtractor(); - final UnaryFunction valueExtractor = origOperator.getValueExtractor(); - final Windowing windowing = origOperator.getWindowing(); - final UnaryFunction eventTimeAssigner = origOperator.getEventTimeAssigner(); - - // apply windowing first - SingleOutputStreamOperator> reduced; - if (windowing == null) { - WindowedStream windowed = - context.attachedWindowStream((DataStream) input, keyExtractor, valueExtractor); - if (origOperator.isCombinable()) { - // reduce incrementally - reduced = windowed.apply( - new StreamingWindowedElementIncrementalReducer(reducer), new PassThroughWindowFunction<>()); - } else { - // reduce all elements at once when the window is fired - reduced = windowed.apply( - new StreamingWindowedElementWindowedReducer(reducer, new PassThroughWindowFunction<>())); - } - } else { - WindowedStream windowed = context.flinkWindow( - (DataStream) input, keyExtractor, valueExtractor, windowing, eventTimeAssigner); - if (origOperator.isCombinable()) { - // reduce incrementally - reduced = windowed.apply( - new MultiWindowedElementIncrementalReducer(reducer), new MultiWindowedElementWindowFunction()); - - } else { - // reduce all elements at once when the window is fired - reduced = windowed.apply( - new MultiWindowedElementWindowedReducer(reducer, new MultiWindowedElementWindowFunction())); - } - } - - DataStream> out = - reduced.name(operator.getName()) - .setParallelism(operator.getParallelism()); - - // FIXME partitioner should be applied during "reduce" to avoid - // unnecessary shuffle, but there is no (known) way how to set custom - // partitioner to "keyBy" transformation - - // apply custom partitioner if different from default - if (!origOperator.getPartitioning().hasDefaultPartitioner()) { - out = out.partitionCustom( - new PartitionerWrapper<>(origOperator.getPartitioning().getPartitioner()), - p -> p.getElement().getKey()); - } - - return out; - } - - /** - * Performs incremental reduction (in case of combining reduce). - */ - private static class StreamingWindowedElementIncrementalReducer - implements ReduceFunction>, - ResultTypeQueryable> { - - final UnaryFunction reducer; - - public StreamingWindowedElementIncrementalReducer(UnaryFunction reducer) { - this.reducer = reducer; - } - - @Override - public StreamingWindowedElement reduce( - StreamingWindowedElement p1, - StreamingWindowedElement p2) { - - Object v1 = p1.getElement().getSecond(); - Object v2 = p2.getElement().getSecond(); - return new StreamingWindowedElement<>( - p1.getWindow(), - p1.getTimestamp(), - Pair.of(p1.getElement().getKey(), reducer.apply(Arrays.asList(v1, v2)))); - } - - @Override - @SuppressWarnings("unchecked") - public TypeInformation> getProducedType() { - return TypeInformation.of((Class) StreamingWindowedElement.class); - } - } // ~ end of StreamingWindowedElementIncrementalReducer - - /** - * Performs non-incremental reduction (in case of non-combining reduce). - */ - private static class StreamingWindowedElementWindowedReducer - implements WindowFunction< - StreamingWindowedElement, - StreamingWindowedElement, - Object, - Window> { - - private final UnaryFunction reducer; - private final WindowFunction, - StreamingWindowedElement, Object, Window> emissionFunction; - - @SuppressWarnings("unchecked") - public StreamingWindowedElementWindowedReducer(UnaryFunction reducer, - WindowFunction emissionFunction) { - this.reducer = reducer; - this.emissionFunction = emissionFunction; - } - - @Override - public void apply(Object key, - Window window, - Iterable> input, - Collector> collector) - throws Exception { - - Iterator> it = input.iterator(); - - // read the first element to obtain window metadata - StreamingWindowedElement element = it.next(); - cz.seznam.euphoria.core.client.dataset.windowing.Window wid = element.getWindow(); - long emissionWatermark = element.getTimestamp(); - - // concat the already read element with rest of the opened iterator - Iterator> concatIt = - Iterators.concat(Iterators.singletonIterator(element), it); - - // unwrap all elements to be used in user defined reducer - Iterator unwrapped = - Iterators.transform(concatIt, e -> e.getElement().getValue()); - - Object reduced = reducer.apply(new IteratorIterable<>(unwrapped)); - - StreamingWindowedElement out = - new StreamingWindowedElement<>(wid, emissionWatermark, Pair.of(key, reduced)); - - // decorate resulting item with emission watermark from fired window - emissionFunction.apply(key, window, Collections.singletonList(out), collector); - } - } // ~ end of StreamingWindowedElementWindowedReducer - - - /** - * Performs incremental reduction (in case of combining reduce) on - * {@link MultiWindowedElement}s. Assumes the result is emitted using - * {@link MultiWindowedElementWindowFunction}. - */ - private static class MultiWindowedElementIncrementalReducer< - WID extends cz.seznam.euphoria.core.client.dataset.windowing.Window, KEY, VALUE> - implements ReduceFunction>>, - ResultTypeQueryable>> { - - final UnaryFunction, VALUE> reducer; - - public MultiWindowedElementIncrementalReducer(UnaryFunction, VALUE> reducer) { - this.reducer = reducer; - } - - @Override - public MultiWindowedElement> reduce( - MultiWindowedElement> p1, - MultiWindowedElement> p2) { - - VALUE v1 = p1.getElement().getSecond(); - VALUE v2 = p2.getElement().getSecond(); - Set s = Collections.emptySet(); - return new MultiWindowedElement<>(s, - Pair.of(p1.getElement().getFirst(), reducer.apply(Arrays.asList(v1, v2)))); - } - - @Override - @SuppressWarnings("unchecked") - public TypeInformation>> - getProducedType() { - return TypeInformation.of((Class) MultiWindowedElement.class); - } - } // ~ end of MultiWindowedElementIncrementalReducer - - /** - * Performs non-incremental reduction (in case of non-combining reduce). - */ - private static class MultiWindowedElementWindowedReducer< - WID extends cz.seznam.euphoria.core.client.dataset.windowing.Window, - KEY, VALUEIN, VALUEOUT> - implements WindowFunction< - MultiWindowedElement>, - StreamingWindowedElement>, - KEY, - FlinkWindow> { - - private final UnaryFunction, VALUEOUT> reducer; - private final MultiWindowedElementWindowFunction emissionFunction; - - public MultiWindowedElementWindowedReducer( - UnaryFunction, VALUEOUT> reducer, - MultiWindowedElementWindowFunction emissionFunction) { - this.reducer = reducer; - this.emissionFunction = emissionFunction; - } - - @Override - public void apply(KEY key, - FlinkWindow window, - Iterable>> input, - Collector>> collector) - throws Exception { - - VALUEOUT reducedValue = reducer.apply(new IteratorIterable<>( - Iterators.transform(input.iterator(), e -> e.getElement().getValue()))); - MultiWindowedElement> reduced = - new MultiWindowedElement<>(Collections.emptySet(), Pair.of(key, reducedValue)); - this.emissionFunction.apply(key, window, - Collections.singletonList(reduced), collector); - } - } // ~ end of MultiWindowedElementWindowedReducer - -} diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.java index 631430d5..978b6cc3 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.java @@ -16,26 +16,29 @@ package cz.seznam.euphoria.flink.streaming; import com.google.common.collect.Iterables; +import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement; import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; +import cz.seznam.euphoria.core.client.functional.CombinableReduceFunction; import cz.seznam.euphoria.core.client.functional.StateFactory; import cz.seznam.euphoria.core.client.functional.UnaryFunction; -import cz.seznam.euphoria.core.client.io.Context; import cz.seznam.euphoria.core.client.operator.ReduceStateByKey; import cz.seznam.euphoria.core.client.operator.state.State; import cz.seznam.euphoria.core.client.util.Pair; import cz.seznam.euphoria.flink.FlinkOperator; import cz.seznam.euphoria.flink.functions.PartitionerWrapper; -import cz.seznam.euphoria.flink.streaming.windowing.FlinkWindow; -import cz.seznam.euphoria.flink.streaming.windowing.MultiWindowedElement; -import cz.seznam.euphoria.flink.streaming.windowing.WindowProperties; -import org.apache.flink.configuration.Configuration; +import cz.seznam.euphoria.flink.streaming.windowing.AttachedWindowing; +import cz.seznam.euphoria.flink.streaming.windowing.KeyedMultiWindowedElement; +import cz.seznam.euphoria.flink.streaming.windowing.WindowOperator; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.WindowedStream; -import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; -import org.apache.flink.streaming.api.windowing.windows.Window; -import org.apache.flink.util.Collector; +import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; -import java.util.Iterator; +import java.time.Duration; +import java.util.Objects; +import java.util.Set; class ReduceStateByKeyTranslator implements StreamingOperatorTranslator { @@ -44,114 +47,124 @@ class ReduceStateByKeyTranslator implements StreamingOperatorTranslator translate(FlinkOperator operator, StreamingExecutorContext context) { - DataStream input = + DataStream input = Iterables.getOnlyElement(context.getInputStreams(operator)); ReduceStateByKey origOperator = operator.getOriginalOperator(); StateFactory stateFactory = origOperator.getStateFactory(); + CombinableReduceFunction stateCombiner = origOperator.getStateCombiner(); + + Windowing windowing = origOperator.getWindowing(); + if (windowing == null) { + // use attached windowing when no windowing explicitly defined + windowing = new AttachedWindowing<>(); + } - final Windowing windowing = origOperator.getWindowing(); final UnaryFunction keyExtractor = origOperator.getKeyExtractor(); final UnaryFunction valueExtractor = origOperator.getValueExtractor(); final UnaryFunction eventTimeAssigner = origOperator.getEventTimeAssigner(); - FlinkStreamingStateStorageProvider storageProvider - = new FlinkStreamingStateStorageProvider(); - - DataStream> folded; - // apply windowing first - if (windowing == null) { - WindowedStream windowed = - context.attachedWindowStream((DataStream) input, keyExtractor, valueExtractor); - // equivalent operation to "left fold" - folded = windowed.apply(new RSBKWindowFunction(storageProvider, stateFactory)) - .name(operator.getName()) - .setParallelism(operator.getParallelism()); - } else { - WindowedStream, Object, FlinkWindow> - windowed = context.flinkWindow( - (DataStream) input, keyExtractor, valueExtractor, windowing, eventTimeAssigner); - // equivalent operation to "left fold" - folded = windowed.apply(new RSBKWindowFunction(storageProvider, stateFactory)) - .name(operator.getName()) - .setParallelism(operator.getParallelism()); + if (eventTimeAssigner != null) { + input = input.assignTimestampsAndWatermarks( + new EventTimeAssigner(context.getAllowedLateness(), eventTimeAssigner)); } + // assign windows + DataStream windowed = input.map(new WindowAssigner( + windowing, keyExtractor, valueExtractor, eventTimeAssigner)) + .setParallelism(operator.getParallelism()); + + DataStream> reduced = (DataStream) windowed.keyBy(new KeyExtractor()) + .transform(operator.getName(), TypeInformation.of(WindowedElement.class), new WindowOperator<>( + windowing, stateFactory, stateCombiner, context.isLocalMode())) + .setParallelism(operator.getParallelism()); + // FIXME partitioner should be applied during "keyBy" to avoid // unnecessary shuffle, but there is no (known) way how to set custom // partitioner to "keyBy" transformation // apply custom partitioner if different from default if (!origOperator.getPartitioning().hasDefaultPartitioner()) { - folded = folded.partitionCustom( + reduced = reduced.partitionCustom( new PartitionerWrapper<>(origOperator.getPartitioning().getPartitioner()), p -> p.getElement().getKey()); } - return folded; + return reduced; } - private static class RSBKWindowFunction< - WID extends cz.seznam.euphoria.core.client.dataset.windowing.Window, - KEY, VALUEIN, VALUEOUT, - W extends Window & WindowProperties> - extends RichWindowFunction>, - StreamingWindowedElement>, - KEY, W> { + private static class EventTimeAssigner + extends BoundedOutOfOrdernessTimestampExtractor + { + private final UnaryFunction eventTimeFn; - private final StateFactory stateFactory; - private final FlinkStreamingStateStorageProvider storageProvider; + EventTimeAssigner(Duration allowedLateness, UnaryFunction eventTimeFn) { + super(millisTime(allowedLateness.toMillis())); + this.eventTimeFn = Objects.requireNonNull(eventTimeFn); + } - RSBKWindowFunction( - FlinkStreamingStateStorageProvider storageProvider, - StateFactory stateFactory) { + @Override + public long extractTimestamp(WindowedElement element) { + return eventTimeFn.apply(element.getElement()); + } - this.stateFactory = stateFactory; - this.storageProvider = storageProvider; + private static org.apache.flink.streaming.api.windowing.time.Time + millisTime(long millis) { + return org.apache.flink.streaming.api.windowing.time.Time.milliseconds(millis); } + } - @Override - public void open(Configuration parameters) throws Exception { - storageProvider.initialize(getRuntimeContext()); + private static class WindowAssigner implements MapFunction, + ResultTypeQueryable { + + private final Windowing windowing; + private final UnaryFunction keyExtractor; + private final UnaryFunction valueExtractor; + private final UnaryFunction eventTimeAssigner; + + public WindowAssigner(Windowing windowing, + UnaryFunction keyExtractor, + UnaryFunction valueExtractor, + UnaryFunction eventTimeAssigner) { + this.windowing = windowing; + this.keyExtractor = keyExtractor; + this.valueExtractor = valueExtractor; + this.eventTimeAssigner = eventTimeAssigner; } @Override - public void apply( - KEY key, - W window, - Iterable>> input, - Collector>> out) - throws Exception { - - Iterator>> it = input.iterator(); - // read the first element to obtain window metadata and key - ElementProvider> element = it.next(); - WID wid = window.getWindowID(); - long emissionWatermark = window.getEmissionWatermark(); - - @SuppressWarnings("unchecked") - State state = stateFactory.apply( - new Context() { - @Override - public void collect(Object elem) { - out.collect(new StreamingWindowedElement(wid, emissionWatermark, Pair.of(key, elem))); - } - @Override - public Object getWindow() { - return wid; - } - }, - storageProvider); - - // add the first element to the state - state.add(element.getElement().getValue()); - - while (it.hasNext()) { - state.add(it.next().getElement().getValue()); + @SuppressWarnings("unchecked") + public KeyedMultiWindowedElement map(WindowedElement el) throws Exception { + if (eventTimeAssigner != null) { + el.setTimestamp((long) eventTimeAssigner.apply(el.getElement())); } - state.flush(); - state.close(); + Set windows = windowing.assignWindowsToElement(el); + + return new KeyedMultiWindowedElement<>( + keyExtractor.apply(el.getElement()), + valueExtractor.apply(el.getElement()), + el.getTimestamp(), + windows); + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(KeyedMultiWindowedElement.class); + } + } + + private static class KeyExtractor implements KeySelector, + ResultTypeQueryable { + + @Override + public Object getKey(KeyedMultiWindowedElement el) throws Exception { + return el.getKey(); + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(Object.class); } } } diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/RepartitionTranslator.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/RepartitionTranslator.java index c96f0ed5..ae5000e1 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/RepartitionTranslator.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/RepartitionTranslator.java @@ -16,6 +16,7 @@ package cz.seznam.euphoria.flink.streaming; import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning; +import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement; import cz.seznam.euphoria.core.client.operator.Repartition; import cz.seznam.euphoria.flink.FlinkOperator; import cz.seznam.euphoria.flink.functions.PartitionerWrapper; @@ -28,8 +29,8 @@ class RepartitionTranslator implements StreamingOperatorTranslator public DataStream translate(FlinkOperator operator, StreamingExecutorContext context) { - DataStream input = - (DataStream) context.getSingleInputStream(operator); + DataStream input = + (DataStream) context.getSingleInputStream(operator); Partitioning partitioning = operator.getOriginalOperator().getPartitioning(); PartitionerWrapper flinkPartitioner = diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingExecutorContext.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingExecutorContext.java index b95d8042..637ba65f 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingExecutorContext.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingExecutorContext.java @@ -15,92 +15,37 @@ */ package cz.seznam.euphoria.flink.streaming; -import cz.seznam.euphoria.core.client.dataset.windowing.Window; -import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; -import cz.seznam.euphoria.core.client.functional.UnaryFunction; import cz.seznam.euphoria.core.client.graph.DAG; -import cz.seznam.euphoria.core.client.util.Pair; import cz.seznam.euphoria.flink.ExecutorContext; import cz.seznam.euphoria.flink.FlinkOperator; -import cz.seznam.euphoria.flink.streaming.windowing.AttachedWindow; -import cz.seznam.euphoria.flink.streaming.windowing.FlinkWindow; -import cz.seznam.euphoria.flink.streaming.windowing.MultiWindowedElement; -import cz.seznam.euphoria.flink.streaming.windowing.MultiWindowedElementWindowFunction; -import cz.seznam.euphoria.flink.streaming.windowing.StreamWindower; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import java.util.Objects; +import java.time.Duration; public class StreamingExecutorContext extends ExecutorContext> { - private final StreamWindower windower; + private final Duration allowedLateness; + + /** True when running in local (test) mode */ + private final boolean localMode; public StreamingExecutorContext(StreamExecutionEnvironment env, DAG> dag, - StreamWindower streamWindower) - { + Duration allowedLateness, + boolean localMode) { super(env, dag); - this.windower = Objects.requireNonNull(streamWindower); + this.allowedLateness = allowedLateness; + this.localMode = localMode; } - /** - * Creates a windowed stream based on euphoria windowing and key assigner. - *

- * The returned windowed stream must be post processed using - * {@link MultiWindowedElementWindowFunction}. Attached windowing is relying - * on its effects. - * - * @param the type of input elements - * @param the type of windows produced - * @param the type of the elements' keys - * @param the type of the elements' values - * - * @param input the input stream to be windowed - * @param keyFn the key extraction function - * @param valFn the value extraction function - * @param windowing the windowing strategy to apply - * @param eventTimeAssigner the event time extraction function - * - * @return a windowed stream prepared for consumption by {@link MultiWindowedElementWindowFunction} - */ - public - WindowedStream>, KEY, FlinkWindow> - flinkWindow(DataStream> input, - UnaryFunction keyFn, - UnaryFunction valFn, - Windowing windowing, - UnaryFunction eventTimeAssigner) { - - return windower.window(input, keyFn, valFn, windowing, eventTimeAssigner); + public Duration getAllowedLateness() { + return allowedLateness; } - /** - * Creates an attached window stream, presuming a preceding non-attached - * windowing on the input data stream forwarding - * {@link StreamingWindowedElement#getTimestamp} of the windows to attach to. - * - * @param the type of input elements - * @param the type of windows produced - * @param the type of the elements' keys - * @param the type of the elements' values - * - * @param input the input stream to be windowed - * @param keyFn the key extraction function - * @param valFn the value extraction function - * - * @return a windowed stream attached to the windowing of the original input stream - */ - - WindowedStream>, - KEY, AttachedWindow> - attachedWindowStream(DataStream> input, - UnaryFunction keyFn, - UnaryFunction valFn) - { - return windower.attachedWindow(input, keyFn, valFn); + public boolean isLocalMode() { + return localMode; } } diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingFlowTranslator.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingFlowTranslator.java index 4dc4f16b..6f1f5f4d 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingFlowTranslator.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingFlowTranslator.java @@ -21,7 +21,6 @@ import cz.seznam.euphoria.core.client.io.DataSink; import cz.seznam.euphoria.core.client.operator.FlatMap; import cz.seznam.euphoria.core.client.operator.Operator; -import cz.seznam.euphoria.core.client.operator.ReduceByKey; import cz.seznam.euphoria.core.client.operator.ReduceStateByKey; import cz.seznam.euphoria.core.client.operator.Repartition; import cz.seznam.euphoria.core.client.operator.Union; @@ -29,9 +28,9 @@ import cz.seznam.euphoria.flink.FlinkOperator; import cz.seznam.euphoria.flink.FlowTranslator; import cz.seznam.euphoria.flink.streaming.io.DataSinkWrapper; -import cz.seznam.euphoria.flink.streaming.windowing.StreamWindower; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.time.Duration; @@ -61,8 +60,6 @@ private static void defineTranslators() { TRANSLATORS.put(Repartition.class, new RepartitionTranslator()); TRANSLATORS.put(ReduceStateByKey.class, new ReduceStateByKeyTranslator()); TRANSLATORS.put(Union.class, new UnionTranslator()); - - TRANSLATORS.put(ReduceByKey.class, new ReduceByKeyTranslator()); } // ~ ------------------------------------------------------------------------------ @@ -98,7 +95,10 @@ public List> translateInto(Flow flow) { env.getConfig().setAutoWatermarkInterval(autoWatermarkInterval.toMillis()); StreamingExecutorContext executorContext = - new StreamingExecutorContext(env, (DAG) dag, new StreamWindower(allowedLateness)); + new StreamingExecutorContext(env, + (DAG) dag, + allowedLateness, + env instanceof LocalStreamEnvironment); // translate each operator to proper Flink transformation dag.traverse().map(Node::get).forEach(op -> { diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingUnaryFunctorWrapper.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingUnaryFunctorWrapper.java index 11f9df69..2517e80a 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingUnaryFunctorWrapper.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingUnaryFunctorWrapper.java @@ -16,6 +16,7 @@ package cz.seznam.euphoria.flink.streaming; import cz.seznam.euphoria.core.client.dataset.windowing.Window; +import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement; import cz.seznam.euphoria.core.client.functional.UnaryFunctor; import cz.seznam.euphoria.core.client.io.Context; import org.apache.flink.api.common.functions.FlatMapFunction; @@ -26,9 +27,9 @@ import java.util.Objects; public class StreamingUnaryFunctorWrapper - implements FlatMapFunction, - StreamingWindowedElement>, - ResultTypeQueryable> { + implements FlatMapFunction, + WindowedElement>, + ResultTypeQueryable> { private final UnaryFunctor f; @@ -37,14 +38,14 @@ public StreamingUnaryFunctorWrapper(UnaryFunctor f) { } @Override - public void flatMap(StreamingWindowedElement value, - Collector> out) + public void flatMap(WindowedElement value, + Collector> out) throws Exception { f.apply(value.getElement(), new Context() { @Override public void collect(OUT elem) { - out.collect(new StreamingWindowedElement<>( + out.collect(new WindowedElement<>( value.getWindow(), value.getTimestamp(), elem)); } @Override @@ -56,7 +57,7 @@ public Object getWindow() { @SuppressWarnings("unchecked") @Override - public TypeInformation> getProducedType() { - return TypeInformation.of((Class) StreamingWindowedElement.class); + public TypeInformation> getProducedType() { + return TypeInformation.of((Class) WindowedElement.class); } } diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingWindowedElement.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingWindowedElement.java deleted file mode 100644 index 8906411a..00000000 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingWindowedElement.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Copyright 2016 Seznam.cz, a.s. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package cz.seznam.euphoria.flink.streaming; - -import cz.seznam.euphoria.core.client.dataset.windowing.Window; -import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement; - -/** - * An extension to {@link WindowedElement} to carry flink specific information - * along the elements. All elements flowing through the streaming flink executor are - * of this type. - */ -public class StreamingWindowedElement - extends WindowedElement - implements ElementProvider -{ - public StreamingWindowedElement(W window, long timestamp, T element) { - super(window, timestamp, element); - } -} diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/io/DataSinkWrapper.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/io/DataSinkWrapper.java index faeb4c69..5823fd1b 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/io/DataSinkWrapper.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/io/DataSinkWrapper.java @@ -15,9 +15,9 @@ */ package cz.seznam.euphoria.flink.streaming.io; +import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement; import cz.seznam.euphoria.core.client.io.DataSink; import cz.seznam.euphoria.core.client.io.Writer; -import cz.seznam.euphoria.flink.streaming.StreamingWindowedElement; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.checkpoint.Checkpointed; @@ -26,7 +26,7 @@ import java.io.Serializable; public class DataSinkWrapper - extends RichSinkFunction> + extends RichSinkFunction> implements Checkpointed { private DataSink dataSink; @@ -56,7 +56,7 @@ public void close() throws Exception { } @Override - public void invoke(StreamingWindowedElement elem) throws Exception { + public void invoke(WindowedElement elem) throws Exception { writer.write(elem.getElement()); } diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/io/DataSourceWrapper.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/io/DataSourceWrapper.java index 84f0229d..55d401da 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/io/DataSourceWrapper.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/io/DataSourceWrapper.java @@ -17,10 +17,10 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import cz.seznam.euphoria.core.client.dataset.windowing.Batch; +import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement; import cz.seznam.euphoria.core.client.io.DataSource; import cz.seznam.euphoria.core.client.io.Partition; import cz.seznam.euphoria.core.client.io.Reader; -import cz.seznam.euphoria.flink.streaming.StreamingWindowedElement; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; @@ -36,8 +36,8 @@ import java.util.concurrent.TimeUnit; public class DataSourceWrapper - extends RichParallelSourceFunction> - implements ResultTypeQueryable> + extends RichParallelSourceFunction> + implements ResultTypeQueryable> { private final DataSource dataSource; private volatile boolean isRunning = true; @@ -49,7 +49,7 @@ public DataSourceWrapper(DataSource dataSource) { } @Override - public void run(SourceContext> ctx) + public void run(SourceContext> ctx) throws Exception { StreamingRuntimeContext runtimeContext = @@ -107,9 +107,9 @@ public void run(SourceContext> ct } } - private StreamingWindowedElement toWindowedElement(T elem) { + private WindowedElement toWindowedElement(T elem) { // assign ingestion timestamp to elements - return new StreamingWindowedElement<>(Batch.BatchWindow.get(), System.currentTimeMillis(), elem); + return new WindowedElement<>(Batch.BatchWindow.get(), System.currentTimeMillis(), elem); } @Override @@ -122,8 +122,8 @@ public void cancel() { @Override @SuppressWarnings("unchecked") - public TypeInformation> getProducedType() { - return TypeInformation.of((Class) StreamingWindowedElement.class); + public TypeInformation> getProducedType() { + return TypeInformation.of((Class) WindowedElement.class); } private ThreadPoolExecutor createThreadPool() { diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AttachedWindow.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AttachedWindow.java deleted file mode 100644 index 5acaab69..00000000 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AttachedWindow.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Copyright 2016 Seznam.cz, a.s. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package cz.seznam.euphoria.flink.streaming.windowing; - - -import cz.seznam.euphoria.flink.streaming.StreamingWindowedElement; -import org.apache.flink.streaming.api.windowing.windows.Window; - -import java.util.Objects; - -public class AttachedWindow - extends Window - implements WindowProperties { - - private final WID wid; - - // ~ transient on purpose not to affect serialized form - // because RocksDB uses that form as a key in state backend - private transient final long emissionWatermark; - - public AttachedWindow(StreamingWindowedElement element) { - this(element.getWindow(), element.getTimestamp()); - } - - public AttachedWindow(WID wid, long emissionWatermark) { - this.wid = Objects.requireNonNull(wid); - this.emissionWatermark = emissionWatermark; - } - - @Override - public WID getWindowID() { - return wid; - } - - @Override - public long getEmissionWatermark() { - return emissionWatermark; - } - - @Override - public long maxTimestamp() { - return Long.MAX_VALUE; - } - - @Override - public boolean equals(Object o) { - if (o instanceof AttachedWindow) { - AttachedWindow that = (AttachedWindow) o; - return this.wid.equals(that.wid); - } - return false; - } - - @Override - public int hashCode() { - return wid.hashCode(); - } - - @Override - public String toString() { - return "AttachedWindow{" + - "wid=" + wid + - ", emissionWatermark=" + emissionWatermark + - '}'; - } -} diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AttachedWindowAssigner.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AttachedWindowAssigner.java deleted file mode 100644 index c8906428..00000000 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AttachedWindowAssigner.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Copyright 2016 Seznam.cz, a.s. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package cz.seznam.euphoria.flink.streaming.windowing; - -import cz.seznam.euphoria.core.client.dataset.windowing.Window; -import cz.seznam.euphoria.flink.streaming.StreamingWindowedElement; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; -import org.apache.flink.streaming.api.windowing.triggers.Trigger; - -import java.util.Collection; -import java.util.Collections; - -public class AttachedWindowAssigner - extends WindowAssigner, - AttachedWindow> -{ - @Override - public Collection> - assignWindows(StreamingWindowedElement element, - long timestamp, - WindowAssignerContext context) { - return Collections.singleton(new AttachedWindow<>(element)); - } - - @Override - public Trigger, AttachedWindow> - getDefaultTrigger(StreamExecutionEnvironment env) { - return new AttachedWindowTrigger<>(); - } - - @SuppressWarnings("unchecked") - @Override - public TypeSerializer> getWindowSerializer(ExecutionConfig executionConfig) { - return new KryoSerializer<>((Class) AttachedWindow.class, executionConfig); - } - - @Override - public boolean isEventTime() { - return true; - } -} \ No newline at end of file diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AttachedWindowTrigger.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AttachedWindowTrigger.java deleted file mode 100644 index 368e506f..00000000 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AttachedWindowTrigger.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Copyright 2016 Seznam.cz, a.s. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package cz.seznam.euphoria.flink.streaming.windowing; - -import cz.seznam.euphoria.core.client.dataset.windowing.Window; -import org.apache.flink.streaming.api.windowing.triggers.Trigger; -import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; - -public class AttachedWindowTrigger - extends Trigger> -{ - @Override - public TriggerResult onElement(T element, - long timestamp, - AttachedWindow window, - TriggerContext ctx) - throws Exception - { - ctx.registerEventTimeTimer(window.getEmissionWatermark()); - return TriggerResult.CONTINUE; - } - - @Override - public TriggerResult onProcessingTime(long time, AttachedWindow window, TriggerContext ctx) - throws Exception - { - throw new UnsupportedOperationException("processing time not supported!"); - } - - @Override - public TriggerResult onEventTime(long time, AttachedWindow window, TriggerContext ctx) - throws Exception - { - if (window.getEmissionWatermark() == time) { - return TriggerResult.FIRE_AND_PURGE; - } else { - // attached windows are registered _only_ for the maxTimestamp() - throw new IllegalStateException("Invalid timer for attached window"); - } - } - - @Override - public void clear(AttachedWindow window, TriggerContext ctx) - throws Exception - { - // ~ attached-windows are purged only when their trigger fires in which case - // the trigger itself get's clear; however, attached windows have - // maxTimestamp == Long.MAX_VALUE and we need to clean-up the registered - // clean-up trigger to avoid mem-leak in long running streams - ctx.deleteEventTimeTimer(window.maxTimestamp()); - } -} diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AttachedWindowing.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AttachedWindowing.java new file mode 100644 index 00000000..a3519669 --- /dev/null +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AttachedWindowing.java @@ -0,0 +1,68 @@ +/** + * Copyright 2016 Seznam.cz, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.flink.streaming.windowing; + +import cz.seznam.euphoria.core.client.dataset.windowing.Batch; +import cz.seznam.euphoria.core.client.dataset.windowing.Window; +import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement; +import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; +import cz.seznam.euphoria.core.client.triggers.Trigger; +import cz.seznam.euphoria.core.client.triggers.TriggerContext; + +import java.util.Collections; +import java.util.Set; + +public class AttachedWindowing implements Windowing { + + @Override + @SuppressWarnings("unchecked") + public Set assignWindowsToElement(WindowedElement el) { + return Collections.singleton((WID) el.getWindow()); + } + + @Override + public Trigger getTrigger() { + return new AttachedWindowTrigger<>(); + } + + private static class AttachedWindowTrigger implements Trigger { + + @Override + public TriggerResult onElement(long time, WID window, TriggerContext ctx) { + // FIXME batch window shouldn't be used in stream flow in the future + // issue #38 on GitHub + if (window instanceof Batch.BatchWindow) return TriggerResult.NOOP; + + ctx.registerTimer(time, window); + return TriggerResult.NOOP; + } + + @Override + public TriggerResult onTimer(long time, WID window, TriggerContext ctx) { + return TriggerResult.FLUSH_AND_PURGE; + } + + @Override + public void onClear(WID window, TriggerContext ctx) { + + } + + @Override + public TriggerResult onMerge(WID window, TriggerContext.TriggerMergeContext ctx) { + throw new UnsupportedOperationException("Merging of attached windows not allowed"); + } + } +} diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/FlinkMergingWindowAssigner.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/FlinkMergingWindowAssigner.java deleted file mode 100644 index 88af2f30..00000000 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/FlinkMergingWindowAssigner.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Copyright 2016 Seznam.cz, a.s. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package cz.seznam.euphoria.flink.streaming.windowing; - -import cz.seznam.euphoria.core.client.dataset.windowing.MergingWindowing; -import cz.seznam.euphoria.core.client.dataset.windowing.Window; -import cz.seznam.euphoria.core.client.util.Pair; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; -import org.apache.flink.streaming.api.windowing.triggers.Trigger; - -import java.util.Collection; - -import static java.util.stream.Collectors.toList; -import static java.util.stream.Collectors.toSet; - -public class FlinkMergingWindowAssigner> - extends MergingWindowAssigner, FlinkWindow> -{ - private final FlinkWindowAssigner wrap; - - public FlinkMergingWindowAssigner(MergingWindowing windowing) { - this.wrap = new FlinkWindowAssigner<>(windowing); - } - - @Override - public void mergeWindows(Collection> windows, - MergeCallback> callback) { - @SuppressWarnings("unchecked") - Collection, WID>> ms = - ((MergingWindowing) this.wrap.getWindowing()).mergeWindows( - windows.stream().map(FlinkWindow::getWindowID).collect(toSet())); - for (Pair, WID> m : ms) { - callback.merge( - m.getFirst().stream().map(FlinkWindow::new).collect(toList()), - new FlinkWindow<>(m.getSecond())); - } - } - - @Override - public Collection> assignWindows( - MultiWindowedElement element, - long timestamp, - WindowAssignerContext context) { - - return this.wrap.assignWindows(element, timestamp, context); - } - - @Override - public Trigger, FlinkWindow> getDefaultTrigger( - StreamExecutionEnvironment env) { - return this.wrap.getDefaultTrigger(env); - } - - @Override - public TypeSerializer> getWindowSerializer(ExecutionConfig executionConfig) { - return this.wrap.getWindowSerializer(executionConfig); - } - - @Override - public boolean isEventTime() { - return this.wrap.isEventTime(); - } -} diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/FlinkWindow.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/FlinkWindow.java deleted file mode 100644 index 5fde4136..00000000 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/FlinkWindow.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Copyright 2016 Seznam.cz, a.s. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package cz.seznam.euphoria.flink.streaming.windowing; - - -import cz.seznam.euphoria.core.client.dataset.windowing.TimedWindow; -import org.apache.flink.streaming.api.windowing.windows.Window; - -/** - * A presentation of {@link cz.seznam.euphoria.core.client.dataset.windowing.Window} - * to Flink. - */ -public class FlinkWindow - extends Window implements WindowProperties -{ - - private final WID wid; - - private transient long emissionWatermark = Long.MIN_VALUE; - private transient long maxTimestamp = Long.MIN_VALUE; - - public FlinkWindow(WID wid) { - this.wid = wid; - } - - @Override - public long getEmissionWatermark() { - return emissionWatermark; - } - - public void setEmissionWatermark(long emissionWatermark) { - this.emissionWatermark = emissionWatermark; - } - - @Override - public long maxTimestamp() { - // see #overrideMaxTimestamp(long) - if (maxTimestamp != Long.MIN_VALUE) { - long mx = maxTimestamp; - this.maxTimestamp = Long.MIN_VALUE; - return mx; - } - - if (this.wid instanceof TimedWindow) { - return ((TimedWindow) this.wid).maxTimestamp(); - } - return Long.MAX_VALUE; - } - - // emh ... a temporary hack to override the value served by - // maxTimestamp(); the value specified here will be served - // the next time - and only the next time - maxTimestamp() is - // called; this allows transferring the aligned time the window - // was fired to the emitted elements: - // see http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WindowOperator-element-s-timestamp-td10038.html - void overrideMaxTimestamp(long maxTimestamp) { - this.maxTimestamp = maxTimestamp; - } - - @Override - public WID getWindowID() { - return wid; - } - - @Override - public boolean equals(Object obj) { - if (obj == this) return true; - if (!(obj instanceof FlinkWindow)) return false; - @SuppressWarnings("unchecked") - WID thatTindowID = ((FlinkWindow) obj).getWindowID(); - return thatTindowID.equals(this.getWindowID()); - } - - @Override - public int hashCode() { - return wid.hashCode(); - } - - @Override - public String toString() { - return "FlinkWindow{" + - "wid=" + wid + - ", emissionWatermark=" + emissionWatermark + - '}'; - } -} diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/FlinkWindowAssigner.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/FlinkWindowAssigner.java deleted file mode 100644 index 662193c5..00000000 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/FlinkWindowAssigner.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Copyright 2016 Seznam.cz, a.s. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package cz.seznam.euphoria.flink.streaming.windowing; - -import cz.seznam.euphoria.core.client.dataset.windowing.Window; -import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; -import org.apache.flink.streaming.api.windowing.triggers.Trigger; - -import java.util.Collection; -import java.util.stream.Collectors; - -public class FlinkWindowAssigner - extends WindowAssigner, FlinkWindow> { - - private final Windowing windowing; - - public FlinkWindowAssigner(Windowing windowing) { - this.windowing = windowing; - } - - Windowing getWindowing() { - return this.windowing; - } - - @Override - public Collection> assignWindows( - MultiWindowedElement element, - long timestamp, WindowAssignerContext context) { - - // map collection of Euphoria WIDs to FlinkWindows - return element.windows().stream().map( - FlinkWindow::new).collect(Collectors.toList()); - } - - @Override - @SuppressWarnings("unchecked") - public Trigger, FlinkWindow> getDefaultTrigger( - StreamExecutionEnvironment env) { - return new FlinkWindowTrigger(windowing.getTrigger()); - } - - @Override - @SuppressWarnings("unchecked") - public TypeSerializer> getWindowSerializer(ExecutionConfig executionConfig) { - return new KryoSerializer<>((Class) FlinkWindow.class, executionConfig); - } - - @Override - public boolean isEventTime() { - return true; - } -} diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/FlinkWindowTrigger.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/FlinkWindowTrigger.java deleted file mode 100644 index 2f25798f..00000000 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/FlinkWindowTrigger.java +++ /dev/null @@ -1,130 +0,0 @@ -/** - * Copyright 2016 Seznam.cz, a.s. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package cz.seznam.euphoria.flink.streaming.windowing; - -import cz.seznam.euphoria.core.client.dataset.windowing.Window; -import org.apache.flink.streaming.api.windowing.triggers.Trigger; -import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; - - -public class FlinkWindowTrigger extends Trigger> { - - private final cz.seznam.euphoria.core.client.triggers.Trigger euphoriaTrigger; - - public FlinkWindowTrigger( - cz.seznam.euphoria.core.client.triggers.Trigger trigger) { - this.euphoriaTrigger = trigger; - } - - @Override - public TriggerResult onElement( - T element, long timestamp, FlinkWindow window, TriggerContext ctx) - throws Exception { - return trackEmissionWatermark( - window, onElementImpl(element, timestamp, window, ctx), - ctx.getCurrentWatermark()); - } - - @Override - public TriggerResult onProcessingTime(long time, - FlinkWindow window, - TriggerContext ctx) throws Exception { - return trackEmissionWatermark(window, onTimeEvent(time, window, ctx), time); - } - - @Override - public TriggerResult onEventTime(long time, - FlinkWindow window, - TriggerContext ctx) throws Exception { - - return trackEmissionWatermark(window, onTimeEvent(time, window, ctx), time); - } - - private TriggerResult onElementImpl( - T element, long timestamp, FlinkWindow window, TriggerContext ctx) - throws Exception { - - // pass onElement event to the original euphoria trigger - return translateResult( - euphoriaTrigger.onElement( - timestamp, window.getWindowID(), new TriggerContextWrapper(ctx))); - } - - private TriggerResult onTimeEvent(long time, - FlinkWindow window, - TriggerContext ctx) throws Exception { - - if (time >= window.maxTimestamp()) { - // fire all windows at the final watermark - return TriggerResult.FIRE_AND_PURGE; - } - - // pass onTimer to the original euphoria trigger - return translateResult( - euphoriaTrigger.onTimer( - time, window.getWindowID(), new TriggerContextWrapper(ctx))); - } - - private TriggerResult translateResult( - cz.seznam.euphoria.core.client.triggers.Trigger.TriggerResult euphoriaResult) { - - switch (euphoriaResult) { - case FLUSH: - return TriggerResult.FIRE; - case FLUSH_AND_PURGE: - return TriggerResult.FIRE_AND_PURGE; - case NOOP: - return TriggerResult.CONTINUE; - case PURGE: - return TriggerResult.PURGE; - default: - throw new IllegalStateException("Unknown result:" + euphoriaResult.name()); - } - } - - private TriggerResult trackEmissionWatermark( - FlinkWindow window, TriggerResult r, long watermark) - { - if (r.isFire()) { - window.setEmissionWatermark(watermark); - window.overrideMaxTimestamp(watermark); - } - return r; - } - - @Override - public void clear(FlinkWindow window, TriggerContext ctx) throws Exception { - euphoriaTrigger.onClear(window.getWindowID(), new TriggerContextWrapper(ctx)); - - // ~ our flink-window-ids windows have maxTimestamp == Long.MAX_VALUE; need to - // clean-up the registered clean-up trigger to avoid mem-leak in long running - // streams - - ctx.deleteEventTimeTimer(window.maxTimestamp()); - } - - @Override - public boolean canMerge() { - return true; - } - - @Override - public TriggerResult onMerge(FlinkWindow window, OnMergeContext ctx) - throws Exception { - return translateResult(euphoriaTrigger.onMerge( - window.getWindowID(), new TriggerMergeContextWrapper(ctx))); - } -} diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/KeyedMultiWindowedElement.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/KeyedMultiWindowedElement.java new file mode 100644 index 00000000..c5ddde98 --- /dev/null +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/KeyedMultiWindowedElement.java @@ -0,0 +1,84 @@ +/** + * Copyright 2016 Seznam.cz, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.flink.streaming.windowing; + +import cz.seznam.euphoria.core.client.dataset.windowing.Window; +import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement; + +import java.util.Set; + +/** + * Single data element associated with multiple windows. It's rather used + * for performance optimization as the same thing could be expressed + * as a set of {@link WindowedElement} instances. + * + * @param Type of used window + * @param Type of element's key + * @param Type of element's data + */ +public class KeyedMultiWindowedElement { + + private KEY key; + private VALUE value; + private long timestamp; + private Set windows; + + public KeyedMultiWindowedElement() { + } + + public KeyedMultiWindowedElement(KEY key, + VALUE value, + long timestamp, + Set windows) + { + this.key = key; + this.value = value; + this.timestamp = timestamp; + this.windows = windows; + } + + public KEY getKey() { + return key; + } + + public void setKey(KEY key) { + this.key = key; + } + + public VALUE getValue() { + return value; + } + + public void setValue(VALUE value) { + this.value = value; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public Set getWindows() { + return windows; + } + + public void setWindows(Set windows) { + this.windows = windows; + } +} diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/MergingWindowSet.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/MergingWindowSet.java new file mode 100644 index 00000000..09b35593 --- /dev/null +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/MergingWindowSet.java @@ -0,0 +1,182 @@ +/** + * Copyright 2016 Seznam.cz, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.flink.streaming.windowing; + +import cz.seznam.euphoria.core.client.dataset.windowing.MergingWindowing; +import cz.seznam.euphoria.core.client.dataset.windowing.Window; +import cz.seznam.euphoria.core.client.util.Pair; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.java.tuple.Tuple2; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Utility class to keep track of currently existing {@link Window} instances + * when using {@link MergingWindowing}. + * + * @param type of the window + */ +class MergingWindowSet { + + private final MergingWindowing windowing; + + /** + * Mapping from window to the window that keeps the window state. When + * we are incrementally merging windows starting from some window we keep that starting + * window as the state window to prevent costly state juggling. + * Inspired by the idea from Flink, + * see {@link org.apache.flink.api.common.state.MergingState} + */ + private final Map mapping; + + /** + * Snapshot of initial state during WindowSet creation. Used to decide + * if there are changes that needs to be persisted. + */ + private final Map initialMapping; + + private final ListState> state; + + public MergingWindowSet(MergingWindowing windowing, + ListState> state) throws Exception { + this.windowing = windowing; + this.state = state; + + mapping = new HashMap<>(); + + // load windows from persistent state storage + Iterable> it = state.get(); + if (it != null) { + for (Tuple2 window : it) { + mapping.put(window.f0, window.f1); + } + } + + initialMapping = new HashMap<>(); + initialMapping.putAll(mapping); + } + + /** + * Persist current internal state of the {@link MergingWindowSet} to the + * Flink state storage. + */ + public void persist() throws Exception { + if (!mapping.equals(initialMapping)) { + state.clear(); + for (Map.Entry e : mapping.entrySet()) { + state.add(new Tuple2<>(e.getKey(), e.getValue())); + } + } + } + + public void removeWindow(W window) throws Exception { + if (mapping.remove(window) == null) { + throw new IllegalStateException("Non-existing window " + window); + } + } + + public W getStateWindow(W window) { + return mapping.get(window); + } + + /** + * Add a new {@link Window} to the window set. It may trigger merging. + * + * @param newWindow The window that added window ended up in. Can be result + * of merge or the original itself. + * @return Window that added window ended up in after merge. + */ + public W addWindow(W newWindow, MergeCallback callback) throws Exception { + Set windows = new HashSet<>(this.mapping.keySet()); + windows.add(newWindow); + + @SuppressWarnings("unchecked") + Collection, W>> mergeResults = windowing.mergeWindows(windows); + + W resultWindow = newWindow; + boolean mergedNewWindow = false; + + for (Pair, W> merges : mergeResults) { + W mergeResult = merges.getSecond(); + Collection mergedWindows = merges.getFirst(); + + // If our new window is in the merged windows make the merge result the + // result window + if (mergedWindows.remove(newWindow)) { + mergedNewWindow = true; + resultWindow = mergeResult; + } + + // Remove mergeResult result from mergedWindows if present. + // This covers the case when windows (1, 2) are merged to (1) + mergedWindows.remove(mergeResult); + + // If no window is actually merged we can skip merging phase. + if (mergedWindows.isEmpty()) continue; + + // Pick any of the merged windows and choose that window's state window + // as the state window for the merge result + W mergedStateWindow = this.mapping.get(mergedWindows.iterator().next()); + + // Compute the state windows that we are merging + List mergedStateWindows = new ArrayList<>(); + for (W mergedWindow: mergedWindows) { + W res = this.mapping.remove(mergedWindow); + if (res != null) { + mergedStateWindows.add(res); + } + } + + this.mapping.put(mergeResult, mergedStateWindow); + + // Don't put the target state window into the merged windows + mergedStateWindows.remove(mergedStateWindow); + + callback.merge(mergeResult, + mergedWindows, + mapping.get(mergeResult), + mergedStateWindows); + } + + if (mergeResults.isEmpty() || (resultWindow.equals(newWindow) && !mergedNewWindow)) { + mapping.put(resultWindow, resultWindow); + } + + return resultWindow; + } + + public interface MergeCallback { + + /** + * This gets called when a merge occurs. + * + * @param mergeResult Result window of merging. + * @param mergedWindows ist of windows that have been removed by merge operation. + * @param stateResultWindow Window actually holding the current result window state. + * @param mergedStateWindows List of state windows that have been merged/removed. + */ + void merge(W mergeResult, + Iterable mergedWindows, + W stateResultWindow, + Iterable mergedStateWindows) throws Exception; + } +} diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/MultiWindowedElement.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/MultiWindowedElement.java deleted file mode 100644 index fb53f460..00000000 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/MultiWindowedElement.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Copyright 2016 Seznam.cz, a.s. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package cz.seznam.euphoria.flink.streaming.windowing; - -import cz.seznam.euphoria.core.client.dataset.windowing.Window; -import cz.seznam.euphoria.flink.streaming.ElementProvider; - -import java.util.Set; - -public final class MultiWindowedElement implements ElementProvider { - private final Set windows; - private final T element; - - public MultiWindowedElement(Set windows, T element) { - this.windows = windows; - this.element = element; - } - - @Override - public T getElement() { - return element; - } - - public Set windows() { - return windows; - } - - @Override - public String toString() { - return "MultiWindowedElement(" - + windows + ", " + element + ")"; - } - - -} diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/MultiWindowedElementWindowFunction.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/MultiWindowedElementWindowFunction.java deleted file mode 100644 index 12b401e2..00000000 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/MultiWindowedElementWindowFunction.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Copyright 2016 Seznam.cz, a.s. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package cz.seznam.euphoria.flink.streaming.windowing; - -import cz.seznam.euphoria.core.client.dataset.windowing.Window; -import cz.seznam.euphoria.core.client.util.Pair; -import cz.seznam.euphoria.flink.streaming.StreamingWindowedElement; -import org.apache.flink.streaming.api.functions.windowing.WindowFunction; -import org.apache.flink.util.Collector; - -/** - * Windowing function to extract the emission watermark from the window being - * emitted and forward it along the emitted element(s). Further ensures that - * the emitted window-id stored on the elements corresponds correctly to the - * emitted window. - */ -public class MultiWindowedElementWindowFunction - implements WindowFunction< - MultiWindowedElement>, - StreamingWindowedElement>, - KEY, - FlinkWindow> { - - @Override - public void apply( - KEY key, - FlinkWindow window, - Iterable>> input, - Collector>> out) { - for (MultiWindowedElement> i : input) { - WID wid = window.getWindowID(); - out.collect( - new StreamingWindowedElement<>( - wid, - window.getEmissionWatermark(), - Pair.of(i.getElement().getFirst(), i.getElement().getSecond()))); - } - } -} \ No newline at end of file diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/StreamWindower.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/StreamWindower.java deleted file mode 100644 index 760eb979..00000000 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/StreamWindower.java +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Copyright 2016 Seznam.cz, a.s. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package cz.seznam.euphoria.flink.streaming.windowing; - -import cz.seznam.euphoria.core.client.dataset.windowing.MergingWindowing; -import cz.seznam.euphoria.core.client.dataset.windowing.Window; -import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; -import cz.seznam.euphoria.core.client.functional.UnaryFunction; -import cz.seznam.euphoria.core.client.util.Pair; -import cz.seznam.euphoria.flink.Utils; -import cz.seznam.euphoria.flink.streaming.StreamingWindowedElement; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.KeyedStream; -import org.apache.flink.streaming.api.datastream.WindowedStream; -import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; -import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; - -import javax.annotation.Nullable; -import java.time.Duration; -import java.util.Objects; - -/** - * Class creating {@code WindowedStream} from {@code DataStream}s. - */ -public class StreamWindower { - - private final Duration allowedLateness; - - public StreamWindower(Duration allowedLateness) { - this.allowedLateness = Objects.requireNonNull(allowedLateness); - } - - @SuppressWarnings("unchecked") - public - WindowedStream>, - KEY, - AttachedWindow> - attachedWindow(DataStream> input, - UnaryFunction keyFn, - UnaryFunction valFn) - { - DataStream>> mapped - = input.map(i -> { - T elem = i.getElement(); - KEY key = keyFn.apply(elem); - VALUE val = valFn.apply(elem); - WID wid = i.getWindow(); - return new StreamingWindowedElement<>( - wid, - // ~ forward the emission watermark - i.getTimestamp(), - Pair.of(key, val)); - }) - .setParallelism(input.getParallelism()) - .returns((Class) StreamingWindowedElement.class); - final KeyedStream>, KEY> keyed; - keyed = mapped.keyBy(Utils.wrapQueryable(new WeKeySelector<>())); - return keyed.window(new AttachedWindowAssigner<>()); - } - - @SuppressWarnings("unchecked") - public - WindowedStream>, KEY, FlinkWindow> - window(DataStream> input, - UnaryFunction keyFn, - UnaryFunction valFn, - Windowing windowing, - @Nullable UnaryFunction eventTimeAssigner) { - - if (eventTimeAssigner != null) { - input = input.assignTimestampsAndWatermarks( - new EventTimeAssigner<>(allowedLateness, (UnaryFunction) eventTimeAssigner)); - } - - DataStream>> - elementsWithWindow = - input.map(i -> { - if (eventTimeAssigner != null) { - i.setTimestamp(eventTimeAssigner.apply(i.getElement())); - } - - return new MultiWindowedElement<>( - windowing.assignWindowsToElement(i), - Pair.of(keyFn.apply(i.getElement()), valFn.apply(i.getElement()))); - }) - .setParallelism(input.getParallelism()) - .returns((Class) MultiWindowedElement.class); - - KeyedStream>, KEY> keyed - = elementsWithWindow.keyBy( - Utils.wrapQueryable((MultiWindowedElement> in) - -> in.getElement().getFirst())); - - WindowAssigner wassign; - if (windowing instanceof MergingWindowing) { - wassign = new FlinkMergingWindowAssigner((MergingWindowing) windowing); - } else { - wassign = new FlinkWindowAssigner<>(windowing); - } - return keyed.window(wassign); - } - - private static org.apache.flink.streaming.api.windowing.time.Time - millisTime(long millis) { - return org.apache.flink.streaming.api.windowing.time.Time.milliseconds(millis); - } - - static final class WeKeySelector implements - KeySelector>, KEY> - { - @Override - public KEY getKey(StreamingWindowedElement> value) - throws Exception - { - return value.getElement().getKey(); - } - } - - static class EventTimeAssigner - extends BoundedOutOfOrdernessTimestampExtractor> - { - private final UnaryFunction eventTimeFn; - - EventTimeAssigner(Duration allowedLateness, UnaryFunction eventTimeFn) { - super(millisTime(allowedLateness.toMillis())); - this.eventTimeFn = Objects.requireNonNull(eventTimeFn); - } - - @Override - public long extractTimestamp(StreamingWindowedElement element) { - return eventTimeFn.apply(element.getElement()); - } - } -} diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/TriggerContextWrapper.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/TriggerContextWrapper.java deleted file mode 100644 index 8443d7f9..00000000 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/TriggerContextWrapper.java +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Copyright 2016 Seznam.cz, a.s. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package cz.seznam.euphoria.flink.streaming.windowing; - -import cz.seznam.euphoria.core.client.dataset.windowing.Window; -import cz.seznam.euphoria.core.client.operator.state.ListStorage; -import cz.seznam.euphoria.core.client.operator.state.ListStorageDescriptor; -import cz.seznam.euphoria.core.client.operator.state.ValueStorage; -import cz.seznam.euphoria.core.client.operator.state.ValueStorageDescriptor; -import cz.seznam.euphoria.core.client.triggers.TriggerContext; -import cz.seznam.euphoria.flink.storage.Descriptors; -import cz.seznam.euphoria.flink.storage.FlinkListStorage; -import cz.seznam.euphoria.flink.storage.FlinkReducingValueStorage; -import cz.seznam.euphoria.flink.storage.FlinkValueStorage; -import org.apache.flink.api.common.state.ReducingState; -import org.apache.flink.api.common.state.ReducingStateDescriptor; -import org.apache.flink.streaming.api.windowing.triggers.Trigger; - -/** - * Adapts Flink {@link Trigger.TriggerContext} to be used as trigger context in - * Euphoria API - */ -public class TriggerContextWrapper implements TriggerContext { - - private final Trigger.TriggerContext flinkContext; - - public TriggerContextWrapper(Trigger.TriggerContext flinkContext) { - this.flinkContext = flinkContext; - } - - Trigger.TriggerContext getFlinkContext() { - return flinkContext; - } - - @Override - public boolean registerTimer(long stamp, Window window) { - if (stamp <= getCurrentTimestamp()) { - return false; - } - - flinkContext.registerEventTimeTimer(stamp); - return true; - } - - @Override - public void deleteTimer(long stamp, Window window) { - flinkContext.deleteEventTimeTimer(stamp); - } - - @Override - public long getCurrentTimestamp() { - return flinkContext.getCurrentWatermark(); - } - - @Override - public ValueStorage getValueStorage(ValueStorageDescriptor descriptor) { - if (descriptor instanceof ValueStorageDescriptor.MergingValueStorageDescriptor) { - @SuppressWarnings("unchecked") - ReducingStateDescriptor from = Descriptors.from( - (ValueStorageDescriptor.MergingValueStorageDescriptor) descriptor); - ReducingState state = getFlinkContext().getPartitionedState(from); - return new FlinkReducingValueStorage<>(state, descriptor.getDefaultValue()); - } - return new FlinkValueStorage<>( - flinkContext.getPartitionedState(Descriptors.from(descriptor))); - } - - @Override - public ListStorage getListStorage(ListStorageDescriptor descriptor) { - return new FlinkListStorage<>( - flinkContext.getPartitionedState(Descriptors.from(descriptor))); - } -} diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/TriggerMergeContextWrapper.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/TriggerMergeContextWrapper.java deleted file mode 100644 index 21e99add..00000000 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/TriggerMergeContextWrapper.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Copyright 2016 Seznam.cz, a.s. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package cz.seznam.euphoria.flink.streaming.windowing; - -import cz.seznam.euphoria.core.client.operator.state.MergingStorageDescriptor; -import cz.seznam.euphoria.core.client.operator.state.StorageDescriptor; -import cz.seznam.euphoria.core.client.operator.state.ValueStorageDescriptor; -import cz.seznam.euphoria.core.client.triggers.TriggerContext; -import cz.seznam.euphoria.flink.storage.Descriptors; -import org.apache.flink.streaming.api.windowing.triggers.Trigger; - -public class TriggerMergeContextWrapper - extends TriggerContextWrapper - implements TriggerContext.TriggerMergeContext { - - public TriggerMergeContextWrapper(Trigger.OnMergeContext flinkContext) { - super(flinkContext); - } - - @SuppressWarnings("unchecked") - @Override - public void mergeStoredState(StorageDescriptor descriptor) { - if (!(descriptor instanceof MergingStorageDescriptor)) { - throw new IllegalStateException( - "Storage descriptor '" + descriptor.getName() + "' must support merging!"); - } - - if (descriptor instanceof ValueStorageDescriptor.MergingValueStorageDescriptor) { - ((Trigger.OnMergeContext) getFlinkContext()) - .mergePartitionedState( - Descriptors.from((ValueStorageDescriptor.MergingValueStorageDescriptor) descriptor)); - return; - } - - throw new UnsupportedOperationException(descriptor + " is not supported for merging yet!"); - } -} \ No newline at end of file diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/WindowOperator.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/WindowOperator.java new file mode 100644 index 00000000..d2ab5ec2 --- /dev/null +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/WindowOperator.java @@ -0,0 +1,468 @@ +/** + * Copyright 2016 Seznam.cz, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.flink.streaming.windowing; + +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Lists; +import com.google.common.collect.Table; +import cz.seznam.euphoria.core.client.dataset.windowing.MergingWindowing; +import cz.seznam.euphoria.core.client.dataset.windowing.TimedWindow; +import cz.seznam.euphoria.core.client.dataset.windowing.Window; +import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement; +import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; +import cz.seznam.euphoria.core.client.functional.CombinableReduceFunction; +import cz.seznam.euphoria.core.client.functional.StateFactory; +import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.operator.state.ListStorage; +import cz.seznam.euphoria.core.client.operator.state.ListStorageDescriptor; +import cz.seznam.euphoria.core.client.operator.state.MergingStorageDescriptor; +import cz.seznam.euphoria.core.client.operator.state.State; +import cz.seznam.euphoria.core.client.operator.state.StorageDescriptor; +import cz.seznam.euphoria.core.client.operator.state.ValueStorage; +import cz.seznam.euphoria.core.client.operator.state.ValueStorageDescriptor; +import cz.seznam.euphoria.core.client.triggers.Trigger; +import cz.seznam.euphoria.core.client.triggers.TriggerContext; +import cz.seznam.euphoria.core.client.util.Pair; +import cz.seznam.euphoria.flink.storage.Descriptors; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.HeapInternalTimerService; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; + +public class WindowOperator + extends AbstractStreamOperator>> + implements OneInputStreamOperator, WindowedElement>>, + Triggerable { + + private final Windowing windowing; + private final Trigger trigger; + private final StateFactory stateFactory; + private final CombinableReduceFunction stateCombiner; + + + // FIXME Arguable hack that ensures all remaining opened windows + // are flushed to output when end of stream is reached. This is intended + // behavior in test environment only. In cluster environment reaching + // end of stream without killing the job is not likely. + // Tracking of opened windows have a huge performance impact. + /** True when executor is running in local test (mode) */ + private final boolean localMode; + + private transient InternalTimerService timerService; + + // tracks existing windows to flush them in case of end of stream is reached + private transient InternalTimerService endOfStreamTimerService; + + private transient TriggerContextAdapter triggerContext; + private transient OutputContext outputContext; + private transient WindowedStorageProvider storageProvider; + + private transient ListStateDescriptor> mergingWindowsDescriptor; + + private transient TypeSerializer windowSerializer; + + // cached window states by key+window + private transient Table windowStates; + + public WindowOperator(Windowing windowing, + StateFactory stateFactory, + CombinableReduceFunction stateCombiner, + boolean localMode) { + this.windowing = Objects.requireNonNull(windowing); + this.trigger = windowing.getTrigger(); + this.stateFactory = Objects.requireNonNull(stateFactory); + this.stateCombiner = Objects.requireNonNull(stateCombiner); + this.localMode = localMode; + } + + @Override + @SuppressWarnings("unchecked") + public void open() throws Exception { + super.open(); + + this.windowSerializer = + (TypeSerializer) TypeExtractor.createTypeInfo(Window.class) + .createSerializer(getRuntimeContext().getExecutionConfig()); + + this.timerService = + getInternalTimerService("window-timers", windowSerializer, this); + this.endOfStreamTimerService = + getInternalTimerService("end-of-stream-timers", windowSerializer, this); + this.triggerContext = new TriggerContextAdapter(); + this.outputContext = new OutputContext(); + this.storageProvider = new WindowedStorageProvider<>( + getKeyedStateBackend(), windowSerializer); + + this.windowStates = HashBasedTable.create(); + + if (windowing instanceof MergingWindowing) { + TupleSerializer> tupleSerializer = + new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[]{windowSerializer, windowSerializer}); + this.mergingWindowsDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer); + } + } + + private void setupEnvironment(Object key, WID window) { + outputContext.setKey(key); + outputContext.setWindow(window); + storageProvider.setWindow(window); + } + + @Override + @SuppressWarnings("unchecked") + public void processElement(StreamRecord> record) + throws Exception { + + KeyedMultiWindowedElement element = record.getValue(); + + // drop late-comers immediately + if (element.getTimestamp() < timerService.currentWatermark()) { + return; + } + + if (windowing instanceof MergingWindowing) { + MergingWindowSet mergingWindowSet = getMergingWindowSet(); + + for (WID window : element.getWindows()) { + + // when window is added to the set it may result in window merging + WID currentWindow = mergingWindowSet.addWindow(window, + (mergeResult, mergedWindows, stateResultWindow, mergedStateWindows) -> { + + setupEnvironment(getCurrentKey(), mergeResult); + triggerContext.setWindow(mergeResult); + + processTriggerResult(mergeResult, + triggerContext.onMerge(mergedWindows), + mergingWindowSet); + + // clear all merged windows + for (WID merged : mergedWindows) { + storageProvider.setWindow(merged); + trigger.onClear(merged, triggerContext); + removeWindow(merged, null); + } + + // FIXME This implementation relies on fact that + // stateCombiner function is actually "fold left". + // That means the result state will end up in the + // first state given to the combiner function. + + // merge all mergedStateWindows into stateResultWindow + List states = new ArrayList<>(); + states.add(getWindowState(stateResultWindow)); + mergedStateWindows.forEach(sw -> states.add(getWindowState(sw))); + stateCombiner.apply(states); + + // remove merged window states + mergedStateWindows.forEach(sw -> { + getWindowState(sw).close(); + removeState(sw); + }); + }); + + setupEnvironment(getCurrentKey(), currentWindow); + + if (localMode) { + endOfStreamTimerService.registerEventTimeTimer(currentWindow, Long.MAX_VALUE); + } + + // process trigger + Trigger.TriggerResult triggerResult = trigger.onElement( + element.getTimestamp(), + currentWindow, + triggerContext); + + WID stateWindow = mergingWindowSet.getStateWindow(currentWindow); + setupEnvironment(getCurrentKey(), stateWindow); + getWindowState(stateWindow).add(element.getValue()); + + processTriggerResult(currentWindow, triggerResult, mergingWindowSet); + } + mergingWindowSet.persist(); + + } else { + for (WID window : element.getWindows()) { + setupEnvironment(getCurrentKey(), window); + + if (localMode) { + // register cleanup timer for each window + endOfStreamTimerService.registerEventTimeTimer(window, Long.MAX_VALUE); + } + + getWindowState(window).add(element.getValue()); + + // process trigger + Trigger.TriggerResult triggerResult = trigger.onElement( + element.getTimestamp(), + window, + triggerContext); + + processTriggerResult(window, triggerResult, null); + } + } + } + + @Override + public void onEventTime(InternalTimer timer) throws Exception { + WID window = timer.getNamespace(); + Object key = timer.getKey(); + + setupEnvironment(key, window); + + Trigger.TriggerResult triggerResult; + if (timer.getTimestamp() == Long.MAX_VALUE) { + // at the end of time flush window immediately + triggerResult = Trigger.TriggerResult.FLUSH_AND_PURGE; + } else { + triggerResult = trigger.onTimer( + timer.getTimestamp(), + window, + triggerContext); + } + + MergingWindowSet mergingWindowSet = null; + if (windowing instanceof MergingWindowing) { + mergingWindowSet = getMergingWindowSet(); + } + + processTriggerResult(window, triggerResult, mergingWindowSet); + + if (mergingWindowSet != null) { + mergingWindowSet.persist(); + } + } + + @Override + public void onProcessingTime(InternalTimer timer) throws Exception { + throw new UnsupportedOperationException("We are not using processing time at all"); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + // FIXME relies heavily on Flink internal implementation + // which may change in the future and break this + + // we need to advance watermark of timer services in given order + // timerService first to fire all scheduled triggers + ((HeapInternalTimerService) timerService).advanceWatermark(mark.getTimestamp()); + if (localMode) { + ((HeapInternalTimerService) endOfStreamTimerService).advanceWatermark(mark.getTimestamp()); + } + + output.emitWatermark(mark); + } + + private void processTriggerResult(WID window, + Trigger.TriggerResult tr, + @Nullable MergingWindowSet mergingWindowSet) { + + if (tr.isFlush() || tr.isPurge()) { + WID stateWindow = window; + State windowState; + + if (windowing instanceof MergingWindowing) { + Objects.requireNonNull(mergingWindowSet); + stateWindow = mergingWindowSet.getStateWindow(window); + windowState = getWindowState(stateWindow); + } else { + windowState = getWindowState(window); + } + + if (tr.isFlush() || tr.isPurge()) { + if (tr.isFlush()) { + windowState.flush(); + } + + if (tr.isPurge()) { + windowState.close(); + trigger.onClear(window, triggerContext); + removeWindow(window, mergingWindowSet); + removeState(stateWindow); + } + } + } + } + + @SuppressWarnings("unchecked") + private State getWindowState(WID window) { + State windowState = windowStates.get(getCurrentKey(), window); + if (windowState == null) { + windowState = stateFactory.apply(outputContext, storageProvider); + windowStates.put((KEY) getCurrentKey(), window, windowState); + } + + return windowState; + } + + private MergingWindowSet getMergingWindowSet() { + try { + ListState> mergeState = + getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, + mergingWindowsDescriptor); + + return new MergingWindowSet<>((MergingWindowing) windowing, mergeState); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void removeWindow(WID window, @Nullable MergingWindowSet windowSet) { + try { + if (localMode) { + endOfStreamTimerService.deleteEventTimeTimer(window, Long.MAX_VALUE); + } + if (windowSet != null) { + windowSet.removeWindow(window); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void removeState(WID stateWindow) { + windowStates.remove(getCurrentKey(), stateWindow); + } + + // ------------------- + + private class TriggerContextAdapter implements TriggerContext, TriggerContext.TriggerMergeContext { + + private WID window; + + private Collection mergedWindows; + + @Override + @SuppressWarnings("unchecked") + public boolean registerTimer(long stamp, Window window) { + if (stamp <= getCurrentTimestamp()) { + return false; + } + + timerService.registerEventTimeTimer((WID) window, stamp); + return true; + } + + @Override + @SuppressWarnings("unchecked") + public void deleteTimer(long stamp, Window window) { + timerService.deleteEventTimeTimer((WID) window, stamp); + } + + @Override + public long getCurrentTimestamp() { + return timerService.currentWatermark(); + } + + @Override + @SuppressWarnings("unchecked") + public ValueStorage getValueStorage(ValueStorageDescriptor descriptor) { + return storageProvider.getValueStorage(descriptor); + } + + @Override + @SuppressWarnings("unchecked") + public ListStorage getListStorage(ListStorageDescriptor descriptor) { + return storageProvider.getListStorage(descriptor); + } + + @Override + @SuppressWarnings("unchecked") + public void mergeStoredState(StorageDescriptor descriptor) { + try { + Objects.requireNonNull(mergedWindows); + if (!(descriptor instanceof MergingStorageDescriptor)) { + throw new IllegalStateException( + "Storage descriptor '" + descriptor.getName() + "' must support merging!"); + } + + if (!mergedWindows.isEmpty()) { + if (descriptor instanceof ValueStorageDescriptor.MergingValueStorageDescriptor) { + getKeyedStateBackend().mergePartitionedStates(window, + mergedWindows, + windowSerializer, + Descriptors.from((ValueStorageDescriptor.MergingValueStorageDescriptor) descriptor)); + return; + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + throw new UnsupportedOperationException(descriptor + " is not supported for merging yet!"); + } + + private Trigger.TriggerResult onMerge(Iterable mergedWindows) { + this.mergedWindows = Lists.newArrayList(mergedWindows); + return trigger.onMerge(window, this); + } + + private void setWindow(WID window) { + this.window = window; + } + } + + private class OutputContext implements Context { + + private Object key; + private Window window; + + private final StreamRecord reuse = new StreamRecord<>(null); + + @Override + @SuppressWarnings("unchecked") + public void collect(Object elem) { + long stamp = (window instanceof TimedWindow) + ? ((TimedWindow) window).maxTimestamp() + : timerService.currentWatermark(); + + // FIXME timestamp is duplicated here + output.collect(reuse.replace( + new WindowedElement<>(window, stamp, Pair.of(key, elem)), + stamp)); + } + + @Override + public Object getWindow() { + return window; + } + + public void setWindow(Window window) { + this.window = window; + } + + public void setKey(Object key) { + this.key = key; + } + } +} diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/WindowProperties.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/WindowProperties.java deleted file mode 100644 index cf75c966..00000000 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/WindowProperties.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Copyright 2016 Seznam.cz, a.s. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package cz.seznam.euphoria.flink.streaming.windowing; - -import cz.seznam.euphoria.core.client.dataset.windowing.Window; - -public interface WindowProperties { - - WID getWindowID(); - - long getEmissionWatermark(); - -} diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/WindowedStorageProvider.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/WindowedStorageProvider.java new file mode 100644 index 00000000..a1f61cea --- /dev/null +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/WindowedStorageProvider.java @@ -0,0 +1,91 @@ +/** + * Copyright 2016 Seznam.cz, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.flink.streaming.windowing; + +import cz.seznam.euphoria.core.client.dataset.windowing.Window; +import cz.seznam.euphoria.core.client.operator.state.ListStorage; +import cz.seznam.euphoria.core.client.operator.state.ListStorageDescriptor; +import cz.seznam.euphoria.core.client.operator.state.StorageProvider; +import cz.seznam.euphoria.core.client.operator.state.ValueStorage; +import cz.seznam.euphoria.core.client.operator.state.ValueStorageDescriptor; +import cz.seznam.euphoria.flink.storage.Descriptors; +import cz.seznam.euphoria.flink.storage.FlinkListStorage; +import cz.seznam.euphoria.flink.storage.FlinkReducingValueStorage; +import cz.seznam.euphoria.flink.storage.FlinkValueStorage; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.KeyedStateBackend; + +/** + * Storage provider using flink's state API. All states are namespaced by window. + */ +class WindowedStorageProvider implements StorageProvider { + + private final KeyedStateBackend keyedStateBackend; + private final TypeSerializer windowSerializer; + private Window window; + + + public WindowedStorageProvider(KeyedStateBackend keyedStateBackend, + TypeSerializer windowSerializer) { + this.keyedStateBackend = keyedStateBackend; + this.windowSerializer = windowSerializer; + } + + public void setWindow(Window window) { + this.window = window; + } + + @Override + @SuppressWarnings("unchecked") + public ValueStorage getValueStorage(ValueStorageDescriptor descriptor) { + try { + if (descriptor instanceof ValueStorageDescriptor.MergingValueStorageDescriptor) { + ReducingStateDescriptor flinkDescriptor = Descriptors.from( + (ValueStorageDescriptor.MergingValueStorageDescriptor) descriptor); + + return new FlinkReducingValueStorage<>((ReducingState) + keyedStateBackend.getPartitionedState(window, + windowSerializer, flinkDescriptor), + descriptor.getDefaultValue(), + window); + } else { + return new FlinkValueStorage<>((ValueState) + keyedStateBackend.getPartitionedState(window, + windowSerializer, Descriptors.from(descriptor)), + window); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + @SuppressWarnings("unchecked") + public ListStorage getListStorage(ListStorageDescriptor descriptor) { + try { + return new FlinkListStorage<>((ListState) + keyedStateBackend.getPartitionedState(window, + windowSerializer, Descriptors.from(descriptor)), + window); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/streaming/windowing/KeyedMultiWindowedElementTest.java b/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/streaming/windowing/KeyedMultiWindowedElementTest.java new file mode 100644 index 00000000..d326e956 --- /dev/null +++ b/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/streaming/windowing/KeyedMultiWindowedElementTest.java @@ -0,0 +1,37 @@ +/** + * Copyright 2016 Seznam.cz, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.flink.streaming.windowing; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.runtime.PojoSerializer; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class KeyedMultiWindowedElementTest { + + @Test + public void testSerializer() { + TypeSerializer serializer = + TypeInformation.of(KeyedMultiWindowedElement.class) + .createSerializer(new ExecutionConfig()); + + // must be POJO serializer for performance reasons + assertTrue(serializer instanceof PojoSerializer); + } +} \ No newline at end of file diff --git a/euphoria-inmem/src/main/java/cz/seznam/euphoria/inmem/WindowedElementCollector.java b/euphoria-inmem/src/main/java/cz/seznam/euphoria/inmem/WindowedElementCollector.java index 794c7ee6..d39a764c 100644 --- a/euphoria-inmem/src/main/java/cz/seznam/euphoria/inmem/WindowedElementCollector.java +++ b/euphoria-inmem/src/main/java/cz/seznam/euphoria/inmem/WindowedElementCollector.java @@ -35,16 +35,14 @@ class WindowedElementCollector implements Context { @Override public void collect(T elem) { - long endWindowStamp = (window instanceof TimedWindow) - ? ((TimedWindow) window).maxTimestamp() - : Long.MAX_VALUE; - // ~ timestamp assigned to element can be either end of window // or current watermark supplied by triggering // ~ this is a workaround for NoopTriggerScheduler // used for batch processing that fires all windows // at the end of bounded input - long stamp = Math.min(endWindowStamp, stampSupplier.get()); + long stamp = (window instanceof TimedWindow) + ? ((TimedWindow) window).maxTimestamp() + : stampSupplier.get(); wrap.collect(Datum.of(window, elem, stamp)); } diff --git a/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/ReduceByKeyTest.java b/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/ReduceByKeyTest.java index eb559e79..80a00142 100644 --- a/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/ReduceByKeyTest.java +++ b/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/ReduceByKeyTest.java @@ -655,104 +655,4 @@ public void validate(Partitions partitions) { } }); } - - static List TETETS_SEEN_TIMES_TRIGGER = Collections.synchronizedList(new ArrayList<>()); - static List TETETS_SEEN_TIMES_ASSIGNER = Collections.synchronizedList(new ArrayList<>()); - - @Processing(Processing.Type.UNBOUNDED) - @Test - public void testElementTimestampEarlyTriggeredStreaming() { - class TimeCollectingWindowing implements Windowing { - @Override - public Set assignWindowsToElement(WindowedElement el) { - TETETS_SEEN_TIMES_ASSIGNER.add(el.getTimestamp()); - return Collections.singleton(new TimeInterval(0, Long.MAX_VALUE)); - } - - @SuppressWarnings("unchecked") - @Override - public Trigger getTrigger() { - return new CountTrigger(1) { - @Override - public boolean isStateful() { - return false; - } - @Override - public TriggerResult onElement(long time, Window window, TriggerContext ctx) { - TETETS_SEEN_TIMES_TRIGGER.add(time); - return super.onElement(time, window, ctx); - } - }; - } - } - - TETETS_SEEN_TIMES_TRIGGER.clear(); - TETETS_SEEN_TIMES_ASSIGNER.clear(); - execute(new AbstractTestCase, Integer>() { - @Override - protected Partitions> getInput() { - return Partitions.add( - // ~ Pair.of(value, time) - Pair.of(1, 10_123L), - Pair.of(2, 11_234L), - Pair.of(8, 16_345L), - Pair.of(9, 17_789L), - // ~ note: exactly one element for the window on purpose (to test out - // all is well even in case our `.combineBy` user function is not called.) - Pair.of(50, 21_456L)) - .build(); - } - - @Override - protected Dataset getOutput(Dataset> input) { - // ~ this operator is supposed to emit elements internally with a - // timestamp which equals the emission of the time windows - Dataset> reduced = - ReduceByKey.of(input) - .keyBy(e -> "") - .valueBy(Pair::getFirst) - .combineBy(Sums.ofInts()) - .windowBy(Time.of(Duration.ofSeconds(10)) - .earlyTriggering(Duration.ofSeconds(5)), Pair::getSecond) - .output(); - // ~ now use a custom windowing with a trigger which does - // the assertions subject to this test (use RSBK which has to - // use triggering, unlike an optimized RBK) - Dataset> output = - ReduceStateByKey.of(reduced) - .keyBy(Pair::getFirst) - .valueBy(Pair::getSecond) - .stateFactory(SumState::new) - .combineStateBy(SumState::combine) - .windowBy(new TimeCollectingWindowing<>()) - .output(); - return FlatMap.of(output) - .using((UnaryFunctor, Integer>) - (elem, context) -> context.collect(elem.getSecond())) - .output(); - } - - @Override - public int getNumOutputPartitions() { - return 1; - } - - @Override - public void validate(Partitions partitions) { - // ~ the last window (containing the last element) gets emitted twice due to - // being "triggered" twice (early triggering plus the end of window) and being - // at the "end of the stream". this is an abnormal situation as streams are - // never-ending. - assertTrue(partitions.get(0).size() > 3); - - ArrayList triggerTimes = new ArrayList<>(TETETS_SEEN_TIMES_TRIGGER); - triggerTimes.sort(Comparator.naturalOrder()); - assertEquals(asList(15_000L, 19_999L, 25_000L, 29_999L), triggerTimes); - - ArrayList assignerTimes = new ArrayList<>(TETETS_SEEN_TIMES_ASSIGNER); - assignerTimes.sort(Comparator.naturalOrder()); - assertEquals(asList(15_000L, 19_999L, 25_000L, 29_999L), assignerTimes); - } - }); - } } diff --git a/pom.xml b/pom.xml index 50ebf463..cc080629 100644 --- a/pom.xml +++ b/pom.xml @@ -605,7 +605,7 @@ - 1.1.3 + 1.2.0 2.1.0