abbccdda commented on a change in pull request #9148:
URL: https://github.com/apache/kafka/pull/9148#discussion_r469028867



##########
File path: streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+public class MockApiProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn, 
VIn, KOut, VOut> {

Review comment:
       I'm thinking whether it makes more sense to let `MockProcessor` 
encapsulate a delegate `MockApiProcessor` so that we could also use existing 
tests to verify the correctness of the migration.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -667,7 +674,7 @@ public void validateCopartition() {
     private void validateGlobalStoreArguments(final String sourceName,
                                               final String topic,
                                               final String processorName,
-                                              final ProcessorSupplier<?, ?> 
stateUpdateSupplier,
+                                              final ProcessorSupplier<?, ?, ?, 
?> stateUpdateSupplier,

Review comment:
       Could we use `Void, Void` instead?

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##########
@@ -512,27 +513,70 @@ public synchronized StreamsBuilder addStateStore(final 
StoreBuilder<?> builder)
      * A {@link SourceNode} with the provided sourceName will be added to 
consume the data arriving from the partitions
      * of the input topic.
      * <p>
-     * The provided {@link ProcessorSupplier} will be used to create an {@link 
ProcessorNode} that will receive all
+     * The provided {@link 
org.apache.kafka.streams.processor.ProcessorSupplier} will be used to create an 
{@link ProcessorNode} that will receive all
      * records forwarded from the {@link SourceNode}. NOTE: you should not use 
the {@code Processor} to insert transformed records into
      * the global state store. This store uses the source topic as changelog 
and during restore will insert records directly
      * from the source.
      * This {@link ProcessorNode} should be used to keep the {@link 
StateStore} up-to-date.
      * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
      * <p>
-     * It is not required to connect a global store to {@link Processor 
Processors}, {@link Transformer Transformers},
+     * It is not required to connect a global store to {@link 
org.apache.kafka.streams.processor.Processor Processors}, {@link Transformer 
Transformers},
      * or {@link ValueTransformer ValueTransformer}; those have read-only 
access to all global stores by default.
      *
      * @param storeBuilder          user defined {@link StoreBuilder}; can't 
be {@code null}
      * @param topic                 the topic to source the data from
      * @param consumed              the instance of {@link Consumed} used to 
define optional parameters; can't be {@code null}
-     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link 
org.apache.kafka.streams.processor.ProcessorSupplier}
      * @return itself
      * @throws TopologyException if the processor of state is already 
registered
+     * @deprecated Since 2.7.0; use {@link #addGlobalStore(StoreBuilder, 
String, Consumed, ProcessorSupplier)} instead.
      */
+    @Deprecated
     public synchronized <K, V> StreamsBuilder addGlobalStore(final 
StoreBuilder<?> storeBuilder,
                                                              final String 
topic,
                                                              final Consumed<K, 
V> consumed,
-                                                             final 
ProcessorSupplier<K, V> stateUpdateSupplier) {
+                                                             final 
org.apache.kafka.streams.processor.ProcessorSupplier<K, V> stateUpdateSupplier) 
{
+        Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
+        Objects.requireNonNull(consumed, "consumed can't be null");
+        internalStreamsBuilder.addGlobalStore(
+            storeBuilder,
+            topic,
+            new ConsumedInternal<>(consumed),
+            () -> ProcessorAdapter.adapt(stateUpdateSupplier.get())
+        );
+        return this;
+    }
+
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka 
Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to 
consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier}} will be used to create an
+     * {@link Processor} that will receive all records forwarded from the 
{@link SourceNode}.
+     * NOTE: you should not use the {@link Processor} to insert transformed 
records into
+     * the global state store. This store uses the source topic as changelog 
and during restore will insert records directly
+     * from the source.
+     * This {@link Processor} should be used to keep the {@link StateStore} 
up-to-date.
+     * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
+     * <p>
+     * It is not required to connect a global store to the {@link Processor 
Processors},
+     * {@link Transformer Transformers}, or {@link ValueTransformer 
ValueTransformer}; those have read-only access to all global stores by default.
+     *
+     * @param storeBuilder          user defined {@link StoreBuilder}; can't 
be {@code null}
+     * @param topic                 the topic to source the data from
+     * @param consumed              the instance of {@link Consumed} used to 
define optional parameters; can't be {@code null}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already 
registered
+     */
+    public synchronized <KIn, VIn> StreamsBuilder addGlobalStore(final 
StoreBuilder<?> storeBuilder,

Review comment:
       Do we want to add test coverage for this function?

##########
File path: 
streams/src/test/java/org/apache/kafka/test/MockApiProcessorSupplier.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.PunctuationType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class MockApiProcessorSupplier<KIn, VIn, KOut, VOut> implements 
ProcessorSupplier<KIn, VIn, KOut, VOut> {
+
+    private final long scheduleInterval;
+    private final PunctuationType punctuationType;
+    private final List<MockApiProcessor<KIn, VIn, KOut, VOut>> processors = 
new ArrayList<>();
+
+    public MockApiProcessorSupplier() {
+        this(-1L);
+    }
+
+    public MockApiProcessorSupplier(final long scheduleInterval) {
+        this(scheduleInterval, PunctuationType.STREAM_TIME);
+    }
+
+    public MockApiProcessorSupplier(final long scheduleInterval, final 
PunctuationType punctuationType) {
+        this.scheduleInterval = scheduleInterval;
+        this.punctuationType = punctuationType;
+    }
+
+    @Override
+    public Processor<KIn, VIn, KOut, VOut> get() {
+        final MockApiProcessor<KIn, VIn, KOut, VOut> processor = new 
MockApiProcessor<>(punctuationType, scheduleInterval);
+        processors.add(processor);
+        return processor;
+    }
+
+    // get the captured processor assuming that only one processor gets 
returned from this supplier
+    public MockApiProcessor<KIn, VIn, KOut, VOut> theCapturedProcessor() {
+        return capturedProcessors(1).get(0);
+    }
+
+    public int capturedProcessorsCount() {
+        return processors.size();
+    }
+
+        // get the captured processors with the expected number

Review comment:
       comment format.

##########
File path: 
streams/src/test/java/org/apache/kafka/test/MockApiProcessorSupplier.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.PunctuationType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class MockApiProcessorSupplier<KIn, VIn, KOut, VOut> implements 
ProcessorSupplier<KIn, VIn, KOut, VOut> {
+
+    private final long scheduleInterval;
+    private final PunctuationType punctuationType;
+    private final List<MockApiProcessor<KIn, VIn, KOut, VOut>> processors = 
new ArrayList<>();
+
+    public MockApiProcessorSupplier() {
+        this(-1L);
+    }
+
+    public MockApiProcessorSupplier(final long scheduleInterval) {
+        this(scheduleInterval, PunctuationType.STREAM_TIME);
+    }
+
+    public MockApiProcessorSupplier(final long scheduleInterval, final 
PunctuationType punctuationType) {
+        this.scheduleInterval = scheduleInterval;
+        this.punctuationType = punctuationType;
+    }
+
+    @Override
+    public Processor<KIn, VIn, KOut, VOut> get() {
+        final MockApiProcessor<KIn, VIn, KOut, VOut> processor = new 
MockApiProcessor<>(punctuationType, scheduleInterval);
+        processors.add(processor);
+        return processor;
+    }
+
+    // get the captured processor assuming that only one processor gets 
returned from this supplier
+    public MockApiProcessor<KIn, VIn, KOut, VOut> theCapturedProcessor() {
+        return capturedProcessors(1).get(0);
+    }
+
+    public int capturedProcessorsCount() {

Review comment:
       We could port this function when it is actually needed.

##########
File path: 
streams/src/test/java/org/apache/kafka/test/MockApiProcessorSupplier.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.PunctuationType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class MockApiProcessorSupplier<KIn, VIn, KOut, VOut> implements 
ProcessorSupplier<KIn, VIn, KOut, VOut> {

Review comment:
       Same question for ProcessorSupplier for using a delegate, but is minor 
to me. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to