ableegoldman commented on a change in pull request #8248:
URL: https://github.com/apache/kafka/pull/8248#discussion_r432190091



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -263,27 +259,28 @@ private void logValue(final Bytes key, final BufferKey 
bufferKey, final BufferVa
         final ByteBuffer buffer = value.serialize(sizeOfBufferTime);
         buffer.putLong(bufferKey.time());
 
-        collector.send(
-            changelogTopic,
-            key,
-            buffer.array(),
-            V_2_CHANGELOG_HEADERS,
-            partition,
-            null,
-            KEY_SERIALIZER,
-            VALUE_SERIALIZER
+        ((RecordCollector.Supplier) context).recordCollector().send(

Review comment:
       Well, only active tasks have a topology, and we don't initialize the 
topology until everything has been cleanly recycled. So by the time `init` is 
being called and the context is being used, it should be all up-to-date with 
the active task references.
   
   Of course that only applies to the Processor/Transformer half of the 
question. With StateStores we're obviously still calling `init` for standby 
tasks, and more. But nothing in the public `ProcessorContext` interface gets 
recycled. Only the cache, record collector, and StreamTask have to be updated, 
so this should all be totally transparent (unless they're doing something they 
shouldn't be)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to