jeqo commented on a change in pull request #11481: URL: https://github.com/apache/kafka/pull/11481#discussion_r754432136
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -467,27 +464,26 @@ private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<VAgg> rightWinAgg, private void updateWindowAndForward(final Window window, final ValueAndTimestamp<VAgg> valueAndTime, - final KIn key, - final VIn value, + final Record<KIn, VIn> record, final long closeTime, final long inputRecordTimestamp) { final long windowStart = window.start(); final long windowEnd = window.end(); if (windowEnd > closeTime) { //get aggregate from existing window final VAgg oldAgg = getValueOrNull(valueAndTime); - final VAgg newAgg = aggregator.apply(key, value, oldAgg); + final VAgg newAgg = aggregator.apply(record.key(), record.value(), oldAgg); final long newTimestamp = oldAgg == null ? inputRecordTimestamp : Math.max(inputRecordTimestamp, valueAndTime.timestamp()); windowStore.put( - key, + record.key(), ValueAndTimestamp.make(newAgg, newTimestamp), windowStart); tupleForwarder.maybeForward( - new Windowed<>(key, window), - newAgg, - sendOldValues ? oldAgg : null, - newTimestamp); + new Record<>( + new Windowed<>(record.key(), window), + new Change<>(newAgg, sendOldValues ? oldAgg : null), + newTimestamp)); Review comment: for sure! fixing it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org