vvcephei commented on a change in pull request #9361: URL: https://github.com/apache/kafka/pull/9361#discussion_r498914034
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ########## @@ -159,84 +157,123 @@ public StateStore getStateStore(final String name) { } final StateStore store = stateManager.getStore(name); - return getReadWriteStore(store); + return (S) getReadWriteStore(store); } @Override public <K, V> void forward(final K key, final V value) { - throwUnsupportedOperationExceptionIfStandby("forward"); - forward(key, value, SEND_TO_ALL); + final Record<K, V> toForward = new Record<>( + key, + value, + timestamp(), + headers() + ); + forward(toForward); } @Override @Deprecated public <K, V> void forward(final K key, final V value, final int childIndex) { - throwUnsupportedOperationExceptionIfStandby("forward"); - forward( + final Record<K, V> toForward = new Record<>( key, value, - To.child((currentNode().children()).get(childIndex).name())); + timestamp(), + headers() + ); + forward(toForward, (currentNode().children()).get(childIndex).name()); } @Override @Deprecated public <K, V> void forward(final K key, final V value, final String childName) { - throwUnsupportedOperationExceptionIfStandby("forward"); - forward(key, value, To.child(childName)); + final Record<K, V> toForward = new Record<>( + key, + value, + timestamp(), + headers() + ); + forward(toForward, childName); } - @SuppressWarnings("unchecked") @Override public <K, V> void forward(final K key, final V value, final To to) { + final ToInternal toInternal = new ToInternal(to); + final Record<K, V> toForward = new Record<>( + key, + value, + toInternal.hasTimestamp() ? toInternal.timestamp() : timestamp(), + headers() + ); + forward(toForward, toInternal.child()); + } + + @Override + public <K, V> void forward(final Record<K, V> record) { + forward(record, null); + } + + @SuppressWarnings("unchecked") Review comment: I see. So, _within_ the `Processor<KIn, Vin, KOut, VOut>`, the `ProcessorContext` is bounded to `<KOut, VOut>`, and therefore `forward` will only accept a `Record<KOut, VOut>`. The compiler will check the source code of the processor and enforce this at compile time. It sounds like this is one part of what you are thinking about. But inside the `ProcessorContextImpl`, we're _outside_ of the `Processor`, and in the "plumbing" of Streams. In this context, we cannot have compile-time type safety, since we can't bound all of KafkaStreams to accept only one kind of record and processor. However, what we can check at compile time is that we only attach _compatible_ children to parents. This isn't a way to do this in `Topology` right now, and I didn't want to expand this KIP to that extent. My plan for getting the full benefit of this KIP in the DSL internals is to actually add an internal utility method for registering pairs of processors, like this: ```java void addGraphNode(final StreamsGraphNode<KIn, VIn, KIntermediate, VIntermediate> parent, final StreamsGraphNode<KIntermediate, VIntermediate, KOut, VOut> child); ``` Then, in addition to a compile-time guarantee that a processor can only forward its declared output type, we also get a guarantee that the builder can only attach compatible parent-child pairs. Assuming you don't do any casting in "user space", we don't need any further compiler checking to render cast exceptions impossible. The "plumbing code" like in this `ProcessorContextImpl` is akin to the JVM runtime after type erasure... the "compiler" has already done its job on the "source code" (the user-facing side of the PAPI), so we don't need the types anymore at "run time". ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org