ableegoldman commented on a change in pull request #8248: URL: https://github.com/apache/kafka/pull/8248#discussion_r432190091
########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java ########## @@ -263,27 +259,28 @@ private void logValue(final Bytes key, final BufferKey bufferKey, final BufferVa final ByteBuffer buffer = value.serialize(sizeOfBufferTime); buffer.putLong(bufferKey.time()); - collector.send( - changelogTopic, - key, - buffer.array(), - V_2_CHANGELOG_HEADERS, - partition, - null, - KEY_SERIALIZER, - VALUE_SERIALIZER + ((RecordCollector.Supplier) context).recordCollector().send( Review comment: Well, only active tasks have a topology, and we don't initialize the topology until everything has been cleanly recycled. So by the time `init` is being called and the context is being used, it should be all up-to-date with the active task references. Of course that only applies to the Processor/Transformer half of the question. With StateStores we're obviously still calling `init` for standby tasks, and more. But nothing in the public `ProcessorContext` interface gets recycled. Only the cache, record collector, and StreamTask have to be updated, so this should all be totally transparent (unless they're doing something they shouldn't be) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org