guozhangwang commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r464683179



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -461,6 +463,42 @@ public void flush() {
         }
     }
 
+    public void flushCache() {
+        RuntimeException firstException = null;
+        // attempting to flush the stores
+        if (!stores.isEmpty()) {
+            log.debug("Flushing all store caches registered in the state 
manager: {}", stores);
+            for (final StateStoreMetadata metadata : stores.values()) {
+                final StateStore store = metadata.stateStore;
+
+                try {
+                    // buffer should be flushed to send all records to 
changelog
+                    if (store instanceof TimeOrderedKeyValueBuffer) {
+                        store.flush();
+                    } else if (store instanceof CachedStateStore) {
+                        ((CachedStateStore) store).flushCache();
+                    }

Review comment:
       For stores that's not time-ordered or cached, we should not flush them 
indeed. In fact moving forward I think we would not flush cache store anyways 
since they will be removed. I.e. generally speaking we should not `flush cache` 
always. In that sense the log4j entry looks reasonable to me?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to