wcarlson5 commented on code in PR #14174:
URL: https://github.com/apache/kafka/pull/14174#discussion_r1366083940


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java:
##########
@@ -51,8 +51,8 @@ public void init(final ProcessorContext<K, V> context) {
         public void process(final Record<K, V> record) {
             // if the key is null, we do not need to put the record into 
window store
             // since it will never be considered for join operations
+            context().forward(record);
             if (record.key() != null) {
-                context().forward(record);
                 // Every record basically starts a new window. We're using a 
window store mostly for the retention.
                 window.put(record.key(), record.value(), record.timestamp());

Review Comment:
   That's likely true about the windowstore implemntations. Something to think 
about for later then. We should maybe clarify the semantics about null matching 
in the docs somewhere.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -351,8 +353,23 @@ private void optimizeTopology(final Properties props) {
             LOG.debug("Optimizing the Kafka Streams graph for self-joins");
             rewriteSingleStoreSelfJoin(root, new IdentityHashMap<>());
         }
+        LOG.debug("Optimizing the Kafka Streams graph for null-key records");
+        rewriteRepartitionNodes();
     }
 
+    private void rewriteRepartitionNodes() {

Review Comment:
   Hmm, I would think they would like to opt out. That would require an update 
to the kip. Maybe even a revote. I'm not sure what the odds are that someone 
manually repartitioning would be needing the null-keys to propagate. But its 
probably higher than you would think as manual repartitioner as are typically 
power-users.
   
   I don't think we need to make it optional as we already filter all null  
keys and now we let some propagate. Maybe we should just make a ticket and we 
can come back to it. Being able to toggle the optimization should be pretty 
simple.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -124,17 +124,20 @@ public void init(final ProcessorContext<K, VOut> context) 
{
         @SuppressWarnings("unchecked")
         @Override
         public void process(final Record<K, V1> record) {
-            if (StreamStreamJoinUtil.skipRecord(record, LOG, 
droppedRecordsSensor, context())) {
-                return;
-            }
-            boolean needOuterJoin = outer;
-
             final long inputRecordTimestamp = record.timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
-
             sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
 
+            if (outer && record.key() == null && record.value() != null) {

Review Comment:
   ah nvn, ignore this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to