cadonna commented on a change in pull request #11455: URL: https://github.com/apache/kafka/pull/11455#discussion_r740921447
########## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ########## @@ -655,10 +656,11 @@ public synchronized void onChange(final Thread thread, // global stream thread has different invariants final GlobalStreamThread.State newState = (GlobalStreamThread.State) abstractNewState; globalThreadState = newState; + final GlobalStreamThread.State oldState = (GlobalStreamThread.State) abstractOldState; if (newState == GlobalStreamThread.State.RUNNING) { maybeSetRunning(); - } else if (newState == GlobalStreamThread.State.DEAD) { + } else if (newState == GlobalStreamThread.State.DEAD && oldState != GlobalStreamThread.State.PENDING_SHUTDOWN) { Review comment: Why does this lead to an error state? AFAICS, an exceptional shutdown of the global stream thread would also first put the global stream thread into `PENDING_SHUTDOWN` and then `DEAD` (see `catch` clauses in `run()` in `GlobalStreamThread`). In any case, tests are needed to verify this code. ########## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ########## @@ -655,10 +656,11 @@ public synchronized void onChange(final Thread thread, // global stream thread has different invariants final GlobalStreamThread.State newState = (GlobalStreamThread.State) abstractNewState; globalThreadState = newState; + final GlobalStreamThread.State oldState = (GlobalStreamThread.State) abstractOldState; if (newState == GlobalStreamThread.State.RUNNING) { maybeSetRunning(); - } else if (newState == GlobalStreamThread.State.DEAD) { + } else if (newState == GlobalStreamThread.State.DEAD && oldState != GlobalStreamThread.State.PENDING_SHUTDOWN) { log.error("Global thread has died. The streams application or client will now close to ERROR."); closeToError(); Review comment: The global stream thread should already call `closeToError()` when the Streams uncaught exception handler is called in `run()` in `GlobalStreamThread`, shouldn't it? ########## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ########## @@ -655,10 +656,11 @@ public synchronized void onChange(final Thread thread, // global stream thread has different invariants final GlobalStreamThread.State newState = (GlobalStreamThread.State) abstractNewState; globalThreadState = newState; + final GlobalStreamThread.State oldState = (GlobalStreamThread.State) abstractOldState; if (newState == GlobalStreamThread.State.RUNNING) { maybeSetRunning(); - } else if (newState == GlobalStreamThread.State.DEAD) { + } else if (newState == GlobalStreamThread.State.DEAD && oldState != GlobalStreamThread.State.PENDING_SHUTDOWN) { Review comment: Actually, a transition to `DEAD` from a different state than `PENDING_SHUTDOWN` is illegal according to the state machine in global stream thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org