pnowojski commented on a change in pull request #15820:
URL: https://github.com/apache/flink/pull/15820#discussion_r624872007



##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -1619,6 +1617,50 @@ public void 
testTaskAvoidHangingAfterSnapshotStateThrownException() throws Excep
         }
     }
 
+    @Test
+    public void testCleanUpResourcesWhenFailingDuringInit() {
+        // given: Configured SourceStreamTask with source which fails during 
initialization.
+        StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO);
+        try {
+            // when: The task initializing(restoring).
+            builder.setupOutputForSingletonOperatorChain(new 
InitFailOperator<>()).build();
+            fail("The task should fail during the restore");
+        } catch (Exception ignore) {
+            // ignore.

Review comment:
       shouldn't we assert the exception here?

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -2508,4 +2550,27 @@ public void awaitRunning() throws InterruptedException {
             runningLatch.await();
         }
     }
+
+    static class InitFailOperator<T> extends AbstractStreamOperator<T>

Review comment:
       optional nitty nit: maybe `OpenFailingOperator`? 

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -1619,6 +1617,50 @@ public void 
testTaskAvoidHangingAfterSnapshotStateThrownException() throws Excep
         }
     }
 
+    @Test
+    public void testCleanUpResourcesWhenFailingDuringInit() {
+        // given: Configured SourceStreamTask with source which fails during 
initialization.
+        StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO);
+        try {
+            // when: The task initializing(restoring).
+            builder.setupOutputForSingletonOperatorChain(new 
InitFailOperator<>()).build();
+            fail("The task should fail during the restore");
+        } catch (Exception ignore) {
+            // ignore.
+        }
+
+        // then: The task should clean up all resources even when it failed on 
init.
+        assertTrue(InitFailOperator.wasClosed);
+    }
+
+    @Test
+    public void testRethrowExceptionFromRestoreInsideOfInvoke() {
+        // given: Configured SourceStreamTask with source which fails during 
initialization.
+        StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO);
+        try {
+            // when: The task invocation without preceded restoring.
+            StreamTaskMailboxTestHarness<Integer> harness =
+                    builder.setupOutputForSingletonOperatorChain(new 
InitFailOperator<>())
+                            .buildUnrestored();
+
+            harness.streamTask.invoke();
+
+            fail("The task should fail during the restore");
+        } catch (Exception ex) {
+            // then: The task should rethrow exception from initialization.
+            assertThat(ex.getMessage(), is(INIT_FAILED_MESSAGE));

Review comment:
       nit/question: I guess here you are freezing a contract that this 
exception is never wrapped, right? Is this intentional? If not our usual 
pattern is:
   ```
   if (!ExceptionUtils.findThrowable(e, 
ExpectedTestException.class).isPresent()) {
       throw e;
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to