dawidwys commented on a change in pull request #17253:
URL: https://github.com/apache/flink/pull/17253#discussion_r713933683



##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationTest.java
##########
@@ -56,6 +60,41 @@
 /** Tests for the StreamTask cancellation. */
 public class StreamTaskCancellationTest extends TestLogger {
 
+    @Test
+    public void testDoNotInterruptWhileClosing() throws Exception {
+        TestInterruptInCloseOperator testOperator = new 
TestInterruptInCloseOperator();
+        try (StreamTaskMailboxTestHarness<String> harness =
+                new 
StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO)
+                        .addInput(STRING_TYPE_INFO)
+                        .setupOutputForSingletonOperatorChain(testOperator)
+                        .build()) {}
+    }
+
+    private static class TestInterruptInCloseOperator extends 
AbstractStreamOperator<String>
+            implements OneInputStreamOperator<String, String> {
+        @Override
+        public void close() throws Exception {
+            super.close();
+
+            AtomicBoolean running = new AtomicBoolean(true);
+            Thread thread =
+                    new Thread(
+                            () -> {
+                                while (running.get()) {}
+                            });
+            thread.start();
+            try {
+                getContainingTask().maybeInterruptOnCancel(thread, null, null);

Review comment:
       just a comment: It is a very white box style of testing. The test 
freezes the signature of `maybeInterruptOnCancel` and the internal flag 
handling. However, given I don't have a better idea. I am fine with the test.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -776,38 +776,14 @@ private void doRun() {
                 throw new CancelTaskException();
             }
         } catch (Throwable t) {
-
-            // unwrap wrapped exceptions to make stack traces more compact
-            if (t instanceof WrappingRuntimeException) {
-                t = ((WrappingRuntimeException) t).unwrap();
-            }
+            t = preProcessException(t);

Review comment:
       nit: move below the block comment

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationTest.java
##########
@@ -95,31 +90,6 @@ public void close() throws Exception {
         public void processElement(StreamRecord<String> element) throws 
Exception {}
     }
 
-    @Test
-    public void testCancellationWaitsForActiveTimers() throws Exception {
-        StreamTaskWithBlockingTimer.reset();
-        ResultPartitionDeploymentDescriptor descriptor =
-                new ResultPartitionDeploymentDescriptor(
-                        PartitionDescriptorBuilder.newBuilder().build(),
-                        
NettyShuffleDescriptorBuilder.newBuilder().buildLocal(),
-                        1,
-                        false);
-        Task task =
-                new TestTaskBuilder(new 
NettyShuffleEnvironmentBuilder().build())
-                        .setInvokable(StreamTaskWithBlockingTimer.class)
-                        .setResultPartitions(singletonList(descriptor))
-                        .build();
-        task.startTaskThread();
-
-        StreamTaskWithBlockingTimer.timerStarted.join();

Review comment:
       `StreamTaskWithBlockingTimer` is now unused.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to