vvcephei commented on a change in pull request #9221:
URL: https://github.com/apache/kafka/pull/9221#discussion_r476953793



##########
File path: 
streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
##########
@@ -145,24 +145,24 @@ public void shouldPunctuateIfWallClockTimeAdvances() {
         assertThat(outputTopic.isEmpty(), is(true));
     }
 
-    public static class CustomMaxAggregatorSupplier implements 
ProcessorSupplier<String, Long> {
+    public static class CustomMaxAggregatorSupplier implements 
ProcessorSupplier<String, Long, String, Long> {
         @Override
-        public Processor<String, Long> get() {
+        public Processor<String, Long, String, Long> get() {
             return new CustomMaxAggregator();
         }
     }
 
-    public static class CustomMaxAggregator implements Processor<String, Long> 
{
-        ProcessorContext context;
+    public static class CustomMaxAggregator implements Processor<String, Long, 
String, Long> {
+        ProcessorContext<String, Long> context;
         private KeyValueStore<String, Long> store;
 
         @SuppressWarnings("unchecked")
         @Override
-        public void init(final ProcessorContext context) {
+        public void init(final ProcessorContext<String, Long> context) {
             this.context = context;
             context.schedule(Duration.ofSeconds(60), 
PunctuationType.WALL_CLOCK_TIME, time -> flushStore());
             context.schedule(Duration.ofSeconds(10), 
PunctuationType.STREAM_TIME, time -> flushStore());
-            store = (KeyValueStore<String, Long>) 
context.getStateStore("aggStore");
+            store = context.getStateStore("aggStore");

Review comment:
       This is a small improvement I noticed; I'll mention this on the KIP 
discussion if you like it. I've changed the ProcessorContext getStateStore 
method so that we don't have to cast the store type anymore. The generic 
parameters to the method take care of casting now.

##########
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##########
@@ -658,8 +658,42 @@ public synchronized Topology addSink(final String name,
      */
     @SuppressWarnings("rawtypes")
     public synchronized Topology addProcessor(final String name,
-                                              final ProcessorSupplier supplier,
+                                              final 
org.apache.kafka.streams.processor.ProcessorSupplier supplier,
                                               final String... parentNames) {
+        return addProcessor(
+            name,
+            new ProcessorSupplier<Object, Object, Object, Object>() {
+                @Override
+                public Set<StoreBuilder<?>> stores() {
+                    return supplier.stores();
+                }
+
+                @Override
+                public 
org.apache.kafka.streams.processor.api.Processor<Object, Object, Object, 
Object> get() {
+                    return ProcessorAdapter.adaptRaw(supplier.get());
+                }
+            },
+            parentNames
+        );
+    }

Review comment:
       
   
   as in previous changes, delegating the old API to the new one.
   

##########
File path: 
streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
##########
@@ -145,24 +145,24 @@ public void shouldPunctuateIfWallClockTimeAdvances() {
         assertThat(outputTopic.isEmpty(), is(true));
     }
 
-    public static class CustomMaxAggregatorSupplier implements 
ProcessorSupplier<String, Long> {
+    public static class CustomMaxAggregatorSupplier implements 
ProcessorSupplier<String, Long, String, Long> {

Review comment:
       Since the new public API change is small, I also converted almost all of 
the usages of the old API to the new one.

##########
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##########
@@ -772,6 +806,94 @@ public synchronized Topology addStateStore(final 
StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka 
Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to 
consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link 
ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link 
StateStore} up-to-date.
+     * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
+     *
+     * @param storeBuilder          user defined state store builder
+     * @param sourceName            name of the {@link SourceNode} that will 
be automatically added
+     * @param keyDeserializer       the {@link Deserializer} to deserialize 
keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize 
values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already 
registered
+     */
+    public synchronized <KIn, VIn> Topology addGlobalStore(final 
StoreBuilder<?> storeBuilder,
+                                                           final String 
sourceName,
+                                                           final 
Deserializer<KIn> keyDeserializer,
+                                                           final 
Deserializer<VIn> valueDeserializer,
+                                                           final String topic,
+                                                           final String 
processorName,
+                                                           final 
ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {

Review comment:
       new API

##########
File path: 
streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
##########
@@ -180,8 +180,5 @@ private void flushStore() {
                 context.forward(next.key, next.value);
             }
         }
-
-        @Override
-        public void close() {}

Review comment:
       It has default method in the new API.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
##########
@@ -26,22 +30,57 @@
  * Used by the Join nodes as there are several parameters, this abstraction 
helps
  * keep the number of arguments more reasonable.
  */
-public class ProcessorParameters<K, V> {
+public class ProcessorParameters<KIn, VIn, KOut, VOut> {
 
-    private final ProcessorSupplier<K, V> processorSupplier;
+    // During the transition to KIP-478, we capture arguments passed from the 
old API to simplify
+    // the performance of casts that we still need to perform. This will 
eventually be removed.
+    private final org.apache.kafka.streams.processor.ProcessorSupplier<KIn, 
VIn> oldProcessorSupplier;
+    private final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier;
     private final String processorName;
 
-    public ProcessorParameters(final ProcessorSupplier<K, V> processorSupplier,
+    public ProcessorParameters(final 
org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> 
processorSupplier,
                                final String processorName) {
+        oldProcessorSupplier = processorSupplier;
+        this.processorSupplier = () -> 
ProcessorAdapter.adapt(processorSupplier.get());
+        this.processorName = processorName;
+    }
 
+    public ProcessorParameters(final ProcessorSupplier<KIn, VIn, KOut, VOut> 
processorSupplier,
+                               final String processorName) {
+        oldProcessorSupplier = null;
         this.processorSupplier = processorSupplier;
         this.processorName = processorName;
     }
 
-    public ProcessorSupplier<K, V> processorSupplier() {
+    public ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier() {
         return processorSupplier;
     }
 
+    public org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> 
oldProcessorSupplier() {
+        return oldProcessorSupplier;
+    }
+
+    @SuppressWarnings("unchecked")
+    KTableSource<KIn, VIn> kTableSourceSupplier() {
+        // This cast always works because KTableSource hasn't been converted 
yet.
+        return oldProcessorSupplier == null
+            ? null
+            : !(oldProcessorSupplier instanceof KTableSource)
+              ? null
+              : (KTableSource<KIn, VIn>) oldProcessorSupplier;
+    }

Review comment:
       This replaces a type check that was previously done elsewhere.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -482,9 +482,9 @@ public final void addSource(final Topology.AutoOffsetReset 
offsetReset,
         nodeGroups = null;
     }
 
-    public final void addProcessor(final String name,
-                                   final 
org.apache.kafka.streams.processor.ProcessorSupplier<?, ?> supplier,
-                                   final String... predecessorNames) {
+    public final <KIn, VIn, KOut, VOut> void addProcessor(final String name,
+                                                          final 
ProcessorSupplier<KIn, VIn, KOut, VOut> supplier,
+                                                          final String... 
predecessorNames) {

Review comment:
       Just converting the internal method, since we've introduced the new API 
in the public Topology.

##########
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##########
@@ -713,7 +747,7 @@ public synchronized Topology addStateStore(final 
StoreBuilder<?> storeBuilder,
                                                        final Deserializer<V> 
valueDeserializer,
                                                        final String topic,
                                                        final String 
processorName,
-                                                       final 
ProcessorSupplier<K, V> stateUpdateSupplier) {
+                                                       final 
org.apache.kafka.streams.processor.ProcessorSupplier<K, V> stateUpdateSupplier) 
{

Review comment:
       As in the other PRs, I inverted the imports, so the old API is fully 
qualified now, and the new API is imported.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
##########
@@ -82,14 +81,18 @@
     }
 
     public String queryableStoreName() {
-        return ((KTableKTableJoinMerger<K, VR>) 
mergeProcessorParameters().processorSupplier()).getQueryableName();
+        return 
mergeProcessorParameters().kTableKTableJoinMergerProcessorSupplier().getQueryableName();
     }
 
     /**
      * The supplier which provides processor with KTable-KTable join merge 
functionality.
      */
+    @SuppressWarnings("unchecked")
     public KTableKTableJoinMerger<K, VR> joinMerger() {
-        return (KTableKTableJoinMerger<K, VR>) 
mergeProcessorParameters().processorSupplier();
+        final KTableKTableJoinMerger<K, Change<VR>> merger =
+            
mergeProcessorParameters().kTableKTableJoinMergerProcessorSupplier();
+        // this incorrect cast should be corrected by the end of the KIP-478 
implementation

Review comment:
       Specifically, it'll get fixed when `KTableKTableJoinMerger` is converted.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
##########
@@ -70,7 +70,7 @@ private static long findAndVerifyWindowGrace(final 
StreamsGraphNode streamsGraph
 
     private static Long extractGracePeriod(final StreamsGraphNode node) {
         if (node instanceof StatefulProcessorNode) {
-            final ProcessorSupplier processorSupplier = 
((StatefulProcessorNode) node).processorParameters().processorSupplier();
+            final ProcessorSupplier processorSupplier = 
((StatefulProcessorNode) node).processorParameters().oldProcessorSupplier();

Review comment:
       You'll see in the ProcessorParameters class that I've captured both the 
old and new APIs and also internalized a few casts. This is all just an effort 
to limit the scope of this PR. It'll all come out in the wash once the KIP is 
completely implemented.

##########
File path: streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
##########
@@ -87,12 +88,12 @@ public void shouldNotAllowZeroTopicsWhenAddingSource() {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullNameWhenAddingProcessor() {
-        topology.addProcessor(null, () -> new MockProcessorSupplier<>().get());
+        topology.addProcessor(null, () -> new 
MockApiProcessorSupplier<>().get());
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullProcessorSupplierWhenAddingProcessor() {
-        topology.addProcessor("name", null);
+        topology.addProcessor("name", (ProcessorSupplier<Object, Object, 
Object, Object>) null);

Review comment:
       There are a few places where we have to cast `null` in order to resolve 
the right overload.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##########
@@ -853,15 +918,56 @@ public void process(final String key, final String value) 
{
         }
     }
 
-    private <K, V> ProcessorSupplier<K, V> define(final Processor<K, V> 
processor) {
+    /**
+     * A processor that stores each key-value pair in an in-memory key-value 
store registered with the context.
+     */
+    protected static class StatefulProcessor implements Processor<String, 
String, Void, Void> {
+        private KeyValueStore<String, String> store;
+        private final String storeName;
+
+        StatefulProcessor(final String storeName) {
+            this.storeName = storeName;
+        }
+
+        @Override
+        public void init(final ProcessorContext<Void, Void> context) {
+            store = context.getStateStore(storeName);
+        }
+
+        @Override
+        public void process(final String key, final String value) {
+            store.put(key, value);
+        }
+    }
+
+    private <K, V> org.apache.kafka.streams.processor.ProcessorSupplier<K, V> 
define(final org.apache.kafka.streams.processor.Processor<K, V> processor) {

Review comment:
       These are just convenience functions for defining ProcessorSuppliers 
(with and without stores attached). I've duplicated them for the old and new 
APIs.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
##########
@@ -26,22 +30,57 @@
  * Used by the Join nodes as there are several parameters, this abstraction 
helps
  * keep the number of arguments more reasonable.
  */
-public class ProcessorParameters<K, V> {
+public class ProcessorParameters<KIn, VIn, KOut, VOut> {
 
-    private final ProcessorSupplier<K, V> processorSupplier;
+    // During the transition to KIP-478, we capture arguments passed from the 
old API to simplify
+    // the performance of casts that we still need to perform. This will 
eventually be removed.
+    private final org.apache.kafka.streams.processor.ProcessorSupplier<KIn, 
VIn> oldProcessorSupplier;
+    private final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier;
     private final String processorName;
 
-    public ProcessorParameters(final ProcessorSupplier<K, V> processorSupplier,
+    public ProcessorParameters(final 
org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> 
processorSupplier,
                                final String processorName) {
+        oldProcessorSupplier = processorSupplier;
+        this.processorSupplier = () -> 
ProcessorAdapter.adapt(processorSupplier.get());
+        this.processorName = processorName;
+    }

Review comment:
       The constructor for the old API is still present, and when you call it, 
we save a direct reference as well as adapting it to the new API. saving a 
direct reference dramatically simplifies the casts we've internalized below. 
Once everything is converted, we'll go back to just one reference saved.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -851,12 +851,12 @@ boolean sendingOldValueEnabled() {
     }
 
     /**
-     * We conflate V with Change<V> in many places. It might be nice to fix 
that eventually.
+     * We conflate V with Change<V> in many places. This will get fixed in the 
implementation of KIP-478.

Review comment:
       Prototyping this KIP was actually how I discovered this conflation to 
begin with. Once we start converting Processors to the new API, the compiler 
will make this conflation impossible, and this method will eventually be unused.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##########
@@ -735,58 +766,92 @@ private Topology createAddHeaderTopology() {
     /**
      * A processor that simply forwards all messages to all children.
      */
-    protected static class ForwardingProcessor extends 
AbstractProcessor<String, String> {
+    protected static class ForwardingProcessor implements Processor<String, 
String, String, String> {

Review comment:
       Converted to the new API.

##########
File path: 
streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
##########
@@ -390,7 +389,20 @@ private Topology setupGlobalStoreTopology(final String... 
sourceTopicNames) {
                 null,
                 sourceTopicName,
                 sourceTopicName + "-processor",
-                new MockProcessorSupplier()
+                () -> new Processor<Object, Object, Void, Void>() {
+                    KeyValueStore<Object, Object> store;
+
+                    @SuppressWarnings("unchecked")
+                    @Override
+                    public void init(final ProcessorContext<Void, Void> 
context) {
+                        store = context.getStateStore(sourceTopicName + 
"-globalStore");
+                    }
+
+                    @Override
+                    public void process(final Object key, final Object value) {
+                        store.put(key, value);
+                    }
+                }

Review comment:
       Shockingly, the old test wasn't testing what it was supposed to test.
   
   It's supposed to check and make sure the processor populates the global 
store, but it was actually just checking whether the mock captured the 
processed records. I changed the processor to actually do what it's supposed 
to, and then changed the assertion below to check what this test is really 
supposed to be checking.

##########
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##########
@@ -658,8 +658,42 @@ public synchronized Topology addSink(final String name,
      */
     @SuppressWarnings("rawtypes")
     public synchronized Topology addProcessor(final String name,
-                                              final ProcessorSupplier supplier,
+                                              final 
org.apache.kafka.streams.processor.ProcessorSupplier supplier,
                                               final String... parentNames) {
+        return addProcessor(
+            name,
+            new ProcessorSupplier<Object, Object, Object, Object>() {
+                @Override
+                public Set<StoreBuilder<?>> stores() {
+                    return supplier.stores();
+                }
+
+                @Override
+                public 
org.apache.kafka.streams.processor.api.Processor<Object, Object, Object, 
Object> get() {
+                    return ProcessorAdapter.adaptRaw(supplier.get());
+                }
+            },
+            parentNames
+        );
+    }
+
+    /**
+     * Add a new processor node that receives and processes records output by 
one or more parent source or processor
+     * node.
+     * Any new record output by this processor will be forwarded to its child 
processor or sink nodes.
+     * If {@code supplier} provides stores via {@link 
ConnectedStoreProvider#stores()}, the provided {@link StoreBuilder}s
+     * will be added to the topology and connected to this processor 
automatically.
+     *
+     * @param name the unique name of the processor node
+     * @param supplier the supplier used to obtain this node's {@link 
Processor} instance
+     * @param parentNames the name of one or more source or processor nodes 
whose output records this processor should receive
+     * and process
+     * @return itself
+     * @throws TopologyException if parent processor is not added yet, or if 
this processor's name is equal to the parent's name
+     */
+    public synchronized <KIn, VIn, KOut, VOut> Topology addProcessor(final 
String name,
+                                                                     final 
ProcessorSupplier<KIn, VIn, KOut, VOut> supplier,
+                                                                     final 
String... parentNames) {

Review comment:
       new API

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
##########
@@ -127,7 +127,7 @@ public InternalStreamsBuilder(final InternalTopologyBuilder 
internalTopologyBuil
             .orElseGenerateWithPrefix(this, KTableImpl.SOURCE_NAME);
 
         final KTableSource<K, V> tableSource = new 
KTableSource<>(materialized.storeName(), materialized.queryableStoreName());
-        final ProcessorParameters<K, V> processorParameters = new 
ProcessorParameters<>(tableSource, tableSourceName);
+        final ProcessorParameters<K, V, ?, ?> processorParameters = new 
ProcessorParameters<>(tableSource, tableSourceName);

Review comment:
       I've also converted ProcessorParameters to the new API, so you'll see a 
lot of changes like this. Many of them will eventually go away as individual 
processors are converted (such as, in this case, `KTableSource`).

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
##########
@@ -99,5 +99,14 @@ public void writeToTopology(final InternalTopologyBuilder 
topologyBuilder) {
             }
         }
 
+        // temporary hack until KIP-478 is fully implemented

Review comment:
       This is kind of horrible, but it'll be gone soon enough.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
##########
@@ -82,14 +81,18 @@
     }
 
     public String queryableStoreName() {
-        return ((KTableKTableJoinMerger<K, VR>) 
mergeProcessorParameters().processorSupplier()).getQueryableName();
+        return 
mergeProcessorParameters().kTableKTableJoinMergerProcessorSupplier().getQueryableName();

Review comment:
       An example of an internalized cast.

##########
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##########
@@ -772,6 +806,94 @@ public synchronized Topology addStateStore(final 
StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka 
Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to 
consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link 
ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link 
StateStore} up-to-date.
+     * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
+     *
+     * @param storeBuilder          user defined state store builder
+     * @param sourceName            name of the {@link SourceNode} that will 
be automatically added
+     * @param keyDeserializer       the {@link Deserializer} to deserialize 
keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize 
values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already 
registered
+     */
+    public synchronized <KIn, VIn> Topology addGlobalStore(final 
StoreBuilder<?> storeBuilder,
+                                                           final String 
sourceName,
+                                                           final 
Deserializer<KIn> keyDeserializer,
+                                                           final 
Deserializer<VIn> valueDeserializer,
+                                                           final String topic,
+                                                           final String 
processorName,
+                                                           final 
ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
+        internalTopologyBuilder.addGlobalStore(
+            storeBuilder,
+            sourceName,
+            null,
+            keyDeserializer,
+            valueDeserializer,
+            topic,
+            processorName,
+            stateUpdateSupplier
+        );
+        return this;
+    }
+
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka 
Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to 
consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link 
ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link 
StateStore} up-to-date.
+     *
+     * @param storeBuilder          user defined key value store builder
+     * @param sourceName            name of the {@link SourceNode} that will 
be automatically added
+     * @param timestampExtractor    the stateless timestamp extractor used for 
this source,
+     *                              if not specified the default extractor 
defined in the configs will be used
+     * @param keyDeserializer       the {@link Deserializer} to deserialize 
keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize 
values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already 
registered
+     */
+    public synchronized <KIn, VIn> Topology addGlobalStore(final 
StoreBuilder<?> storeBuilder,
+                                                           final String 
sourceName,
+                                                           final 
TimestampExtractor timestampExtractor,
+                                                           final 
Deserializer<KIn> keyDeserializer,
+                                                           final 
Deserializer<VIn> valueDeserializer,
+                                                           final String topic,
+                                                           final String 
processorName,
+                                                           final 
ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {

Review comment:
       new API

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
##########
@@ -37,8 +38,8 @@
     /**
      * A connected sub-graph of a {@link Topology}.
      * <p>
-     * Nodes of a {@code Subtopology} are connected {@link 
Topology#addProcessor(String,
-     * org.apache.kafka.streams.processor.ProcessorSupplier, String...) 
directly} or indirectly via
+     * Nodes of a {@code Subtopology} are connected
+     * {@link Topology#addProcessor(String, ProcessorSupplier, String...) 
directly} or indirectly via

Review comment:
       referencing the new API

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
##########
@@ -31,6 +31,15 @@
         }
     }
 
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public static <KIn, VIn, KOut, VOut> Processor<KIn, VIn, KOut, VOut> 
adaptRaw(final org.apache.kafka.streams.processor.Processor delegate) {

Review comment:
       A minor, convenience adapter to avoid a `rawtypes` warning at the call 
site.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
##########
@@ -26,22 +30,57 @@
  * Used by the Join nodes as there are several parameters, this abstraction 
helps
  * keep the number of arguments more reasonable.
  */
-public class ProcessorParameters<K, V> {
+public class ProcessorParameters<KIn, VIn, KOut, VOut> {

Review comment:
       Converted to the new API.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
##########
@@ -67,8 +66,8 @@ public void writeToTopology(final InternalTopologyBuilder 
topologyBuilder) {
             topologyBuilder.connectProcessorAndStateStores(processorName, 
storeNames);
         }
 
-        if (processorParameters.processorSupplier() instanceof KTableSource) {
-            if (((KTableSource<?, ?>) 
processorParameters.processorSupplier()).materialized()) {
+        if (processorParameters.kTableSourceSupplier() != null) {
+            if (processorParameters.kTableSourceSupplier().materialized()) {

Review comment:
       This is the type check that I internalized. Note, it becomes more 
complicated now that we also have to check whether or not the supplier is an 
"old API" supplier.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##########
@@ -159,10 +159,11 @@ Cancellable schedule(final long intervalMs,
      * @param type one of: {@link PunctuationType#STREAM_TIME}, {@link 
PunctuationType#WALL_CLOCK_TIME}
      * @param callback a function consuming timestamps representing the 
current stream or system time
      * @return a handle allowing cancellation of the punctuation schedule 
established by this method
+     * @throws IllegalArgumentException if the interval is not representable 
in milliseconds
      */
     Cancellable schedule(final Duration interval,
                          final PunctuationType type,
-                         final Punctuator callback) throws 
IllegalArgumentException;
+                         final Punctuator callback);

Review comment:
       This was a minor API design error. The intent was to document that we 
could throw the exception, but declaring a runtime exception in the method 
header doesn't do anything. The right way to do it is to put it in the javadoc.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
##########
@@ -102,7 +102,7 @@ public void shouldAddGlobalStore() {
                 @SuppressWarnings("unchecked")
                 @Override
                 public void init(final ProcessorContext<Void, Void> context) {
-                    store = (KeyValueStore<String, String>) 
context.getStateStore("store");
+                    store = context.getStateStore("store");

Review comment:
       Another example of how we don't need to cast anymore.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##########
@@ -102,7 +102,7 @@ void register(final StateStore store,
      * @param name The store name
      * @return The state store instance
      */
-    StateStore getStateStore(final String name);
+    <S extends StateStore> S getStateStore(final String name);

Review comment:
       This is the small extension to the KIP that I mentioned, which means 
callers no longer have to cast the result to the store interface that they need 
(eg `KeyValueStore`).

##########
File path: streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
##########
@@ -87,12 +88,12 @@ public void shouldNotAllowZeroTopicsWhenAddingSource() {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullNameWhenAddingProcessor() {
-        topology.addProcessor(null, () -> new MockProcessorSupplier<>().get());
+        topology.addProcessor(null, () -> new 
MockApiProcessorSupplier<>().get());

Review comment:
       I've converted most of the tests to the new API, and just left behind a 
couple to make sure the delegation works properly.

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -856,11 +857,18 @@ public void 
statelessTopologyShouldNotCreateStateDirectory() throws Exception {
         final String outputTopic = safeTestName + "-output";
         final Topology topology = new Topology();
         topology.addSource("source", Serdes.String().deserializer(), 
Serdes.String().deserializer(), inputTopic)
-                .addProcessor("process", () -> new AbstractProcessor<String, 
String>() {
+                .addProcessor("process", () -> new Processor<String, String, 
String, String>() {

Review comment:
       Converting to the new API. I didn't want to convert over the 
AbstractProcessor, since that would drag in more changes. I also didn't 
introduce a new abstract class, since the only thing it does is capture the 
context.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##########
@@ -293,6 +293,37 @@ public void testDrivingConnectedStateStoreTopology() {
         assertNull(store.get("key4"));
     }
 
+    @Test
+    public void 
testDrivingConnectedStateStoreInDifferentProcessorsTopologyWithOldAPI() {

Review comment:
       Here's an example of a test I left in place to exercise the delegation 
mechanism (I renamed the "old API" version of the test).

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
##########
@@ -1196,33 +1195,25 @@ public void shouldReinitializeRevivedTasksInAnyState() {
         internalTopologyBuilder.addSource(null, "name", null, null, null, 
topic1);
         final AtomicBoolean shouldThrow = new AtomicBoolean(false);
         final AtomicBoolean processed = new AtomicBoolean(false);
-        internalTopologyBuilder.addProcessor("proc", new 
ProcessorSupplier<Object, Object>() {
-            @Override
-            public Processor<Object, Object> get() {
-                return new Processor<Object, Object>() {
-                    private ProcessorContext context;
-
-                    @Override
-                    public void init(final ProcessorContext context) {
-                        this.context = context;
-                    }
+        internalTopologyBuilder.addProcessor(
+            "proc",
+            () -> new Processor<Object, Object, Object, Object>() {

Review comment:
       Converted to the new API and cleaned up.

##########
File path: 
streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
##########
@@ -996,7 +1003,7 @@ public void shouldPunctuateOnWallClockTime() {
     @Test
     public void shouldReturnAllStores() {
         final Topology topology = setupSourceSinkTopology();
-        topology.addProcessor("processor", () -> null, "source");
+        topology.addProcessor("processor", (ProcessorSupplier<Object, Object, 
Object, Object>) () -> null, "source");

Review comment:
       Just casting `null` to resolve the right overload.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to