ableegoldman commented on code in PR #17929: URL: https://github.com/apache/kafka/pull/17929#discussion_r1867412493
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java: ########## @@ -184,7 +184,7 @@ private KTable<K, Long> doCount(final Named named, final Materialized<K, Long, K final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); return doAggregate( - new KStreamAggregate<>(materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator), + new KStreamAggregate<>(materializedInternal, aggregateBuilder.countInitializer, aggregateBuilder.countAggregator), Review Comment: ah sorry I forgot to respond to this earlier -- basically you're right that we ultimately need to remove it from `StateProcessorNode` but we need to convert a bunch of other stuff to implement `#stores` before we can do this. I put `StatefulProcessorNode` on our TODO list with a note to say it should be picked up last. In the meantime, it doesn't hurt to add the store twice. The InternalTopologyBuilder has checks for this (see `StoreFactory#isCompatibleWith`) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org