VictorvandenHoven commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1377564878
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ########## @@ -228,7 +231,7 @@ private void emitNonJoinedOuterRecords( sharedTimeTracker.minTime = timestamp; // Skip next records if window has not closed - if (timestamp + joinAfterMs + joinGraceMs >= sharedTimeTracker.streamTime) { + if (sharedTimeTracker.minTime + windowsAfterIntervalMs + joinGraceMs >= sharedTimeTracker.streamTime) { Review Comment: I think you are right. It depends on the side of the outerJoin-value whether we should check with _windowsAfterMs_ or _windowsBeforeMs_. Since the outerJoin-store can contain both left-sided and right-sided outerJoin-values, we should check with _windowsAfterMs_ when outerJoin-value is left-sided and we should check _windowsBeforeMs_ when outerJoin-value is right-sided. Also, we can not break the emitNonJoinedOuterRecords-while-loop until we are completely sure that there are no more left-sided and right-sided outerJoin-values available to emit. For example, if we find out that we can skip a left-sided outerJoin-value, since the window for this value has not yet been closed, there can still be a right-sided outerJoin-value that must be emitted. In the future maybe some leftSide/rightSide-flags could be introduced that indicate whether we have put left-sided or right-sided outerJoin-values in the outerJoin-store. So that with a leftJoin() we could break the emitNonJoinedOuterRecords-while-loop earlier in order to gain time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org