mjsax commented on a change in pull request #10462: URL: https://github.com/apache/kafka/pull/10462#discussion_r609954693
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ########## @@ -133,34 +141,37 @@ public void process(final K key, final V1 value) { return; } - // maxObservedStreamTime is updated and shared between left and right sides, so we can - // process a non-join record immediately if it is late - final long maxStreamTime = maxObservedStreamTime.updateAndGet(time -> Math.max(time, context().timestamp())); - boolean needOuterJoin = outer; + boolean joinFound = false; final long inputRecordTimestamp = context().timestamp(); final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs); final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); + // maxObservedStreamTime is updated and shared between left and right join sides + maxObservedStreamTime.updateAndGet(t -> Math.max(t, inputRecordTimestamp)); + try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) { while (iter.hasNext()) { needOuterJoin = false; + joinFound = true; final KeyValue<Long, V2> otherRecord = iter.next(); final long otherRecordTimestamp = otherRecord.key; + + // Emit expired records before the joined record to keep time ordering Review comment: I guess that's possible, but _if_ the join result is large, we could run into memory issue buffering all join results? Also, sorting could be expensive and we can actually avoid it, and still guarantee that results are emitted in timestamp order: - we know that left/outer join result would have the smallest timestamps and thus we can emit those first (given that we use timestamped-sorted store anyway, we just scan the store from old to new and emit - for the inner join result, we get the output sorted by timestamp, too, because for the join key, data is sorted in timestamp order in the store, too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org