guozhangwang commented on a change in pull request #8964: URL: https://github.com/apache/kafka/pull/8964#discussion_r464683179
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ########## @@ -461,6 +463,42 @@ public void flush() { } } + public void flushCache() { + RuntimeException firstException = null; + // attempting to flush the stores + if (!stores.isEmpty()) { + log.debug("Flushing all store caches registered in the state manager: {}", stores); + for (final StateStoreMetadata metadata : stores.values()) { + final StateStore store = metadata.stateStore; + + try { + // buffer should be flushed to send all records to changelog + if (store instanceof TimeOrderedKeyValueBuffer) { + store.flush(); + } else if (store instanceof CachedStateStore) { + ((CachedStateStore) store).flushCache(); + } Review comment: For stores that's not time-ordered or cached, we should not flush them indeed. In fact moving forward I think we would not flush cache store anyways since they will be removed. I.e. generally speaking we should not `flush cache` always. In that sense the log4j entry looks reasonable to me? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org