Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36546] Handle batch sources in DataSinkTranslator #3646

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

morozov
Copy link
Contributor

@morozov morozov commented Oct 16, 2024

There's no public API in Flink to detect the boundedness of a stream, so this patch duplicates the code from StreamGraphGenerator that Flink itself uses to instantiate CommitterOperatorFactory.

@lvyanquan
Copy link
Contributor

I think a test for this is still necessary.
We have two bound sources(values, MySQL when setting scan.startup.mode to snapshot), and I don't get the point that you said "the sink never receives the end-of-input signal", can you provide a more detailed description? we can add some logs in flush method if endOfInput is true to verify this.

@morozov
Copy link
Contributor Author

morozov commented Oct 16, 2024

@lvyanquan please see the temporary reproducer in the second commit.

and I don't get the point that you said "the sink never receives the end-of-input signal", can you provide a more detailed description?

I got confused. The problem is not that the sink never receives the end-of-input signal but that the two-phase committing sink doesn't commit the last checkpoint.

In order to reproduce the issue I had to make the following temporary changes:

  1. Make ValuesSink a TwoPhaseCommittingSink.
  2. Bypass the usage of reflection in DataSinkTranslator. Otherwise, the test would fail with an IllegalAccessException, which I didn't know how to address.

The point is that with the fix from the first commit, when DataSinkTranslatorBatchModeIT runs, it produces the following log message:

4890 [PostPartition -> Sink Writer: Value Sink -> Sink Committer: Value Sink (1/1)#0] INFO org.apache.flink.cdc.connectors.values.sink.ValuesDataSink$ValuesSink - Find me in the logs. Committing 1 committables.

If the first commit is reverted, this message won't be produced because the sink won't commit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants