jeqo commented on a change in pull request #11481:
URL: https://github.com/apache/kafka/pull/11481#discussion_r754432136



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -467,27 +464,26 @@ private boolean rightWindowIsNotEmpty(final 
ValueAndTimestamp<VAgg> rightWinAgg,
 
         private void updateWindowAndForward(final Window window,
                                             final ValueAndTimestamp<VAgg> 
valueAndTime,
-                                            final KIn key,
-                                            final VIn value,
+                                            final Record<KIn, VIn> record,
                                             final long closeTime,
                                             final long inputRecordTimestamp) {
             final long windowStart = window.start();
             final long windowEnd = window.end();
             if (windowEnd > closeTime) {
                 //get aggregate from existing window
                 final VAgg oldAgg = getValueOrNull(valueAndTime);
-                final VAgg newAgg = aggregator.apply(key, value, oldAgg);
+                final VAgg newAgg = aggregator.apply(record.key(), 
record.value(), oldAgg);
 
                 final long newTimestamp = oldAgg == null ? 
inputRecordTimestamp : Math.max(inputRecordTimestamp, valueAndTime.timestamp());
                 windowStore.put(
-                    key,
+                    record.key(),
                     ValueAndTimestamp.make(newAgg, newTimestamp),
                     windowStart);
                 tupleForwarder.maybeForward(
-                    new Windowed<>(key, window),
-                    newAgg,
-                    sendOldValues ? oldAgg : null,
-                    newTimestamp);
+                    new Record<>(
+                        new Windowed<>(record.key(), window),
+                        new Change<>(newAgg, sendOldValues ? oldAgg : null),
+                        newTimestamp));

Review comment:
       for sure! fixing it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to