[ https://issues.apache.org/jira/browse/KAFKA-17515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17882672#comment-17882672 ]
Yu-Lin Chen edited comment on KAFKA-17515 at 9/18/24 11:14 AM: --------------------------------------------------------------- The another flaky was caused by long initialization of stream tasks. It took longer than 60 secs so no record was processed. ([flaky link|https://ge.apache.org/s/kzmyf3c3jo7fo/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=1]) Error: {code:java} org.opentest4j.AssertionFailedError: Condition not met within timeout 60000. Did not receive all [KeyValue(0, 0), KeyValue(1, 1), KeyValue(2, 2), KeyValue(3, 3), KeyValue(4, 4), KeyValue(5, 5), KeyValue(6, 6), KeyValue(7, 7), KeyValue(8, 8), KeyValue(9, 9), KeyValue(10, 10), KeyValue(11, 11), KeyValue(12, 12), KeyValue(13, 13), KeyValue(14, 14), KeyValue(15, 15), KeyValue(16, 16), KeyValue(17, 17), KeyValue(18, 18), KeyValue(19, 19), KeyValue(20, 20), KeyValue(21, 21), KeyValue(22, 22), KeyValue(23, 23), KeyValue(24, 24), KeyValue(25, 25), KeyValue(26, 26), KeyValue(27, 27), KeyValue(28, 28), KeyValue(29, 29), KeyValue(30, 30), KeyValue(31, 31), KeyValue(32, 32), KeyValue(33, 33), KeyValue(34, 34), KeyValue(35, 35), KeyValue(36, 36), KeyValue(37, 37), KeyValue(38, 38), KeyValue(39, 39), KeyValue(40, 40), KeyValue(41, 41), KeyValue(42, 42), KeyValue(43, 43), KeyValue(44, 44), KeyValue(45, 45), KeyValue(46, 46), KeyValue(47, 47), KeyValue(48, 48), KeyValue(49, 49), KeyValue(50, 50), KeyValue(51, 51), KeyValue(52, 52), KeyValue(53, 53), KeyValue(54, 54), KeyValue(55, 55), KeyValue(56, 56), KeyValue(57, 57), KeyValue(58, 58), KeyValue(59, 59), KeyValue(60, 60), KeyValue(61, 61), KeyValue(62, 62), KeyValue(63, 63), KeyValue(64, 64), KeyValue(65, 65), KeyValue(66, 66), KeyValue(67, 67), KeyValue(68, 68), KeyValue(69, 69), KeyValue(70, 70), KeyValue(71, 71), KeyValue(72, 72), KeyValue(73, 73), KeyValue(74, 74), KeyValue(75, 75), KeyValue(76, 76), KeyValue(77, 77), KeyValue(78, 78), KeyValue(79, 79), KeyValue(80, 80), KeyValue(81, 81), KeyValue(82, 82), KeyValue(83, 83), KeyValue(84, 84), KeyValue(85, 85), KeyValue(86, 86), KeyValue(87, 87), KeyValue(88, 88), KeyValue(89, 89), KeyValue(90, 90), KeyValue(91, 91), KeyValue(92, 92), KeyValue(93, 93), KeyValue(94, 94), KeyValue(95, 95), KeyValue(96, 96), KeyValue(97, 97), KeyValue(98, 98), KeyValue(99, 99)] records from topic outputTopic (got []) ==> expected: <true> but was: <false> {code} Logs: {code:java} [2024-09-12 18:50:29,644] INFO ... ks1-StreamThread-1] State transition from CREATED to STARTING [2024-09-12 18:50:38,263] INFO ... ks1-StreamThread-1] State transition from STARTING to PARTITIONS_ASSIGNED [2024-09-12 18:50:48,813] INFO ... ks1-StreamThread-1] task [0_3] Initialized (org.apache.kafka.streams.processor.internals.StreamTask:277) [2024-09-12 18:51:02,038] INFO ... ks1-StreamThread-1] task [0_0] Initialized (org.apache.kafka.streams.processor.internals.StreamTask:277) [2024-09-12 18:51:14,316] INFO ... ks1-StreamThread-1] task [0_4] Initialized (org.apache.kafka.streams.processor.internals.StreamTask:277) [2024-09-12 18:51:25,080] INFO ... ks1-StreamThread-1] task [0_1] Initialized (org.apache.kafka.streams.processor.internals.StreamTask:277) [2024-09-12 18:51:37,862] INFO ... ks1-StreamThread-1] task [0_2] Initialized (org.apache.kafka.streams.processor.internals.StreamTask:277) {code} In the flaky test run, each step of initial a stream task took the following amount of time for task initialization: # 8.619 sec for rebalancing # 10.550 sec for initializing task [0_3] # 13.225 sec for initializing task [0_0] # 12.278 sec for initializing task [0_4] # 10.764 sec for initializing task [0_1] # 12.782 sec for initializing task [0_2] It took longer than 60 seconds. But in most of the time, it can be done less than 10 secs. The followings are the total tasks initialization time in other flaky test: # Failed Execution ([Link|https://ge.apache.org/s/kzmyf3c3jo7fo/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=1]): 68.218 sec (Started Sep 13 2024 at 02:50:08 CST) # Successed Execution ([Link|https://ge.apache.org/s/kzmyf3c3jo7fo/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=2]) 6.718 sec (Started Sep 13 2024 at 05:40:21 CST) # Failed Execution ([Link|https://ge.apache.org/s/g72utse4pdln2/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=1]): 59.632 sec (Started Sep 14 2024 at 04:05:41 CST) # Passed Execution ([Link|https://ge.apache.org/s/g72utse4pdln2/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=2]): 7.891(Started Sep 14 2024 at 07:36:26 CST) This issue doesn't occurs in the last three days, so I think we could fix the known issue in the PR first. Keep moniotoring the flaky rate in the next following days to see whether we should loose the timeout again. was (Author: yu-lin chen): The another flaky was caused by long initialize of stream tasks. It took longer than 60 secs so no record was processed. ([flaky link|https://ge.apache.org/s/kzmyf3c3jo7fo/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=1]) Error: {code:java} org.opentest4j.AssertionFailedError: Condition not met within timeout 60000. Did not receive all [KeyValue(0, 0), KeyValue(1, 1), KeyValue(2, 2), KeyValue(3, 3), KeyValue(4, 4), KeyValue(5, 5), KeyValue(6, 6), KeyValue(7, 7), KeyValue(8, 8), KeyValue(9, 9), KeyValue(10, 10), KeyValue(11, 11), KeyValue(12, 12), KeyValue(13, 13), KeyValue(14, 14), KeyValue(15, 15), KeyValue(16, 16), KeyValue(17, 17), KeyValue(18, 18), KeyValue(19, 19), KeyValue(20, 20), KeyValue(21, 21), KeyValue(22, 22), KeyValue(23, 23), KeyValue(24, 24), KeyValue(25, 25), KeyValue(26, 26), KeyValue(27, 27), KeyValue(28, 28), KeyValue(29, 29), KeyValue(30, 30), KeyValue(31, 31), KeyValue(32, 32), KeyValue(33, 33), KeyValue(34, 34), KeyValue(35, 35), KeyValue(36, 36), KeyValue(37, 37), KeyValue(38, 38), KeyValue(39, 39), KeyValue(40, 40), KeyValue(41, 41), KeyValue(42, 42), KeyValue(43, 43), KeyValue(44, 44), KeyValue(45, 45), KeyValue(46, 46), KeyValue(47, 47), KeyValue(48, 48), KeyValue(49, 49), KeyValue(50, 50), KeyValue(51, 51), KeyValue(52, 52), KeyValue(53, 53), KeyValue(54, 54), KeyValue(55, 55), KeyValue(56, 56), KeyValue(57, 57), KeyValue(58, 58), KeyValue(59, 59), KeyValue(60, 60), KeyValue(61, 61), KeyValue(62, 62), KeyValue(63, 63), KeyValue(64, 64), KeyValue(65, 65), KeyValue(66, 66), KeyValue(67, 67), KeyValue(68, 68), KeyValue(69, 69), KeyValue(70, 70), KeyValue(71, 71), KeyValue(72, 72), KeyValue(73, 73), KeyValue(74, 74), KeyValue(75, 75), KeyValue(76, 76), KeyValue(77, 77), KeyValue(78, 78), KeyValue(79, 79), KeyValue(80, 80), KeyValue(81, 81), KeyValue(82, 82), KeyValue(83, 83), KeyValue(84, 84), KeyValue(85, 85), KeyValue(86, 86), KeyValue(87, 87), KeyValue(88, 88), KeyValue(89, 89), KeyValue(90, 90), KeyValue(91, 91), KeyValue(92, 92), KeyValue(93, 93), KeyValue(94, 94), KeyValue(95, 95), KeyValue(96, 96), KeyValue(97, 97), KeyValue(98, 98), KeyValue(99, 99)] records from topic outputTopic (got []) ==> expected: <true> but was: <false> {code} Logs: {code:java} [2024-09-12 18:50:29,644] INFO ... ks1-StreamThread-1] State transition from CREATED to STARTING [2024-09-12 18:50:38,263] INFO ... ks1-StreamThread-1] State transition from STARTING to PARTITIONS_ASSIGNED [2024-09-12 18:50:48,813] INFO ... ks1-StreamThread-1] task [0_3] Initialized (org.apache.kafka.streams.processor.internals.StreamTask:277) [2024-09-12 18:51:02,038] INFO ... ks1-StreamThread-1] task [0_0] Initialized (org.apache.kafka.streams.processor.internals.StreamTask:277) [2024-09-12 18:51:14,316] INFO ... ks1-StreamThread-1] task [0_4] Initialized (org.apache.kafka.streams.processor.internals.StreamTask:277) [2024-09-12 18:51:25,080] INFO ... ks1-StreamThread-1] task [0_1] Initialized (org.apache.kafka.streams.processor.internals.StreamTask:277) [2024-09-12 18:51:37,862] INFO ... ks1-StreamThread-1] task [0_2] Initialized (org.apache.kafka.streams.processor.internals.StreamTask:277) {code} In the flaky test run, each step of initial a stream task took the following amount of time for task initialization: # 8.619 sec for rebalancing # 10.550 sec for initializing task [0_3] # 13.225 sec for initializing task [0_0] # 12.278 sec for initializing task [0_4] # 10.764 sec for initializing task [0_1] # 12.782 sec for initializing task [0_2] It took longer than 60 seconds. But in most of the time, it can be done less than 10 secs. The followings are the total tasks initialization time in other flaky test: # Failed Execution ([Link|https://ge.apache.org/s/kzmyf3c3jo7fo/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=1]): 68.218 sec (Started Sep 13 2024 at 02:50:08 CST) # Successed Execution ([Link|https://ge.apache.org/s/kzmyf3c3jo7fo/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=2]) 6.718 sec (Started Sep 13 2024 at 05:40:21 CST) # Failed Execution ([Link|https://ge.apache.org/s/g72utse4pdln2/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=1]): 59.632 sec (Started Sep 14 2024 at 04:05:41 CST) # Passed Execution ([Link|https://ge.apache.org/s/g72utse4pdln2/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=2]): 7.891(Started Sep 14 2024 at 07:36:26 CST) This issue doesn't occurs in the last three days, so I think we could fix the known issue in the PR first. Keep moniotoring the flaky rate in the next following days to see whether we should loose the timeout again. > Fix flaky > RestoreIntegrationTest.shouldInvokeUserDefinedGlobalStateRestoreListener > ---------------------------------------------------------------------------------- > > Key: KAFKA-17515 > URL: https://issues.apache.org/jira/browse/KAFKA-17515 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests > Reporter: Chia-Ping Tsai > Assignee: Yu-Lin Chen > Priority: Major > Attachments: Reproduced screenshoot in my env (Loop 7).png > > > {code:java} > Stacktrace > java.nio.file.DirectoryNotEmptyException: > /tmp/shouldInvokeUserDefinedGlobalStateRestoreListenerH0u0n9foRY_peZu4FqeGHQ10111145955704739924-ks1/shouldInvokeUserDefinedGlobalStateRestoreListenerH0u0n9foRY_peZu4FqeGHQ/0_0 > at > java.base/sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:289) > at > java.base/sun.nio.fs.AbstractFileSystemProvider.deleteIfExists(AbstractFileSystemProvider.java:109) > at java.base/java.nio.file.Files.deleteIfExists(Files.java:1191) > at > org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:898) > at > org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:870) > at java.base/java.nio.file.Files.walkFileTree(Files.java:2803) > at java.base/java.nio.file.Files.walkFileTree(Files.java:2857) > at org.apache.kafka.common.utils.Utils.delete(Utils.java:870) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState(IntegrationTestUtils.java:266) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState(IntegrationTestUtils.java:278) > at > org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldInvokeUserDefinedGlobalStateRestoreListener(RestoreIntegrationTest.java:583) > at java.base/java.lang.reflect.Method.invoke(Method.java:580) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1596) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1596) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)