vvcephei commented on a change in pull request #9221:
URL: https://github.com/apache/kafka/pull/9221#discussion_r485732127



##########
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##########
@@ -772,6 +806,94 @@ public synchronized Topology addStateStore(final 
StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka 
Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to 
consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link 
ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link 
StateStore} up-to-date.
+     * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
+     *
+     * @param storeBuilder          user defined state store builder
+     * @param sourceName            name of the {@link SourceNode} that will 
be automatically added
+     * @param keyDeserializer       the {@link Deserializer} to deserialize 
keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize 
values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already 
registered
+     */
+    public synchronized <KIn, VIn> Topology addGlobalStore(final 
StoreBuilder<?> storeBuilder,
+                                                           final String 
sourceName,
+                                                           final 
Deserializer<KIn> keyDeserializer,
+                                                           final 
Deserializer<VIn> valueDeserializer,
+                                                           final String topic,
+                                                           final String 
processorName,
+                                                           final 
ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
+        internalTopologyBuilder.addGlobalStore(
+            storeBuilder,
+            sourceName,
+            null,
+            keyDeserializer,
+            valueDeserializer,
+            topic,
+            processorName,
+            stateUpdateSupplier
+        );
+        return this;
+    }
+
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka 
Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to 
consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link 
ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link 
StateStore} up-to-date.
+     *
+     * @param storeBuilder          user defined key value store builder
+     * @param sourceName            name of the {@link SourceNode} that will 
be automatically added
+     * @param timestampExtractor    the stateless timestamp extractor used for 
this source,
+     *                              if not specified the default extractor 
defined in the configs will be used
+     * @param keyDeserializer       the {@link Deserializer} to deserialize 
keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize 
values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already 
registered
+     */
+    public synchronized <KIn, VIn> Topology addGlobalStore(final 
StoreBuilder<?> storeBuilder,
+                                                           final String 
sourceName,
+                                                           final 
TimestampExtractor timestampExtractor,
+                                                           final 
Deserializer<KIn> keyDeserializer,
+                                                           final 
Deserializer<VIn> valueDeserializer,
+                                                           final String topic,
+                                                           final String 
processorName,
+                                                           final 
ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {

Review comment:
       Yeah, this probably would have been a better design. I'm a little 
hesitant to make this change to the KIP right now, though. Subjectively, it 
seems more lightweight for users if they don't have to change much of their 
code to switch over to the new API. Also, maybe I have a little bit of 
emotional resistance to increasing the scope of this KIP because it's been 
taking so long to actually make progress on it.
   
   I've filed https://issues.apache.org/jira/browse/KAFKA-10472 to capture the 
thought, though.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to