pnowojski commented on a change in pull request #13000:
URL: https://github.com/apache/flink/pull/13000#discussion_r462289373



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -562,6 +567,7 @@ private void runMailboxLoop() throws Exception {
 
        protected void afterInvoke() throws Exception {
                LOG.debug("Finished task {}", getName());
+               getCompletionFuture().exceptionally(unused -> null).join();

Review comment:
       1. 
   > It's already reported in SourceStreamTask.processInput using 
mailboxProcessor.reportThrowable
   
   I would rephrase it that "it SHOULD be reported in...". But what about 
handling our bugs, when it won't be reported? Could we make it more error 
prone? For example make sure that no exceptions should ever reach 
`StreamTask#getCompletionFuture().join()`? And if something would reach it 
here, it would be equivalent of an `IllegalStateException`?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -562,6 +567,7 @@ private void runMailboxLoop() throws Exception {
 
        protected void afterInvoke() throws Exception {
                LOG.debug("Finished task {}", getName());
+               getCompletionFuture().exceptionally(unused -> null).join();

Review comment:
       1. 
   > It's already reported in SourceStreamTask.processInput using 
mailboxProcessor.reportThrowable
   
   I would rephrase it that "it SHOULD be reported in...". But what about 
handling our bugs, when it won't be reported? Could we make it more error 
prone? For example make sure that no exceptions should ever reach 
`StreamTask#getCompletionFuture().join()`? And if something would reach it 
here, it would be equivalent of an `IllegalStateException`?
   
   Maybe `SourceStreamTask#getCompletionFuture()` could wrap the future 
somehow? 

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -562,6 +567,7 @@ private void runMailboxLoop() throws Exception {
 
        protected void afterInvoke() throws Exception {
                LOG.debug("Finished task {}", getName());
+               getCompletionFuture().exceptionally(unused -> null).join();

Review comment:
       1. 
   > It's already reported in SourceStreamTask.processInput using 
mailboxProcessor.reportThrowable
   
   I think you are right, it should be correct now. 
   
   But I would just rephrase it that "it SHOULD be reported in...". what about 
handling our bugs, when it won't be reported? Could we make it more error 
prone? For example make sure that no exceptions should ever reach 
`StreamTask#getCompletionFuture().join()`? And if something would reach it 
here, it would be equivalent of an `IllegalStateException`?
   
   Maybe `SourceStreamTask#getCompletionFuture()` could wrap the future 
somehow? `SourceStreamTask` is the owner of logic handling/forwarding the 
exceptions, so it would be a slightly better place to ignore exceptions 
compared to `StreamTask`?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -562,6 +567,7 @@ private void runMailboxLoop() throws Exception {
 
        protected void afterInvoke() throws Exception {
                LOG.debug("Finished task {}", getName());
+               getCompletionFuture().exceptionally(unused -> null).join();

Review comment:
       ok, let's keep it as it is.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to