guozhangwang commented on code in PR #17929: URL: https://github.com/apache/kafka/pull/17929#discussion_r1866737144
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java: ########## @@ -41,19 +49,35 @@ public class KStreamAggregate<KIn, VIn, VAgg> implements KStreamAggProcessorSupp private static final Logger LOG = LoggerFactory.getLogger(KStreamAggregate.class); private final String storeName; + private final StoreFactory storeFactory; private final Initializer<VAgg> initializer; private final Aggregator<? super KIn, ? super VIn, VAgg> aggregator; private boolean sendOldValues = false; - KStreamAggregate(final String storeName, + KStreamAggregate(final MaterializedInternal<KIn, VAgg, KeyValueStore<Bytes, byte[]>> materialized, + final Initializer<VAgg> initializer, + final Aggregator<? super KIn, ? super VIn, VAgg> aggregator) { + this.storeFactory = new KeyValueStoreMaterializer<>(materialized); + this.storeName = materialized.storeName(); + this.initializer = initializer; + this.aggregator = aggregator; + } + + KStreamAggregate(final StoreFactory storeFactory, final Initializer<VAgg> initializer, final Aggregator<? super KIn, ? super VIn, VAgg> aggregator) { - this.storeName = storeName; + this.storeFactory = storeFactory; + this.storeName = storeFactory.name(); this.initializer = initializer; this.aggregator = aggregator; } + @Override + public Set<StoreBuilder<?>> stores() { Review Comment: I'm assuming these are the main changes here and put most focus on them only :) ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java: ########## @@ -184,7 +184,7 @@ private KTable<K, Long> doCount(final Named named, final Materialized<K, Long, K final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); return doAggregate( - new KStreamAggregate<>(materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator), + new KStreamAggregate<>(materializedInternal, aggregateBuilder.countInitializer, aggregateBuilder.countAggregator), Review Comment: I think it's okay since we also changed https://github.com/apache/kafka/pull/17929/files#diff-593acf162f2804512ad5e034f6646d6d6b61dc7f8e6904ee4e711bcb346d2fa0L94 hence calling another constructor, and this one (https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java#L49) does not set store factory? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org