guozhangwang commented on a change in pull request #8964: URL: https://github.com/apache/kafka/pull/8964#discussion_r461266795
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java ########## @@ -38,13 +41,39 @@ */ final class StateManagerUtil { static final String CHECKPOINT_FILE_NAME = ".checkpoint"; + static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L; Review comment: Yeah my major concern is to tie the flushing policy with rocksdb -- although it is the default persistent stores now, we should avoid tying with a specific type of stores. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ########## @@ -454,6 +456,41 @@ public void flush() { } } + public void flushCache() { + RuntimeException firstException = null; + // attempting to flush the stores + if (!stores.isEmpty()) { + log.debug("Flushing all store caches registered in the state manager: {}", stores); + for (final StateStoreMetadata metadata : stores.values()) { + final StateStore store = metadata.stateStore; + + try { + // buffer should be flushed to send all records to changelog + if (store instanceof TimeOrderedKeyValueBuffer) { Review comment: I'm thinking we can remove the whole `flushCache` method. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java ########## @@ -38,13 +41,39 @@ */ final class StateManagerUtil { static final String CHECKPOINT_FILE_NAME = ".checkpoint"; + static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L; private StateManagerUtil() {} static RecordConverter converterForStore(final StateStore store) { return isTimestamped(store) ? rawValueToTimestampedValue() : identity(); } + static boolean checkpointNeeded(final boolean enforceCheckpoint, + final Map<TopicPartition, Long> oldOffsetSnapshot, + final Map<TopicPartition, Long> newOffsetSnapshot) { + // we should always have the old snapshot post completing the register state stores; + // if it is null it means the registration is not done and hence we should not overwrite the checkpoint + if (oldOffsetSnapshot == null) + return false; + + // if the previous snapshot is empty while the current snapshot is not then we should always checkpoint; + // note if the task is stateless or stateful but no stores logged, the snapshot would also be empty + // and hence it's okay to not checkpoint + if (oldOffsetSnapshot.isEmpty() && !newOffsetSnapshot.isEmpty()) + return true; + + // we can checkpoint if the the difference between the current and the previous snapshot is large enough + long totalOffsetDelta = 0L; + for (final Map.Entry<TopicPartition, Long> entry : newOffsetSnapshot.entrySet()) { + totalOffsetDelta += Math.abs(oldOffsetSnapshot.getOrDefault(entry.getKey(), 0L) - entry.getValue()); + } + + // when enforcing checkpoint is required, we should overwrite the checkpoint if it is different from the old one; + // otherwise, we only overwrite the checkpoint if it is largely different from the old one + return enforceCheckpoint ? totalOffsetDelta > 0 : totalOffsetDelta > OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT; Review comment: I think we do not need to enforce checkpoint during suspension but only need to do that during closure / recycling; if a suspended task is resumed then we do not need to write checkpoint in between. But admittedly moving forward most suspended tasks would be closed or recycled :slightly_smiling_face: So I can change that back. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org