vvcephei commented on a change in pull request #9361:
URL: https://github.com/apache/kafka/pull/9361#discussion_r498914034



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -159,84 +157,123 @@ public StateStore getStateStore(final String name) {
         }
 
         final StateStore store = stateManager.getStore(name);
-        return getReadWriteStore(store);
+        return (S) getReadWriteStore(store);
     }
 
     @Override
     public <K, V> void forward(final K key,
                                final V value) {
-        throwUnsupportedOperationExceptionIfStandby("forward");
-        forward(key, value, SEND_TO_ALL);
+        final Record<K, V> toForward = new Record<>(
+            key,
+            value,
+            timestamp(),
+            headers()
+        );
+        forward(toForward);
     }
 
     @Override
     @Deprecated
     public <K, V> void forward(final K key,
                                final V value,
                                final int childIndex) {
-        throwUnsupportedOperationExceptionIfStandby("forward");
-        forward(
+        final Record<K, V> toForward = new Record<>(
             key,
             value,
-            To.child((currentNode().children()).get(childIndex).name()));
+            timestamp(),
+            headers()
+        );
+        forward(toForward, (currentNode().children()).get(childIndex).name());
     }
 
     @Override
     @Deprecated
     public <K, V> void forward(final K key,
                                final V value,
                                final String childName) {
-        throwUnsupportedOperationExceptionIfStandby("forward");
-        forward(key, value, To.child(childName));
+        final Record<K, V> toForward = new Record<>(
+            key,
+            value,
+            timestamp(),
+            headers()
+        );
+        forward(toForward, childName);
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public <K, V> void forward(final K key,
                                final V value,
                                final To to) {
+        final ToInternal toInternal = new ToInternal(to);
+        final Record<K, V> toForward = new Record<>(
+            key,
+            value,
+            toInternal.hasTimestamp() ? toInternal.timestamp() : timestamp(),
+            headers()
+        );
+        forward(toForward, toInternal.child());
+    }
+
+    @Override
+    public <K, V> void forward(final Record<K, V> record) {
+        forward(record, null);
+    }
+
+    @SuppressWarnings("unchecked")

Review comment:
       I see.
   
   So, _within_ the `Processor<KIn, Vin, KOut, VOut>`, the `ProcessorContext` 
is bounded to `<KOut, VOut>`, and therefore `forward` will only accept a 
`Record<KOut, VOut>`. The compiler will check the source code of the processor 
and enforce this at compile time. It sounds like this is one part of what you 
are thinking about.
   
   But inside the `ProcessorContextImpl`, we're _outside_ of the `Processor`, 
and in the "plumbing" of Streams. In this context, we cannot have compile-time 
type safety, since we can't bound all of KafkaStreams to accept only one kind 
of record and processor.
   
   However, what we can check at compile time is that we only attach 
_compatible_ children to parents. This isn't a way to do this in `Topology` 
right now, and I didn't want to expand this KIP to that extent. My plan for 
getting the full benefit of this KIP in the DSL internals is to actually add an 
internal utility method for registering pairs of processors, like this:
   ```java
       void addGraphNode(final StreamsGraphNode<KIn, VIn, KIntermediate, 
VIntermediate> parent,
                         final StreamsGraphNode<KIntermediate, VIntermediate, 
KOut, VOut> child);
   ```
   Then, in addition to a compile-time guarantee that a processor can only 
forward its declared output type, we also get a guarantee that the builder can 
only attach compatible parent-child pairs. Assuming you don't do any casting in 
"user space",
   we don't need any further compiler checking to render cast exceptions 
impossible. The "plumbing code" like in this `ProcessorContextImpl` is akin to 
the JVM runtime after type erasure... the "compiler" has already done its job 
on the "source code" (the user-facing side of the PAPI), so we don't need the 
types anymore at "run time".




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to