Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#! [euphoria-flink] Avoid extra shuffle when windowing on streaming #52

Merged
merged 1 commit into from
Mar 21, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ public DataStream<?> translate(FlinkOperator<ReduceStateByKey> operator,
// assign windows
DataStream<KeyedMultiWindowedElement> windowed = input.map(new WindowAssigner(
windowing, keyExtractor, valueExtractor, eventTimeAssigner))
.setParallelism(operator.getParallelism());
// ~ execute in the same chain of the input's processing
// so far, thereby, avoiding an unnecessary shuffle
.setParallelism(input.getParallelism());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference of input.getParallelism() and operator.getParallelism()? Shouldn't this be the same provided that to operator didn't define its own parallelism?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • input.getParallelism() returns the parallelism the corresponding flink's dataset
  • operator.getParallism() returns the target parallelism intended for the operator (in this case, rsbk)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, okay, but if the user code does not define specific parallelism (for RSBK), shouldn't this be the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, yes. that's correct! in that case input.parallelism will equal operator.parallelism, that's right.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I didn't look into details of this PR, but will the RSBK respect the user-supplied parallelism (if any) after this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it will. the attached execution plan shows a program where the RSBK is instructed for a 120 partitions output.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, user-supplied parallelism is correctly applied to the operation after shuffle. It's basically map -> keyBy -> setParallelism -> reduceByKey.


DataStream<WindowedElement<?, Pair>> reduced = (DataStream) windowed.keyBy(new KeyExtractor())
.transform(operator.getName(), TypeInformation.of(WindowedElement.class), new WindowOperator<>(
Expand Down