lucasbru commented on code in PR #18765: URL: https://github.com/apache/kafka/pull/18765#discussion_r1936939161
########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -1543,26 +1544,28 @@ private boolean close(final Optional<Long> timeout, final boolean leaveGroup) { timeoutMs = Long.MAX_VALUE; } - if (state.hasCompletedShutdown()) { - log.info("Streams client is already in the terminal {} state, all resources are closed and the client has stopped.", state); - return true; - } - if (state.isShuttingDown()) { - log.info("Streams client is in {}, all resources are being closed and the client will be stopped.", state); - if (state == State.PENDING_ERROR && waitOnState(State.ERROR, timeoutMs)) { - log.info("Streams client stopped to ERROR completely"); - return true; - } else if (state == State.PENDING_SHUTDOWN && waitOnState(State.NOT_RUNNING, timeoutMs)) { - log.info("Streams client stopped to NOT_RUNNING completely"); + if (!setState(State.PENDING_SHUTDOWN)) { + + if (state.isShuttingDown()) { Review Comment: We know that are in a shutting-down or shut-down state if the transition to `PENDING_SHUTDOWN` fails, so either `isShuttingDown` or `hasCompletedShutdown` will be true. However, it is important that we check `isShuttingDown` before `hasCompletedShutdown`, as we otherwise can get a race condition where both return false (if the other thread transitions from `PENDING_SHUTDOWN` to `SHUTDOWN` in the meantime). ########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -347,8 +347,9 @@ private boolean setState(final State newState) { } else if (state == State.REBALANCING && newState == State.REBALANCING) { // when the state is already in REBALANCING, it should not transit to REBALANCING again return false; - } else if (state == State.ERROR && (newState == State.PENDING_ERROR || newState == State.ERROR)) { - // when the state is already in ERROR, its transition to PENDING_ERROR or ERROR (due to consecutive close calls) + } else if (state == State.ERROR && (newState == State.PENDING_ERROR || newState == State.ERROR || newState == State.PENDING_SHUTDOWN)) { Review Comment: We need transitions of all shutdown states (`PENDING_SHUTDOWN`, `NOT_RUNNING`, `ERROR`, `PENDING_ERROR`) to `PENDING_SHUTDOWN` to be non-fatal (rejecting transition but not throwing), so that we can attempt to transition. The transition from `ERROR` to `PENDING_SHUTDOWN` was the only fatal combination. This wasn't triggered in the original code but would be triggered by the fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org