pnowojski commented on a change in pull request #15820: URL: https://github.com/apache/flink/pull/15820#discussion_r624872007
########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ########## @@ -1619,6 +1617,50 @@ public void testTaskAvoidHangingAfterSnapshotStateThrownException() throws Excep } } + @Test + public void testCleanUpResourcesWhenFailingDuringInit() { + // given: Configured SourceStreamTask with source which fails during initialization. + StreamTaskMailboxTestHarnessBuilder<Integer> builder = + new StreamTaskMailboxTestHarnessBuilder<>( + OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO) + .addInput(BasicTypeInfo.INT_TYPE_INFO); + try { + // when: The task initializing(restoring). + builder.setupOutputForSingletonOperatorChain(new InitFailOperator<>()).build(); + fail("The task should fail during the restore"); + } catch (Exception ignore) { + // ignore. Review comment: shouldn't we assert the exception here? ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ########## @@ -2508,4 +2550,27 @@ public void awaitRunning() throws InterruptedException { runningLatch.await(); } } + + static class InitFailOperator<T> extends AbstractStreamOperator<T> Review comment: optional nitty nit: maybe `OpenFailingOperator`? ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ########## @@ -1619,6 +1617,50 @@ public void testTaskAvoidHangingAfterSnapshotStateThrownException() throws Excep } } + @Test + public void testCleanUpResourcesWhenFailingDuringInit() { + // given: Configured SourceStreamTask with source which fails during initialization. + StreamTaskMailboxTestHarnessBuilder<Integer> builder = + new StreamTaskMailboxTestHarnessBuilder<>( + OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO) + .addInput(BasicTypeInfo.INT_TYPE_INFO); + try { + // when: The task initializing(restoring). + builder.setupOutputForSingletonOperatorChain(new InitFailOperator<>()).build(); + fail("The task should fail during the restore"); + } catch (Exception ignore) { + // ignore. + } + + // then: The task should clean up all resources even when it failed on init. + assertTrue(InitFailOperator.wasClosed); + } + + @Test + public void testRethrowExceptionFromRestoreInsideOfInvoke() { + // given: Configured SourceStreamTask with source which fails during initialization. + StreamTaskMailboxTestHarnessBuilder<Integer> builder = + new StreamTaskMailboxTestHarnessBuilder<>( + OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO) + .addInput(BasicTypeInfo.INT_TYPE_INFO); + try { + // when: The task invocation without preceded restoring. + StreamTaskMailboxTestHarness<Integer> harness = + builder.setupOutputForSingletonOperatorChain(new InitFailOperator<>()) + .buildUnrestored(); + + harness.streamTask.invoke(); + + fail("The task should fail during the restore"); + } catch (Exception ex) { + // then: The task should rethrow exception from initialization. + assertThat(ex.getMessage(), is(INIT_FAILED_MESSAGE)); Review comment: nit/question: I guess here you are freezing a contract that this exception is never wrapped, right? Is this intentional? If not our usual pattern is: ``` if (!ExceptionUtils.findThrowable(e, ExpectedTestException.class).isPresent()) { throw e; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org