pnowojski commented on a change in pull request #15425: URL: https://github.com/apache/flink/pull/15425#discussion_r603854862
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java ########## @@ -97,6 +97,11 @@ public void init() throws Exception { inputProcessor = new StreamOneInputProcessor<>(input, output, operatorChain); } + @Override + protected void finishTask() throws Exception { + stopMailboxProcessor(); Review comment: I think I agree with @kezhuw , but I'm worried this is just a bug and it would brake clean shutdown of `stop-with-savepoint WITH drain`. As far as I can tell, `cancelables.close()` is closing some state backend resources. If they are closed, I wouldn't be surprised that operators/user functions would start failing during the clean shutdown. Calling `mailboxProcessor.allActionsCompleted()` here should be enough. Probably this is missing a test coverage. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ########## @@ -730,12 +730,7 @@ public final void cancel() throws Exception { (unusedResult, unusedError) -> { // WARN: the method is called from the task thread but the callback // can be invoked from a different thread - mailboxProcessor.allActionsCompleted(); - try { - cancelables.close(); - } catch (IOException e) { - throw new CompletionException(e); - } + stopMailboxProcessor(); Review comment: After your change `stopMailboxProcessor()` has an implicit contract that it should be thread safe and because of that I'm not sure if extracting this code to separate method as it is is a good idea. There is `WARN` comment that you have missed. (Previously that was a private implementation detail of this lambda method that it has to be thread safe. Now you are moving it to the `StreamTask` interface) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org