[ 
https://issues.apache.org/jira/browse/KAFKA-17697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu-Lin Chen updated KAFKA-17697:
--------------------------------
    Description: 
28 flaky out of 253 trunk build in the past 28 days. (github) ([Report 
Link|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=1727973081200&search.startTimeMin=1725552000000&search.tags=trunk,github&search.timeZoneId=Asia%2FTaipei&tests.container=org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest&tests.test=shouldRestoreSingleActiveStatefulTask()])

The issue can be reproduced in my local env within 20 loops. Can also be 
reproduced by the attached patch:  [^0001-reproduce-the-racing-issue.patch] 

 ([Oct 2 2024 at 05:39:43 
CST|https://ge.apache.org/s/5gsvq5esvbouc/tests/task/:streams:test/details/org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest/shouldRestoreSingleActiveStatefulTask()?expanded-stacktrace=WyIwIl0&top-execution=1])
{code:java}
org.opentest4j.AssertionFailedError: Condition not met within timeout 30000. 
Did not get all restored active task within the given timeout! ==> expected: 
<true> but was: <false>       

at 
org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
        
at 
org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
        
at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)     
at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)      
at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)     
at 
org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:396)   
     
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:444) 
     
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:393) 
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377) 
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:367) 
at 
org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.verifyRestoredActiveTasks(DefaultStateUpdaterTest.java:1715)
    
at 
org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldRestoreSingleActiveStatefulTask(DefaultStateUpdaterTest.java:340)
 
at java.lang.reflect.Method.invoke(Method.java:566)     
at java.util.ArrayList.forEach(ArrayList.java:1541)     
at java.util.ArrayList.forEach(ArrayList.java:1541)
{code}
{*}Root Casue{*}: Racing between below two threads
1. stateUpdater.add(task) in test thread [1]
2. runOnce() in StateUpdaterThread loops [2]

In below scenario, the StateUpdaterThread hang even if there have updatingTasks.

{*}Flaky scenario{*}: If stateUpdater.add(task) ran behind the first runOnce() 
loop, the second loop will hang. [3][4]. The allWorkDone() after second loop 
will always be true[5], even if updatingTasks.isEmpty() is false. [6]

Below is the flaky flow of above situation:
 # runOnce() loop 1: completedChangelogs() return emptySet,
 # runOnce() loop 1: allChangelogsCompleted() return false, updatingTasks is 
empty, allWorkDone() is false. Called tasksAndActionsCondition.await(). (Will 
be notify by stateUpdater.add(task) [1][7])
 # test thread call stateUpdater.add(task)
 # runOnce() loop 1: allChangelogsCompleted() return false again before quit 
the while loop. [8] ( tasksAndActions is not empty )
 # runOnce() loop 2: completedChangelogs() return 1 topic partition
 # runOnce() loop 2: allChangelogsCompleted() return true, {color:#DE350B}call 
tasksAndActionsCondition.await() again and 
 *never be notified*{color}.

 
The happy path is: (stateUpdater.add(task) ran before the end of first 
runOnce() loop)
 # runOnce() loop 1: completedChangelogs() return emptySet,
 # runOnce() loop 1: allChangelogsCompleted() return false, updatingTasks is 
not empty, allWorkDone() is false
 # runOnce() loop 2: completedChangelogs() return 1 topic partition,
 # runOnce() loop 2: allChangelogsCompleted() return false, updatingTasks is 
