guozhangwang commented on code in PR #17929:
URL: https://github.com/apache/kafka/pull/17929#discussion_r1866737144


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java:
##########
@@ -41,19 +49,35 @@ public class KStreamAggregate<KIn, VIn, VAgg> implements 
KStreamAggProcessorSupp
     private static final Logger LOG = 
LoggerFactory.getLogger(KStreamAggregate.class);
 
     private final String storeName;
+    private final StoreFactory storeFactory;
     private final Initializer<VAgg> initializer;
     private final Aggregator<? super KIn, ? super VIn, VAgg> aggregator;
 
     private boolean sendOldValues = false;
 
-    KStreamAggregate(final String storeName,
+    KStreamAggregate(final MaterializedInternal<KIn, VAgg, 
KeyValueStore<Bytes, byte[]>> materialized,
+                     final Initializer<VAgg> initializer,
+                     final Aggregator<? super KIn, ? super VIn, VAgg> 
aggregator) {
+        this.storeFactory = new KeyValueStoreMaterializer<>(materialized);
+        this.storeName = materialized.storeName();
+        this.initializer = initializer;
+        this.aggregator = aggregator;
+    }
+
+    KStreamAggregate(final StoreFactory storeFactory,
                      final Initializer<VAgg> initializer,
                      final Aggregator<? super KIn, ? super VIn, VAgg> 
aggregator) {
-        this.storeName = storeName;
+        this.storeFactory = storeFactory;
+        this.storeName = storeFactory.name();
         this.initializer = initializer;
         this.aggregator = aggregator;
     }
 
+    @Override
+    public Set<StoreBuilder<?>> stores() {

Review Comment:
   I'm assuming these are the main changes here and put most focus on them only 
:) 



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java:
##########
@@ -184,7 +184,7 @@ private KTable<K, Long> doCount(final Named named, final 
Materialized<K, Long, K
 
         final String name = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
         return doAggregate(
-            new KStreamAggregate<>(materializedInternal.storeName(), 
aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
+            new KStreamAggregate<>(materializedInternal, 
aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),

Review Comment:
   I think it's okay since we also changed 
https://github.com/apache/kafka/pull/17929/files#diff-593acf162f2804512ad5e034f6646d6d6b61dc7f8e6904ee4e711bcb346d2fa0L94
 hence calling another constructor, and this one 
(https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java#L49)
 does not set store factory?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to