[ https://issues.apache.org/jira/browse/KAFKA-17646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17885561#comment-17885561 ]
Yu-Lin Chen edited comment on KAFKA-17646 at 9/28/24 3:14 PM: -------------------------------------------------------------- The first issue, "Unexpected state transition from ERROR to PENDING_SHUTDOWN" [1], was caused by a racing between streams.close() and shutdownHelper() thread. After GlobalThread dead, the shutdownHelper() thread is triggered asynchronously. [2] The state of KafkaStream changes in this order: RUNNING -> PENDING_ERROR -> ERROR However, the test called stream.close() in parallel [3], which conduct a non-atomic check-and-set flow as shown below: [4] # state.hasCompletedShutdown() (Passed if state is in PENDING_ERROR) # state.isShuttingDown() (Passed if state is in ERROR) # setState(State.PENDING_SHUTDOWN) The third step will throw an error since state transition "ERROR to PENDING_SHUTDOWN" is illegal. This racing issue can be simulated by the attached patch. [^0001-reproduce-unexpected-state-transition.patch] [1] [https://github.com/apache/kafka/blob/1854d4b8a11461b53b59fa109b95f2a4f5003997/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L343] [2] [https://github.com/apache/kafka/blob/1854d4b8a11461b53b59fa109b95f2a4f5003997/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L696] [3] [https://github.com/apache/kafka/blob/1854d4b8a11461b53b59fa109b95f2a4f5003997/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java#L550C1-L551C1] [4] [https://github.com/apache/kafka/blob/1854d4b8a11461b53b59fa109b95f2a4f5003997/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L1565] was (Author: yu-lin chen): The first issue, "Unexpected state transition from ERROR to PENDING_SHUTDOWN" [1], was caused by a racing between streams.close() and shutdownHelper() thread. After GlobalThread dead, the shutdownHelper() thread is triggered. [2] The state of KafkaStream changes in this order: RUNNING -> PENDING_ERROR -> ERROR However, the test called stream.close() asynchronously [3], which conduct a non-atomic check-and-set flow as shown below: [4] # state.hasCompletedShutdown() (Passed if state is in PENDING_ERROR) # state.isShuttingDown() (Passed if state is in ERROR) # setState(State.PENDING_SHUTDOWN) The third step will throw an error since state transition "ERROR to PENDING_SHUTDOWN" is illegal. This racing issue can be simulated by the attached patch. [^0001-reproduce-unexpected-state-transition.patch] [1] [https://github.com/apache/kafka/blob/1854d4b8a11461b53b59fa109b95f2a4f5003997/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L343] [2] [https://github.com/apache/kafka/blob/1854d4b8a11461b53b59fa109b95f2a4f5003997/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L696] [3] [https://github.com/apache/kafka/blob/1854d4b8a11461b53b59fa109b95f2a4f5003997/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java#L550C1-L551C1] [4] [https://github.com/apache/kafka/blob/1854d4b8a11461b53b59fa109b95f2a4f5003997/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L1565] > Fix flaky KafkaStreamsTest.testStateGlobalThreadClose > ----------------------------------------------------- > > Key: KAFKA-17646 > URL: https://issues.apache.org/jira/browse/KAFKA-17646 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests > Reporter: Yu-Lin Chen > Assignee: Yu-Lin Chen > Priority: Major > Attachments: 0001-reproduce-unexpected-state-transition.patch, > 0002-reproduce-waiting-timeout-for-PENDING_ERROR.patch > > > 22 flaky builds out of 584 in the past 28 days. (13 from jenkins, 9 from > github) ([Report > Link|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=1727517159423&search.startTimeMin=1725033600000&search.tags=trunk&search.timeZoneId=Asia%2FTaipei&tests.container=org.apache.kafka.streams.KafkaStreamsTest&tests.test=testStateGlobalThreadClose()]) > Two types of error messages: > 1. Unexpected state transition from ERROR to PENDING_SHUTDOWN ([Sep 24 2024 > at 15:45:33 > CST|https://ge.apache.org/s/ewblfqjcre6gu/tests/task/:streams:test/details/org.apache.kafka.streams.KafkaStreamsTest/testStateGlobalThreadClose()?expanded-stacktrace=WyIwIl0&top-execution=1]) > {code:java} > java.lang.IllegalStateException: Stream-client test-client: Unexpected state > transition from ERROR to PENDING_SHUTDOWN > at org.apache.kafka.streams.KafkaStreams.setState(KafkaStreams.java:343) > at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:1566) > at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:1459) > at > org.apache.kafka.streams.KafkaStreamsTest.testStateGlobalThreadClose(KafkaStreamsTest.java:550) > > at java.lang.reflect.Method.invoke(Method.java:569) > at java.util.ArrayList.forEach(ArrayList.java:1511) > at java.util.ArrayList.forEach(ArrayList.java:1511) > {code} > 2. org.opentest4j.AssertionFailedError: Condition not met within timeout > 15000. Thread never stopped. ==> expected: <true> but was: <false> ([Sep 26 > 2024 at 09:36:22 > CST|https://ge.apache.org/s/hetqkmasks5ve/tests/task/:streams:test/details/org.apache.kafka.streams.KafkaStreamsTest/testStateGlobalThreadClose()?expanded-stacktrace=WyIwIl0&top-execution=1]) > {code:java} > at > org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > > at > org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > > at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:396) > > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:444) > > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:393) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:350) > at > org.apache.kafka.streams.KafkaStreamsTest.testStateGlobalThreadClose(KafkaStreamsTest.java:546) > > at java.lang.reflect.Method.invoke(Method.java:569) > at java.util.ArrayList.forEach(ArrayList.java:1511) > at java.util.ArrayList.forEach(ArrayList.java:1511) > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)