mjsax commented on a change in pull request #8776: URL: https://github.com/apache/kafka/pull/8776#discussion_r433659175
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -247,48 +251,75 @@ public void completeRestoration() { */ @Override public void prepareSuspend() { - if (state() == State.CREATED || state() == State.SUSPENDED) { - // do nothing - log.trace("Skip prepare suspending since state is {}", state()); - } else if (state() == State.RUNNING) { - closeTopology(true); + switch (state()) { + case CREATED: + case SUSPENDED: + // do nothing + log.trace("Skip prepare suspending since state is {}", state()); - stateMgr.flush(); - recordCollector.flush(); + break; - log.info("Prepare suspending running"); - } else if (state() == State.RESTORING) { - stateMgr.flush(); + case RESTORING: + stateMgr.flush(); + log.info("Prepare suspending restoring"); - log.info("Prepare suspending restoring"); - } else { - throw new IllegalStateException("Illegal state " + state() + " while suspending active task " + id); + break; + + case RUNNING: + closeTopology(true); + + stateMgr.flush(); + recordCollector.flush(); + + log.info("Prepare suspending running"); + + break; + + case CLOSED: + throw new IllegalStateException("Illegal state " + state() + " while suspending active task " + id); + + default: + throw new IllegalStateException("Unknown state " + state() + " while suspending active task " + id); } } @Override public void suspend() { - if (state() == State.CREATED || state() == State.SUSPENDED) { - // do nothing - log.trace("Skip suspending since state is {}", state()); - } else if (state() == State.RUNNING) { - stateMgr.checkpoint(checkpointableOffsets()); - partitionGroup.clear(); - - transitionTo(State.SUSPENDED); - log.info("Suspended running"); - } else if (state() == State.RESTORING) { - // we just checkpoint the position that we've restored up to without - // going through the commit process - stateMgr.checkpoint(emptyMap()); - - // we should also clear any buffered records of a task when suspending it - partitionGroup.clear(); - - transitionTo(State.SUSPENDED); - log.info("Suspended restoring"); - } else { - throw new IllegalStateException("Illegal state " + state() + " while suspending active task " + id); + switch (state()) { Review comment: Align code style to use `switch` if all states are used ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org