[ https://issues.apache.org/jira/browse/KAFKA-17697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax resolved KAFKA-17697. ------------------------------------- Fix Version/s: 4.0.0 Resolution: Fixed > Fix flaky DefaultStateUpdaterTest.shouldRestoreSingleActiveStatefulTask > ----------------------------------------------------------------------- > > Key: KAFKA-17697 > URL: https://issues.apache.org/jira/browse/KAFKA-17697 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests > Reporter: Yu-Lin Chen > Assignee: Bruno Cadonna > Priority: Major > Fix For: 4.0.0 > > Attachments: 0001-reproduce-the-racing-issue.patch > > > 28 flaky out of 253 trunk build in the past 28 days. (github) ([Report > Link|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=1727973081200&search.startTimeMin=1725552000000&search.tags=trunk,github&search.timeZoneId=Asia%2FTaipei&tests.container=org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest&tests.test=shouldRestoreSingleActiveStatefulTask()]) > The issue can be reproduced in my local env within 20 loops. Can also be > reproduced by the attached patch: [^0001-reproduce-the-racing-issue.patch] > ([Oct 2 2024 at 05:39:43 > CST|https://ge.apache.org/s/5gsvq5esvbouc/tests/task/:streams:test/details/org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest/shouldRestoreSingleActiveStatefulTask()?expanded-stacktrace=WyIwIl0&top-execution=1]) > {code:java} > org.opentest4j.AssertionFailedError: Condition not met within timeout 30000. > Did not get all restored active task within the given timeout! ==> expected: > <true> but was: <false> > at > org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > > at > org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > > at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:396) > > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:444) > > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:393) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:367) > at > org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.verifyRestoredActiveTasks(DefaultStateUpdaterTest.java:1715) > > at > org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldRestoreSingleActiveStatefulTask(DefaultStateUpdaterTest.java:340) > > at java.lang.reflect.Method.invoke(Method.java:566) > at java.util.ArrayList.forEach(ArrayList.java:1541) > at java.util.ArrayList.forEach(ArrayList.java:1541) > {code} > {*}Root Casue{*}: Racing between below two threads > 1. stateUpdater.add(task) in test thread [1] > 2. runOnce() in StateUpdaterThread loops [2] > In below scenario, the StateUpdaterThread hang even if there have > updatingTasks. > {*}Flaky scenario{*}: If stateUpdater.add(task) ran behind the first > runOnce() loop, the second loop will hang. [3][4]. The allWorkDone() in the > second loop of runOnce() will be true[5], even if updatingTasks.isEmpty() is > false. [6] > Below is the flow of the flaky scenario: > # runOnce() loop 1: completedChangelogs() return emptySet, > # runOnce() loop 1: allChangelogsCompleted() return false, updatingTasks is > empty, allWorkDone() is false. {color:#de350b}Called > tasksAndActionsCondition.await(){color}. (Will be notify by > stateUpdater.add(task) [1][7]) > # test thread call stateUpdater.add(task) > # runOnce() loop 1: allChangelogsCompleted() return false again before quit > the while loop. allWorkDone() is false because tasksAndActions is not empty. > [8] > # runOnce() loop 2: completedChangelogs() return 1 topic partition > # runOnce() loop 2: allChangelogsCompleted() return true, allWorkDone() is > true, {color:#de350b}call tasksAndActionsCondition.await() again{color} and > never be notified. > > The happy path is: (stateUpdater.add(task) ran before the end of first > runOnce() loop) > # runOnce() loop 1: completedChangelogs() return emptySet, > # runOnce() loop 1: allChangelogsCompleted() return false, updatingTasks is > not empty, allWorkDone() is false > # runOnce() loop 2: completedChangelogs() return 1 topic partition, > # runOnce() loop 2: allChangelogsCompleted() return false, updatingTasks is > not empty, allWorkDone() is false > # runOnce() loop 3: completedChangelogs() return 2 topic partition, > {color:#4c9aff}+move the task to restoredActiveTasks+{color} [9] > # runOnce() loop 3: allChangelogsCompleted() return true, allWorkDone() is > true, {color:#4c9aff}call tasksAndActionsCondition.await() {color} > > [1] > [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java#L338] > [2] > [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L177-L198] > [3] > [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L191] > [4] > [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L435] > [5] > [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L433] > [6] > [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L447] > [7] > [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L837] > [8] > [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L451] > [9] > [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L675] -- This message was sent by Atlassian Jira (v8.20.10#820010)