pnowojski commented on a change in pull request #15425:
URL: https://github.com/apache/flink/pull/15425#discussion_r604613973



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -730,12 +730,7 @@ public final void cancel() throws Exception {
                             (unusedResult, unusedError) -> {
                                 // WARN: the method is called from the task 
thread but the callback
                                 // can be invoked from a different thread
-                                mailboxProcessor.allActionsCompleted();
-                                try {
-                                    cancelables.close();
-                                } catch (IOException e) {
-                                    throw new CompletionException(e);
-                                }
+                                stopMailboxProcessor();

Review comment:
       > Not sure if I fully understand the concern. Can you help me understand 
more? Previously it is possible that the logic in stopMailboxProcessor is run 
in a different thread from the mailbox thread, e.g. the SourceThread in 
SourceStreamTask. So supposedly the stopMailboxProcessor logic should already 
be thread safe. And expose it to subclasses should also be safe. Is that the 
correct?
   
   yes, it's correct.
   
   But previously that was implementation detail of the `cancel()` method. By 
promoting this code to a separate `protected` method, you are making it a part 
of the `StreamTask` interface. That's the part I wasn't sure about. But I 
believe if you fix the first issue with not calling `cancelables.close()`, this 
change here can be safely reverted. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to