Yu-Lin Chen created KAFKA-17697: ----------------------------------- Summary: Fix flaky DefaultStateUpdaterTest.shouldRestoreSingleActiveStatefulTask Key: KAFKA-17697 URL: https://issues.apache.org/jira/browse/KAFKA-17697 Project: Kafka Issue Type: Bug Reporter: Yu-Lin Chen Assignee: Yu-Lin Chen Attachments: 0001-reproduce-the-racing-issue.patch
28 flaky out of 253 trunk build in the past 28 days. (github) ([Report Link|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=1727973081200&search.startTimeMin=1725552000000&search.tags=trunk,github&search.timeZoneId=Asia%2FTaipei&tests.container=org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest&tests.test=shouldRestoreSingleActiveStatefulTask()]) The issue can be reproduced in my local env within 20 loops. Can also be reproduced by the attached patch: ([Oct 2 2024 at 05:39:43 CST|https://ge.apache.org/s/5gsvq5esvbouc/tests/task/:streams:test/details/org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest/shouldRestoreSingleActiveStatefulTask()?expanded-stacktrace=WyIwIl0&top-execution=1]) {code:java} org.opentest4j.AssertionFailedError: Condition not met within timeout 30000. Did not get all restored active task within the given timeout! ==> expected: <true> but was: <false> at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214) at org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:396) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:444) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:393) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:367) at org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.verifyRestoredActiveTasks(DefaultStateUpdaterTest.java:1715) at org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldRestoreSingleActiveStatefulTask(DefaultStateUpdaterTest.java:340) at java.lang.reflect.Method.invoke(Method.java:566) at java.util.ArrayList.forEach(ArrayList.java:1541) at java.util.ArrayList.forEach(ArrayList.java:1541) {code} {*}Root Casue{*}: Racing between below two threads 1. stateUpdater.add(task) in test thread [1] 2. runOnce() in StateUpdaterThread loops [2] In below scenario, the StateUpdaterThread hang even if there have updatingTasks. {*}Flaky scenario{*}: If stateUpdater.add(task) ran behind the first runOnce() loop, the second loop will hang. [3][4]. The allWorkDone() will always be true[5], even if updatingTasks.isEmpty() is false. [6] Below is the flaky flow of above situation: # runOnce() loop 1: completedChangelogs() return emptySet, # runOnce() loop 1: allChangelogsCompleted() return false, updatingTasks is empty, allWorkDone() is false. Called tasksAndActionsCondition.await(). (Will be notify by stateUpdater.add(task) [1][7]) # runOnce() loop 1: allChangelogsCompleted() return false again before quit the while loop. [8] # runOnce() loop 2: completedChangelogs() return 1 topic partition # runOnce() loop 2: allChangelogsCompleted() return true, call tasksAndActionsCondition.await() again and never be notified. The happy path is: (stateUpdater.add(task) ran before the end of first runOnce() loop) # runOnce() loop 1: completedChangelogs() return emptySet, # runOnce() loop 1: allChangelogsCompleted() return false, updatingTasks is not empty, allWorkDone() is false # runOnce() loop 2: completedChangelogs() return 1 topic partition, # runOnce() loop 2: allChangelogsCompleted() return false, updatingTasks is not empty, allWorkDone() is false # runOnce() loop 3: completedChangelogs() return 2 topic partition, +move the task to restoredActiveTasks+ [10] # runOnce() loop 3: allChangelogsCompleted() return true (Doesn't matter) [1] [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java#L338] [2] [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L177-L198] [3] [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L191] [4] [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L435] [5] [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L433] [6] [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L447] [7] [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L433-L436] [8] https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L451 [9] [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L837] [10] [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L675] -- This message was sent by Atlassian Jira (v8.20.10#820010)