not empty, allWorkDone() is false
 # runOnce() loop 3: completedChangelogs() return 2 topic partition, 
{color:#4C9AFF}*+move the task to restoredActiveTasks+*{color} [9]
 # runOnce() loop 3: allChangelogsCompleted() return true (Doesn't matter)

 

[1] 
[https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java#L338]
[2] 
[https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L177-L198]
[3] 
[https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L191]
[4] 
[https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L435]
[5] 
[https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L433]
[6] 
[https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L447]
[7] 
https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L837
[8] 
https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L451
[9][https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L675]

  was:
28 flaky out of 253 trunk build in the past 28 days. (github) ([Report 
Link|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=1727973081200&search.startTimeMin=1725552000000&search.tags=trunk,github&search.timeZoneId=Asia%2FTaipei&tests.container=org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest&tests.test=shouldRestoreSingleActiveStatefulTask()])

The issue can be reproduced in my local env within 20 loops. Can also be 
reproduced by the attached patch:  [^0001-reproduce-the-racing-issue.patch] 

 ([Oct 2 2024 at 05:39:43 
CST|https://ge.apache.org/s/5gsvq5esvbouc/tests/task/:streams:test/details/org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest/shouldRestoreSingleActiveStatefulTask()?expanded-stacktrace=WyIwIl0&top-execution=1])
{code:java}
org.opentest4j.AssertionFailedError: Condition not met within timeout 30000. 
Did not get all restored active task within the given timeout! ==> expected: 
<true> but was: <false>       

at 
org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
        
at 
org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
        
at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)     
at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)      
at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)     
at 
org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:396)   
     
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:444) 
     
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:393) 
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377) 
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:367) 
at 
org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.verifyRestoredActiveTasks(DefaultStateUpdaterTest.java:1715)
    
at 
org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldRestoreSingleActiveStatefulTask(DefaultStateUpdaterTest.java:340)
 
at java.lang.reflect.Method.invoke(Method.java:566)     
at java.util.ArrayList.forEach(ArrayList.java:1541)     
at java.util.ArrayList.forEach(ArrayList.java:1541)
{code}
{*}Root Casue{*}: Racing between below two threads
1. stateUpdater.add(task) in test thread [1]
2. runOnce() in StateUpdaterThread loops [2]

In below scenario, the StateUpdaterThread hang even if there have updatingTasks.

{*}Flaky scenario{*}: If stateUpdater.add(task) ran behind the first runOnce() 
loop, the second loop will hang. [3][4]. The allWorkDone() will always be 
true[5], even if updatingTasks.isEmpty() is false. [6]

Below is the flaky flow of above situation:
 # runOnce() loop 1: completedChangelogs() return emptySet,
 # runOnce() loop 1: allChangelogsCompleted() return false, updatingTasks is 
empty, allWorkDone() is false. Called tasksAndActionsCondition.await(). (Will 
be notify by stateUpdater.add(task) [1][7])
 # runOnce() loop 1: allChangelogsCompleted() return false again before quit 
the while loop. [8]
 # runOnce() loop 2: completedChangelogs() return 1 topic partition
 # runOnce() loop 2: allChangelogsCompleted() return true, call 
tasksAndActionsCondition.await() again and never be notified.

 
The happy path is: (stateUpdater.add(task) ran before the end of first 
runOnce() loop)
 # runOnce() loop 1: completedChangelogs() return emptySet,
 # runOnce() loop 1: allChangelogsCompleted() return false, updatingTasks is 
not empty, allWorkDone() is false
 # runOnce() loop 2: completedChangelogs() return 1 topic partition,
 # runOnce() loop 2: allChangelogsCompleted() return false, updatingTasks is 
not empty, allWorkDone() is false
 # runOnce() loop 3: completedChangelogs() return 2 topic partition, +move the 
task to restoredActiveTasks+ [10]
 # runOnce() loop 3: allChangelogsCompleted() return true (Doesn't matter)

 

