guozhangwang commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r434892254



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -493,28 +549,45 @@ public void closeAndRecycleState() {
     private Map<TopicPartition, Long> prepareClose(final boolean clean) {
         final Map<TopicPartition, Long> checkpoint;
 
-        if (state() == State.CREATED) {
-            // the task is created and not initialized, just re-write the 
checkpoint file
-            checkpoint = Collections.emptyMap();
-        } else if (state() == State.RUNNING) {
-            closeTopology(clean);
+        switch (state()) {
+            case CREATED:
+                // the task is created and not initialized, just re-write the 
checkpoint file
+                checkpoint = Collections.emptyMap();
 
-            if (clean) {
-                stateMgr.flush();
-                recordCollector.flush();
-                checkpoint = checkpointableOffsets();
-            } else {
+                break;
+
+            case RUNNING:
+                closeTopology(clean);
+
+                if (clean) {
+                    stateMgr.flush();
+                    recordCollector.flush();
+                    checkpoint = checkpointableOffsets();
+                } else {
+                    checkpoint = null; // `null` indicates to not write a 
checkpoint
+                    executeAndMaybeSwallow(false, stateMgr::flush, "state 
manager flush", log);
+                }
+
+                break;
+
+            case RESTORING:
+                executeAndMaybeSwallow(clean, stateMgr::flush, "state manager 
flush", log);
+                checkpoint = Collections.emptyMap();
+
+                break;
+
+            case SUSPENDED:
+                // if `SUSPENDED` do not need to checkpoint, since when 
suspending we've already committed the state
                 checkpoint = null; // `null` indicates to not write a 
checkpoint
-                executeAndMaybeSwallow(false, stateMgr::flush, "state manager 
flush", log);
-            }
-        } else if (state() == State.RESTORING) {
-            executeAndMaybeSwallow(clean, stateMgr::flush, "state manager 
flush", log);
-            checkpoint = Collections.emptyMap();
-        } else if (state() == State.SUSPENDED) {
-            // if `SUSPENDED` do not need to checkpoint, since when suspending 
we've already committed the state
-            checkpoint = null; // `null` indicates to not write a checkpoint
-        } else {
-            throw new IllegalStateException("Illegal state " + state() + " 
while prepare closing active task " + id);
+
+                break;
+            case CLOSED:
+                checkpoint = Collections.emptyMap();

Review comment:
       I think null and emptyMap has different semantics: the former indicates 
do not try to override the checkpoint file, while the latter indicates “just 
writing the checkpoint file as of the current state store maintained offset” 
I.e. in stateMgr.checkpoint(writtenOffsets)  if the map is empty, we would 
still write the checkpoint file but just based on each store’s current 
storeMetadata.offset.
   
   So back to prepareClose: if we are in CREATED, meaning we’ve read the 
checkpoint file into the store, we still need to write that loaded offsets back 
to the file; in SUSPENDED we know we’ve written the offset to the checkpoint 
file already when transiting to that state, so we can return null to indicate 
no need to write again.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to