vvcephei commented on a change in pull request #9361:
URL: https://github.com/apache/kafka/pull/9361#discussion_r498606066
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -159,84 +157,123 @@ public StateStore getStateStore(final String name) {
}
final StateStore store = stateManager.getStore(name);
- return getReadWriteStore(store);
+ return (S) getReadWriteStore(store);
}
@Override
public <K, V> void forward(final K key,
final V value) {
- throwUnsupportedOperationExceptionIfStandby("forward");
- forward(key, value, SEND_TO_ALL);
+ final Record<K, V> toForward = new Record<>(
+ key,
+ value,
+ timestamp(),
+ headers()
+ );
+ forward(toForward);
}
@Override
@Deprecated
public <K, V> void forward(final K key,
final V value,
final int childIndex) {
- throwUnsupportedOperationExceptionIfStandby("forward");
- forward(
+ final Record<K, V> toForward = new Record<>(
key,
value,
- To.child((currentNode().children()).get(childIndex).name()));
+ timestamp(),
+ headers()
+ );
+ forward(toForward, (currentNode().children()).get(childIndex).name());
}
@Override
@Deprecated
public <K, V> void forward(final K key,
final V value,
final String childName) {
- throwUnsupportedOperationExceptionIfStandby("forward");
- forward(key, value, To.child(childName));
+ final Record<K, V> toForward = new Record<>(
+ key,
+ value,
+ timestamp(),
+ headers()
+ );
+ forward(toForward, childName);
}
- @SuppressWarnings("unchecked")
@Override
public <K, V> void forward(final K key,
final V value,
final To to) {
+ final ToInternal toInternal = new ToInternal(to);
+ final Record<K, V> toForward = new Record<>(
+ key,
+ value,
+ toInternal.hasTimestamp() ? toInternal.timestamp() : timestamp(),
+ headers()
+ );
+ forward(toForward, toInternal.child());
+ }
+
+ @Override
+ public <K, V> void forward(final Record<K, V> record) {
+ forward(record, null);
+ }
+
+ @SuppressWarnings("unchecked")
Review comment:
I'll have to think about this, but offhand, it doesn't seem like that
would work.
In general, the type safety we're providing is on the user side of the PAPI
(of course, the DSL internals is a user of the PAPI). On the internals side of
the PAPI implementation, we generally can't get any more safety. For example,
the ProcessorContextImpl is instantiated for the whole Task, which would handle
a number of different record types and ProcessorNode types, so the internal
implementation of the ProcessorContextImpl would have to be agnostic wrt the
actual types of the nodes.
I think that by the time I'm done with the full implementation of this KIP,
you'll see most of the unchecked suppressions and casts gone from the DSL
implementation, but there will still be some in classes like this one.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]