vvcephei commented on a change in pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#discussion_r766223374



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
##########
@@ -107,10 +107,10 @@ private void putAndMaybeForward(final 
ThreadCache.DirtyEntry entry,
             // we can skip flushing to downstream as well as writing to 
underlying store
             if (rawNewValue != null || rawOldValue != null) {
                 // we need to get the old values if needed, and then put to 
store, and then flush
-                wrapped().put(entry.key(), entry.newValue());
-
                 final ProcessorRecordContext current = context.recordContext();
                 context.setRecordContext(entry.entry().context());
+                wrapped().put(entry.key(), entry.newValue());

Review comment:
       This is actually important. We were not previously setting the record 
context before passing the cache-evicted record down to the lower store layers. 
Previously, the context was incorrectly not set during that operation, and if 
stores relied on the record context (via the old ProcessorContext), they were 
getting the wrong metadata.
   
   Apparently, this work is the first time we added a feature in Streams that 
actually relied on that metadata. What is happening now is that we use that 
metadata to set the Position in the lower store, and if it's not set, then we 
get an error.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to