Ao Li created KAFKA-17162:
-----------------------------

             Summary: DefaultTaskManagerTest may leak AwaitingRunnable thread
                 Key: KAFKA-17162
                 URL: https://issues.apache.org/jira/browse/KAFKA-17162
             Project: Kafka
          Issue Type: Bug
          Components: streams, unit tests
    Affects Versions: 3.9.0
            Reporter: Ao Li


The `DefaultTaskManagerTest#shouldReturnFromAwaitOnInterruption` will fail with 
the following patch:

{code}
```
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
index 5d2db3c279..b87a82b85b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
@@ -348,6 +348,10 @@ public class DefaultTaskManager implements TaskManager {
     }
 
     private <T> T returnWithTasksLocked(final Supplier<T> action) {
+        try {
+            Thread.sleep(1000);
+        } catch (final Exception e) {
+        }
         tasksLock.lock();
         try {
             return action.get();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
index 98065eae7d..0d8dde7156 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
@@ -114,6 +114,10 @@ public class DefaultTaskManagerTest {
         @Override
         public void run() {
             while (!shutdownRequested.get()) {
+                try {
+                    Thread.sleep(1000);
+                } catch (final Exception e) {
+                }
                 try {
                     taskManager.awaitProcessableTasks();
                 } catch (final InterruptedException ignored) {
@@ -151,6 +155,8 @@ public class DefaultTaskManagerTest {
         assertTrue(awaitingRunnable.awaitDone.await(VERIFICATION_TIMEOUT, 
TimeUnit.MILLISECONDS));
 
         awaitingRunnable.shutdown();
+        Thread.sleep(5000);
+        assertFalse(awaitingThread.isAlive());
     }
 
     @Test
```
{code}

Because awatingThread is left unclosed because it was waiting for the signal

{code}
"Thread-3" #25 [26371] prio=5 os_prio=31 cpu=9.68ms elapsed=74.89s 
tid=0x00000001250d8600 nid=26371 waiting on condition  [0x0000000173d4e000]
   java.lang.Thread.State: WAITING (parking)
        at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
        - parking to wait for  <0x00000007dcd49b88> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at 
java.util.concurrent.locks.LockSupport.park(java.base@21.0.2/LockSupport.java:371)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionNode.block(java.base@21.0.2/AbstractQueuedSynchronizer.java:519)
        at 
java.util.concurrent.ForkJoinPool.unmanagedBlock(java.base@21.0.2/ForkJoinPool.java:3780)
        at 
java.util.concurrent.ForkJoinPool.managedBlock(java.base@21.0.2/ForkJoinPool.java:3725)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(java.base@21.0.2/AbstractQueuedSynchronizer.java:1707)
        at 
org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager.lambda$awaitProcessableTasks$1(DefaultTaskManager.java:142)
        at 
org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager$$Lambda/0x0000007001305428.get(Unknown
 Source)
        at 
org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager.returnWithTasksLocked(DefaultTaskManager.java:357)
        at 
org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager.awaitProcessableTasks(DefaultTaskManager.java:129)
        at 
org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManagerTest$AwaitingRunnable.run(DefaultTaskManagerTest.java:122)
        at java.lang.Thread.runWith(java.base@21.0.2/Thread.java:1596)
        at java.lang.Thread.run(java.base@21.0.2/Thread.java:1583)
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to