Ao Li created KAFKA-17162:
-----------------------------
Summary: DefaultTaskManagerTest may leak AwaitingRunnable thread
Key: KAFKA-17162
URL: https://issues.apache.org/jira/browse/KAFKA-17162
Project: Kafka
Issue Type: Bug
Components: streams, unit tests
Affects Versions: 3.9.0
Reporter: Ao Li
The `DefaultTaskManagerTest#shouldReturnFromAwaitOnInterruption` will fail with
the following patch:
{code}
```
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
index 5d2db3c279..b87a82b85b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
@@ -348,6 +348,10 @@ public class DefaultTaskManager implements TaskManager {
}
private <T> T returnWithTasksLocked(final Supplier<T> action) {
+ try {
+ Thread.sleep(1000);
+ } catch (final Exception e) {
+ }
tasksLock.lock();
try {
return action.get();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
index 98065eae7d..0d8dde7156 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
@@ -114,6 +114,10 @@ public class DefaultTaskManagerTest {
@Override
public void run() {
while (!shutdownRequested.get()) {
+ try {
+ Thread.sleep(1000);
+ } catch (final Exception e) {
+ }
try {
taskManager.awaitProcessableTasks();
} catch (final InterruptedException ignored) {
@@ -151,6 +155,8 @@ public class DefaultTaskManagerTest {
assertTrue(awaitingRunnable.awaitDone.await(VERIFICATION_TIMEOUT,
TimeUnit.MILLISECONDS));
awaitingRunnable.shutdown();
+ Thread.sleep(5000);
+ assertFalse(awaitingThread.isAlive());
}
@Test
```
{code}
Because awatingThread is left unclosed because it was waiting for the signal
{code}
"Thread-3" #25 [26371] prio=5 os_prio=31 cpu=9.68ms elapsed=74.89s
tid=0x00000001250d8600 nid=26371 waiting on condition [0x0000000173d4e000]
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
- parking to wait for <0x00000007dcd49b88> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at
java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:371)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionNode.block([email protected]/AbstractQueuedSynchronizer.java:519)
at
java.util.concurrent.ForkJoinPool.unmanagedBlock([email protected]/ForkJoinPool.java:3780)
at
java.util.concurrent.ForkJoinPool.managedBlock([email protected]/ForkJoinPool.java:3725)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await([email protected]/AbstractQueuedSynchronizer.java:1707)
at
org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager.lambda$awaitProcessableTasks$1(DefaultTaskManager.java:142)
at
org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager$$Lambda/0x0000007001305428.get(Unknown
Source)
at
org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager.returnWithTasksLocked(DefaultTaskManager.java:357)
at
org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager.awaitProcessableTasks(DefaultTaskManager.java:129)
at
org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManagerTest$AwaitingRunnable.run(DefaultTaskManagerTest.java:122)
at java.lang.Thread.runWith([email protected]/Thread.java:1596)
at java.lang.Thread.run([email protected]/Thread.java:1583)
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)