[ 
https://issues.apache.org/jira/browse/KAFKA-17515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17882672#comment-17882672
 ] 

Yu-Lin Chen edited comment on KAFKA-17515 at 9/18/24 11:14 AM:
---------------------------------------------------------------

The another flaky was caused by long initialization of stream tasks. It took 
longer than 60 secs so no record was processed.  ([flaky 
link|https://ge.apache.org/s/kzmyf3c3jo7fo/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=1])

Error:
{code:java}
org.opentest4j.AssertionFailedError: Condition not met within timeout 60000. 
Did not receive all [KeyValue(0, 0), KeyValue(1, 1), KeyValue(2, 2), 
KeyValue(3, 3), KeyValue(4, 4), KeyValue(5, 5), KeyValue(6, 6), KeyValue(7, 7), 
KeyValue(8, 8), KeyValue(9, 9), KeyValue(10, 10), KeyValue(11, 11), 
KeyValue(12, 12), KeyValue(13, 13), KeyValue(14, 14), KeyValue(15, 15), 
KeyValue(16, 16), KeyValue(17, 17), KeyValue(18, 18), KeyValue(19, 19), 
KeyValue(20, 20), KeyValue(21, 21), KeyValue(22, 22), KeyValue(23, 23), 
KeyValue(24, 24), KeyValue(25, 25), KeyValue(26, 26), KeyValue(27, 27), 
KeyValue(28, 28), KeyValue(29, 29), KeyValue(30, 30), KeyValue(31, 31), 
KeyValue(32, 32), KeyValue(33, 33), KeyValue(34, 34), KeyValue(35, 35), 
KeyValue(36, 36), KeyValue(37, 37), KeyValue(38, 38), KeyValue(39, 39), 
KeyValue(40, 40), KeyValue(41, 41), KeyValue(42, 42), KeyValue(43, 43), 
KeyValue(44, 44), KeyValue(45, 45), KeyValue(46, 46), KeyValue(47, 47), 
KeyValue(48, 48), KeyValue(49, 49), KeyValue(50, 50), KeyValue(51, 51), 
KeyValue(52, 52), KeyValue(53, 53), KeyValue(54, 54), KeyValue(55, 55), 
KeyValue(56, 56), KeyValue(57, 57), KeyValue(58, 58), KeyValue(59, 59), 
KeyValue(60, 60), KeyValue(61, 61), KeyValue(62, 62), KeyValue(63, 63), 
KeyValue(64, 64), KeyValue(65, 65), KeyValue(66, 66), KeyValue(67, 67), 
KeyValue(68, 68), KeyValue(69, 69), KeyValue(70, 70), KeyValue(71, 71), 
KeyValue(72, 72), KeyValue(73, 73), KeyValue(74, 74), KeyValue(75, 75), 
KeyValue(76, 76), KeyValue(77, 77), KeyValue(78, 78), KeyValue(79, 79), 
KeyValue(80, 80), KeyValue(81, 81), KeyValue(82, 82), KeyValue(83, 83), 
KeyValue(84, 84), KeyValue(85, 85), KeyValue(86, 86), KeyValue(87, 87), 
KeyValue(88, 88), KeyValue(89, 89), KeyValue(90, 90), KeyValue(91, 91), 
KeyValue(92, 92), KeyValue(93, 93), KeyValue(94, 94), KeyValue(95, 95), 
KeyValue(96, 96), KeyValue(97, 97), KeyValue(98, 98), KeyValue(99, 99)] records 
from topic outputTopic (got []) ==> expected: <true> but was: <false> {code}
Logs:
{code:java}
[2024-09-12 18:50:29,644] INFO ... ks1-StreamThread-1] State transition from 
CREATED to STARTING
[2024-09-12 18:50:38,263] INFO ... ks1-StreamThread-1] State transition from 
STARTING to PARTITIONS_ASSIGNED
[2024-09-12 18:50:48,813] INFO ... ks1-StreamThread-1] task [0_3] Initialized 
(org.apache.kafka.streams.processor.internals.StreamTask:277)
[2024-09-12 18:51:02,038] INFO ... ks1-StreamThread-1] task [0_0] Initialized 
(org.apache.kafka.streams.processor.internals.StreamTask:277)
[2024-09-12 18:51:14,316] INFO ... ks1-StreamThread-1] task [0_4] Initialized 
(org.apache.kafka.streams.processor.internals.StreamTask:277)
[2024-09-12 18:51:25,080] INFO ... ks1-StreamThread-1] task [0_1] Initialized 
(org.apache.kafka.streams.processor.internals.StreamTask:277)
[2024-09-12 18:51:37,862] INFO ... ks1-StreamThread-1] task [0_2] Initialized 
(org.apache.kafka.streams.processor.internals.StreamTask:277) {code}
In the flaky test run, each step of initial a stream task took the following 
amount of time for task initialization:
 # 8.619 sec for rebalancing
 # 10.550 sec for initializing task [0_3]
 # 13.225 sec for initializing task [0_0]
 # 12.278 sec for initializing task [0_4]
 # 10.764 sec for initializing task [0_1]
 # 12.782 sec for initializing task [0_2]

