guozhangwang commented on code in PR #12135:
URL: https://github.com/apache/kafka/pull/12135#discussion_r872636671
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java:
##########
@@ -467,13 +459,35 @@ private boolean rightWindowIsNotEmpty(final
ValueAndTimestamp<VAgg> rightWinAgg,
return rightWinAgg != null && rightWinAgg.timestamp() >
inputRecordTimestamp;
}
+ @Override
+ protected void maybeForwardFinalResult(final Record<KIn, VIn> record,
final long windowCloseTime) {
+ if (!shouldEmitFinal(windowCloseTime)) {
+ return;
+ }
+
+ final long emitRangeUpperBoundExclusive = windowCloseTime -
windows.timeDifferenceMs();
+
+ if (emitRangeUpperBoundExclusive <= 0) {
+ // Sliding window's start and end timestamps are inclusive, so
+ // the window is not closed if emitRangeUpperBoundExclusive is
0,
+ // and we shouldn't emit
+ return;
+ }
+
+ final long emitRangeLowerBoundInclusive = lastEmitWindowCloseTime
== ConsumerRecord.NO_TIMESTAMP ?
+ 0L : lastEmitWindowCloseTime - windows.timeDifferenceMs();
Review Comment:
So far it seems `lastEmitWindowCloseTime` should always be no smaller than
window size in either sliding or time windows, but when there's bugs it's
possible that the read value from the processor metadata is small. I will
update it accordingly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]