abbccdda commented on a change in pull request #9148:
URL: https://github.com/apache/kafka/pull/9148#discussion_r468967992



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##########
@@ -528,13 +529,56 @@ public synchronized StreamsBuilder addStateStore(final 
StoreBuilder<?> builder)
      * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
      * @return itself
      * @throws TopologyException if the processor of state is already 
registered
+     * @deprecated Since 2.7.0; use {@link #addGlobalStore(StoreBuilder, 
String, Consumed, org.apache.kafka.streams.processor.api.ProcessorSupplier)} 
instead.
      */
+    @Deprecated
     public synchronized <K, V> StreamsBuilder addGlobalStore(final 
StoreBuilder<?> storeBuilder,
                                                              final String 
topic,
                                                              final Consumed<K, 
V> consumed,
                                                              final 
ProcessorSupplier<K, V> stateUpdateSupplier) {
         Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
         Objects.requireNonNull(consumed, "consumed can't be null");
+        internalStreamsBuilder.addGlobalStore(
+            storeBuilder,
+            topic,
+            new ConsumedInternal<>(consumed),
+            () -> ProcessorAdapter.adapt(stateUpdateSupplier.get())
+        );
+        return this;
+    }
+
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka 
Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to 
consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link 
org.apache.kafka.streams.processor.api.ProcessorSupplier}} will be used to 
create an
+     * {@link org.apache.kafka.streams.processor.api.Processor} that will 
receive all records forwarded from the {@link SourceNode}.
+     * NOTE: you should not use the {@link 
org.apache.kafka.streams.processor.api.Processor} to insert transformed records 
into
+     * the global state store. This store uses the source topic as changelog 
and during restore will insert records directly
+     * from the source.
+     * This {@link org.apache.kafka.streams.processor.api.Processor} should be 
used to keep the {@link StateStore} up-to-date.
+     * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
+     * <p>
+     * It is not required to connect a global store to the {@link 
org.apache.kafka.streams.processor.api.Processor Processors},
+     * {@link Transformer Transformers}, or {@link ValueTransformer 
ValueTransformer}; those have read-only access to all global stores by default.
+     *
+     * @param storeBuilder          user defined {@link StoreBuilder}; can't 
be {@code null}
+     * @param topic                 the topic to source the data from
+     * @param consumed              the instance of {@link Consumed} used to 
define optional parameters; can't be {@code null}
+     * @param stateUpdateSupplier   the instance of {@link 
org.apache.kafka.streams.processor.api.ProcessorSupplier}

Review comment:
       Yea, that's probably easier when we eventually cleanup the deprecated 
stuff 👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to