Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#! [euphoria-flink] Avoid extra shuffle when windowing on streaming #52

Merged
merged 1 commit into from
Mar 21, 2017

Conversation

xitep
Copy link
Contributor

@xitep xitep commented Mar 21, 2017

Moves the map function into the operator chain being executed before .keyBy avoiding an extra re-balancing operation.

@xitep xitep requested a review from vanekjar March 21, 2017 08:44
@xitep
Copy link
Contributor Author

xitep commented Mar 21, 2017

I'm sorry, I didn't spot this yesterday as part of the review for #50.

Here's the execution graph with the extra re-balancing:

testapps-graph

Here's the exec graph of the same program with the fix applied:

testapps-fix-graph

.setParallelism(operator.getParallelism());
// ~ execute in the same chain of the input's processing
// so far, thereby, avoiding an unnecessary shuffle
.setParallelism(input.getParallelism());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference of input.getParallelism() and operator.getParallelism()? Shouldn't this be the same provided that to operator didn't define its own parallelism?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • input.getParallelism() returns the parallelism the corresponding flink's dataset
  • operator.getParallism() returns the target parallelism intended for the operator (in this case, rsbk)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, okay, but if the user code does not define specific parallelism (for RSBK), shouldn't this be the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, yes. that's correct! in that case input.parallelism will equal operator.parallelism, that's right.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I didn't look into details of this PR, but will the RSBK respect the user-supplied parallelism (if any) after this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it will. the attached execution plan shows a program where the RSBK is instructed for a 120 partitions output.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, user-supplied parallelism is correctly applied to the operation after shuffle. It's basically map -> keyBy -> setParallelism -> reduceByKey.

@vanekjar
Copy link
Contributor

Good catch! Thanks 👍

@vanekjar vanekjar merged commit 4312951 into master Mar 21, 2017
@vanekjar vanekjar deleted the pete/align-parallelism branch March 21, 2017 11:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants