wcarlson5 commented on code in PR #14174:
URL: https://github.com/apache/kafka/pull/14174#discussion_r1366083940
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java:
##########
@@ -51,8 +51,8 @@ public void init(final ProcessorContext<K, V> context) {
public void process(final Record<K, V> record) {
// if the key is null, we do not need to put the record into
window store
// since it will never be considered for join operations
+ context().forward(record);
if (record.key() != null) {
- context().forward(record);
// Every record basically starts a new window. We're using a
window store mostly for the retention.
window.put(record.key(), record.value(), record.timestamp());
Review Comment:
That's likely true about the windowstore implemntations. Something to think
about for later then. We should maybe clarify the semantics about null matching
in the docs somewhere.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -351,8 +353,23 @@ private void optimizeTopology(final Properties props) {
LOG.debug("Optimizing the Kafka Streams graph for self-joins");
rewriteSingleStoreSelfJoin(root, new IdentityHashMap<>());
}
+ LOG.debug("Optimizing the Kafka Streams graph for null-key records");
+ rewriteRepartitionNodes();
}
+ private void rewriteRepartitionNodes() {
Review Comment:
Hmm, I would think they would like to opt out. That would require an update
to the kip. Maybe even a revote. I'm not sure what the odds are that someone
manually repartitioning would be needing the null-keys to propagate. But its
probably higher than you would think as manual repartitioner as are typically
power-users.
I don't think we need to make it optional as we already filter all null
keys and now we let some propagate. Maybe we should just make a ticket and we
can come back to it. Being able to toggle the optimization should be pretty
simple.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -124,17 +124,20 @@ public void init(final ProcessorContext<K, VOut> context)
{
@SuppressWarnings("unchecked")
@Override
public void process(final Record<K, V1> record) {
- if (StreamStreamJoinUtil.skipRecord(record, LOG,
droppedRecordsSensor, context())) {
- return;
- }
- boolean needOuterJoin = outer;
-
final long inputRecordTimestamp = record.timestamp();
final long timeFrom = Math.max(0L, inputRecordTimestamp -
joinBeforeMs);
final long timeTo = Math.max(0L, inputRecordTimestamp +
joinAfterMs);
-
sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
+ if (outer && record.key() == null && record.value() != null) {
Review Comment:
ah nvn, ignore this
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]