[ 
https://issues.apache.org/jira/browse/FLINK-21990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ming li updated FLINK-21990:
----------------------------
    Description: 
If the source in {{SourceStreamTask}} implements {{CheckpointedFunction}} and 
an exception is thrown in the snapshotState method, then the 
{{SourceStreamTask}} will always hang.

The main reason is that the checkpoint is executed in the mailbox. When the 
{{CheckpointedFunction#snapshotState}}  of the source throws an exception, the 
StreamTask#cleanUpInvoke will be called, where it will wait for the end of the 
{{LegacySourceFunctionThread}} of the source. However, the source thread does 
not end by itself (this requires the user to control it), the {{Task}} will 
hang at this time, and the JobMaster has no perception of this behavior.
{code:java}
protected void cleanUpInvoke() throws Exception {
    getCompletionFuture().exceptionally(unused -> null).join(); //wait for the 
end of the source
    // clean up everything we initialized
    isRunning = false;
    ...
}{code}
I think we should call the cancel method of the source first, and then wait for 
the end.

 

  was:
If the source in SourceStreamTask implements {{CheckpointedFunction}} and an 
exception is thrown in the snapshotState method, then the SourceStreamTask will 
always hang.

The main reason is that the checkpoint is executed in the mailbox. When the 
CheckpointedFunction#snapshotState  of the source throws an exception, the 
StreamTask#cleanUpInvoke will be called, where it will wait for the end of the 
{{LegacySourceFunctionThread}} of the source. However, the source thread does 
not end by itself (this requires the user to control it), the Task will hang at 
this time, and the JobMaster has no perception of this behavior.

 


> SourceStreamTask will always hang if the CheckpointedFunction#snapshotState 
> throws an exception.
> ------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-21990
>                 URL: https://issues.apache.org/jira/browse/FLINK-21990
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.11.0
>            Reporter: ming li
>            Priority: Major
>
> If the source in {{SourceStreamTask}} implements {{CheckpointedFunction}} and 
> an exception is thrown in the snapshotState method, then the 
> {{SourceStreamTask}} will always hang.
> The main reason is that the checkpoint is executed in the mailbox. When the 
> {{CheckpointedFunction#snapshotState}}  of the source throws an exception, 
> the StreamTask#cleanUpInvoke will be called, where it will wait for the end 
> of the {{LegacySourceFunctionThread}} of the source. However, the source 
> thread does not end by itself (this requires the user to control it), the 
> {{Task}} will hang at this time, and the JobMaster has no perception of this 
> behavior.
> {code:java}
> protected void cleanUpInvoke() throws Exception {
>     getCompletionFuture().exceptionally(unused -> null).join(); //wait for 
> the end of the source
>     // clean up everything we initialized
>     isRunning = false;
>     ...
> }{code}
> I think we should call the cancel method of the source first, and then wait 
> for the end.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to