ableegoldman commented on code in PR #17929:
URL: https://github.com/apache/kafka/pull/17929#discussion_r1867413674


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java:
##########
@@ -41,19 +49,35 @@ public class KStreamAggregate<KIn, VIn, VAgg> implements 
KStreamAggProcessorSupp
     private static final Logger LOG = 
LoggerFactory.getLogger(KStreamAggregate.class);
 
     private final String storeName;
+    private final StoreFactory storeFactory;
     private final Initializer<VAgg> initializer;
     private final Aggregator<? super KIn, ? super VIn, VAgg> aggregator;
 
     private boolean sendOldValues = false;
 
-    KStreamAggregate(final String storeName,
+    KStreamAggregate(final MaterializedInternal<KIn, VAgg, 
KeyValueStore<Bytes, byte[]>> materialized,
+                     final Initializer<VAgg> initializer,
+                     final Aggregator<? super KIn, ? super VIn, VAgg> 
aggregator) {
+        this.storeFactory = new KeyValueStoreMaterializer<>(materialized);
+        this.storeName = materialized.storeName();
+        this.initializer = initializer;
+        this.aggregator = aggregator;
+    }
+
+    KStreamAggregate(final StoreFactory storeFactory,
                      final Initializer<VAgg> initializer,
                      final Aggregator<? super KIn, ? super VIn, VAgg> 
aggregator) {
-        this.storeName = storeName;
+        this.storeFactory = storeFactory;
+        this.storeName = storeFactory.name();
         this.initializer = initializer;
         this.aggregator = aggregator;
     }
 
+    @Override
+    public Set<StoreBuilder<?>> stores() {

Review Comment:
   yep!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to