ableegoldman commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r461229173



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##########
@@ -38,13 +41,39 @@
  */
 final class StateManagerUtil {
     static final String CHECKPOINT_FILE_NAME = ".checkpoint";
+    static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L;

Review comment:
       Something like that. We know rocksdb will render the memtable immutable 
once it reaches the configured memtable size, after that it will flush once the 
number of immutable memtables reaches the configured value. Probably makes 
sense to align our checkpoint/flushing to the configured rocksdb flushing.
   
   Would be cool if we could piggy-back on the rocksdb options and avoid a new 
config in Streams altogether, but obviously not everyone uses rocksdb

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -454,6 +456,41 @@ public void flush() {
         }
     }
 
+    public void flushCache() {
+        RuntimeException firstException = null;
+        // attempting to flush the stores
+        if (!stores.isEmpty()) {
+            log.debug("Flushing all store caches registered in the state 
manager: {}", stores);
+            for (final StateStoreMetadata metadata : stores.values()) {
+                final StateStore store = metadata.stateStore;
+
+                try {
+                    // buffer should be flushed to send all records to 
changelog
+                    if (store instanceof TimeOrderedKeyValueBuffer) {

Review comment:
       So you're saying we'd still need to flush the suppression buffer but not 
the cache once we decouple caching from emitting? Or that we can remove this 
`flushCache` method altogether once that is done? Or that it will still do some 
flushing, but will not resemble the current `flushCache` method at all

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -454,6 +456,41 @@ public void flush() {
         }
     }
 
+    public void flushCache() {
+        RuntimeException firstException = null;
+        // attempting to flush the stores
+        if (!stores.isEmpty()) {
+            log.debug("Flushing all store caches registered in the state 
manager: {}", stores);
+            for (final StateStoreMetadata metadata : stores.values()) {
+                final StateStore store = metadata.stateStore;
+
+                try {
+                    // buffer should be flushed to send all records to 
changelog
+                    if (store instanceof TimeOrderedKeyValueBuffer) {

Review comment:
       Is there a ticket for that?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##########
@@ -38,13 +41,39 @@
  */
 final class StateManagerUtil {
     static final String CHECKPOINT_FILE_NAME = ".checkpoint";
+    static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L;
 
     private StateManagerUtil() {}
 
     static RecordConverter converterForStore(final StateStore store) {
         return isTimestamped(store) ? rawValueToTimestampedValue() : 
identity();
     }
 
+    static boolean checkpointNeeded(final boolean enforceCheckpoint,
+                                    final Map<TopicPartition, Long> 
oldOffsetSnapshot,
+                                    final Map<TopicPartition, Long> 
newOffsetSnapshot) {
+        // we should always have the old snapshot post completing the register 
state stores;
+        // if it is null it means the registration is not done and hence we 
should not overwrite the checkpoint
+        if (oldOffsetSnapshot == null)
+            return false;
+
+        // if the previous snapshot is empty while the current snapshot is not 
then we should always checkpoint;
+        // note if the task is stateless or stateful but no stores logged, the 
snapshot would also be empty
+        // and hence it's okay to not checkpoint
+        if (oldOffsetSnapshot.isEmpty() && !newOffsetSnapshot.isEmpty())
+            return true;
+
+        // we can checkpoint if the the difference between the current and the 
previous snapshot is large enough
+        long totalOffsetDelta = 0L;
+        for (final Map.Entry<TopicPartition, Long> entry : 
newOffsetSnapshot.entrySet()) {
+            totalOffsetDelta += 
Math.abs(oldOffsetSnapshot.getOrDefault(entry.getKey(), 0L) - entry.getValue());
+        }
+
+        // when enforcing checkpoint is required, we should overwrite the 
checkpoint if it is different from the old one;
+        // otherwise, we only overwrite the checkpoint if it is largely 
different from the old one
+        return enforceCheckpoint ? totalOffsetDelta > 0 : totalOffsetDelta > 
OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT;

Review comment:
       I'm still not following -- I thought we no longer commit during close at 
all, we just commit when suspending? For active tasks we call pre/postCommit 
and suspend in `handleRevocation` and then just close them in 
`handleAssignment`. For standbys we do all those things in `handleAssignment`, 
but still only once

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##########
@@ -38,13 +41,39 @@
  */
 final class StateManagerUtil {
     static final String CHECKPOINT_FILE_NAME = ".checkpoint";
+    static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L;
 
     private StateManagerUtil() {}
 
     static RecordConverter converterForStore(final StateStore store) {
         return isTimestamped(store) ? rawValueToTimestampedValue() : 
identity();
     }
 
+    static boolean checkpointNeeded(final boolean enforceCheckpoint,
+                                    final Map<TopicPartition, Long> 
oldOffsetSnapshot,
+                                    final Map<TopicPartition, Long> 
newOffsetSnapshot) {
+        // we should always have the old snapshot post completing the register 
state stores;
+        // if it is null it means the registration is not done and hence we 
should not overwrite the checkpoint
+        if (oldOffsetSnapshot == null)
+            return false;
+
+        // if the previous snapshot is empty while the current snapshot is not 
then we should always checkpoint;
+        // note if the task is stateless or stateful but no stores logged, the 
snapshot would also be empty
+        // and hence it's okay to not checkpoint
+        if (oldOffsetSnapshot.isEmpty() && !newOffsetSnapshot.isEmpty())
+            return true;
+
+        // we can checkpoint if the the difference between the current and the 
previous snapshot is large enough
+        long totalOffsetDelta = 0L;
+        for (final Map.Entry<TopicPartition, Long> entry : 
newOffsetSnapshot.entrySet()) {
+            totalOffsetDelta += 
Math.abs(oldOffsetSnapshot.getOrDefault(entry.getKey(), 0L) - entry.getValue());
+        }
+
+        // when enforcing checkpoint is required, we should overwrite the 
checkpoint if it is different from the old one;
+        // otherwise, we only overwrite the checkpoint if it is largely 
different from the old one
+        return enforceCheckpoint ? totalOffsetDelta > 0 : totalOffsetDelta > 
OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT;

Review comment:
       Oh, I see, we now do the checkpoint in `handleAssignment` as well. Why? 
It seems like we're just bringing back the problem of potentially 
double-checkpointing that had been solved?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##########
@@ -38,13 +41,39 @@
  */
 final class StateManagerUtil {
     static final String CHECKPOINT_FILE_NAME = ".checkpoint";
+    static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L;
 
     private StateManagerUtil() {}
 
     static RecordConverter converterForStore(final StateStore store) {
         return isTimestamped(store) ? rawValueToTimestampedValue() : 
identity();
     }
 
+    static boolean checkpointNeeded(final boolean enforceCheckpoint,
+                                    final Map<TopicPartition, Long> 
oldOffsetSnapshot,
+                                    final Map<TopicPartition, Long> 
newOffsetSnapshot) {
+        // we should always have the old snapshot post completing the register 
state stores;
+        // if it is null it means the registration is not done and hence we 
should not overwrite the checkpoint
+        if (oldOffsetSnapshot == null)
+            return false;
+
+        // if the previous snapshot is empty while the current snapshot is not 
then we should always checkpoint;
+        // note if the task is stateless or stateful but no stores logged, the 
snapshot would also be empty
+        // and hence it's okay to not checkpoint
+        if (oldOffsetSnapshot.isEmpty() && !newOffsetSnapshot.isEmpty())
+            return true;
+
+        // we can checkpoint if the the difference between the current and the 
previous snapshot is large enough
+        long totalOffsetDelta = 0L;
+        for (final Map.Entry<TopicPartition, Long> entry : 
newOffsetSnapshot.entrySet()) {
+            totalOffsetDelta += 
Math.abs(oldOffsetSnapshot.getOrDefault(entry.getKey(), 0L) - entry.getValue());
+        }
+
+        // when enforcing checkpoint is required, we should overwrite the 
checkpoint if it is different from the old one;
+        // otherwise, we only overwrite the checkpoint if it is largely 
different from the old one
+        return enforceCheckpoint ? totalOffsetDelta > 0 : totalOffsetDelta > 
OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT;

Review comment:
       I'm still not following -- I thought we no longer commit during close at 
all, we just commit when suspending? For active tasks we call pre/postCommit 
and suspend in `handleRevocation` and then just close them in 
`handleAssignment`. For standbys we do all those things in `handleAssignment`, 
but still only once

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##########
@@ -38,13 +41,39 @@
  */
 final class StateManagerUtil {
     static final String CHECKPOINT_FILE_NAME = ".checkpoint";
+    static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L;
 
     private StateManagerUtil() {}
 
     static RecordConverter converterForStore(final StateStore store) {
         return isTimestamped(store) ? rawValueToTimestampedValue() : 
identity();
     }
 
+    static boolean checkpointNeeded(final boolean enforceCheckpoint,
+                                    final Map<TopicPartition, Long> 
oldOffsetSnapshot,
+                                    final Map<TopicPartition, Long> 
newOffsetSnapshot) {
+        // we should always have the old snapshot post completing the register 
state stores;
+        // if it is null it means the registration is not done and hence we 
should not overwrite the checkpoint
+        if (oldOffsetSnapshot == null)
+            return false;
+
+        // if the previous snapshot is empty while the current snapshot is not 
then we should always checkpoint;
+        // note if the task is stateless or stateful but no stores logged, the 
snapshot would also be empty
+        // and hence it's okay to not checkpoint
+        if (oldOffsetSnapshot.isEmpty() && !newOffsetSnapshot.isEmpty())
+            return true;
+
+        // we can checkpoint if the the difference between the current and the 
previous snapshot is large enough
+        long totalOffsetDelta = 0L;
+        for (final Map.Entry<TopicPartition, Long> entry : 
newOffsetSnapshot.entrySet()) {
+            totalOffsetDelta += 
Math.abs(oldOffsetSnapshot.getOrDefault(entry.getKey(), 0L) - entry.getValue());
+        }
+
+        // when enforcing checkpoint is required, we should overwrite the 
checkpoint if it is different from the old one;
+        // otherwise, we only overwrite the checkpoint if it is largely 
different from the old one
+        return enforceCheckpoint ? totalOffsetDelta > 0 : totalOffsetDelta > 
OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT;

Review comment:
       Can you elaborate on why not enforce the checkpoint during suspension in 
`handleRevocation` (for the tasks to be closed)? It seems like we just 
re-introduce this problem of potentially double checkpointing and I'm not sure 
I understand why we need to checkpoint only in `handleAssignment`. Is this an 
optimization or a correctness issue or a code cleanness thing?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to