It took longer than 60 seconds. But in most of the time, it can be done less 
than 10 secs.

The followings are the total tasks initialization time in other flaky test:
 # Failed Execution 
([Link|https://ge.apache.org/s/kzmyf3c3jo7fo/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=1]):
  68.218 sec (Started Sep 13 2024 at 02:50:08 CST)
 # Successed Execution 
([Link|https://ge.apache.org/s/kzmyf3c3jo7fo/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=2])
 6.718 sec (Started Sep 13 2024 at 05:40:21 CST)
 # Failed Execution 
([Link|https://ge.apache.org/s/g72utse4pdln2/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=1]):
  59.632 sec (Started Sep 14 2024 at 04:05:41 CST)
 # Passed Execution 
([Link|https://ge.apache.org/s/g72utse4pdln2/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=2]):
  7.891(Started Sep 14 2024 at 07:36:26 CST)

This issue doesn't occurs in the last three days, so I think we could fix the 
known issue in the PR first.
Keep moniotoring the flaky rate in the next following days to see whether we 
should loose the timeout again.


was (Author: yu-lin chen):
The another flaky was caused by long initialize of stream tasks. It took longer 
than 60 secs so no record was processed.  ([flaky 
link|https://ge.apache.org/s/kzmyf3c3jo7fo/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=1])

Error:
{code:java}
org.opentest4j.AssertionFailedError: Condition not met within timeout 60000. 
Did not receive all [KeyValue(0, 0), KeyValue(1, 1), KeyValue(2, 2), 
KeyValue(3, 3), KeyValue(4, 4), KeyValue(5, 5), KeyValue(6, 6), KeyValue(7, 7), 
KeyValue(8, 8), KeyValue(9, 9), KeyValue(10, 10), KeyValue(11, 11), 
KeyValue(12, 12), KeyValue(13, 13), KeyValue(14, 14), KeyValue(15, 15), 
KeyValue(16, 16), KeyValue(17, 17), KeyValue(18, 18), KeyValue(19, 19), 
KeyValue(20, 20), KeyValue(21, 21), KeyValue(22, 22), KeyValue(23, 23), 
KeyValue(24, 24), KeyValue(25, 25), KeyValue(26, 26), KeyValue(27, 27), 
KeyValue(28, 28), KeyValue(29, 29), KeyValue(30, 30), KeyValue(31, 31), 
KeyValue(32, 32), KeyValue(33, 33), KeyValue(34, 34), KeyValue(35, 35), 
KeyValue(36, 36), KeyValue(37, 37), KeyValue(38, 38), KeyValue(39, 39), 
KeyValue(40, 40), KeyValue(41, 41), KeyValue(42, 42), KeyValue(43, 43), 
KeyValue(44, 44), KeyValue(45, 45), KeyValue(46, 46), KeyValue(47, 47), 
KeyValue(48, 48), KeyValue(49, 49), KeyValue(50, 50), KeyValue(51, 51), 
KeyValue(52, 52), KeyValue(53, 53), KeyValue(54, 54), KeyValue(55, 55), 
KeyValue(56, 56), KeyValue(57, 57), KeyValue(58, 58), KeyValue(59, 59), 
KeyValue(60, 60), KeyValue(61, 61), KeyValue(62, 62), KeyValue(63, 63), 
KeyValue(64, 64), KeyValue(65, 65), KeyValue(66, 66), KeyValue(67, 67), 
KeyValue(68, 68), KeyValue(69, 69), KeyValue(70, 70), KeyValue(71, 71), 
KeyValue(72, 72), KeyValue(73, 73), KeyValue(74, 74), KeyValue(75, 75), 
KeyValue(76, 76), KeyValue(77, 77), KeyValue(78, 78), KeyValue(79, 79), 
KeyValue(80, 80), KeyValue(81, 81), KeyValue(82, 82), KeyValue(83, 83), 
KeyValue(84, 84), KeyValue(85, 85), KeyValue(86, 86), KeyValue(87, 87), 
KeyValue(88, 88), KeyValue(89, 89), KeyValue(90, 90), KeyValue(91, 91), 
KeyValue(92, 92), KeyValue(93, 93), KeyValue(94, 94), KeyValue(95, 95), 
KeyValue(96, 96), KeyValue(97, 97), KeyValue(98, 98), KeyValue(99, 99)] records 
from topic outputTopic (got []) ==> expected: <true> but was: <false> {code}
Logs:
{code:java}
[2024-09-12 18:50:29,644] INFO ... ks1-StreamThread-1] State transition from 
CREATED to STARTING
[2024-09-12 18:50:38,263] INFO ... ks1-StreamThread-1] State transition from 
STARTING to PARTITIONS_ASSIGNED
[2024-09-12 18:50:48,813] INFO ... ks1-StreamThread-1] task [0_3] Initialized 
(org.apache.kafka.streams.processor.internals.StreamTask:277)
[2024-09-12 18:51:02,038] INFO ... ks1-StreamThread-1] task [0_0] Initialized 
(org.apache.kafka.streams.processor.internals.StreamTask:277)
[2024-09-12 18:51:14,316] INFO ... ks1-StreamThread-1] task [0_4] Initialized 
(org.apache.kafka.streams.processor.internals.StreamTask:277)
[2024-09-12 18:51:25,080] INFO ... ks1-StreamThread-1] task [0_1] Initialized 
(org.apache.kafka.streams.processor.internals.StreamTask:277)
[2024-09-12 18:51:37,862] INFO ... ks1-StreamThread-1] task [0_2] Initialized 
(org.apache.kafka.streams.processor.internals.StreamTask:277) {code}
In the flaky test run, each step of initial a stream task took the following 
amount of time for task initialization:
 # 8.619 sec for rebalancing
 # 10.550 sec for initializing task [0_3]
 # 13.225 sec for initializing task [0_0]
 # 12.278 sec for initializing task [0_4]
 # 10.764 sec for initializing task [0_1]
 # 12.782 sec for initializing task [0_2]

It took longer than 60 seconds. But in most of the time, it can be done less 
than 10 secs.

The followings are the total tasks initialization time in other flaky test:
 # Failed Execution 
([Link|https://ge.apache.org/s/kzmyf3c3jo7fo/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=1]):
  68.218 sec (Started Sep 13 2024 at 02:50:08 CST)
 # Successed Execution 
([Link|https://ge.apache.org/s/kzmyf3c3jo7fo/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=2])
 6.718 sec (Started Sep 13 2024 at 05:40:21 CST)
 # Failed Execution 
([Link|https://ge.apache.org/s/g72utse4pdln2/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=1]):
  59.632 sec (Started Sep 14 2024 at 04:05:41 CST)
 # Passed Execution 
([Link|https://ge.apache.org/s/g72utse4pdln2/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=2]):
  7.891(Started Sep 14 2024 at 07:36:26 CST)

This issue doesn't occurs in the last three days, so I think we could fix the 
known issue in the PR first.
Keep moniotoring the flaky rate in the next following days to see whether we 
should loose the timeout again.

> Fix flaky 
> RestoreIntegrationTest.shouldInvokeUserDefinedGlobalStateRestoreListener
> ----------------------------------------------------------------------------------
>
>                 Key: KAFKA-17515
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17515
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams, unit tests
>            Reporter: Chia-Ping Tsai
>            Assignee: Yu-Lin Chen
>            Priority: Major
>         Attachments: Reproduced screenshoot in my env (Loop 7).png
>
>
> {code:java}
> Stacktrace
> java.nio.file.DirectoryNotEmptyException: 
> /tmp/shouldInvokeUserDefinedGlobalStateRestoreListenerH0u0n9foRY_peZu4FqeGHQ10111145955704739924-ks1/shouldInvokeUserDefinedGlobalStateRestoreListenerH0u0n9foRY_peZu4FqeGHQ/0_0
>       at 
> java.base/sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:289)
>       at 
> java.base/sun.nio.fs.AbstractFileSystemProvider.deleteIfExists(AbstractFileSystemProvider.java:109)
>       at java.base/java.nio.file.Files.deleteIfExists(Files.java:1191)
>       at 
> org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:898)
>       at 
> org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:870)
>       at java.base/java.nio.file.Files.walkFileTree(Files.java:2803)
>       at java.base/java.nio.file.Files.walkFileTree(Files.java:2857)
>       at org.apache.kafka.common.utils.Utils.delete(Utils.java:870)
>       at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState(IntegrationTestUtils.java:266)
>       at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState(IntegrationTestUtils.java:278)
>       at 
> org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldInvokeUserDefinedGlobalStateRestoreListener(RestoreIntegrationTest.java:583)
>       at java.base/java.lang.reflect.Method.invoke(Method.java:580)
>       at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
>       at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to