guozhangwang commented on code in PR #12279:
URL: https://github.com/apache/kafka/pull/12279#discussion_r900312281
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -290,6 +295,34 @@ private void addTaskToRestoredTasks(final StreamTask task)
{
restoredActiveTasksLock.unlock();
}
}
+
+ private void maybeCommitRestoringTasks(final long now) {
+ final long elapsedMsSinceLastCommit = now - lastCommitMs;
+ if (elapsedMsSinceLastCommit > commitIntervalMs) {
+ if (log.isDebugEnabled()) {
+ log.debug("Committing all restoring tasks since {}ms has
elapsed (commit interval is {}ms)",
+ elapsedMsSinceLastCommit, commitIntervalMs);
+ }
+
+ for (final Task task : updatingTasks.values()) {
+ // do not enforce checkpointing during restoration if its
position has not advanced much
Review Comment:
Fair enough. I will refactor this piece of code in the follow-up PR (I have
one doing the code refactoring already) such that:
1) We remove the `prepareCommit` and `postCommit` in standby task, instead
we call `writeCheckpoint` directly.
2) We remove the `postCommit` in active task, `enforceCheckpoint` boolean in
the `maybeCheckpoint` function.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]