[ https://issues.apache.org/jira/browse/FLINK-21133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17287105#comment-17287105 ]
Piotr Nowojski edited comment on FLINK-21133 at 2/19/21, 5:14 PM: ------------------------------------------------------------------ [~becket_qin] I think you are missing the *Stop-with-Savepoint, with pipeline draining* case, which [~sewen] mentioned. There, we are emitting {{MAX_WATERMARK}} with {{advanceToEndOfEventTime}} case. In that case we probably should follow the standard clean shutdown procedure. I don't know if we should invoke {{endOfInput}} on the operators in that case or not... I would guess probably yes. If we are flushing window operators, we would also want to flush all buffered records. And if we are never intending to resume this job, {{endOfInput}} makes kind of sense for the downstream operators? [~sewen], generally speaking I like the idea of changing stop with savepoint (without the drain), to cancel with savepoint. As me and [~roman_khachatryan] mentioned previously, we would like to avoid controlling the flow with exceptions. But that should be as easy to replace throwing `CancelTaskException` with just `StreamTask#cancel` call. However I still do not see how one would solve the problem of fully/completely backpressured legacy source task. In this case, source thread can be perpetually blocked while holding {{checkpointLock}}, thus preventing {{notifyCheckpointComplete}} from being ever executed. Maybe we would need to spin using {{StreamTask#runSynchronousSavepointMailboxLoop}} while triggering the checkpoint, and thus also blocking source thread from making any progress after triggering the checkpoint? Secondly [~sewen] as we discussed offline. We would have to make sure that downstream/upstream task would cancel correctly, without mis-leading error messages, if they receive network connection closed before processing {{notifyCheckpointComplete()}}. I haven't looked how this would affect the code, maybe that's not an issue and it would work as it is, but it has to be verified. was (Author: pnowojski): [~becket_qin] I think you are missing the *Stop-with-Savepoint, with pipeline draining* case, which [~sewen] mentioned. There, we are emitting {{MAX_WATERMARK}} with {{advanceToEndOfEventTime}} case. In that case we probably should follow the standard clean shutdown procedure. I don't know if we should invoke {{endOfInput}} on the operators in that case or not... I would guess probably yes. If we are flushing window operators, we would also want to flush all buffered records. And if we are never intending to resume this job, {{endOfInput}} makes kind of sense for the downstream operators? [~sewen], generally speaking I like the idea of changing stop with savepoint (without the drain), to cancel with savepoint. As me and [~roman_khachatryan] mentioned previously, we would like to avoid controlling the flow with exceptions. But that should be as easy to replace throwing `CancelTaskException` with just `StreamTask#cancel` call. However I still do not see how one would solve the problem of fully/completely backpressured legacy source task. In this case, source thread can be perpetually blocked while holding {{checkpointLock}}, thus preventing {{notifyCheckpointComplete}} from being ever executed. Secondly [~sewen] as we discussed offline. We would have to make sure that downstream/upstream task would cancel correctly, without mis-leading error messages, if they receive network connection closed before processing {{notifyCheckpointComplete()}}. I haven't looked how this would affect the code, maybe that's not an issue and it would work as it is, but it has to be verified. > FLIP-27 Source does not work with synchronous savepoint > ------------------------------------------------------- > > Key: FLINK-21133 > URL: https://issues.apache.org/jira/browse/FLINK-21133 > Project: Flink > Issue Type: Bug > Components: API / Core, API / DataStream, Runtime / Checkpointing > Affects Versions: 1.11.3, 1.12.1 > Reporter: Kezhu Wang > Priority: Critical > Fix For: 1.11.4, 1.13.0, 1.12.3 > > > I have pushed branch > [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case] > in my repository. {{SavepointITCase.testStopSavepointWithFlip27Source}} > failed due to timeout. > See also FLINK-21132 and > [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033].. -- This message was sent by Atlassian Jira (v8.3.4#803005)