[ 
https://issues.apache.org/jira/browse/KAFKA-17515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17881175#comment-17881175
 ] 

Yu-Lin Chen edited comment on KAFKA-17515 at 9/12/24 5:44 AM:
--------------------------------------------------------------

I found two issues in the flaky tests:
 # The local state is purged after kafkaStreams.close() timeout (Current 
timeout = 5 sec). This is the root cause of the [flaky 
test|https://ge.apache.org/s/havqcr7zu2tbk/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?expanded-stacktrace=WyIwIl0&page=eyJvdXRwdXQiOnsiMCI6MSwiMSI6Mn19&top-execution=1]
 that occured on Sep 10 2024 16:55:51 CST. ([Code 
Link|https://github.com/apache/kafka/blob/a1f28570afb9499fb0cdb9b2928afb50a8a489ff/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java#L582-L583])
 # Some partition tasks assigned to first kafkaStream(ks-1), are reassigned to 
second KafkaStream(ks-2) after rebalancing. However, the reassigned tasks are 
not started, so it’s won’t enter the suspend state. This is the root cause of 
the [flak 
test|https://ge.apache.org/s/hdpapdbvngcts/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=1]
 that occured on Sep 10 2024 11:44:34 CST.

 
Analysis resutl from flaky logs: (TL;DR; )

For flaky #1, log shows:
2024-09-10 04:35:{*}14,227{*} [ks1-StreamThread-1] Informed to shut down
2024-09-10 04:35:{*}19,349{*} [ks1-StreamThread-1] Shutdown completed
-> It took 5.022 secs
-> the retry test took 2.429 secs
-> My local env took 0.2 secs

For the failed flaky test #2,
Below is the time flow of the second flaky test:
time 1: ks-1 (Assigned 0_0, 0_1, 0_2, 0_3, 0_4)
time 2: ks-1 (0_4 restore started)
time 3: ks-1 (0_2 restore started)
time 4: ks-1 (0_0 restore started)
time 5: ks-2 (After rebalancing, assigned 0_1, 0_3), those test haven't started 
on ks-1
time 6: ks-2 (0_1 restore started)
time 7: ks-2 (0_3 restore started)

In this case: 
`kafkaStreams1StateRestoreListener.awaitUntilRestorationSuspends()` will never 
be true since no "Suspended from RUNNING" for task 0_1 and 0_3 in ks-1.

The log of sucessful run in flaky #2

time 1: ks-1 (Assigned 0_0, 0_1, 0_2, 0_3, 0_4)
time 2: ks-1 (0_1 restore started)
time 3: ks-1 (0_3 restore started)
time 4: ks-1 (0_4 restore started)
time 5: ks-1 (0_2 restore started)
time 6: ks-1 (0_0 restore started)
time 7: ks-2 (After rebalancing, assigned 0_0, 0_2, 0_4)
time 8: ks-1 (task 0_4 suspended from RESTORING) 
time 9: ks-1 (task 0_2 suspended from RESTORING)
time 10: ks-1 (task 0_0 suspended from RESTORING)
time 11: ks-2 (0_4 restore started)
time 12: ks-2 (0_2 restore started)
time 13: ks-2 (0_0 restore started)

kafkaStreams1StateRestoreListener.awaitUntilRestorationSuspends() passed at 
time 8. 
kafkaStreams2StateRestoreListener.awaitUntilRestorationStarts() passed at time 
11.
 
[~chia7712] If you're not workign on this, I could submit a PR to fix the known 
issue.


was (Author: yu-lin chen):
I found two issues in the flaky tests:
 # The local state is purged after kafkaStreams.close() timeout (Current 
timeout = 5 sec). This is the root cause of the [flaky 
test|https://ge.apache.org/s/havqcr7zu2tbk/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?expanded-stacktrace=WyIwIl0&page=eyJvdXRwdXQiOnsiMCI6MSwiMSI6Mn19&top-execution=1]
 that occured on Sep 10 2024 16:55:51 CST.
 # Some partition tasks assigned to first kafkaStream(ks-1), are reassigned to 
second KafkaStream(ks-2) after rebalancing. However, the reassigned tasks are 
not started, so it’s won’t enter the suspend state. This is the root cause of 
the [flak 
test|https://ge.apache.org/s/hdpapdbvngcts/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=1]
 that occured on Sep 10 2024 11:44:34 CST.

 
Analysis resutl from flaky logs: (TL;DR; )

For flaky #1, it take 
2024-09-10 04:35:14,227 [ks1-StreamThread-1] Informed to shut down
2024-09-10 04:35:19,349 [ks1-StreamThread-1] Shutdown completed
-> It took 5.022 secs
-> the retry test took 2.429 secs
-> My local env took 0.2 secs

For the failed flaky test #2,
Below is the time flow of the second flaky test:
time 1: ks-1 (Assigned 0_0, 0_1, 0_2, 0_3, 0_4)
time 2: ks-1 (0_4 restore started)
time 3: ks-1 (0_2 restore started)
time 4: ks-1 (0_0 restore started)
time 5: ks-2 (After rebalancing, assigned 0_1, 0_3), those test haven't started 
on ks-1
time 6: ks-2 (0_1 restore started)
time 7: ks-2 (0_3 restore started)

In this case: 
`kafkaStreams1StateRestoreListener.awaitUntilRestorationSuspends()` will never 
be true since no "Suspended from RUNNING" for task 0_1 and 0_3 in ks-1.

The log of sucessful run in flaky #2

time 1: ks-1 (Assigned 0_0, 0_1, 0_2, 0_3, 0_4)
time 2: ks-1 (0_1 restore started)
time 3: ks-1 (0_3 restore started)
time 4: ks-1 (0_4 restore started)
time 5: ks-1 (0_2 restore started)
time 6: ks-1 (0_0 restore started)
time 7: ks-2 (After rebalancing, assigned 0_0, 0_2, 0_4)
time 8: ks-1 (task 0_4 suspended from RESTORING) 
time 9: ks-1 (task 0_2 suspended from RESTORING)
time 10: ks-1 (task 0_0 suspended from RESTORING)
time 11: ks-2 (0_4 restore started)
time 12: ks-2 (0_2 restore started)
time 13: ks-2 (0_0 restore started)

kafkaStreams1StateRestoreListener.awaitUntilRestorationSuspends() passed at 
time 8. 
kafkaStreams2StateRestoreListener.awaitUntilRestorationStarts() passed at time 
11.
 
[~chia7712] If you're not workign on this, I could submit a PR to fix the known 
issue.

> Fix flaky 
> RestoreIntegrationTest.shouldInvokeUserDefinedGlobalStateRestoreListener
> ----------------------------------------------------------------------------------
>
>                 Key: KAFKA-17515
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17515
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams, unit tests
>            Reporter: Chia-Ping Tsai
>            Assignee: Chia-Ping Tsai
>            Priority: Major
>
> {code:java}
> Stacktrace
> java.nio.file.DirectoryNotEmptyException: 
> /tmp/shouldInvokeUserDefinedGlobalStateRestoreListenerH0u0n9foRY_peZu4FqeGHQ10111145955704739924-ks1/shouldInvokeUserDefinedGlobalStateRestoreListenerH0u0n9foRY_peZu4FqeGHQ/0_0
>       at 
> java.base/sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:289)
>       at 
> java.base/sun.nio.fs.AbstractFileSystemProvider.deleteIfExists(AbstractFileSystemProvider.java:109)
>       at java.base/java.nio.file.Files.deleteIfExists(Files.java:1191)
>       at 
> org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:898)
>       at 
> org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:870)
>       at java.base/java.nio.file.Files.walkFileTree(Files.java:2803)
>       at java.base/java.nio.file.Files.walkFileTree(Files.java:2857)
>       at org.apache.kafka.common.utils.Utils.delete(Utils.java:870)
>       at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState(IntegrationTestUtils.java:266)
>       at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState(IntegrationTestUtils.java:278)
>       at 
> org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldInvokeUserDefinedGlobalStateRestoreListener(RestoreIntegrationTest.java:583)
>       at java.base/java.lang.reflect.Method.invoke(Method.java:580)
>       at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
>       at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to