vvcephei commented on a change in pull request #11581: URL: https://github.com/apache/kafka/pull/11581#discussion_r766223374
########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java ########## @@ -107,10 +107,10 @@ private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, // we can skip flushing to downstream as well as writing to underlying store if (rawNewValue != null || rawOldValue != null) { // we need to get the old values if needed, and then put to store, and then flush - wrapped().put(entry.key(), entry.newValue()); - final ProcessorRecordContext current = context.recordContext(); context.setRecordContext(entry.entry().context()); + wrapped().put(entry.key(), entry.newValue()); Review comment: This is actually important. We were not previously setting the record context before passing the cache-evicted record down to the lower store layers. Previously, the context was incorrectly not set during that operation, and if stores relied on the record context (via the old ProcessorContext), they were getting the wrong metadata. Apparently, this work is the first time we added a feature in Streams that actually relied on that metadata. What is happening now is that we use that metadata to set the Position in the lower store, and if it's not set, then we get an error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org