[ https://issues.apache.org/jira/browse/FLINK-21990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ming li updated FLINK-21990: ---------------------------- Description: If the source in {{SourceStreamTask}} implements {{CheckpointedFunction}} and an exception is thrown in the snapshotState method, then the {{SourceStreamTask}} will always hang. The main reason is that the checkpoint is executed in the mailbox. When the {{CheckpointedFunction#snapshotState}} of the source throws an exception, the StreamTask#cleanUpInvoke will be called, where it will wait for the end of the {{LegacySourceFunctionThread}} of the source. However, the source thread does not end by itself (this requires the user to control it), the {{Task}} will hang at this time, and the JobMaster has no perception of this behavior. {code:java} protected void cleanUpInvoke() throws Exception { getCompletionFuture().exceptionally(unused -> null).join(); //wait for the end of the source // clean up everything we initialized isRunning = false; ... }{code} I think we should call the cancel method of the source first, and then wait for the end. was: If the source in SourceStreamTask implements {{CheckpointedFunction}} and an exception is thrown in the snapshotState method, then the SourceStreamTask will always hang. The main reason is that the checkpoint is executed in the mailbox. When the CheckpointedFunction#snapshotState of the source throws an exception, the StreamTask#cleanUpInvoke will be called, where it will wait for the end of the {{LegacySourceFunctionThread}} of the source. However, the source thread does not end by itself (this requires the user to control it), the Task will hang at this time, and the JobMaster has no perception of this behavior. > SourceStreamTask will always hang if the CheckpointedFunction#snapshotState > throws an exception. > ------------------------------------------------------------------------------------------------ > > Key: FLINK-21990 > URL: https://issues.apache.org/jira/browse/FLINK-21990 > Project: Flink > Issue Type: Bug > Affects Versions: 1.11.0 > Reporter: ming li > Priority: Major > > If the source in {{SourceStreamTask}} implements {{CheckpointedFunction}} and > an exception is thrown in the snapshotState method, then the > {{SourceStreamTask}} will always hang. > The main reason is that the checkpoint is executed in the mailbox. When the > {{CheckpointedFunction#snapshotState}} of the source throws an exception, > the StreamTask#cleanUpInvoke will be called, where it will wait for the end > of the {{LegacySourceFunctionThread}} of the source. However, the source > thread does not end by itself (this requires the user to control it), the > {{Task}} will hang at this time, and the JobMaster has no perception of this > behavior. > {code:java} > protected void cleanUpInvoke() throws Exception { > getCompletionFuture().exceptionally(unused -> null).join(); //wait for > the end of the source > // clean up everything we initialized > isRunning = false; > ... > }{code} > I think we should call the cancel method of the source first, and then wait > for the end. > -- This message was sent by Atlassian Jira (v8.3.4#803005)