dawidwys commented on a change in pull request #17253: URL: https://github.com/apache/flink/pull/17253#discussion_r713933683
########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationTest.java ########## @@ -56,6 +60,41 @@ /** Tests for the StreamTask cancellation. */ public class StreamTaskCancellationTest extends TestLogger { + @Test + public void testDoNotInterruptWhileClosing() throws Exception { + TestInterruptInCloseOperator testOperator = new TestInterruptInCloseOperator(); + try (StreamTaskMailboxTestHarness<String> harness = + new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO) + .addInput(STRING_TYPE_INFO) + .setupOutputForSingletonOperatorChain(testOperator) + .build()) {} + } + + private static class TestInterruptInCloseOperator extends AbstractStreamOperator<String> + implements OneInputStreamOperator<String, String> { + @Override + public void close() throws Exception { + super.close(); + + AtomicBoolean running = new AtomicBoolean(true); + Thread thread = + new Thread( + () -> { + while (running.get()) {} + }); + thread.start(); + try { + getContainingTask().maybeInterruptOnCancel(thread, null, null); Review comment: just a comment: It is a very white box style of testing. The test freezes the signature of `maybeInterruptOnCancel` and the internal flag handling. However, given I don't have a better idea. I am fine with the test. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ########## @@ -776,38 +776,14 @@ private void doRun() { throw new CancelTaskException(); } } catch (Throwable t) { - - // unwrap wrapped exceptions to make stack traces more compact - if (t instanceof WrappingRuntimeException) { - t = ((WrappingRuntimeException) t).unwrap(); - } + t = preProcessException(t); Review comment: nit: move below the block comment ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationTest.java ########## @@ -95,31 +90,6 @@ public void close() throws Exception { public void processElement(StreamRecord<String> element) throws Exception {} } - @Test - public void testCancellationWaitsForActiveTimers() throws Exception { - StreamTaskWithBlockingTimer.reset(); - ResultPartitionDeploymentDescriptor descriptor = - new ResultPartitionDeploymentDescriptor( - PartitionDescriptorBuilder.newBuilder().build(), - NettyShuffleDescriptorBuilder.newBuilder().buildLocal(), - 1, - false); - Task task = - new TestTaskBuilder(new NettyShuffleEnvironmentBuilder().build()) - .setInvokable(StreamTaskWithBlockingTimer.class) - .setResultPartitions(singletonList(descriptor)) - .build(); - task.startTaskThread(); - - StreamTaskWithBlockingTimer.timerStarted.join(); Review comment: `StreamTaskWithBlockingTimer` is now unused. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org