pnowojski commented on a change in pull request #13000: URL: https://github.com/apache/flink/pull/13000#discussion_r462289373
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ########## @@ -562,6 +567,7 @@ private void runMailboxLoop() throws Exception { protected void afterInvoke() throws Exception { LOG.debug("Finished task {}", getName()); + getCompletionFuture().exceptionally(unused -> null).join(); Review comment: 1. > It's already reported in SourceStreamTask.processInput using mailboxProcessor.reportThrowable I would rephrase it that "it SHOULD be reported in...". But what about handling our bugs, when it won't be reported? Could we make it more error prone? For example make sure that no exceptions should ever reach `StreamTask#getCompletionFuture().join()`? And if something would reach it here, it would be equivalent of an `IllegalStateException`? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ########## @@ -562,6 +567,7 @@ private void runMailboxLoop() throws Exception { protected void afterInvoke() throws Exception { LOG.debug("Finished task {}", getName()); + getCompletionFuture().exceptionally(unused -> null).join(); Review comment: 1. > It's already reported in SourceStreamTask.processInput using mailboxProcessor.reportThrowable I would rephrase it that "it SHOULD be reported in...". But what about handling our bugs, when it won't be reported? Could we make it more error prone? For example make sure that no exceptions should ever reach `StreamTask#getCompletionFuture().join()`? And if something would reach it here, it would be equivalent of an `IllegalStateException`? Maybe `SourceStreamTask#getCompletionFuture()` could wrap the future somehow? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ########## @@ -562,6 +567,7 @@ private void runMailboxLoop() throws Exception { protected void afterInvoke() throws Exception { LOG.debug("Finished task {}", getName()); + getCompletionFuture().exceptionally(unused -> null).join(); Review comment: 1. > It's already reported in SourceStreamTask.processInput using mailboxProcessor.reportThrowable I think you are right, it should be correct now. But I would just rephrase it that "it SHOULD be reported in...". what about handling our bugs, when it won't be reported? Could we make it more error prone? For example make sure that no exceptions should ever reach `StreamTask#getCompletionFuture().join()`? And if something would reach it here, it would be equivalent of an `IllegalStateException`? Maybe `SourceStreamTask#getCompletionFuture()` could wrap the future somehow? `SourceStreamTask` is the owner of logic handling/forwarding the exceptions, so it would be a slightly better place to ignore exceptions compared to `StreamTask`? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ########## @@ -562,6 +567,7 @@ private void runMailboxLoop() throws Exception { protected void afterInvoke() throws Exception { LOG.debug("Finished task {}", getName()); + getCompletionFuture().exceptionally(unused -> null).join(); Review comment: ok, let's keep it as it is. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org