pnowojski commented on a change in pull request #15425:
URL: https://github.com/apache/flink/pull/15425#discussion_r603854862



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -97,6 +97,11 @@ public void init() throws Exception {
         inputProcessor = new StreamOneInputProcessor<>(input, output, 
operatorChain);
     }
 
+    @Override
+    protected void finishTask() throws Exception {
+        stopMailboxProcessor();

Review comment:
       I think I agree with @kezhuw , but I'm worried this is just a bug and it 
would brake clean shutdown of `stop-with-savepoint WITH drain`. As far as I can 
tell, `cancelables.close()` is closing some state backend resources. If they 
are closed, I wouldn't be surprised that operators/user functions would start 
failing during the clean shutdown. Calling 
`mailboxProcessor.allActionsCompleted()` here should be enough.
   
   Probably this is missing a test coverage.
   

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -730,12 +730,7 @@ public final void cancel() throws Exception {
                             (unusedResult, unusedError) -> {
                                 // WARN: the method is called from the task 
thread but the callback
                                 // can be invoked from a different thread
-                                mailboxProcessor.allActionsCompleted();
-                                try {
-                                    cancelables.close();
-                                } catch (IOException e) {
-                                    throw new CompletionException(e);
-                                }
+                                stopMailboxProcessor();

Review comment:
       After your change `stopMailboxProcessor()` has an implicit contract that 
it should be thread safe and because of that I'm not sure if extracting this 
code to separate method as it is is a good idea. There is `WARN` comment that 
you have missed. 
   
   (Previously that was a private implementation detail of this lambda method 
that it has to be thread safe. Now you are moving it to the `StreamTask` 
interface)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to