pnowojski commented on a change in pull request #15820: URL: https://github.com/apache/flink/pull/15820#discussion_r623819184
########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ########## @@ -1619,6 +1616,34 @@ public void testTaskAvoidHangingAfterSnapshotStateThrownException() throws Excep } } + @Test + public void testCleanUpResourcesWhenFailingDuringInit() throws Exception { + // given: Configured SourceStreamTask with source which fails during initialization. + Configuration taskManagerConfig = new Configuration(); + taskManagerConfig.setString(STATE_BACKEND, TestMemoryStateBackendFactory.class.getName()); + + StreamConfig cfg = new StreamConfig(new Configuration()); + cfg.setStateKeySerializer(mock(TypeSerializer.class)); + cfg.setOperatorID(new OperatorID(4712L, 43L)); + + cfg.setStreamOperator(new TestStreamSource<>(new InitFailedSource())); + cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + try (NettyShuffleEnvironment shuffleEnv = new NettyShuffleEnvironmentBuilder().build()) { + Task task = createTask(SourceStreamTask.class, shuffleEnv, cfg, taskManagerConfig); + + // when: Task starts. + task.startTaskThread(); + + // wait for clean termination. + task.getExecutingThread().join(); + + // then: The task should clean up all resources even when it failed on init. + assertEquals(ExecutionState.FAILED, task.getExecutionState()); + assertFalse(InitFailedSource.resourceAllocated); Review comment: nit: rename and invert boolean logic to `assertTrue(InitFailedSource.wasClosed)`? It would be more accurate compared to "allocated". ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ########## @@ -609,42 +615,54 @@ private void ensureNotCanceled() { @Override public final void invoke() throws Exception { - try { - // Allow invoking method 'invoke' without having to call 'restore' before it. - if (!isRunning) { - LOG.debug("Restoring during invoke will be called."); - restore(); - } + runWithCleanUpOnFail(this::executeInvoke); + + cleanUpInvoke(); + } + + private void executeInvoke() throws Exception { + // Allow invoking method 'invoke' without having to call 'restore' before it. + if (!isRunning) { + LOG.debug("Restoring during invoke will be called."); + restore(); Review comment: `executeRestore()` + add a test for not swallowing exceptions if `restore` hasn't been invoked? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ########## @@ -609,42 +615,54 @@ private void ensureNotCanceled() { @Override public final void invoke() throws Exception { - try { - // Allow invoking method 'invoke' without having to call 'restore' before it. - if (!isRunning) { - LOG.debug("Restoring during invoke will be called."); - restore(); - } + runWithCleanUpOnFail(this::executeInvoke); + + cleanUpInvoke(); + } + + private void executeInvoke() throws Exception { + // Allow invoking method 'invoke' without having to call 'restore' before it. + if (!isRunning) { + LOG.debug("Restoring during invoke will be called."); + restore(); + } - // final check to exit early before starting to run - ensureNotCanceled(); + // final check to exit early before starting to run + ensureNotCanceled(); - // let the task do its work - runMailboxLoop(); + // let the task do its work + runMailboxLoop(); - // if this left the run() method cleanly despite the fact that this was canceled, - // make sure the "clean shutdown" is not attempted - ensureNotCanceled(); + // if this left the run() method cleanly despite the fact that this was canceled, + // make sure the "clean shutdown" is not attempted + ensureNotCanceled(); - afterInvoke(); + afterInvoke(); + } + + private void runWithCleanUpOnFail(RunnableWithException run) throws Exception { + try { + run.run(); } catch (Throwable invokeException) { failing = !canceled; try { if (!canceled) { - cancelTask(); + try { + cancelTask(); + } catch (Throwable ex) { + invokeException = firstOrSuppressed(ex, invokeException); + } Review comment: 1. shouldn't this be a separate commit? 2. does it have a test coverage? ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ########## @@ -1619,6 +1616,34 @@ public void testTaskAvoidHangingAfterSnapshotStateThrownException() throws Excep } } + @Test + public void testCleanUpResourcesWhenFailingDuringInit() throws Exception { + // given: Configured SourceStreamTask with source which fails during initialization. + Configuration taskManagerConfig = new Configuration(); + taskManagerConfig.setString(STATE_BACKEND, TestMemoryStateBackendFactory.class.getName()); + + StreamConfig cfg = new StreamConfig(new Configuration()); + cfg.setStateKeySerializer(mock(TypeSerializer.class)); Review comment: Generally speaking we are trying really hard avoid mockito ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ########## @@ -1619,6 +1616,34 @@ public void testTaskAvoidHangingAfterSnapshotStateThrownException() throws Excep } } + @Test + public void testCleanUpResourcesWhenFailingDuringInit() throws Exception { + // given: Configured SourceStreamTask with source which fails during initialization. + Configuration taskManagerConfig = new Configuration(); + taskManagerConfig.setString(STATE_BACKEND, TestMemoryStateBackendFactory.class.getName()); + + StreamConfig cfg = new StreamConfig(new Configuration()); + cfg.setStateKeySerializer(mock(TypeSerializer.class)); + cfg.setOperatorID(new OperatorID(4712L, 43L)); + + cfg.setStreamOperator(new TestStreamSource<>(new InitFailedSource())); + cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + try (NettyShuffleEnvironment shuffleEnv = new NettyShuffleEnvironmentBuilder().build()) { + Task task = createTask(SourceStreamTask.class, shuffleEnv, cfg, taskManagerConfig); + + // when: Task starts. + task.startTaskThread(); + + // wait for clean termination. + task.getExecutingThread().join(); Review comment: I would suggest to simplify and implement this test on a lower level in more unit test style, bypassing `Task` class, and without spawning extra threads. You could check how it can be done for example in `StreamTaskTest#testNotifyCheckpointOnClosedOperator`. Something like: ``` StreamTaskMailboxTestHarnessBuilder<Integer> builder = new StreamTaskMailboxTestHarnessBuilder<>( OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO) .addInput(BasicTypeInfo.INT_TYPE_INFO); StreamTaskMailboxTestHarness<Integer> harness = builder.setupOutputForSingletonOperatorChain(new YourTestOperator()).build(); harness.streamTask.restore(); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org