[ https://issues.apache.org/jira/browse/KAFKA-17515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17881175#comment-17881175 ]
Yu-Lin Chen edited comment on KAFKA-17515 at 9/12/24 5:44 AM: -------------------------------------------------------------- I found two issues in the flaky tests: # The local state is purged after kafkaStreams.close() timeout (Current timeout = 5 sec). This is the root cause of the [flaky test|https://ge.apache.org/s/havqcr7zu2tbk/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?expanded-stacktrace=WyIwIl0&page=eyJvdXRwdXQiOnsiMCI6MSwiMSI6Mn19&top-execution=1] that occured on Sep 10 2024 16:55:51 CST. ([Code Link|https://github.com/apache/kafka/blob/a1f28570afb9499fb0cdb9b2928afb50a8a489ff/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java#L582-L583]) # Some partition tasks assigned to first kafkaStream(ks-1), are reassigned to second KafkaStream(ks-2) after rebalancing. However, the reassigned tasks are not started, so it’s won’t enter the suspend state. This is the root cause of the [flak test|https://ge.apache.org/s/hdpapdbvngcts/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=1] that occured on Sep 10 2024 11:44:34 CST. Analysis resutl from flaky logs: (TL;DR; ) For flaky #1, log shows: 2024-09-10 04:35:{*}14,227{*} [ks1-StreamThread-1] Informed to shut down 2024-09-10 04:35:{*}19,349{*} [ks1-StreamThread-1] Shutdown completed -> It took 5.022 secs -> the retry test took 2.429 secs -> My local env took 0.2 secs For the failed flaky test #2, Below is the time flow of the second flaky test: time 1: ks-1 (Assigned 0_0, 0_1, 0_2, 0_3, 0_4) time 2: ks-1 (0_4 restore started) time 3: ks-1 (0_2 restore started) time 4: ks-1 (0_0 restore started) time 5: ks-2 (After rebalancing, assigned 0_1, 0_3), those test haven't started on ks-1 time 6: ks-2 (0_1 restore started) time 7: ks-2 (0_3 restore started) In this case: `kafkaStreams1StateRestoreListener.awaitUntilRestorationSuspends()` will never be true since no "Suspended from RUNNING" for task 0_1 and 0_3 in ks-1. The log of sucessful run in flaky #2 time 1: ks-1 (Assigned 0_0, 0_1, 0_2, 0_3, 0_4) time 2: ks-1 (0_1 restore started) time 3: ks-1 (0_3 restore started) time 4: ks-1 (0_4 restore started) time 5: ks-1 (0_2 restore started) time 6: ks-1 (0_0 restore started) time 7: ks-2 (After rebalancing, assigned 0_0, 0_2, 0_4) time 8: ks-1 (task 0_4 suspended from RESTORING) time 9: ks-1 (task 0_2 suspended from RESTORING) time 10: ks-1 (task 0_0 suspended from RESTORING) time 11: ks-2 (0_4 restore started) time 12: ks-2 (0_2 restore started) time 13: ks-2 (0_0 restore started) kafkaStreams1StateRestoreListener.awaitUntilRestorationSuspends() passed at time 8. kafkaStreams2StateRestoreListener.awaitUntilRestorationStarts() passed at time 11. [~chia7712] If you're not workign on this, I could submit a PR to fix the known issue. was (Author: yu-lin chen): I found two issues in the flaky tests: # The local state is purged after kafkaStreams.close() timeout (Current timeout = 5 sec). This is the root cause of the [flaky test|https://ge.apache.org/s/havqcr7zu2tbk/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?expanded-stacktrace=WyIwIl0&page=eyJvdXRwdXQiOnsiMCI6MSwiMSI6Mn19&top-execution=1] that occured on Sep 10 2024 16:55:51 CST. # Some partition tasks assigned to first kafkaStream(ks-1), are reassigned to second KafkaStream(ks-2) after rebalancing. However, the reassigned tasks are not started, so it’s won’t enter the suspend state. This is the root cause of the [flak test|https://ge.apache.org/s/hdpapdbvngcts/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=1] that occured on Sep 10 2024 11:44:34 CST. Analysis resutl from flaky logs: (TL;DR; ) For flaky #1, it take 2024-09-10 04:35:14,227 [ks1-StreamThread-1] Informed to shut down 2024-09-10 04:35:19,349 [ks1-StreamThread-1] Shutdown completed -> It took 5.022 secs -> the retry test took 2.429 secs -> My local env took 0.2 secs For the failed flaky test #2, Below is the time flow of the second flaky test: time 1: ks-1 (Assigned 0_0, 0_1, 0_2, 0_3, 0_4) time 2: ks-1 (0_4 restore started) time 3: ks-1 (0_2 restore started) time 4: ks-1 (0_0 restore started) time 5: ks-2 (After rebalancing, assigned 0_1, 0_3), those test haven't started on ks-1 time 6: ks-2 (0_1 restore started) time 7: ks-2 (0_3 restore started) In this case: `kafkaStreams1StateRestoreListener.awaitUntilRestorationSuspends()` will never be true since no "Suspended from RUNNING" for task 0_1 and 0_3 in ks-1. The log of sucessful run in flaky #2 time 1: ks-1 (Assigned 0_0, 0_1, 0_2, 0_3, 0_4) time 2: ks-1 (0_1 restore started) time 3: ks-1 (0_3 restore started) time 4: ks-1 (0_4 restore started) time 5: ks-1 (0_2 restore started) time 6: ks-1 (0_0 restore started) time 7: ks-2 (After rebalancing, assigned 0_0, 0_2, 0_4) time 8: ks-1 (task 0_4 suspended from RESTORING) time 9: ks-1 (task 0_2 suspended from RESTORING) time 10: ks-1 (task 0_0 suspended from RESTORING) time 11: ks-2 (0_4 restore started) time 12: ks-2 (0_2 restore started) time 13: ks-2 (0_0 restore started) kafkaStreams1StateRestoreListener.awaitUntilRestorationSuspends() passed at time 8. kafkaStreams2StateRestoreListener.awaitUntilRestorationStarts() passed at time 11. [~chia7712] If you're not workign on this, I could submit a PR to fix the known issue. > Fix flaky > RestoreIntegrationTest.shouldInvokeUserDefinedGlobalStateRestoreListener > ---------------------------------------------------------------------------------- > > Key: KAFKA-17515 > URL: https://issues.apache.org/jira/browse/KAFKA-17515 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests > Reporter: Chia-Ping Tsai > Assignee: Chia-Ping Tsai > Priority: Major > > {code:java} > Stacktrace > java.nio.file.DirectoryNotEmptyException: > /tmp/shouldInvokeUserDefinedGlobalStateRestoreListenerH0u0n9foRY_peZu4FqeGHQ10111145955704739924-ks1/shouldInvokeUserDefinedGlobalStateRestoreListenerH0u0n9foRY_peZu4FqeGHQ/0_0 > at > java.base/sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:289) > at > java.base/sun.nio.fs.AbstractFileSystemProvider.deleteIfExists(AbstractFileSystemProvider.java:109) > at java.base/java.nio.file.Files.deleteIfExists(Files.java:1191) > at > org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:898) > at > org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:870) > at java.base/java.nio.file.Files.walkFileTree(Files.java:2803) > at java.base/java.nio.file.Files.walkFileTree(Files.java:2857) > at org.apache.kafka.common.utils.Utils.delete(Utils.java:870) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState(IntegrationTestUtils.java:266) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState(IntegrationTestUtils.java:278) > at > org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldInvokeUserDefinedGlobalStateRestoreListener(RestoreIntegrationTest.java:583) > at java.base/java.lang.reflect.Method.invoke(Method.java:580) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1596) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1596) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)