guozhangwang commented on code in PR #12135:
URL: https://github.com/apache/kafka/pull/12135#discussion_r872901603


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java:
##########
@@ -467,13 +459,33 @@ private boolean rightWindowIsNotEmpty(final 
ValueAndTimestamp<VAgg> rightWinAgg,
             return rightWinAgg != null && rightWinAgg.timestamp() > 
inputRecordTimestamp;
         }
 
+        @Override
+        protected long emitRangeLowerBound(final long windowCloseTime) {
+            return lastEmitWindowCloseTime == ConsumerRecord.NO_TIMESTAMP ?
+                0L : Math.max(0L, lastEmitWindowCloseTime - 
windows.timeDifferenceMs());
+        }
+
+        @Override
+        protected long emitRangeUpperBound(final long windowCloseTime) {
+            // Sliding window's start and end timestamps are inclusive, so
+            // we should minus 1 for the inclusive closed window-end upper 
bound
+            return windowCloseTime - windows.timeDifferenceMs() - 1;

Review Comment:
   I can actually be negative, and we would skip the range fetching in that 
case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to