lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r478442739
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -174,12 +181,11 @@ public void processInOrder(final K key, final V value,
final long timestamp) {
}
}
}
-
//create right window for previous record
if (latestLeftTypeWindow != null) {
- final long leftWindowEnd =
latestLeftTypeWindow.key.window().end();
- final long rightWinStart = leftWindowEnd ==
windows.timeDifferenceMs() ? latestLeftTypeWindow.value.timestamp() + 1 :
leftWindowEnd + 1;
- if (!windowStartTimes.contains(rightWinStart)) {
+ final long previousRecord =
latestLeftTypeWindow.key.window().end();
+ final long rightWinStart = previousRecord ==
windows.timeDifferenceMs() ? latestLeftTypeWindow.value.timestamp() + 1 :
previousRecord + 1;
Review comment:
If previous record is at `timeDifferenceMs` then the "left type window"
we found was the window from [0,timeDifference]. Since we know it's the left
window from `windowEnd == timeDifference`, we know that either we'll create a
right window for the max record in the combined window, or we'll create a right
window for the record at the end point of the window (which would still be the
max record).
I see that `previousRecord` is misleading, I renamed it to
`previousWindowEnd`. `previousRecord` here was really just the `windowEnd`,
since there might not be a _record_ at the end of the combined window, which is
the case this is covering. This case is specifically for records that come
outside of the combined window, but still need to create a right window for
something _in_ the combined window, in which case the right window won't
already exist.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]