mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r434216773
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -493,28 +542,45 @@ public void closeAndRecycleState() {
private Map<TopicPartition, Long> prepareClose(final boolean clean) {
final Map<TopicPartition, Long> checkpoint;
- if (state() == State.CREATED) {
- // the task is created and not initialized, just re-write the
checkpoint file
- checkpoint = Collections.emptyMap();
- } else if (state() == State.RUNNING) {
- closeTopology(clean);
+ switch (state()) {
+ case CREATED:
+ // the task is created and not initialized, just re-write the
checkpoint file
+ checkpoint = Collections.emptyMap();
- if (clean) {
- stateMgr.flush();
- recordCollector.flush();
- checkpoint = checkpointableOffsets();
- } else {
+ break;
+
+ case RUNNING:
+ closeTopology(clean);
+
+ if (clean) {
+ stateMgr.flush();
+ recordCollector.flush();
+ checkpoint = checkpointableOffsets();
+ } else {
+ checkpoint = null; // `null` indicates to not write a
checkpoint
+ executeAndMaybeSwallow(false, stateMgr::flush, "state
manager flush", log);
+ }
+
+ break;
+
+ case RESTORING:
+ executeAndMaybeSwallow(clean, stateMgr::flush, "state manager
flush", log);
+ checkpoint = Collections.emptyMap();
+
+ break;
+
+ case SUSPENDED:
+ // if `SUSPENDED` do not need to checkpoint, since when
suspending we've already committed the state
checkpoint = null; // `null` indicates to not write a
checkpoint
- executeAndMaybeSwallow(false, stateMgr::flush, "state manager
flush", log);
- }
- } else if (state() == State.RESTORING) {
- executeAndMaybeSwallow(clean, stateMgr::flush, "state manager
flush", log);
- checkpoint = Collections.emptyMap();
- } else if (state() == State.SUSPENDED) {
- // if `SUSPENDED` do not need to checkpoint, since when suspending
we've already committed the state
- checkpoint = null; // `null` indicates to not write a checkpoint
- } else {
- throw new IllegalStateException("Illegal state " + state() + "
while prepare closing active task " + id);
+
+ break;
+ case CLOSED:
Review comment:
Not sure if we can merge CLOSE and CREATED -- but I plan to do follow up
PRs to change state handling further. Hence, I would like to keep it
out-of-scope for this PR.
`emptyMap()` is not an empty checkpoint: the map we return is some
additional data we write into the checkpoint. `null` on the other hand means to
_not_ write any checkpoint but in a clean-close case we want to write a
checkpoint.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]