[ 
https://issues.apache.org/jira/browse/FLINK-21133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17285319#comment-17285319
 ] 

Kezhu Wang edited comment on FLINK-21133 at 2/16/21, 4:21 PM:
--------------------------------------------------------------

{quote}But this assumption doesn't hold with the legacy sources. Maybe it would 
also cause problems with iterations/cyclic graphs in the future
{quote}
Hi [~pnowojski], I commented about [legacy 
source|https://issues.apache.org/jira/browse/FLINK-21132?focusedCommentId=17276558&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17276558]
 in 
[FLINK-21132|https://issues.apache.org/jira/browse/FLINK-21132?focusedCommentId=17276558&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17276558].
 Legacy source always requires some special treatment in implementation. I have 
not evaluate iterations/cyclic graphs. I have assumption/expectation in 
implementation that when one task hit "notifyCheckpointComplete" in 
"stop-with-savepoint" path, all tasks will hit that point sooner or later, all 
we have to decide is how to exit(cooperative or independent). Before 
"notifyCheckpointComplete", code-path for all approaches could be same.

I am no trying to cling to one approach or even my approach, I wanted/want to 
finger out best approach by evaluating/comparing all emerging approaches.
 # {{EndOfPartition}}: It works but not that elegant, and requires extra works 
from sources(chained or not).
 # {{EndOfData}}: I have raised same concerns as you.
 # Stop task individually: downstream task could be terminated before upstream 
task. Could cause errors in network stack and potential deadlock.

I will also evaluate FLINK-21086 for further information.

Besides my proposed approach for stop-with-savepoint, I also want to bring 
another separated question to attentions:

Whether {{StreamOperator.close}} should be called *only* "after all records 
have been added to the operators" ?

Currently, stop-with-savepoint is an exception for {{StreamOperator.close}}. 
This may be also related to [~pnowojski]'s recently 2pc concerns on FLIP-147 
dicussion as {{StreamOperator.close}} is just 
{{flush-after-all-input-consumed}} from javadoc after FLINK-2647.


was (Author: kezhuw):
{quote}But this assumption doesn't hold with the legacy sources. Maybe it would 
also cause problems with iterations/cyclic graphs in the future
{quote}
Hi [~pnowojski], I commented about legacy source in FLINK-21132. Legacy source 
always requires some special treatment in implementation. I have not evaluate 
iterations/cyclic graphs. I have assumption/expectation in implementation that 
when one task hit "notifyCheckpointComplete" in "stop-with-savepoint" path, all 
tasks will hit that point sooner or later, all we have to decide is how to 
exit(cooperative or independent). Before "notifyCheckpointComplete", code-path 
for all approaches could be same.

I am no trying to cling to one approach or even my approach, I wanted/want to 
finger out best approach by evaluating/comparing all emerging approaches.
 # {{EndOfPartition}}: It works but not that elegant, and requires extra works 
from sources(chained or not).
 # {{EndOfData}}: I have raised same concerns as you.
 # Stop task individually: downstream task could be terminated before upstream 
task. Could cause errors in network stack and potential deadlock.

I will also evaluate FLINK-21086 for further information.

Besides my proposed approach for stop-with-savepoint, I also want to bring 
another separated question to attentions:

Whether {{StreamOperator.close}} should be called *only* "after all records 
have been added to the operators" ?

Currently, stop-with-savepoint is an exception for {{StreamOperator.close}}. 
This may be also related to [~pnowojski]'s recently 2pc concerns on FLIP-147 
dicussion as {{StreamOperator.close}} is just 
{{flush-after-all-input-consumed}} from javadoc after FLINK-2647.

> FLIP-27 Source does not work with synchronous savepoint
> -------------------------------------------------------
>
>                 Key: FLINK-21133
>                 URL: https://issues.apache.org/jira/browse/FLINK-21133
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core, API / DataStream, Runtime / Checkpointing
>    Affects Versions: 1.11.3, 1.12.1
>            Reporter: Kezhu Wang
>            Priority: Critical
>             Fix For: 1.11.4, 1.13.0, 1.12.3
>
>
> I have pushed branch 
> [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case]
>  in my repository. {{SavepointITCase.testStopSavepointWithFlip27Source}} 
> failed due to timeout.
> See also FLINK-21132 and 
> [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033]..



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to