[ https://issues.apache.org/jira/browse/KAFKA-17379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ao Li reassigned KAFKA-17379: ----------------------------- Assignee: Ao Li > KafkaStreams: Unexpected state transition from ERROR to PENDING_SHUTDOWN > ------------------------------------------------------------------------ > > Key: KAFKA-17379 > URL: https://issues.apache.org/jira/browse/KAFKA-17379 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Ao Li > Assignee: Ao Li > Priority: Major > > I saw a failing test: `KafkaStreamsTest::shouldNotAddThreadWhenError` > {code} > Stream-client test-client: Unexpected state transition from ERROR to > PENDING_SHUTDOWN > java.lang.IllegalStateException: Stream-client test-client: Unexpected state > transition from ERROR to PENDING_SHUTDOWN > at org.apache.kafka.streams.KafkaStreams.setState(KafkaStreams.java:344) > at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:1558) > at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:1456) > at > org.apache.kafka.streams.KafkaStreamsTest.shouldNotAddThreadWhenError(KafkaStreamsTest.java:708) > at java.base/java.lang.reflect.Method.invoke(Method.java:580) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1596) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1596) > {code} > You may use the following branch to reproduce the failure. > https://github.com/aoli-al/kafka/tree/KAFKA-218 > The root cause of the failure is that the close function in KafkaStreams is > not atomic. If a state is changed while closing, the failure occurs: > {code} > // state is PENDINGERROR > if (state.hasCompletedShutdown()) { > log.info("Streams client is already in the terminal {} state, all > resources are closed and the client has stopped.", state); > return true; > } > // state is ERROR > if (state.isShuttingDown()) { > log.info("Streams client is in {}, all resources are being closed > and the client will be stopped.", state); > if (state == State.PENDING_ERROR && waitOnState(State.ERROR, > timeoutMs)) { > log.info("Streams client stopped to ERROR completely"); > return true; > } else if (state == State.PENDING_SHUTDOWN && > waitOnState(State.NOT_RUNNING, timeoutMs)) { > log.info("Streams client stopped to NOT_RUNNING completely"); > return true; > } else { > log.warn("Streams client cannot transition to {} completely > within the timeout", > state == State.PENDING_SHUTDOWN ? State.NOT_RUNNING > : State.ERROR); > return false; > } > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)