[ https://issues.apache.org/jira/browse/FLINK-37747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17948051#comment-17948051 ]
Arvid Heise commented on FLINK-37747: ------------------------------------- Thank you for raising the issue and your detailed analysis. Could you please add the exact Flink version? Flink 1.20.1 has some fixes around the sink. >From a look at your PR, I don't think that your fix resolves the issue >sufficiently well. I guess a proper IT Case should help clarify that. Do you >need help to craft one? I'll comment on the PR. > GlobalCommitterOperator cannot commit after scaling writer/committer > -------------------------------------------------------------------- > > Key: FLINK-37747 > URL: https://issues.apache.org/jira/browse/FLINK-37747 > Project: Flink > Issue Type: Bug > Reporter: David > Priority: Blocker > Labels: pull-request-available > > Hey, > Our FLINK job stopped writing into Delta table with FLINK Delta connector > frequently. After checking the issue, we found in GlobalCommitterOperator, in > [commit|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperator.java#L207] > function, it was returned directly when checking some checkpoint has > finished or not(this > [code|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperator.java#L211]). > The issue was happened when: > * auto-scaler scales up chained writer/committer(the direct upstream > operator of GlobalCommitterOperator) > * job ran limited TM first with lower parallelism for writer/committer, and > then writer/committer was scaled up to higher parallelism > After debugging with more logs, we found the cause of the issue. An example > is: > * for checkpoint 3, FLINK job completed successfully with 3 writer/committer > in parallel > ** All committable objects in writer/committer were saved into checkpoint > state in checkpoint 3 > * writer/committer was scaled up to 5 parallel tasks > * writer/committer restore state from checkpoint 3, they will emit > committable objects from checkpoint 3. code is > [here|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java#L139] > ** latest parallelism of writer/committer is used, which is 5 in > CommittableSummary. Code is > [here|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java#L186] > * GlobalCommitterOpeartor received committable summary from > writer/committer, it knows: > ** 5 parallel writer/committer from upstreams > ** it will look for committable summary from 5 upstream writer/committer > * 3 writer/committers emit CommittableSummary to global committer operator > as only 3 restore state from checkpoint 3 > * Global committer operator stuck here forever as it looks for committable > summary for 5 subtasks from upstream operator > We have a quick solution for this case and raise a PR to fix this. > We are using FLINK 1.20 but we found the issue is still existed in master > branch. -- This message was sent by Atlassian Jira (v8.20.10#820010)