mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r433659175



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -247,48 +251,75 @@ public void completeRestoration() {
      */
     @Override
     public void prepareSuspend() {
-        if (state() == State.CREATED || state() == State.SUSPENDED) {
-            // do nothing
-            log.trace("Skip prepare suspending since state is {}", state());
-        } else if (state() == State.RUNNING) {
-            closeTopology(true);
+        switch (state()) {
+            case CREATED:
+            case SUSPENDED:
+                // do nothing
+                log.trace("Skip prepare suspending since state is {}", 
state());
 
-            stateMgr.flush();
-            recordCollector.flush();
+                break;
 
-            log.info("Prepare suspending running");
-        } else if (state() == State.RESTORING) {
-            stateMgr.flush();
+            case RESTORING:
+                stateMgr.flush();
+                log.info("Prepare suspending restoring");
 
-            log.info("Prepare suspending restoring");
-        } else {
-            throw new IllegalStateException("Illegal state " + state() + " 
while suspending active task " + id);
+                break;
+
+            case RUNNING:
+                closeTopology(true);
+
+                stateMgr.flush();
+                recordCollector.flush();
+
+                log.info("Prepare suspending running");
+
+                break;
+
+            case CLOSED:
+                throw new IllegalStateException("Illegal state " + state() + " 
while suspending active task " + id);
+
+            default:
+                throw new IllegalStateException("Unknown state " + state() + " 
while suspending active task " + id);
         }
     }
 
     @Override
     public void suspend() {
-        if (state() == State.CREATED || state() == State.SUSPENDED) {
-            // do nothing
-            log.trace("Skip suspending since state is {}", state());
-        } else if (state() == State.RUNNING) {
-            stateMgr.checkpoint(checkpointableOffsets());
-            partitionGroup.clear();
-
-            transitionTo(State.SUSPENDED);
-            log.info("Suspended running");
-        } else if (state() == State.RESTORING) {
-            // we just checkpoint the position that we've restored up to 
without
-            // going through the commit process
-            stateMgr.checkpoint(emptyMap());
-
-            // we should also clear any buffered records of a task when 
suspending it
-            partitionGroup.clear();
-
-            transitionTo(State.SUSPENDED);
-            log.info("Suspended restoring");
-        } else {
-            throw new IllegalStateException("Illegal state " + state() + " 
while suspending active task " + id);
+        switch (state()) {

Review comment:
       Align code style to use `switch` if all states are used




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to