[ 
https://issues.apache.org/jira/browse/FLINK-37747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17948051#comment-17948051
 ] 

Arvid Heise commented on FLINK-37747:
-------------------------------------

Thank you for raising the issue and your detailed analysis. Could you please 
add the exact Flink version? Flink 1.20.1 has some fixes around the sink.

>From a look at your PR, I don't think that your fix resolves the issue 
>sufficiently well. I guess a proper IT Case should help clarify that. Do you 
>need help to craft one?

I'll comment on the PR.

> GlobalCommitterOperator cannot commit after scaling writer/committer
> --------------------------------------------------------------------
>
>                 Key: FLINK-37747
>                 URL: https://issues.apache.org/jira/browse/FLINK-37747
>             Project: Flink
>          Issue Type: Bug
>            Reporter: David
>            Priority: Blocker
>              Labels: pull-request-available
>
> Hey,
> Our FLINK job stopped writing into Delta table with FLINK Delta connector 
> frequently. After checking the issue, we found in GlobalCommitterOperator, in 
> [commit|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperator.java#L207]
>  function, it was returned directly when checking some checkpoint has 
> finished or not(this 
> [code|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperator.java#L211]).
>  The issue was happened when:
>  * auto-scaler scales up chained writer/committer(the direct upstream 
> operator of GlobalCommitterOperator)
>  * job ran limited TM first with lower parallelism for writer/committer, and 
> then writer/committer was scaled up to higher parallelism
> After debugging with more logs, we found the cause of the issue. An example 
> is:
>  * for checkpoint 3, FLINK job completed successfully with 3 writer/committer 
> in parallel
>  ** All committable objects in writer/committer were saved into checkpoint 
> state in checkpoint 3
>  * writer/committer was scaled up to 5 parallel tasks
>  * writer/committer restore state from checkpoint 3, they will emit 
> committable objects from checkpoint 3. code is 
> [here|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java#L139]
>  ** latest parallelism of writer/committer is used, which is 5 in 
> CommittableSummary. Code is 
> [here|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java#L186]
>  * GlobalCommitterOpeartor received committable summary from 
> writer/committer, it knows:
>  ** 5 parallel writer/committer from upstreams
>  ** it will look for committable summary from 5 upstream writer/committer
>  * 3 writer/committers emit CommittableSummary to global committer operator 
> as only 3 restore state from checkpoint 3
>  * Global committer operator stuck here forever as it looks for committable 
> summary for 5 subtasks from upstream operator
> We have a quick solution for this case and raise a PR to fix this. 
> We are using FLINK 1.20 but we found the issue is still existed in master 
> branch.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to