[1] 
[https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java#L338]
[2] 
[https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L177-L198]
[3] 
[https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L191]
[4] 
[https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L435]
[5] 
[https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L433]
[6] 
[https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L447]
[7] 
[https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L433-L436]
[8] 
https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L451
[9] 
[https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L837]
[10] 
[https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L675]


> Fix flaky DefaultStateUpdaterTest.shouldRestoreSingleActiveStatefulTask
> -----------------------------------------------------------------------
>
>                 Key: KAFKA-17697
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17697
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Yu-Lin Chen
>            Assignee: Yu-Lin Chen
>            Priority: Major
>         Attachments: 0001-reproduce-the-racing-issue.patch
>
>
> 28 flaky out of 253 trunk build in the past 28 days. (github) ([Report 
> Link|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=1727973081200&search.startTimeMin=1725552000000&search.tags=trunk,github&search.timeZoneId=Asia%2FTaipei&tests.container=org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest&tests.test=shouldRestoreSingleActiveStatefulTask()])
> The issue can be reproduced in my local env within 20 loops. Can also be 
> reproduced by the attached patch:  [^0001-reproduce-the-racing-issue.patch] 
>  ([Oct 2 2024 at 05:39:43 
> CST|https://ge.apache.org/s/5gsvq5esvbouc/tests/task/:streams:test/details/org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest/shouldRestoreSingleActiveStatefulTask()?expanded-stacktrace=WyIwIl0&top-execution=1])
> {code:java}
> org.opentest4j.AssertionFailedError: Condition not met within timeout 30000. 
> Did not get all restored active task within the given timeout! ==> expected: 
> <true> but was: <false>     
> at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>       
> at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>       
> at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)   
> at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)    
> at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)   
> at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:396) 
>      
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:444)
>     
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:393)       
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377)       
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:367)       
> at 
> org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.verifyRestoredActiveTasks(DefaultStateUpdaterTest.java:1715)
>   
> at 
> org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldRestoreSingleActiveStatefulTask(DefaultStateUpdaterTest.java:340)
>        
> at java.lang.reflect.Method.invoke(Method.java:566)   
> at java.util.ArrayList.forEach(ArrayList.java:1541)   
> at java.util.ArrayList.forEach(ArrayList.java:1541)
> {code}
> {*}Root Casue{*}: Racing between below two threads
> 1. stateUpdater.add(task) in test thread [1]
> 2. runOnce() in StateUpdaterThread loops [2]
> In below scenario, the StateUpdaterThread hang even if there have 
> updatingTasks.
> {*}Flaky scenario{*}: If stateUpdater.add(task) ran behind the first 
> runOnce() loop, the second loop will hang. [3][4]. The allWorkDone() after 
> second loop will always be true[5], even if updatingTasks.isEmpty() is false. 
> [6]
> Below is the flaky flow of above situation:
>  # runOnce() loop 1: completedChangelogs() return emptySet,
>  # runOnce() loop 1: allChangelogsCompleted() return false, updatingTasks is 
> empty, allWorkDone() is false. Called tasksAndActionsCondition.await(). (Will 
> be notify by stateUpdater.add(task) [1][7])
>  # test thread call stateUpdater.add(task)
>  # runOnce() loop 1: allChangelogsCompleted() return false again before quit 
> the while loop. [8] ( tasksAndActions is not empty )
>  # runOnce() loop 2: completedChangelogs() return 1 topic partition
>  # runOnce() loop 2: allChangelogsCompleted() return true, 
> {color:#DE350B}call tasksAndActionsCondition.await() again and 
>  *never be notified*{color}.
>  
> The happy path is: (stateUpdater.add(task) ran before the end of first 
> runOnce() loop)
>  # runOnce() loop 1: completedChangelogs() return emptySet,
>  # runOnce() loop 1: allChangelogsCompleted() return false, updatingTasks is 
> not empty, allWorkDone() is false
>  # runOnce() loop 2: completedChangelogs() return 1 topic partition,
>  # runOnce() loop 2: allChangelogsCompleted() return false, updatingTasks is 
> not empty, allWorkDone() is false
>  # runOnce() loop 3: completedChangelogs() return 2 topic partition, 
> {color:#4C9AFF}*+move the task to restoredActiveTasks+*{color} [9]
>  # runOnce() loop 3: allChangelogsCompleted() return true (Doesn't matter)
>  
> [1] 
> [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java#L338]
> [2] 
> [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L177-L198]
> [3] 
> [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L191]
> [4] 
> [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L435]
> [5] 
> [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L433]
> [6] 
> [https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L447]
> [7] 
> https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L837
> [8] 
> https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L451
> [9][https://github.com/apache/kafka/blob/0edf5dbd204df9eb62bfea1b56993e95737df5a3/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L675]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to