[
https://issues.apache.org/jira/browse/KAFKA-17697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yu-Lin Chen resolved KAFKA-17697.
---------------------------------
Resolution: Duplicate
Close it as it was fixed by another PR:
https://github.com/apache/kafka/pull/16562
> Fix flaky DefaultStateUpdaterTest.shouldRestoreSingleActiveStatefulTask
> -----------------------------------------------------------------------
>
> Key: KAFKA-17697
> URL: https://issues.apache.org/jira/browse/KAFKA-17697
> Project: Kafka
> Issue Type: Bug
> Components: streams, unit tests
> Reporter: Yu-Lin Chen
> Assignee: Yu-Lin Chen
> Priority: Major
> Attachments: 0001-reproduce-the-racing-issue.patch
>
>
> 28 flaky out of 253 trunk build in the past 28 days. (github) ([Report
> Link|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=1727973081200&search.startTimeMin=1725552000000&search.tags=trunk,github&search.timeZoneId=Asia%2FTaipei&tests.container=org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest&tests.test=shouldRestoreSingleActiveStatefulTask()])
> The issue can be reproduced in my local env within 20 loops. Can also be
> reproduced by the attached patch: [^0001-reproduce-the-racing-issue.patch]
> ([Oct 2 2024 at 05:39:43
> CST|https://ge.apache.org/s/5gsvq5esvbouc/tests/task/:streams:test/details/org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest/shouldRestoreSingleActiveStatefulTask()?expanded-stacktrace=WyIwIl0&top-execution=1])
> {code:java}
> org.opentest4j.AssertionFailedError: Condition not met within timeout 30000.
> Did not get all restored active task within the given timeout! ==> expected:
> <true> but was: <false>
> at
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>
> at
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>
> at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
> at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
> at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)
> at
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:396)
>
> at
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:444)
>
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:393)
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377)
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:367)
> at
> org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.verifyRestoredActiveTasks(DefaultStateUpdaterTest.java:1715)
>
> at
> org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldRestoreSingleActiveStatefulTask(DefaultStateUpdaterTest.java:340)
>
> at java.lang.reflect.Method.invoke(Method.java:566)
> at java.util.ArrayList.forEach(ArrayList.java:1541)
> at java.util.ArrayList.forEach(ArrayList.java:1541)
> {code}
> {*}Root Casue{*}: Racing between below two threads
> 1. stateUpdater.add(task) in test thread [1]
> 2. runOnce() in StateUpdaterThread loops [2]
> In below scenario, the StateUpdaterThread hang even if there have
> updatingTasks.
> {*}Flaky scenario{*}: If stateUpdater.add(task) ran behind the first
> runOnce() loop, the second loop will hang. [3][4]. The allWorkDone() in the
> second loop of runOnce() will be true[5], even if updatingTasks.isEmpty() is
> false. [6]
> Below is the flow of the flaky scenario:
> # runOnce() loop 1: completedChangelogs() return emptySet,
> # runOnce() loop 1: allChangelogsCompleted() return false, updatingTasks is
> empty, allWorkDone() is false. {color:#de350b}Called
> tasksAndActionsCondition.await(){color}. (Will be notify by
> stateUpdater.add(task) [1][7])
> # test thread call stateUpdater.add(task)
> # runOnce() loop 1: allChangelogsCompleted() return false again before quit
> the while loop. allWorkDone() is false because tasksAndActions is not empty.
> [8]
> # runOnce() loop 2: completedChangelogs() return 1 topic partition
> # runOnce() loop 2: allChangelogsCompleted() return true, allWorkDone() is
> true, {color:#de350b}call tasksAndActionsCondition.await() again{color} and
> never be notified.
>
> The happy path is: (stateUpdater.add(task) ran before the end of first
> runOnce() loop)
> # runOnce() loop 1: completedChangelogs() return emptySet,
> # runOnce() loop 1: allChangelogsCompleted() return false, updatingTasks is
> not empty, allWorkDone() is false
> # runOnce() loop 2: completedChangelogs() return 1 topic partition,
> # runOnce() loop 2: allChangelogsCompleted() return false, updatingTasks is
> not empty, allWorkDone() is false
> # runOnce() loop 3: completedChangelogs() return 2 topic partition,
> {color:#4c9aff}+move the task to restoredActiveTasks+{color} [9]
> # runOnce() loop 3: allChangelogsCompleted() return true, allWorkDone() is
> true, {color:#4c9aff}call tasksAndActionsCondition.await() {color}
>
> [1]
> [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java#L338]
> [2]
> [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L177-L198]
> [3]
> [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L191]
> [4]
> [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L435]
> [5]
> [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L433]
> [6]
> [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L447]
> [7]
> [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L837]
> [8]
> [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L451]
> [9]
> [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L675]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)