vvcephei commented on a change in pull request #9004:
URL: https://github.com/apache/kafka/pull/9004#discussion_r461271276



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -880,40 +880,41 @@ public synchronized ProcessorTopology 
buildGlobalStateTopology() {
         return globalGroups;
     }
 
+    @SuppressWarnings("unchecked")
     private ProcessorTopology build(final Set<String> nodeGroup) {
         Objects.requireNonNull(applicationId, "topology has not completed 
optimization");
 
-        final Map<String, ProcessorNode<?, ?>> processorMap = new 
LinkedHashMap<>();
-        final Map<String, SourceNode<?, ?>> topicSourceMap = new HashMap<>();
-        final Map<String, SinkNode<?, ?>> topicSinkMap = new HashMap<>();
+        final Map<String, ProcessorNode<?, ?, ?, ?>> processorMap = new 
LinkedHashMap<>();
+        final Map<String, SourceNode<?, ?, ?, ?>> topicSourceMap = new 
HashMap<>();
+        final Map<String, SinkNode<?, ?, ?, ?>> topicSinkMap = new HashMap<>();
         final Map<String, StateStore> stateStoreMap = new LinkedHashMap<>();
         final Set<String> repartitionTopics = new HashSet<>();
 
         // create processor nodes in a topological order ("nodeFactories" is 
already topologically sorted)
         // also make sure the state store map values following the insertion 
ordering
-        for (final NodeFactory<?, ?> factory : nodeFactories.values()) {
+        for (final NodeFactory<?, ?, ?, ?> factory : nodeFactories.values()) {
             if (nodeGroup == null || nodeGroup.contains(factory.name)) {
-                final ProcessorNode<?, ?> node = factory.build();
+                final ProcessorNode<?, ?, ?, ?> node = factory.build();
                 processorMap.put(node.name(), node);
 
                 if (factory instanceof ProcessorNodeFactory) {
                     buildProcessorNode(processorMap,
                                        stateStoreMap,
-                                       (ProcessorNodeFactory<?, ?>) factory,
-                                       node);
+                                       (ProcessorNodeFactory<?, ?, ?, ?>) 
factory,
+                                       (ProcessorNode<Object, Object, Object, 
Object>) node);
 
                 } else if (factory instanceof SourceNodeFactory) {
                     buildSourceNode(topicSourceMap,
                                     repartitionTopics,
-                                    (SourceNodeFactory<?, ?>) factory,
-                                    (SourceNode<?, ?>) node);
+                                    (SourceNodeFactory<?, ?, ?, ?>) factory,
+                                    (SourceNode<?, ?, ?, ?>) node);
 
                 } else if (factory instanceof SinkNodeFactory) {
                     buildSinkNode(processorMap,
                                   topicSinkMap,
                                   repartitionTopics,
-                                  (SinkNodeFactory<?, ?>) factory,
-                                  (SinkNode<?, ?>) node);
+                                  (SinkNodeFactory<?, ?, ?, ?>) factory,
+                                  (SinkNode<Object, Object, Object, Object>) 
node);

Review comment:
       They have subtly different meanings, which I'm not 100% clear on all the 
time. I'm not sure if I had to change this one, of if it was an accident. I'll 
give it a closer look.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorShim.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+
+public final class ProcessorShim<KIn, VIn, KOut, VOut> implements 
Processor<KIn, VIn, KOut, VOut> {

Review comment:
       I think "adapter" is the standard design pattern name for this type of 
thing. Not sure why I thought "shim" was a good choice in the heat of the 
moment. Maybe because I'm kind of slipping these classes in the middle to make 
everything line up? I can change them to "adapter".

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -880,40 +880,41 @@ public synchronized ProcessorTopology 
buildGlobalStateTopology() {
         return globalGroups;
     }
 
+    @SuppressWarnings("unchecked")
     private ProcessorTopology build(final Set<String> nodeGroup) {
         Objects.requireNonNull(applicationId, "topology has not completed 
optimization");
 
-        final Map<String, ProcessorNode<?, ?>> processorMap = new 
LinkedHashMap<>();
-        final Map<String, SourceNode<?, ?>> topicSourceMap = new HashMap<>();
-        final Map<String, SinkNode<?, ?>> topicSinkMap = new HashMap<>();
+        final Map<String, ProcessorNode<?, ?, ?, ?>> processorMap = new 
LinkedHashMap<>();
+        final Map<String, SourceNode<?, ?, ?, ?>> topicSourceMap = new 
HashMap<>();
+        final Map<String, SinkNode<?, ?, ?, ?>> topicSinkMap = new HashMap<>();
         final Map<String, StateStore> stateStoreMap = new LinkedHashMap<>();
         final Set<String> repartitionTopics = new HashSet<>();
 
         // create processor nodes in a topological order ("nodeFactories" is 
already topologically sorted)
         // also make sure the state store map values following the insertion 
ordering
-        for (final NodeFactory<?, ?> factory : nodeFactories.values()) {
+        for (final NodeFactory<?, ?, ?, ?> factory : nodeFactories.values()) {
             if (nodeGroup == null || nodeGroup.contains(factory.name)) {
-                final ProcessorNode<?, ?> node = factory.build();
+                final ProcessorNode<?, ?, ?, ?> node = factory.build();
                 processorMap.put(node.name(), node);
 
                 if (factory instanceof ProcessorNodeFactory) {
                     buildProcessorNode(processorMap,
                                        stateStoreMap,
-                                       (ProcessorNodeFactory<?, ?>) factory,
-                                       node);
+                                       (ProcessorNodeFactory<?, ?, ?, ?>) 
factory,
+                                       (ProcessorNode<Object, Object, Object, 
Object>) node);
 
                 } else if (factory instanceof SourceNodeFactory) {
                     buildSourceNode(topicSourceMap,
                                     repartitionTopics,
-                                    (SourceNodeFactory<?, ?>) factory,
-                                    (SourceNode<?, ?>) node);
+                                    (SourceNodeFactory<?, ?, ?, ?>) factory,
+                                    (SourceNode<?, ?, ?, ?>) node);
 
                 } else if (factory instanceof SinkNodeFactory) {
                     buildSinkNode(processorMap,
                                   topicSinkMap,
                                   repartitionTopics,
-                                  (SinkNodeFactory<?, ?>) factory,
-                                  (SinkNode<?, ?>) node);
+                                  (SinkNodeFactory<?, ?, ?, ?>) factory,
+                                  (SinkNode<Object, Object, Object, Object>) 
node);

Review comment:
       They have subtly different meanings, which I'm not 100% clear on all the 
time. Usually, the reason I switched to Object because the wildcard makes the 
type system want to bind the type to something unfortunate, and it can't prove 
that the usage is actually ok.
   
   I'm not sure if I had to change this one, of if it was an accident. I'll 
give it a closer look.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseShim.java
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.internals.ThreadCache;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Map;
+
+public final class ProcessorContextReverseShim implements 
InternalProcessorContext {

Review comment:
       Thanks. Maybe it's not in the place you were looking for it. I see it on 
line 133.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -880,40 +880,41 @@ public synchronized ProcessorTopology 
buildGlobalStateTopology() {
         return globalGroups;
     }
 
+    @SuppressWarnings("unchecked")
     private ProcessorTopology build(final Set<String> nodeGroup) {
         Objects.requireNonNull(applicationId, "topology has not completed 
optimization");
 
-        final Map<String, ProcessorNode<?, ?>> processorMap = new 
LinkedHashMap<>();
-        final Map<String, SourceNode<?, ?>> topicSourceMap = new HashMap<>();
-        final Map<String, SinkNode<?, ?>> topicSinkMap = new HashMap<>();
+        final Map<String, ProcessorNode<?, ?, ?, ?>> processorMap = new 
LinkedHashMap<>();
+        final Map<String, SourceNode<?, ?, ?, ?>> topicSourceMap = new 
HashMap<>();
+        final Map<String, SinkNode<?, ?, ?, ?>> topicSinkMap = new HashMap<>();
         final Map<String, StateStore> stateStoreMap = new LinkedHashMap<>();
         final Set<String> repartitionTopics = new HashSet<>();
 
         // create processor nodes in a topological order ("nodeFactories" is 
already topologically sorted)
         // also make sure the state store map values following the insertion 
ordering
-        for (final NodeFactory<?, ?> factory : nodeFactories.values()) {
+        for (final NodeFactory<?, ?, ?, ?> factory : nodeFactories.values()) {
             if (nodeGroup == null || nodeGroup.contains(factory.name)) {
-                final ProcessorNode<?, ?> node = factory.build();
+                final ProcessorNode<?, ?, ?, ?> node = factory.build();
                 processorMap.put(node.name(), node);
 
                 if (factory instanceof ProcessorNodeFactory) {
                     buildProcessorNode(processorMap,
                                        stateStoreMap,
-                                       (ProcessorNodeFactory<?, ?>) factory,
-                                       node);
+                                       (ProcessorNodeFactory<?, ?, ?, ?>) 
factory,
+                                       (ProcessorNode<Object, Object, Object, 
Object>) node);
 
                 } else if (factory instanceof SourceNodeFactory) {
                     buildSourceNode(topicSourceMap,
                                     repartitionTopics,
-                                    (SourceNodeFactory<?, ?>) factory,
-                                    (SourceNode<?, ?>) node);
+                                    (SourceNodeFactory<?, ?, ?, ?>) factory,
+                                    (SourceNode<?, ?, ?, ?>) node);
 
                 } else if (factory instanceof SinkNodeFactory) {
                     buildSinkNode(processorMap,
                                   topicSinkMap,
                                   repartitionTopics,
-                                  (SinkNodeFactory<?, ?>) factory,
-                                  (SinkNode<?, ?>) node);
+                                  (SinkNodeFactory<?, ?, ?, ?>) factory,
+                                  (SinkNode<Object, Object, Object, Object>) 
node);

Review comment:
       Ah, yes, indeed. It's because inside buildSinkNode, we are calling 
`getProcessor(...).addChild(node)`, and the type system is unable to prove that 
the forward types of the parent match the input types of the child, because in 
this internal layer, we have already lost all the type information of the nodes.
   
   It doesn't really matter in the internals anyway (note that this class 
always just had wildcards/Object as the generic parameters. The real benefit of 
KIP-478 is for users of the PAPI (including the DSL implementation), not this 
internal plumbing logic.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
##########
@@ -82,20 +79,12 @@ public void setup() {
             null,
             null);
 
-        final ProcessorNode<?, ?> processorNode = mock(ProcessorNode.class);
-        globalContext.setCurrentNode(processorNode);
+        final ProcessorNode<Object, Object, Object, Object> processorNode = 
new ProcessorNode<>("testNode");
 
         child = mock(ProcessorNode.class);
+        processorNode.addChild(child);
 
-        expect(processorNode.children())

Review comment:
       Ah, it's because I swapped out the mock of the ProcessorNode for a real 
processor node (on line 82). Note that these were not checks, they were just 
dummy returns, since we never verified them, and it was a nice mock.
   
   Now, all the tests behave the same, and we don't need as much boilerplace 
test setup.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalApiProcessorContext.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.RecordContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.internals.ThreadCache;
+import 
org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
+
+/**
+ * For internal use so we can update the {@link RecordContext} and current
+ * {@link ProcessorNode} when we are forwarding items that have been evicted 
or flushed from
+ * {@link ThreadCache}
+ */
+public interface InternalApiProcessorContext<KForward, VForward> extends 
ProcessorContext<KForward, VForward> {
+    BytesSerializer BYTES_KEY_SERIALIZER = new BytesSerializer();
+    ByteArraySerializer BYTEARRAY_VALUE_SERIALIZER = new ByteArraySerializer();
+
+    @Override
+    StreamsMetricsImpl metrics();
+
+    /**
+     * @param timeMs current wall-clock system timestamp in milliseconds
+     */
+    void setSystemTimeMs(long timeMs);
+
+    /**
+     * @retun the current wall-clock system timestamp in milliseconds

Review comment:
       Good eye. I'll also fix it in InternalProcessorContext.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalApiProcessorContext.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.RecordContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.internals.ThreadCache;
+import 
org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
+
+/**
+ * For internal use so we can update the {@link RecordContext} and current
+ * {@link ProcessorNode} when we are forwarding items that have been evicted 
or flushed from
+ * {@link ThreadCache}
+ */
+public interface InternalApiProcessorContext<KForward, VForward> extends 
ProcessorContext<KForward, VForward> {
+    BytesSerializer BYTES_KEY_SERIALIZER = new BytesSerializer();

Review comment:
       Good eye :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to