cadonna commented on a change in pull request #11455:
URL: https://github.com/apache/kafka/pull/11455#discussion_r740921447



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -655,10 +656,11 @@ public synchronized void onChange(final Thread thread,
                     // global stream thread has different invariants
                     final GlobalStreamThread.State newState = 
(GlobalStreamThread.State) abstractNewState;
                     globalThreadState = newState;
+                    final GlobalStreamThread.State oldState = 
(GlobalStreamThread.State) abstractOldState;
 
                     if (newState == GlobalStreamThread.State.RUNNING) {
                         maybeSetRunning();
-                    } else if (newState == GlobalStreamThread.State.DEAD) {
+                    } else if (newState == GlobalStreamThread.State.DEAD && 
oldState != GlobalStreamThread.State.PENDING_SHUTDOWN) {

Review comment:
       Why does this lead to an error state?
   AFAICS, an exceptional shutdown of the global stream thread would also first 
put the global stream thread into `PENDING_SHUTDOWN` and then `DEAD` (see 
`catch` clauses in `run()` in `GlobalStreamThread`).
   In any case, tests are needed to verify this code.
    

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -655,10 +656,11 @@ public synchronized void onChange(final Thread thread,
                     // global stream thread has different invariants
                     final GlobalStreamThread.State newState = 
(GlobalStreamThread.State) abstractNewState;
                     globalThreadState = newState;
+                    final GlobalStreamThread.State oldState = 
(GlobalStreamThread.State) abstractOldState;
 
                     if (newState == GlobalStreamThread.State.RUNNING) {
                         maybeSetRunning();
-                    } else if (newState == GlobalStreamThread.State.DEAD) {
+                    } else if (newState == GlobalStreamThread.State.DEAD && 
oldState != GlobalStreamThread.State.PENDING_SHUTDOWN) {
                         log.error("Global thread has died. The streams 
application or client will now close to ERROR.");
                         closeToError();

Review comment:
       The global stream thread should already call `closeToError()` when the 
Streams uncaught exception handler is called in `run()` in 
`GlobalStreamThread`, shouldn't it?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -655,10 +656,11 @@ public synchronized void onChange(final Thread thread,
                     // global stream thread has different invariants
                     final GlobalStreamThread.State newState = 
(GlobalStreamThread.State) abstractNewState;
                     globalThreadState = newState;
+                    final GlobalStreamThread.State oldState = 
(GlobalStreamThread.State) abstractOldState;
 
                     if (newState == GlobalStreamThread.State.RUNNING) {
                         maybeSetRunning();
-                    } else if (newState == GlobalStreamThread.State.DEAD) {
+                    } else if (newState == GlobalStreamThread.State.DEAD && 
oldState != GlobalStreamThread.State.PENDING_SHUTDOWN) {

Review comment:
       Actually, a transition to `DEAD` from a different state than 
`PENDING_SHUTDOWN` is illegal according to the state machine in global stream 
thread.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to