rkhachatryan commented on a change in pull request #12525:
URL: https://github.com/apache/flink/pull/12525#discussion_r436595491



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -692,6 +693,7 @@ private void shutdownAsyncThreads() throws Exception {
         */
        private void disposeAllOperators(boolean logOnlyErrors) throws 
Exception {
                if (operatorChain != null && !disposedOperators) {
+                       Exception disposalException = null;
                        for (StreamOperatorWrapper<?, ?> operatorWrapper : 
operatorChain.getAllOperators(true)) {
                                StreamOperator<?> operator = 
operatorWrapper.getStreamOperator();
                                if (!logOnlyErrors) {

Review comment:
       I think this branch could also gain from your change (currently, it 
disposes operators until 1st failure).

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -702,11 +704,14 @@ private void disposeAllOperators(boolean logOnlyErrors) 
throws Exception {
                                                operator.dispose();
                                        }
                                        catch (Exception e) {
-                                               LOG.error("Error during 
disposal of stream operator.", e);
+                                               disposalException = 
ExceptionUtils.firstOrSuppressed(e, disposalException);

Review comment:
       I think this can cause problems because in `firstOrSuppressed`:
   ```
   public static <T extends Throwable> T firstOrSuppressed(T newException, 
@Nullable T previous) {
                checkNotNull(newException, "newException");
   ```
   
   WDYT about changing `firstOrSuppressed` to return any of the passed 
arguments if the other one is null? (I also had a similar issue :) ).

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -702,11 +704,14 @@ private void disposeAllOperators(boolean logOnlyErrors) 
throws Exception {
                                                operator.dispose();
                                        }
                                        catch (Exception e) {
-                                               LOG.error("Error during 
disposal of stream operator.", e);
+                                               disposalException = 
ExceptionUtils.firstOrSuppressed(e, disposalException);
                                        }
                                }
                        }
                        disposedOperators = true;
+                       if (disposalException != null) {
+                               throw disposalException;

Review comment:
       So we ignore `logOnlyErrors` now and throw the exception anyways.
   This prevents some other cleanup in `cleanUpInvoke`, which is not intended I 
guess.
   
   Instead, we could pass "main exception" to `disposeAllOperators` which would 
add "dispose exceptions" to it as suppressed.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -537,6 +537,7 @@ public final void invoke() throws Exception {
                        afterInvoke();

Review comment:
       This method also disposes operators and will throw encountered (not log) 
errors.
   So we could get a log like this:
   ```
   closing operator x
   closing operator y
   Invoke Error
   operator-y-dispose-error-message
   ```
   Which is confusing to me.
   

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -537,6 +537,7 @@ public final void invoke() throws Exception {
                        afterInvoke();
                }
                catch (Exception invokeException) {
+                       LOG.error("Invoke Error", invokeException);

Review comment:
       I think word "invoke" is specific to code and could confuse users.
   It could be something like `"Error while running task {}", getTaskName()`.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to