vvcephei commented on a change in pull request #9004: URL: https://github.com/apache/kafka/pull/9004#discussion_r461271276
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ########## @@ -880,40 +880,41 @@ public synchronized ProcessorTopology buildGlobalStateTopology() { return globalGroups; } + @SuppressWarnings("unchecked") private ProcessorTopology build(final Set<String> nodeGroup) { Objects.requireNonNull(applicationId, "topology has not completed optimization"); - final Map<String, ProcessorNode<?, ?>> processorMap = new LinkedHashMap<>(); - final Map<String, SourceNode<?, ?>> topicSourceMap = new HashMap<>(); - final Map<String, SinkNode<?, ?>> topicSinkMap = new HashMap<>(); + final Map<String, ProcessorNode<?, ?, ?, ?>> processorMap = new LinkedHashMap<>(); + final Map<String, SourceNode<?, ?, ?, ?>> topicSourceMap = new HashMap<>(); + final Map<String, SinkNode<?, ?, ?, ?>> topicSinkMap = new HashMap<>(); final Map<String, StateStore> stateStoreMap = new LinkedHashMap<>(); final Set<String> repartitionTopics = new HashSet<>(); // create processor nodes in a topological order ("nodeFactories" is already topologically sorted) // also make sure the state store map values following the insertion ordering - for (final NodeFactory<?, ?> factory : nodeFactories.values()) { + for (final NodeFactory<?, ?, ?, ?> factory : nodeFactories.values()) { if (nodeGroup == null || nodeGroup.contains(factory.name)) { - final ProcessorNode<?, ?> node = factory.build(); + final ProcessorNode<?, ?, ?, ?> node = factory.build(); processorMap.put(node.name(), node); if (factory instanceof ProcessorNodeFactory) { buildProcessorNode(processorMap, stateStoreMap, - (ProcessorNodeFactory<?, ?>) factory, - node); + (ProcessorNodeFactory<?, ?, ?, ?>) factory, + (ProcessorNode<Object, Object, Object, Object>) node); } else if (factory instanceof SourceNodeFactory) { buildSourceNode(topicSourceMap, repartitionTopics, - (SourceNodeFactory<?, ?>) factory, - (SourceNode<?, ?>) node); + (SourceNodeFactory<?, ?, ?, ?>) factory, + (SourceNode<?, ?, ?, ?>) node); } else if (factory instanceof SinkNodeFactory) { buildSinkNode(processorMap, topicSinkMap, repartitionTopics, - (SinkNodeFactory<?, ?>) factory, - (SinkNode<?, ?>) node); + (SinkNodeFactory<?, ?, ?, ?>) factory, + (SinkNode<Object, Object, Object, Object>) node); Review comment: They have subtly different meanings, which I'm not 100% clear on all the time. I'm not sure if I had to change this one, of if it was an accident. I'll give it a closer look. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorShim.java ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + + +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; + +public final class ProcessorShim<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> { Review comment: I think "adapter" is the standard design pattern name for this type of thing. Not sure why I thought "shim" was a good choice in the heat of the moment. Maybe because I'm kind of slipping these classes in the middle to make everything line up? I can change them to "adapter". ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ########## @@ -880,40 +880,41 @@ public synchronized ProcessorTopology buildGlobalStateTopology() { return globalGroups; } + @SuppressWarnings("unchecked") private ProcessorTopology build(final Set<String> nodeGroup) { Objects.requireNonNull(applicationId, "topology has not completed optimization"); - final Map<String, ProcessorNode<?, ?>> processorMap = new LinkedHashMap<>(); - final Map<String, SourceNode<?, ?>> topicSourceMap = new HashMap<>(); - final Map<String, SinkNode<?, ?>> topicSinkMap = new HashMap<>(); + final Map<String, ProcessorNode<?, ?, ?, ?>> processorMap = new LinkedHashMap<>(); + final Map<String, SourceNode<?, ?, ?, ?>> topicSourceMap = new HashMap<>(); + final Map<String, SinkNode<?, ?, ?, ?>> topicSinkMap = new HashMap<>(); final Map<String, StateStore> stateStoreMap = new LinkedHashMap<>(); final Set<String> repartitionTopics = new HashSet<>(); // create processor nodes in a topological order ("nodeFactories" is already topologically sorted) // also make sure the state store map values following the insertion ordering - for (final NodeFactory<?, ?> factory : nodeFactories.values()) { + for (final NodeFactory<?, ?, ?, ?> factory : nodeFactories.values()) { if (nodeGroup == null || nodeGroup.contains(factory.name)) { - final ProcessorNode<?, ?> node = factory.build(); + final ProcessorNode<?, ?, ?, ?> node = factory.build(); processorMap.put(node.name(), node); if (factory instanceof ProcessorNodeFactory) { buildProcessorNode(processorMap, stateStoreMap, - (ProcessorNodeFactory<?, ?>) factory, - node); + (ProcessorNodeFactory<?, ?, ?, ?>) factory, + (ProcessorNode<Object, Object, Object, Object>) node); } else if (factory instanceof SourceNodeFactory) { buildSourceNode(topicSourceMap, repartitionTopics, - (SourceNodeFactory<?, ?>) factory, - (SourceNode<?, ?>) node); + (SourceNodeFactory<?, ?, ?, ?>) factory, + (SourceNode<?, ?, ?, ?>) node); } else if (factory instanceof SinkNodeFactory) { buildSinkNode(processorMap, topicSinkMap, repartitionTopics, - (SinkNodeFactory<?, ?>) factory, - (SinkNode<?, ?>) node); + (SinkNodeFactory<?, ?, ?, ?>) factory, + (SinkNode<Object, Object, Object, Object>) node); Review comment: They have subtly different meanings, which I'm not 100% clear on all the time. Usually, the reason I switched to Object because the wildcard makes the type system want to bind the type to something unfortunate, and it can't prove that the usage is actually ok. I'm not sure if I had to change this one, of if it was an accident. I'll give it a closer look. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseShim.java ########## @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.processor.Cancellable; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; +import org.apache.kafka.streams.processor.StateRestoreCallback; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.internals.ThreadCache; + +import java.io.File; +import java.time.Duration; +import java.util.Map; + +public final class ProcessorContextReverseShim implements InternalProcessorContext { Review comment: Thanks. Maybe it's not in the place you were looking for it. I see it on line 133. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ########## @@ -880,40 +880,41 @@ public synchronized ProcessorTopology buildGlobalStateTopology() { return globalGroups; } + @SuppressWarnings("unchecked") private ProcessorTopology build(final Set<String> nodeGroup) { Objects.requireNonNull(applicationId, "topology has not completed optimization"); - final Map<String, ProcessorNode<?, ?>> processorMap = new LinkedHashMap<>(); - final Map<String, SourceNode<?, ?>> topicSourceMap = new HashMap<>(); - final Map<String, SinkNode<?, ?>> topicSinkMap = new HashMap<>(); + final Map<String, ProcessorNode<?, ?, ?, ?>> processorMap = new LinkedHashMap<>(); + final Map<String, SourceNode<?, ?, ?, ?>> topicSourceMap = new HashMap<>(); + final Map<String, SinkNode<?, ?, ?, ?>> topicSinkMap = new HashMap<>(); final Map<String, StateStore> stateStoreMap = new LinkedHashMap<>(); final Set<String> repartitionTopics = new HashSet<>(); // create processor nodes in a topological order ("nodeFactories" is already topologically sorted) // also make sure the state store map values following the insertion ordering - for (final NodeFactory<?, ?> factory : nodeFactories.values()) { + for (final NodeFactory<?, ?, ?, ?> factory : nodeFactories.values()) { if (nodeGroup == null || nodeGroup.contains(factory.name)) { - final ProcessorNode<?, ?> node = factory.build(); + final ProcessorNode<?, ?, ?, ?> node = factory.build(); processorMap.put(node.name(), node); if (factory instanceof ProcessorNodeFactory) { buildProcessorNode(processorMap, stateStoreMap, - (ProcessorNodeFactory<?, ?>) factory, - node); + (ProcessorNodeFactory<?, ?, ?, ?>) factory, + (ProcessorNode<Object, Object, Object, Object>) node); } else if (factory instanceof SourceNodeFactory) { buildSourceNode(topicSourceMap, repartitionTopics, - (SourceNodeFactory<?, ?>) factory, - (SourceNode<?, ?>) node); + (SourceNodeFactory<?, ?, ?, ?>) factory, + (SourceNode<?, ?, ?, ?>) node); } else if (factory instanceof SinkNodeFactory) { buildSinkNode(processorMap, topicSinkMap, repartitionTopics, - (SinkNodeFactory<?, ?>) factory, - (SinkNode<?, ?>) node); + (SinkNodeFactory<?, ?, ?, ?>) factory, + (SinkNode<Object, Object, Object, Object>) node); Review comment: Ah, yes, indeed. It's because inside buildSinkNode, we are calling `getProcessor(...).addChild(node)`, and the type system is unable to prove that the forward types of the parent match the input types of the child, because in this internal layer, we have already lost all the type information of the nodes. It doesn't really matter in the internals anyway (note that this class always just had wildcards/Object as the generic parameters. The real benefit of KIP-478 is for users of the PAPI (including the DSL implementation), not this internal plumbing logic. ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java ########## @@ -82,20 +79,12 @@ public void setup() { null, null); - final ProcessorNode<?, ?> processorNode = mock(ProcessorNode.class); - globalContext.setCurrentNode(processorNode); + final ProcessorNode<Object, Object, Object, Object> processorNode = new ProcessorNode<>("testNode"); child = mock(ProcessorNode.class); + processorNode.addChild(child); - expect(processorNode.children()) Review comment: Ah, it's because I swapped out the mock of the ProcessorNode for a real processor node (on line 82). Note that these were not checks, they were just dummy returns, since we never verified them, and it was a nice mock. Now, all the tests behave the same, and we don't need as much boilerplace test setup. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalApiProcessorContext.java ########## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.BytesSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.RecordContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.Task.TaskType; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.internals.ThreadCache; +import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener; + +/** + * For internal use so we can update the {@link RecordContext} and current + * {@link ProcessorNode} when we are forwarding items that have been evicted or flushed from + * {@link ThreadCache} + */ +public interface InternalApiProcessorContext<KForward, VForward> extends ProcessorContext<KForward, VForward> { + BytesSerializer BYTES_KEY_SERIALIZER = new BytesSerializer(); + ByteArraySerializer BYTEARRAY_VALUE_SERIALIZER = new ByteArraySerializer(); + + @Override + StreamsMetricsImpl metrics(); + + /** + * @param timeMs current wall-clock system timestamp in milliseconds + */ + void setSystemTimeMs(long timeMs); + + /** + * @retun the current wall-clock system timestamp in milliseconds Review comment: Good eye. I'll also fix it in InternalProcessorContext. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalApiProcessorContext.java ########## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.BytesSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.RecordContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.Task.TaskType; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.internals.ThreadCache; +import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener; + +/** + * For internal use so we can update the {@link RecordContext} and current + * {@link ProcessorNode} when we are forwarding items that have been evicted or flushed from + * {@link ThreadCache} + */ +public interface InternalApiProcessorContext<KForward, VForward> extends ProcessorContext<KForward, VForward> { + BytesSerializer BYTES_KEY_SERIALIZER = new BytesSerializer(); Review comment: Good eye :) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org