abbccdda commented on a change in pull request #8776: URL: https://github.com/apache/kafka/pull/8776#discussion_r434107127
########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ########## @@ -1799,8 +1799,8 @@ public void shouldThrowIfClosingOnIllegalState() { task.closeClean(checkpoint); // close call are not idempotent since we are already in closed Review comment: nit: call -> calls ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ########## @@ -247,6 +247,9 @@ private void close(final boolean clean) { "state manager close", log ); + } else if (state() == State.CLOSED) { + log.trace("Skip closing since state is {}", state()); Review comment: We could just say `Skip closing since state is closed` here ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -493,28 +542,45 @@ public void closeAndRecycleState() { private Map<TopicPartition, Long> prepareClose(final boolean clean) { final Map<TopicPartition, Long> checkpoint; - if (state() == State.CREATED) { - // the task is created and not initialized, just re-write the checkpoint file - checkpoint = Collections.emptyMap(); - } else if (state() == State.RUNNING) { - closeTopology(clean); + switch (state()) { + case CREATED: + // the task is created and not initialized, just re-write the checkpoint file + checkpoint = Collections.emptyMap(); - if (clean) { - stateMgr.flush(); - recordCollector.flush(); - checkpoint = checkpointableOffsets(); - } else { + break; + + case RUNNING: + closeTopology(clean); + + if (clean) { + stateMgr.flush(); + recordCollector.flush(); + checkpoint = checkpointableOffsets(); + } else { + checkpoint = null; // `null` indicates to not write a checkpoint + executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush", log); + } + + break; + + case RESTORING: + executeAndMaybeSwallow(clean, stateMgr::flush, "state manager flush", log); + checkpoint = Collections.emptyMap(); + + break; + + case SUSPENDED: + // if `SUSPENDED` do not need to checkpoint, since when suspending we've already committed the state checkpoint = null; // `null` indicates to not write a checkpoint - executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush", log); - } - } else if (state() == State.RESTORING) { - executeAndMaybeSwallow(clean, stateMgr::flush, "state manager flush", log); - checkpoint = Collections.emptyMap(); - } else if (state() == State.SUSPENDED) { - // if `SUSPENDED` do not need to checkpoint, since when suspending we've already committed the state - checkpoint = null; // `null` indicates to not write a checkpoint - } else { - throw new IllegalStateException("Illegal state " + state() + " while prepare closing active task " + id); + + break; + case CLOSED: Review comment: Could we merge the case `CLOSED` and `CREATED`? Also could you elaborate why we do empty checkpoint map instead of null? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final Map<TopicPartition, Lo for (final Map.Entry<TopicPartition, Long> changelogEntry : changelogOffsets.entrySet()) { final long offset = changelogEntry.getValue(); - offsetSum += offset; - if (offsetSum < 0) { - log.warn("Sum of changelog offsets for task {} overflowed, pinning to Long.MAX_VALUE", id); - return Long.MAX_VALUE; + if (offset == Task.LATEST_OFFSET) { Review comment: Should we also check `task.isActive` here? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org