vvcephei commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r478588602



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.internals.ApiUtils;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import java.time.Duration;
+import java.util.Objects;
+import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+
+/**
+ * A sliding window used for aggregating events.
+ * <p>
+ * Sliding Windows are defined based on a record's timestamp, the window size 
based on the given maximum time difference (inclusive) between
+ * records in the same window, and the given window grace period. While the 
window is sliding over the input data stream, a new window is
+ * created each time a record enters the sliding window or a record drops out 
of the sliding window.
+ * <p>
+ * Records that come after set grace period will be ignored, i.e., a window is 
closed when
+ * {@code stream-time > window-end + grace-period}.
+ * <p>
+ * For example, if we have a time difference of 5000ms and the following data 
arrives:
+ * <pre>
+ * +--------------------------------------+
+ * |    key    |    value    |    time    |
+ * +-----------+-------------+------------+
+ * |    A      |     1       |    8000    |
+ * +-----------+-------------+------------+
+ * |    A      |     2       |    9200    |
+ * +-----------+-------------+------------+
+ * |    A      |     3       |    12400   |
+ * +-----------+-------------+------------+
+ * </pre>
+ * We'd have the following 5 windows:
+ * <ul>
+ *     <li>window {@code [3000;8000]} contains [1] (created when first record 
enters the window)</li>
+ *     <li>window {@code [4200;9200]} contains [1,2] (created when second 
record enters the window)</li>
+ *     <li>window {@code [7400;124000]} contains [1,2,3] (created when third 
record enters the window)</li>
+ *     <li>window {@code [8001;130001]} contains [2,3] (created when the first 
record drops out of the window)</li>
+ *     <li>window {@code [9201;142001]} contains [3] (created when the second 
record drops out of the window)</li>

Review comment:
       ```suggestion
    *     <li>window {@code [7400;12400]} contains [1,2,3] (created when third 
record enters the window)</li>
    *     <li>window {@code [8001;13001]} contains [2,3] (created when the 
first record drops out of the window)</li>
    *     <li>window {@code [9201;14201]} contains [3] (created when the second 
record drops out of the window)</li>
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.internals.ApiUtils;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import java.time.Duration;
+import java.util.Objects;
+import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+
+/**
+ * A sliding window used for aggregating events.
+ * <p>
+ * Sliding Windows are defined based on a record's timestamp, the window size 
based on the given maximum time difference (inclusive) between
+ * records in the same window, and the given window grace period. While the 
window is sliding over the input data stream, a new window is
+ * created each time a record enters the sliding window or a record drops out 
of the sliding window.
+ * <p>
+ * Records that come after set grace period will be ignored, i.e., a window is 
closed when
+ * {@code stream-time > window-end + grace-period}.
+ * <p>
+ * For example, if we have a time difference of 5000ms and the following data 
arrives:
+ * <pre>
+ * +--------------------------------------+
+ * |    key    |    value    |    time    |
+ * +-----------+-------------+------------+
+ * |    A      |     1       |    8000    |
+ * +-----------+-------------+------------+
+ * |    A      |     2       |    9200    |
+ * +-----------+-------------+------------+
+ * |    A      |     3       |    12400   |
+ * +-----------+-------------+------------+
+ * </pre>
+ * We'd have the following 5 windows:
+ * <ul>
+ *     <li>window {@code [3000;8000]} contains [1] (created when first record 
enters the window)</li>
+ *     <li>window {@code [4200;9200]} contains [1,2] (created when second 
record enters the window)</li>
+ *     <li>window {@code [7400;124000]} contains [1,2,3] (created when third 
record enters the window)</li>
+ *     <li>window {@code [8001;130001]} contains [2,3] (created when the first 
record drops out of the window)</li>
+ *     <li>window {@code [9201;142001]} contains [3] (created when the second 
record drops out of the window)</li>
+ * </ul>
+ *<p>
+ * Note that while SlidingWindows are of a fixed size, as are {@link 
TimeWindows}, the start and end points of the window
+ * depend on when events occur in the stream (i.e., event timestamps), similar 
to {@link SessionWindows}.
+ * <p>
+ * For time semantics, see {@link TimestampExtractor}.
+ *
+ * @see TimeWindows
+ * @see SessionWindows
+ * @see UnlimitedWindows
+ * @see JoinWindows
+ * @see KGroupedStream#windowedBy(SlidingWindows)
+ * @see CogroupedKStream#windowedBy(SlidingWindows)
+ * @see TimestampExtractor
+ */
+
+public final class SlidingWindows {
+
+    /** The size of the windows in milliseconds, defined by the max time 
difference between records. */
+    private final long timeDifferenceMs;
+
+    /** The grace period in milliseconds. */
+    private final long graceMs;
+
+    private SlidingWindows(final long timeDifferenceMs, final long graceMs) {
+        this.timeDifferenceMs = timeDifferenceMs;
+        this.graceMs = graceMs;
+    }
+
+    /**
+     * Return a window definition with the window size based on the given 
maximum time difference (inclusive) between
+     * records in the same window and given window grace period. Reject 
out-of-order events that arrive after {@code grace}.
+     * A window is closed when {@code stream-time > window-end + grace-period}.
+     *
+     * @param timeDifference the max time difference (inclusive) between two 
records in a window
+     * @param grace the grace period to admit out-of-order events to a window
+     * @return a new window definition
+     * @throws IllegalArgumentException if the specified window size is < 0 or 
grace < 0, or either can't be represented as {@code long milliseconds}
+     */
+    public static SlidingWindows withTimeDifferenceAndGrace(final Duration 
timeDifference, final Duration grace) throws IllegalArgumentException {
+        final String msgPrefixSize = 
prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
+        final long timeDifferenceMs = 
ApiUtils.validateMillisecondDuration(timeDifference, msgPrefixSize);
+        if (timeDifferenceMs < 0) {
+            throw new IllegalArgumentException("Window time difference must 
not be negative.");
+        }
+        final String msgPrefixGrace = prepareMillisCheckFailMsgPrefix(grace, 
"afterWindowEnd");

Review comment:
       ```suggestion
           final String msgPrefixGrace = prepareMillisCheckFailMsgPrefix(grace, 
"grace");
   ```

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -459,6 +460,89 @@ public void 
shouldSupportFinalResultsForTimeWindowsWithLargeJump() {
         }
     }
 
+    @Test
+    public void shouldSupportFinalResultsForSlidingWindows() {

Review comment:
       Aside from join, forgetting to test new operators in front of Suppress 
has also been an issue. It's great to see this test here!

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate<K, V, Agg> implements 
KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final String storeName;
+    private final SlidingWindows windows;
+    private final Initializer<Agg> initializer;
+    private final Aggregator<? super K, ? super V, Agg> aggregator;
+
+    private boolean sendOldValues = false;
+
+    public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+                                         final String storeName,
+                                         final Initializer<Agg> initializer,
+                                         final Aggregator<? super K, ? super 
V, Agg> aggregator) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.initializer = initializer;
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamSlidingWindowAggregateProcessor();
+    }
+
+    public SlidingWindows windows() {
+        return windows;
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+    private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor<K, V> {
+        private TimestampedWindowStore<K, Agg> windowStore;
+        private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
+        private StreamsMetricsImpl metrics;
+        private InternalProcessorContext internalProcessorContext;
+        private Sensor lateRecordDropSensor;
+        private Sensor droppedRecordsSensor;
+        private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            super.init(context);
+            internalProcessorContext = (InternalProcessorContext) context;
+            metrics = internalProcessorContext.metrics();
+            final String threadId = Thread.currentThread().getName();
+            lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+                threadId,
+                context.taskId().toString(),
+                internalProcessorContext.currentNode().name(),
+                metrics
+            );
+            droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+            windowStore = (TimestampedWindowStore<K, Agg>) 
context.getStateStore(storeName);
+            tupleForwarder = new TimestampedTupleForwarder<>(
+                windowStore,
+                context,
+                new TimestampedCacheFlushListener<>(context),
+                sendOldValues);
+        }
+
+        @Override
+        public void process(final K key, final V value) {
+            if (key == null || value == null) {
+                log.warn(
+                    "Skipping record due to null key or value. value=[{}] 
topic=[{}] partition=[{}] offset=[{}]",
+                    value, context().topic(), context().partition(), 
context().offset()
+                );
+                droppedRecordsSensor.record();
+                return;
+            }
+
+            final long timestamp = context().timestamp();
+            //don't process records that don't fall within a full sliding 
window
+            if (timestamp < windows.timeDifferenceMs()) {
+                log.warn(
+                    "Skipping record due to early arrival. value=[{}] 
topic=[{}] partition=[{}] offset=[{}]",
+                    value, context().topic(), context().partition(), 
context().offset()
+                );
+                droppedRecordsSensor.record();
+                return;
+            }
+            processInOrder(key, value, timestamp);
+        }
+
+        public void processInOrder(final K key, final V value, final long 
timestamp) {
+
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long closeTime = observedStreamTime - 
windows.gracePeriodMs();
+
+            //store start times of windows we find
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            // aggregate that will go in the current record’s left/right 
window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            // keep the left type window closest to the record
+            Window latestLeftTypeWindow = null;
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.fetch(
+                    key,
+                    key,
+                    timestamp - 2 * windows.timeDifferenceMs(),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + 
windows.timeDifferenceMs();
+
+                    if (endTime < timestamp) {
+                        leftWinAgg = next.value;
+                        if (isLeftWindow(next)) {
+                            latestLeftTypeWindow = next.key.window();
+                        }
+                    } else if (endTime == timestamp) {
+                        leftWinAlreadyCreated = true;
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else if (endTime > timestamp && startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else {
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            //create right window for previous record
+            if (latestLeftTypeWindow != null) {
+                final long rightWinStart = latestLeftTypeWindow.end() + 1;
+                if (!windowStartTimes.contains(rightWinStart)) {
+                    final TimeWindow window = new TimeWindow(rightWinStart, 
rightWinStart + windows.timeDifferenceMs());
+                    final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
+                    putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+                }

Review comment:
       Is it already guaranteed that this window actually contains the current 
record? It doesn't look like we're checking that `endTime >= timestamp` 
anywhere, and it seems like the start of the range (`timestamp - 2 * 
windows.timeDifferenceMs()`) could give back a window that starts and ends 
before the current record's timestamp.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import 
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockProcessor;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.hamcrest.Matcher;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Properties;
+
+import static java.time.Duration.ofMillis;
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class KStreamSlidingWindowAggregateTest {
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+    private final String threadId = Thread.currentThread().getName();
+
+    @Test
+    public void testAggBasic() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTable<Windowed<String>, String> table2 = builder
+                .stream(topic1, Consumed.with(Serdes.String(), 
Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(50)))
+                .aggregate(
+                        MockInitializer.STRING_INIT,
+                        MockAggregator.TOSTRING_ADDER,
+                        Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String())
+                );
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new 
MockProcessorSupplier<>();
+        table2.toStream().process(supplier);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic1 =
+                    driver.createInputTopic(topic1, new StringSerializer(), 
new StringSerializer());
+            inputTopic1.pipeInput("A", "1", 10L);
+            inputTopic1.pipeInput("A", "1", 20L);
+            inputTopic1.pipeInput("A", "1", 22L);
+            inputTopic1.pipeInput("A", "3", 15L);
+
+            inputTopic1.pipeInput("B", "2", 12L);
+            inputTopic1.pipeInput("B", "2", 13L);
+            inputTopic1.pipeInput("B", "2", 18L);
+            inputTopic1.pipeInput("B", "1", 19L);
+            inputTopic1.pipeInput("B", "2", 25L);
+            inputTopic1.pipeInput("B", "3", 14L);
+
+            inputTopic1.pipeInput("C", "3", 11L);
+            inputTopic1.pipeInput("C", "4", 15L);
+            inputTopic1.pipeInput("C", "1", 16L);
+            inputTopic1.pipeInput("C", "1", 21);
+            inputTopic1.pipeInput("C", "1", 23L);
+
+            inputTopic1.pipeInput("D", "4", 11L);
+            inputTopic1.pipeInput("D", "2", 12L);
+            inputTopic1.pipeInput("D", "3", 29L);
+            inputTopic1.pipeInput("D", "5", 16L);
+        }
+
+        assertEquals(
+                asList(
+                        // A@10 left window created when A@10 processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(0, 10)), "0+1", 10),
+                        // A@10 right window created when A@20 processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(11, 21)), "0+1", 20),
+                        // A@20 left window created when A@20 processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(10, 20)), "0+1+1", 20),
+                        // A@20 right window created when A@22 processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(21, 31)), "0+1", 22),
+                        // A@22 left window created when A@22 processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(12, 22)), "0+1+1", 22),
+                        // A@20 left window updated when A@15 processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(10, 20)), "0+1+1+3", 20),
+                        // A@10 right window updated when A@15 processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(11, 21)), "0+1+3", 20),
+                        // A@22 left window updated when A@15 processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(12, 22)), "0+1+1+3", 22),
+                        // A@15 left window created when A@15 processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(5, 15)), "0+1+3", 15),
+                        // A@15 right window created when A@15 processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(16, 26)), "0+1+1", 22),
+
+                        // B@12 left window created when B@12 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(2, 12)), "0+2", 12),
+                        // B@12 right window created when B@13 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(13, 23)), "0+2", 13),
+                        // B@13 left window created when B@13 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(3, 13)), "0+2+2", 13),
+                        // B@12 right window updated when B@18 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(13, 23)), "0+2+2", 18),
+                        // B@13 right window created when B@18 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(14, 24)), "0+2", 18),
+                        // B@18 left window created when B@18 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(8, 18)), "0+2+2+2", 18),
+                        // B@12 right window updated when B@19 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(13, 23)), "0+2+2+1", 19),
+                        // B@13 right window updated when B@19 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(14, 24)), "0+2+1", 19),
+                        // B@18 right window created when B@19 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(19, 29)), "0+1", 19),
+                        // B@19 left window created when B@19 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(9, 19)), "0+2+2+2+1", 19),
+                        // B@18 right window updated when B@25 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(19, 29)), "0+1+2", 25),
+                        // B@19 right window updated when B@25 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(20, 30)), "0+2", 25),
+                        // B@25 left window created when B@25 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(15, 25)), "0+2+1+2", 25),
+                        // B@18 left window updated when B@14 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(8, 18)), "0+2+2+2+3", 18),
+                        // B@19 left window updated when B@14 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(9, 19)), "0+2+2+2+1+3", 19),
+                        // B@12 right window updated when B@14 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(13, 23)), "0+2+2+1+3", 19),
+                        // B@13 right window updated when B@14 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(14, 24)), "0+2+1+3", 19),
+                        // B@14 left window created when B@14 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(4, 14)), "0+2+2+3", 14),
+
+                        // C@11 left window created when C@11 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(1, 11)), "0+3", 11),
+                        // C@11 right window created when C@15 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(12, 22)), "0+4", 15),
+                        // C@15 left window created when C@15 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(5, 15)), "0+3+4", 15),
+                        // C@11 right window updated when C@16 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(12, 22)), "0+4+1", 16),
+                        // C@15 right window created when C@16 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(16, 26)), "0+1", 16),
+                        // C@16 left window created when C@16 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(6, 16)), "0+3+4+1", 16),
+                        // C@11 right window updated when C@21 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(12, 22)), "0+4+1+1", 21),
+                        // C@15 right window updated when C@21 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(16, 26)), "0+1+1", 21),
+                        // C@16 right window created when C@21 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(17, 27)), "0+1", 21),
+                        // C@21 left window created when C@21 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(11, 21)), "0+3+4+1+1", 21),
+                        // C@15 right window updated when C@23 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(16, 26)), "0+1+1+1", 23),
+                        // C@16 right window updated when C@23 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(17, 27)), "0+1+1", 23),
+                        // C@21 right window created when C@23 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(22, 32)), "0+1", 23),
+                        // C@23 left window created when C@23 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(13, 23)), "0+4+1+1+1", 23),
+
+                        // D@11 left window created when D@11 processed
+                        new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(1, 11)), "0+4", 11),
+                        // D@11 right window created when D@12 processed
+                        new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(12, 22)), "0+2", 12),
+                        // D@12 left window created when D@12 processed
+                        new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(2, 12)), "0+4+2", 12),
+                        // D@29 left window created when D@29 processed
+                        new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(19, 29)), "0+3", 29),
+                        // D@11 right window updated when D@16 processed
+                        new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(12, 22)), "0+2+5", 16),
+                        // D@12 right window created when D@16 processed
+                        new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(13, 23)), "0+5", 16),
+                        // D@16 left window created when D@16 processed
+                        new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(6, 16)), "0+4+2+5", 16)
+                        ),
+                supplier.theCapturedProcessor().processed
+        );
+    }
+
+    @Test
+    public void testJoin() {

Review comment:
       The Achilles heel of implementing new KTable features has historically 
been that we forgot to test them in a context that required the ValueGetter to 
work properly, of which Join is a notable use case. I'd actually say it should 
be required for every KTable operator to have a test where it's the source of a 
Join. For stateless operators, we should test both with and without a 
Materialized argument on the operator.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
##########
@@ -275,6 +275,15 @@
      */
     <W extends Window> TimeWindowedCogroupedKStream<K, VOut> windowedBy(final 
Windows<W> windows);
 
+    /**
+     * Create a new {@link TimeWindowedCogroupedKStream} instance that can be 
used to perform sliding
+     * windowed aggregations.
+     *
+     * @param windows the specification of the aggregation {@link 
SlidingWindows}
+     * @return an instance of {@link TimeWindowedCogroupedKStream}
+     */
+    TimeWindowedCogroupedKStream<K, VOut> windowedBy(final SlidingWindows 
windows);

Review comment:
       Coming back to this after completing the review, I'd say the biggest 
advice I'd share is to avoid whitespace changes and cleanups on the side when 
the PR is so long already. In fact, for my own super-complex PRs, I tend to go 
back over the whole diff and back out anything that's not critically important, 
just to lighten the load on the reviewers.
   
   Cleanups are nice to have, but it's better to keep them in their own PRs or 
in more trivial ones.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate<K, V, Agg> implements 
KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final String storeName;
+    private final SlidingWindows windows;
+    private final Initializer<Agg> initializer;
+    private final Aggregator<? super K, ? super V, Agg> aggregator;
+
+    private boolean sendOldValues = false;
+
+    public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+                                         final String storeName,
+                                         final Initializer<Agg> initializer,
+                                         final Aggregator<? super K, ? super 
V, Agg> aggregator) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.initializer = initializer;
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamSlidingWindowAggregateProcessor();
+    }
+
+    public SlidingWindows windows() {
+        return windows;
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+    private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor<K, V> {
+        private TimestampedWindowStore<K, Agg> windowStore;
+        private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
+        private StreamsMetricsImpl metrics;
+        private InternalProcessorContext internalProcessorContext;
+        private Sensor lateRecordDropSensor;
+        private Sensor droppedRecordsSensor;
+        private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            super.init(context);
+            internalProcessorContext = (InternalProcessorContext) context;
+            metrics = internalProcessorContext.metrics();
+            final String threadId = Thread.currentThread().getName();
+            lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+                threadId,
+                context.taskId().toString(),
+                internalProcessorContext.currentNode().name(),
+                metrics
+            );
+            droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+            windowStore = (TimestampedWindowStore<K, Agg>) 
context.getStateStore(storeName);
+            tupleForwarder = new TimestampedTupleForwarder<>(
+                windowStore,
+                context,
+                new TimestampedCacheFlushListener<>(context),
+                sendOldValues);
+        }
+
+        @Override
+        public void process(final K key, final V value) {
+            if (key == null || value == null) {
+                log.warn(
+                    "Skipping record due to null key or value. value=[{}] 
topic=[{}] partition=[{}] offset=[{}]",
+                    value, context().topic(), context().partition(), 
context().offset()
+                );
+                droppedRecordsSensor.record();
+                return;
+            }
+
+            final long timestamp = context().timestamp();
+            //don't process records that don't fall within a full sliding 
window
+            if (timestamp < windows.timeDifferenceMs()) {
+                log.warn(
+                    "Skipping record due to early arrival. value=[{}] 
topic=[{}] partition=[{}] offset=[{}]",
+                    value, context().topic(), context().partition(), 
context().offset()
+                );
+                droppedRecordsSensor.record();
+                return;
+            }
+            processInOrder(key, value, timestamp);
+        }
+
+        public void processInOrder(final K key, final V value, final long 
timestamp) {
+
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long closeTime = observedStreamTime - 
windows.gracePeriodMs();
+
+            //store start times of windows we find
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            // aggregate that will go in the current record’s left/right 
window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            // keep the left type window closest to the record
+            Window latestLeftTypeWindow = null;
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.fetch(
+                    key,
+                    key,
+                    timestamp - 2 * windows.timeDifferenceMs(),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + 
windows.timeDifferenceMs();
+
+                    if (endTime < timestamp) {
+                        leftWinAgg = next.value;
+                        if (isLeftWindow(next)) {
+                            latestLeftTypeWindow = next.key.window();
+                        }
+                    } else if (endTime == timestamp) {
+                        leftWinAlreadyCreated = true;
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else if (endTime > timestamp && startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else {
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            //create right window for previous record
+            if (latestLeftTypeWindow != null) {
+                final long rightWinStart = latestLeftTypeWindow.end() + 1;
+                if (!windowStartTimes.contains(rightWinStart)) {
+                    final TimeWindow window = new TimeWindow(rightWinStart, 
rightWinStart + windows.timeDifferenceMs());
+                    final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
+                    putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+                }
+            }
+
+            //create left window for new record
+            if (!leftWinAlreadyCreated) {
+                final ValueAndTimestamp<Agg> valueAndTime;
+                //there's a right window that the new record could create --> 
new record's left window is not empty
+                if (latestLeftTypeWindow != null) {
+                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
+                } else {
+                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
+                }
+                final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
+                putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+            }
+            //create right window for new record
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
+                final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+                putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+            }
+        }
+
+        private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<Agg> 
rightWinAgg, final long timestamp) {
+            return rightWinAgg != null && rightWinAgg.timestamp() > timestamp;
+        }
+
+        private boolean isLeftWindow(final KeyValue<Windowed<K>, 
ValueAndTimestamp<Agg>> window) {
+            return window.key.window().end() == window.value.timestamp();
+        }
+
+        private void putAndForward(final Window window,
+                                   final ValueAndTimestamp<Agg> valueAndTime,
+                                   final K key,
+                                   final V value,
+                                   final long closeTime,
+                                   final long timestamp) {

Review comment:
       Might not be a bad idea to have an assertion here that the timestamp is 
actually in the window boundaries.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate<K, V, Agg> implements 
KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final String storeName;
+    private final SlidingWindows windows;
+    private final Initializer<Agg> initializer;
+    private final Aggregator<? super K, ? super V, Agg> aggregator;
+
+    private boolean sendOldValues = false;
+
+    public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+                                         final String storeName,
+                                         final Initializer<Agg> initializer,
+                                         final Aggregator<? super K, ? super 
V, Agg> aggregator) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.initializer = initializer;
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamSlidingWindowAggregateProcessor();
+    }
+
+    public SlidingWindows windows() {
+        return windows;
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+    private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor<K, V> {
+        private TimestampedWindowStore<K, Agg> windowStore;
+        private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
+        private StreamsMetricsImpl metrics;
+        private InternalProcessorContext internalProcessorContext;
+        private Sensor lateRecordDropSensor;
+        private Sensor droppedRecordsSensor;
+        private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            super.init(context);
+            internalProcessorContext = (InternalProcessorContext) context;
+            metrics = internalProcessorContext.metrics();
+            final String threadId = Thread.currentThread().getName();
+            lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+                threadId,
+                context.taskId().toString(),
+                internalProcessorContext.currentNode().name(),
+                metrics
+            );
+            droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+            windowStore = (TimestampedWindowStore<K, Agg>) 
context.getStateStore(storeName);
+            tupleForwarder = new TimestampedTupleForwarder<>(
+                windowStore,
+                context,
+                new TimestampedCacheFlushListener<>(context),
+                sendOldValues);
+        }
+
+        @Override
+        public void process(final K key, final V value) {
+            if (key == null || value == null) {
+                log.warn(
+                    "Skipping record due to null key or value. value=[{}] 
topic=[{}] partition=[{}] offset=[{}]",
+                    value, context().topic(), context().partition(), 
context().offset()
+                );
+                droppedRecordsSensor.record();
+                return;
+            }
+
+            final long timestamp = context().timestamp();
+            //don't process records that don't fall within a full sliding 
window
+            if (timestamp < windows.timeDifferenceMs()) {
+                log.warn(
+                    "Skipping record due to early arrival. value=[{}] 
topic=[{}] partition=[{}] offset=[{}]",
+                    value, context().topic(), context().partition(), 
context().offset()
+                );
+                droppedRecordsSensor.record();
+                return;
+            }
+            processInOrder(key, value, timestamp);
+        }
+
+        public void processInOrder(final K key, final V value, final long 
timestamp) {
+
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long closeTime = observedStreamTime - 
windows.gracePeriodMs();
+
+            //store start times of windows we find
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            // aggregate that will go in the current record’s left/right 
window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            // keep the left type window closest to the record
+            Window latestLeftTypeWindow = null;
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.fetch(
+                    key,
+                    key,
+                    timestamp - 2 * windows.timeDifferenceMs(),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;

Review comment:
       minor: this could be declared `final` at the assignment on line 161

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -0,0 +1,692 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import 
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockProcessor;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockReducer;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.hamcrest.Matcher;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+import static java.time.Duration.ofMillis;
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class KStreamSlidingWindowAggregateTest {
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+    private final String threadId = Thread.currentThread().getName();
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testAggregateSmallInput() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+
+        final KTable<Windowed<String>, String> table = builder
+            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+            
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(50)))
+            .aggregate(
+                MockInitializer.STRING_INIT,
+                MockAggregator.TOSTRING_ADDER,
+                Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
+            );
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new 
MockProcessorSupplier<>();
+        table.toStream().process(supplier);
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic =
+                driver.createInputTopic(topic, new StringSerializer(), new 
StringSerializer());
+            inputTopic.pipeInput("A", "1", 10L);
+            inputTopic.pipeInput("A", "2", 15L);
+            inputTopic.pipeInput("A", "3", 20L);
+            inputTopic.pipeInput("A", "4", 22L);
+            inputTopic.pipeInput("A", "5", 30L);
+        }
+
+        final Map<Long, ValueAndTimestamp<String>> actual = new HashMap<>();
+        for (final KeyValueTimestamp<Object, Object> entry : 
supplier.theCapturedProcessor().processed) {
+            final Windowed<String> window = (Windowed<String>) entry.key();
+            final Long start = window.window().start();
+            final ValueAndTimestamp valueAndTimestamp = 
ValueAndTimestamp.make((String) entry.value(), entry.timestamp());
+            if (actual.putIfAbsent(start, valueAndTimestamp) != null) {
+                actual.replace(start, valueAndTimestamp);
+            }
+        }
+
+        final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>();
+        expected.put(0L, ValueAndTimestamp.make("0+1", 10L));
+        expected.put(5L, ValueAndTimestamp.make("0+1+2", 15L));
+        expected.put(10L, ValueAndTimestamp.make("0+1+2+3", 20L));
+        expected.put(11L, ValueAndTimestamp.make("0+2+3", 20L));
+        expected.put(12L, ValueAndTimestamp.make("0+2+3+4", 22L));
+        expected.put(16L, ValueAndTimestamp.make("0+3+4", 22L));
+        expected.put(20L, ValueAndTimestamp.make("0+3+4+5", 30L));
+        expected.put(21L, ValueAndTimestamp.make("0+4+5", 30L));
+        expected.put(23L, ValueAndTimestamp.make("0+5", 30L));
+
+        assertEquals(expected, actual);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testReduceSmallInput() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+
+        final KTable<Windowed<String>, String> table = builder
+            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+            
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(50)))
+            .reduce(
+                MockReducer.STRING_ADDER,
+                Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
+            );
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new 
MockProcessorSupplier<>();
+        table.toStream().process(supplier);
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic =
+                driver.createInputTopic(topic, new StringSerializer(), new 
StringSerializer());
+            inputTopic.pipeInput("A", "1", 10L);
+            inputTopic.pipeInput("A", "2", 14L);
+            inputTopic.pipeInput("A", "3", 15L);
+            inputTopic.pipeInput("A", "4", 22L);
+            inputTopic.pipeInput("A", "5", 26L);
+            inputTopic.pipeInput("A", "6", 30L);
+        }
+
+        final Map<Long, ValueAndTimestamp<String>> actual = new HashMap<>();
+        for (final KeyValueTimestamp<Object, Object> entry : 
supplier.theCapturedProcessor().processed) {
+            final Windowed<String> window = (Windowed<String>) entry.key();
+            final Long start = window.window().start();
+            final ValueAndTimestamp valueAndTimestamp = 
ValueAndTimestamp.make((String) entry.value(), entry.timestamp());
+            if (actual.putIfAbsent(start, valueAndTimestamp) != null) {
+                actual.replace(start, valueAndTimestamp);
+            }
+        }
+
+        final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>();
+        expected.put(0L, ValueAndTimestamp.make("1", 10L));
+        expected.put(4L, ValueAndTimestamp.make("1+2", 14L));
+        expected.put(5L, ValueAndTimestamp.make("1+2+3", 15L));
+        expected.put(11L, ValueAndTimestamp.make("2+3", 15L));
+        expected.put(12L, ValueAndTimestamp.make("2+3+4", 22L));
+        expected.put(15L, ValueAndTimestamp.make("3+4", 22L));
+        expected.put(16L, ValueAndTimestamp.make("4+5", 26L));
+        expected.put(20L, ValueAndTimestamp.make("4+5+6", 30L));
+        expected.put(23L, ValueAndTimestamp.make("5+6", 30L));
+        expected.put(27L, ValueAndTimestamp.make("6", 30L));
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testAggregateLargeInput() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTable<Windowed<String>, String> table2 = builder
+                .stream(topic1, Consumed.with(Serdes.String(), 
Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(50)))
+                .aggregate(
+                        MockInitializer.STRING_INIT,
+                        MockAggregator.TOSTRING_ADDER,
+                        Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String())
+                );
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new 
MockProcessorSupplier<>();
+        table2.toStream().process(supplier);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic1 =
+                    driver.createInputTopic(topic1, new StringSerializer(), 
new StringSerializer());
+            inputTopic1.pipeInput("A", "1", 10L);
+            inputTopic1.pipeInput("A", "2", 20L);
+            inputTopic1.pipeInput("A", "3", 22L);
+            inputTopic1.pipeInput("A", "4", 15L);
+
+            inputTopic1.pipeInput("B", "1", 12L);
+            inputTopic1.pipeInput("B", "2", 13L);
+            inputTopic1.pipeInput("B", "3", 18L);
+            inputTopic1.pipeInput("B", "4", 19L);
+            inputTopic1.pipeInput("B", "5", 25L);
+            inputTopic1.pipeInput("B", "6", 14L);
+
+            inputTopic1.pipeInput("C", "1", 11L);
+            inputTopic1.pipeInput("C", "2", 15L);
+            inputTopic1.pipeInput("C", "3", 16L);
+            inputTopic1.pipeInput("C", "4", 21);
+            inputTopic1.pipeInput("C", "5", 23L);
+
+            inputTopic1.pipeInput("D", "4", 11L);
+            inputTopic1.pipeInput("D", "2", 12L);
+            inputTopic1.pipeInput("D", "3", 29L);
+            inputTopic1.pipeInput("D", "5", 16L);
+        }
+
+        assertEquals(
+                asList(
+                        // FINAL WINDOW: A@10 left window created when A@10 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(0, 10)), "0+1", 10),
+                        // A@10 right window created when A@20 processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(11, 21)), "0+2", 20),
+                        // A@20 left window created when A@20 processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(10, 20)), "0+1+2", 20),
+                        // FINAL WINDOW: A@20 right window created when A@22 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(21, 31)), "0+3", 22),
+                        // A@22 left window created when A@22 processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(12, 22)), "0+2+3", 22),
+                        // FINAL WINDOW: A@20 left window updated when A@15 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(10, 20)), "0+1+2+4", 20),
+                        // FINAL WINDOW: A@10 right window updated when A@15 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(11, 21)), "0+2+4", 20),
+                        // FINAL WINDOW: A@22 left window updated when A@15 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(12, 22)), "0+2+3+4", 22),
+                        // FINAL WINDOW: A@15 left window created when A@15 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(5, 15)), "0+1+4", 15),
+                        // FINAL WINDOW: A@15 right window created when A@15 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(16, 26)), "0+2+3", 22),
+
+                        // FINAL WINDOW: B@12 left window created when B@12 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(2, 12)), "0+1", 12),
+                        // B@12 right window created when B@13 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(13, 23)), "0+2", 13),
+                        // FINAL WINDOW: B@13 left window created when B@13 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(3, 13)), "0+1+2", 13),
+                        // B@12 right window updated when B@18 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(13, 23)), "0+2+3", 18),
+                        // B@13 right window created when B@18 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(14, 24)), "0+3", 18),
+                        // B@18 left window created when B@18 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(8, 18)), "0+1+2+3", 18),
+                        // B@12 right window updated when B@19 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(13, 23)), "0+2+3+4", 19),
+                        // B@13 right window updated when B@19 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(14, 24)), "0+3+4", 19),
+                        // B@18 right window created when B@19 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(19, 29)), "0+4", 19),
+                        // B@19 left window created when B@19 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(9, 19)), "0+1+2+3+4", 19),
+                        // FINAL WINDOW: B@18 right window updated when B@25 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(19, 29)), "0+4+5", 25),
+                        // FINAL WINDOW: B@19 right window updated when B@25 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(20, 30)), "0+5", 25),
+                        // FINAL WINDOW: B@25 left window created when B@25 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(15, 25)), "0+3+4+5", 25),
+                        // FINAL WINDOW: B@18 left window updated when B@14 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(8, 18)), "0+1+2+3+6", 18),
+                        // FINAL WINDOW: B@19 left window updated when B@14 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(9, 19)), "0+1+2+3+4+6", 19),
+                        // FINAL WINDOW: B@12 right window updated when B@14 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(13, 23)), "0+2+3+4+6", 19),
+                        // FINAL WINDOW: B@13 right window updated when B@14 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(14, 24)), "0+3+4+6", 19),
+                        // FINAL WINDOW: B@14 left window created when B@14 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(4, 14)), "0+1+2+6", 14),
+
+                        // FINAL WINDOW: C@11 left window created when C@11 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(1, 11)), "0+1", 11),
+                        // C@11 right window created when C@15 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(12, 22)), "0+2", 15),
+                        // FINAL WINDOW: C@15 left window created when C@15 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(5, 15)), "0+1+2", 15),
+                        // C@11 right window updated when C@16 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(12, 22)), "0+2+3", 16),
+                        // C@15 right window created when C@16 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(16, 26)), "0+3", 16),
+                        // FINAL WINDOW: C@16 left window created when C@16 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(6, 16)), "0+1+2+3", 16),
+                        // FINAL WINDOW: C@11 right window updated when C@21 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(12, 22)), "0+2+3+4", 21),
+                        // C@15 right window updated when C@21 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(16, 26)), "0+3+4", 21),
+                        // C@16 right window created when C@21 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(17, 27)), "0+4", 21),
+                        // FINAL WINDOW: C@21 left window created when C@21 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(11, 21)), "0+1+2+3+4", 21),
+                        // FINAL WINDOW: C@15 right window updated when C@23 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(16, 26)), "0+3+4+5", 23),
+                        // FINAL WINDOW: C@16 right window updated when C@23 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(17, 27)), "0+4+5", 23),
+                        // FINAL WINDOW: C@21 right window created when C@23 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(22, 32)), "0+5", 23),
+                        // FINAL WINDOW: C@23 left window created when C@23 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(13, 23)), "0+2+3+4+5", 23),
+
+                        // FINAL WINDOW: D@11 left window created when D@11 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(1, 11)), "0+4", 11),
+                        // D@11 right window created when D@12 processed
+                        new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(12, 22)), "0+2", 12),
+                        // FINAL WINDOW: D@12 left window created when D@12 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(2, 12)), "0+4+2", 12),
+                        // FINAL WINDOW: D@29 left window created when D@29 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(19, 29)), "0+3", 29),
+                        // FINAL WINDOW: D@11 right window updated when D@16 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(12, 22)), "0+2+5", 16),
+                        // FINAL WINDOW: D@12 right window created when D@16 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(13, 23)), "0+5", 16),
+                        // FINAL WINDOW: D@16 left window created when D@16 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(6, 16)), "0+4+2+5", 16)
+                        ),
+                supplier.theCapturedProcessor().processed
+        );
+    }
+
+    @Test
+    public void testJoin() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+        final String topic2 = "topic2";
+
+        final KTable<Windowed<String>, String> table1 = builder
+                .stream(topic1, Consumed.with(Serdes.String(), 
Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(100)))
+                .aggregate(
+                        MockInitializer.STRING_INIT,
+                        MockAggregator.TOSTRING_ADDER,
+                        Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String())
+                );
+
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new 
MockProcessorSupplier<>();
+        table1.toStream().process(supplier);
+
+        final KTable<Windowed<String>, String> table2 = builder
+                .stream(topic2, Consumed.with(Serdes.String(), 
Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(100)))
+                .aggregate(
+                        MockInitializer.STRING_INIT,
+                        MockAggregator.TOSTRING_ADDER,
+                        Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic2-Canonized").withValueSerde(Serdes.String())
+                );
+        table2.toStream().process(supplier);
+
+        table1.join(table2, (p1, p2) -> p1 + "%" + 
p2).toStream().process(supplier);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic1 =
+                    driver.createInputTopic(topic1, new StringSerializer(), 
new StringSerializer());
+            final TestInputTopic<String, String> inputTopic2 =
+                    driver.createInputTopic(topic2, new StringSerializer(), 
new StringSerializer());
+            inputTopic1.pipeInput("A", "1", 10L);
+            inputTopic1.pipeInput("B", "2", 11L);
+            inputTopic1.pipeInput("C", "3", 12L);
+
+            final List<MockProcessor<Windowed<String>, String>> processors = 
supplier.capturedProcessors(3);
+
+            processors.get(0).checkAndClearProcessResult(
+                    // left windows created by the first set of records to 
table 1
+                    new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(0, 10)),  "0+1",  10),
+                    new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(1, 11)),  "0+2",  11),
+                    new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(2, 12)),  "0+3",  12)
+            );
+            processors.get(1).checkAndClearProcessResult();
+            processors.get(2).checkAndClearProcessResult();
+
+            inputTopic1.pipeInput("A", "1", 15L);
+            inputTopic1.pipeInput("B", "2", 16L);
+            inputTopic1.pipeInput("C", "3", 19L);
+
+            processors.get(0).checkAndClearProcessResult(
+                    // right windows from previous records are created, and 
left windows from new records to table 1
+                    new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(11, 21)),  "0+1",  15),
+                    new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(5, 15)),  "0+1+1",  15),
+                    new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(12, 22)),  "0+2",  16),
+                    new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(6, 16)),  "0+2+2",  16),
+                    new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(13, 23)),  "0+3",  19),
+                    new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(9, 19)),  "0+3+3",  19)
+            );
+            processors.get(1).checkAndClearProcessResult();
+            processors.get(2).checkAndClearProcessResult();
+
+            inputTopic2.pipeInput("A", "a", 10L);
+            inputTopic2.pipeInput("B", "b", 30L);
+            inputTopic2.pipeInput("C", "c", 12L);
+            inputTopic2.pipeInput("C", "c", 35L);
+
+
+            processors.get(0).checkAndClearProcessResult();
+            processors.get(1).checkAndClearProcessResult(
+                    // left windows from first set of records sent to table 2
+                    new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(0, 10)),  "0+a",  10),
+                    new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(20, 30)),  "0+b",  30),
+                    new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(2, 12)),  "0+c",  12),
+                    new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(25, 35)),  "0+c",  35)
+            );
+            processors.get(2).checkAndClearProcessResult(
+                    // set of join windows from windows created by table 1 and 
table 2
+                    new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(0, 10)),  "0+1%0+a",  10),
+                    new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(2, 12)),  "0+3%0+c",  12)
+            );
+
+            inputTopic2.pipeInput("A", "a", 15L);
+            inputTopic2.pipeInput("B", "b", 16L);
+            inputTopic2.pipeInput("C", "c", 17L);
+
+            processors.get(0).checkAndClearProcessResult();
+            processors.get(1).checkAndClearProcessResult(
+                    // right windows from previous records are created (where 
applicable), and left windows from new records to table 2
+                    new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(11, 21)),  "0+a",  15),
+                    new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(5, 15)),  "0+a+a",  15),
+                    new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(6, 16)),  "0+b",  16),
+                    new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(13, 23)),  "0+c",  17),
+                    new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(7, 17)),  "0+c+c",  17)
+            );
+            processors.get(2).checkAndClearProcessResult(
+                    // set of join windows from windows created by table 1 and 
table 2
+                    new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(11, 21)),  "0+1%0+a",  15),
+                    new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(5, 15)),  "0+1+1%0+a+a",  15),
+                    new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(6, 16)),  "0+2+2%0+b",  16),
+                    new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(13, 23)),  "0+3%0+c",  19)
+            );
+        }
+    }
+
+    @Test
+    public void shouldLogAndMeterWhenSkippingNullKey() {

Review comment:
       I'd normally say we should have a test also to verify we log properly on 
early records, but you already opened the PR to add early record handling, so 
we're good.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImpl.java
##########
@@ -61,7 +62,7 @@
         Objects.requireNonNull(groupedStream, "groupedStream can't be null");
         Objects.requireNonNull(aggregator, "aggregator can't be null");
         groupPatterns.put((KGroupedStreamImpl<K, ?>) groupedStream,
-                          (Aggregator<? super K, ? super Object, VOut>) 
aggregator);
+            (Aggregator<? super K, ? super Object, VOut>) aggregator);

Review comment:
       ```suggestion
                             (Aggregator<? super K, ? super Object, VOut>) 
aggregator);
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
##########
@@ -275,6 +275,15 @@
      */
     <W extends Window> TimeWindowedCogroupedKStream<K, VOut> windowedBy(final 
Windows<W> windows);
 
+    /**
+     * Create a new {@link TimeWindowedCogroupedKStream} instance that can be 
used to perform sliding
+     * windowed aggregations.
+     *
+     * @param windows the specification of the aggregation {@link 
SlidingWindows}
+     * @return an instance of {@link TimeWindowedCogroupedKStream}
+     */
+    TimeWindowedCogroupedKStream<K, VOut> windowedBy(final SlidingWindows 
windows);

Review comment:
       I'm reviewing this whole PR as-is, so there's no need to do anything 
now, but @mjsax 's specific suggestion is beside the point. The general 
feedback is that this PR is too large, which it is. We shoot for under 1K, and 
it's the PR author's responsibility to figure out the best way to break it up.
   
   This policy isn't just "reviewers complaining," it's an important component 
of ensuring AK's quality. Long PRs overwhelm any reviewer's cognitive capacity 
to pay attention to every detail, so oversights are more likely to slip through 
into the codebase, and once they're there, you're really at the mercy of the 
testing layers to catch them. When the oversights are very subtle, they wind up 
getting released and then surface as user-reported bugs. Reviewers can't 
guarantee to notice every problem, but our capacity to notice problems is 
inversely proportional to the length of the PR.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate<K, V, Agg> implements 
KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final String storeName;
+    private final SlidingWindows windows;
+    private final Initializer<Agg> initializer;
+    private final Aggregator<? super K, ? super V, Agg> aggregator;
+
+    private boolean sendOldValues = false;
+
+    public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+                                          final String storeName,
+                                          final Initializer<Agg> initializer,
+                                          final Aggregator<? super K, ? super 
V, Agg> aggregator) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.initializer = initializer;
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamSlidingWindowAggregateProcessor();
+    }
+
+    public SlidingWindows windows() {
+        return windows;
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+    private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor<K, V> {
+        private TimestampedWindowStore<K, Agg> windowStore;
+        private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
+        private StreamsMetricsImpl metrics;
+        private InternalProcessorContext internalProcessorContext;
+        private Sensor lateRecordDropSensor;
+        private Sensor droppedRecordsSensor;
+        private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+        private boolean reverseIteratorImplemented = false;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            super.init(context);
+            internalProcessorContext = (InternalProcessorContext) context;
+            metrics = internalProcessorContext.metrics();
+            final String threadId = Thread.currentThread().getName();
+            lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+                    threadId,
+                    context.taskId().toString(),
+                    internalProcessorContext.currentNode().name(),
+                    metrics
+            );
+            //catch unsupported operation error
+            droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+            windowStore = (TimestampedWindowStore<K, Agg>) 
context.getStateStore(storeName);
+            tupleForwarder = new TimestampedTupleForwarder<>(
+                    windowStore,
+                    context,
+                    new TimestampedCacheFlushListener<>(context),
+                    sendOldValues);
+        }
+
+        @Override
+        public void process(final K key, final V value) {
+            if (key == null || value == null) {
+                log.warn(
+                        "Skipping record due to null key or value. value=[{}] 
topic=[{}] partition=[{}] offset=[{}]",
+                        value, context().topic(), context().partition(), 
context().offset()
+                );
+                droppedRecordsSensor.record();
+                return;
+            }
+            if (reverseIteratorImplemented) {
+                processReverse(key, value);
+            } else {
+                processInOrder(key, value);
+            }
+        }
+
+        public void processReverse(final K key, final V value) {
+
+            final long timestamp = context().timestamp();
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long closeTime = observedStreamTime - 
windows.gracePeriodMs();
+            //store start times of windows we find
+            final HashSet<Long> windowStartTimes = new HashSet<Long>();
+            // aggregate that will go in the current record’s left/right 
window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            try (
+                    final KeyValueIterator<Windowed<K>, 
ValueAndTimestamp<Agg>> iterator = windowStore.fetch(
+                            key,
+                            key,
+                            timestamp - 2 * windows.timeDifferenceMs(),
+                            timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                //if we've already seen the window with the closest start time 
to the record
+                boolean foundRightWinAgg = false;
+                //if we've already seen the window with the closest end time 
to the record
+                boolean foundLeftWinAgg = false;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+
+                    //determine if current record's right window exists, will 
only be true at most once, on the first pass
+                    if (next.key.window().start() == timestamp + 1) {
+                        rightWinAlreadyCreated = true;
+                        continue;
+                    } else if (next.key.window().end() > timestamp) {
+                        if (!foundRightWinAgg) {
+                            foundRightWinAgg = true;
+                            rightWinAgg = next.value;
+                        }
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                        continue;
+                    } else if (next.key.window().end() == timestamp) {
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                        leftWinAlreadyCreated = true;
+                        continue;
+                    } else {
+                        if (!foundLeftWinAgg) {
+                            leftWinAgg = next.value;
+                            foundLeftWinAgg = true;
+                        }
+                        //If it's a left window, there is a record at this 
window's end time who may need a corresponding right window
+                        if (isLeftWindow(next)) {
+                            final long rightWinStart = next.key.window().end() 
+ 1;
+                            if (!windowStartTimes.contains(rightWinStart)) {
+                                final TimeWindow window = new 
TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs());
+                                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
+                                putAndForward(window, valueAndTime, key, 
value, closeTime, timestamp);
+                            }
+                            break;
+                        }
+                    }
+                }
+            }
+            //create the left window of the current record if it's not created
+            if (!leftWinAlreadyCreated) {
+                final ValueAndTimestamp<Agg> valueAndTime;
+                //confirms that the left window contains more than the current 
record
+                if (leftWinAgg.timestamp() < timestamp && 
leftWinAgg.timestamp() > timestamp - windows.timeDifferenceMs()) {
+                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
+                } else {
+                    //left window just contains the current record
+                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
+                }
+                final TimeWindow window = new TimeWindow(Math.max(0, timestamp 
- windows.timeDifferenceMs()), timestamp);
+                putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+            }
+            //create the right window for the current record, if need be
+            if (!rightWinAlreadyCreated && rightWinAgg != null && 
rightWinAgg.timestamp() > timestamp) {
+                final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+                putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+            }
+        }
+
+        public void processInOrder(final K key, final V value) {
+
+            final long timestamp = context().timestamp();
+            //don't process records that don't fall within a full sliding 
window
+            if (timestamp < windows.timeDifferenceMs()) {

Review comment:
       Is this condition supposed to be checking whether records are "early" 
with respect to now? It looks like it should be:
   ```suggestion
               if (timestamp < (observedStreamTime - 
windows.timeDifferenceMs())) {
   ```

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -0,0 +1,692 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import 
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockProcessor;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockReducer;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.hamcrest.Matcher;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+import static java.time.Duration.ofMillis;
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class KStreamSlidingWindowAggregateTest {
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+    private final String threadId = Thread.currentThread().getName();
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testAggregateSmallInput() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+
+        final KTable<Windowed<String>, String> table = builder
+            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+            
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(50)))
+            .aggregate(
+                MockInitializer.STRING_INIT,
+                MockAggregator.TOSTRING_ADDER,
+                Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
+            );
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new 
MockProcessorSupplier<>();
+        table.toStream().process(supplier);
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic =
+                driver.createInputTopic(topic, new StringSerializer(), new 
StringSerializer());
+            inputTopic.pipeInput("A", "1", 10L);
+            inputTopic.pipeInput("A", "2", 15L);
+            inputTopic.pipeInput("A", "3", 20L);
+            inputTopic.pipeInput("A", "4", 22L);
+            inputTopic.pipeInput("A", "5", 30L);
+        }
+
+        final Map<Long, ValueAndTimestamp<String>> actual = new HashMap<>();
+        for (final KeyValueTimestamp<Object, Object> entry : 
supplier.theCapturedProcessor().processed) {
+            final Windowed<String> window = (Windowed<String>) entry.key();
+            final Long start = window.window().start();
+            final ValueAndTimestamp valueAndTimestamp = 
ValueAndTimestamp.make((String) entry.value(), entry.timestamp());
+            if (actual.putIfAbsent(start, valueAndTimestamp) != null) {
+                actual.replace(start, valueAndTimestamp);
+            }
+        }
+
+        final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>();
+        expected.put(0L, ValueAndTimestamp.make("0+1", 10L));
+        expected.put(5L, ValueAndTimestamp.make("0+1+2", 15L));
+        expected.put(10L, ValueAndTimestamp.make("0+1+2+3", 20L));
+        expected.put(11L, ValueAndTimestamp.make("0+2+3", 20L));
+        expected.put(12L, ValueAndTimestamp.make("0+2+3+4", 22L));
+        expected.put(16L, ValueAndTimestamp.make("0+3+4", 22L));
+        expected.put(20L, ValueAndTimestamp.make("0+3+4+5", 30L));
+        expected.put(21L, ValueAndTimestamp.make("0+4+5", 30L));
+        expected.put(23L, ValueAndTimestamp.make("0+5", 30L));
+
+        assertEquals(expected, actual);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testReduceSmallInput() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+
+        final KTable<Windowed<String>, String> table = builder
+            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+            
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(50)))
+            .reduce(
+                MockReducer.STRING_ADDER,
+                Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
+            );
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new 
MockProcessorSupplier<>();
+        table.toStream().process(supplier);
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic =
+                driver.createInputTopic(topic, new StringSerializer(), new 
StringSerializer());
+            inputTopic.pipeInput("A", "1", 10L);
+            inputTopic.pipeInput("A", "2", 14L);
+            inputTopic.pipeInput("A", "3", 15L);
+            inputTopic.pipeInput("A", "4", 22L);
+            inputTopic.pipeInput("A", "5", 26L);
+            inputTopic.pipeInput("A", "6", 30L);
+        }
+
+        final Map<Long, ValueAndTimestamp<String>> actual = new HashMap<>();
+        for (final KeyValueTimestamp<Object, Object> entry : 
supplier.theCapturedProcessor().processed) {
+            final Windowed<String> window = (Windowed<String>) entry.key();
+            final Long start = window.window().start();
+            final ValueAndTimestamp valueAndTimestamp = 
ValueAndTimestamp.make((String) entry.value(), entry.timestamp());
+            if (actual.putIfAbsent(start, valueAndTimestamp) != null) {
+                actual.replace(start, valueAndTimestamp);
+            }
+        }
+
+        final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>();
+        expected.put(0L, ValueAndTimestamp.make("1", 10L));
+        expected.put(4L, ValueAndTimestamp.make("1+2", 14L));
+        expected.put(5L, ValueAndTimestamp.make("1+2+3", 15L));
+        expected.put(11L, ValueAndTimestamp.make("2+3", 15L));
+        expected.put(12L, ValueAndTimestamp.make("2+3+4", 22L));
+        expected.put(15L, ValueAndTimestamp.make("3+4", 22L));
+        expected.put(16L, ValueAndTimestamp.make("4+5", 26L));
+        expected.put(20L, ValueAndTimestamp.make("4+5+6", 30L));
+        expected.put(23L, ValueAndTimestamp.make("5+6", 30L));
+        expected.put(27L, ValueAndTimestamp.make("6", 30L));
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testAggregateLargeInput() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTable<Windowed<String>, String> table2 = builder
+                .stream(topic1, Consumed.with(Serdes.String(), 
Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(50)))
+                .aggregate(
+                        MockInitializer.STRING_INIT,
+                        MockAggregator.TOSTRING_ADDER,
+                        Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String())
+                );
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new 
MockProcessorSupplier<>();
+        table2.toStream().process(supplier);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic1 =
+                    driver.createInputTopic(topic1, new StringSerializer(), 
new StringSerializer());
+            inputTopic1.pipeInput("A", "1", 10L);
+            inputTopic1.pipeInput("A", "2", 20L);
+            inputTopic1.pipeInput("A", "3", 22L);
+            inputTopic1.pipeInput("A", "4", 15L);
+
+            inputTopic1.pipeInput("B", "1", 12L);
+            inputTopic1.pipeInput("B", "2", 13L);
+            inputTopic1.pipeInput("B", "3", 18L);
+            inputTopic1.pipeInput("B", "4", 19L);
+            inputTopic1.pipeInput("B", "5", 25L);
+            inputTopic1.pipeInput("B", "6", 14L);
+
+            inputTopic1.pipeInput("C", "1", 11L);
+            inputTopic1.pipeInput("C", "2", 15L);
+            inputTopic1.pipeInput("C", "3", 16L);
+            inputTopic1.pipeInput("C", "4", 21);
+            inputTopic1.pipeInput("C", "5", 23L);
+
+            inputTopic1.pipeInput("D", "4", 11L);
+            inputTopic1.pipeInput("D", "2", 12L);
+            inputTopic1.pipeInput("D", "3", 29L);
+            inputTopic1.pipeInput("D", "5", 16L);
+        }
+
+        assertEquals(
+                asList(
+                        // FINAL WINDOW: A@10 left window created when A@10 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(0, 10)), "0+1", 10),
+                        // A@10 right window created when A@20 processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(11, 21)), "0+2", 20),
+                        // A@20 left window created when A@20 processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(10, 20)), "0+1+2", 20),
+                        // FINAL WINDOW: A@20 right window created when A@22 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(21, 31)), "0+3", 22),
+                        // A@22 left window created when A@22 processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(12, 22)), "0+2+3", 22),
+                        // FINAL WINDOW: A@20 left window updated when A@15 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(10, 20)), "0+1+2+4", 20),
+                        // FINAL WINDOW: A@10 right window updated when A@15 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(11, 21)), "0+2+4", 20),
+                        // FINAL WINDOW: A@22 left window updated when A@15 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(12, 22)), "0+2+3+4", 22),
+                        // FINAL WINDOW: A@15 left window created when A@15 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(5, 15)), "0+1+4", 15),
+                        // FINAL WINDOW: A@15 right window created when A@15 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(16, 26)), "0+2+3", 22),
+
+                        // FINAL WINDOW: B@12 left window created when B@12 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(2, 12)), "0+1", 12),
+                        // B@12 right window created when B@13 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(13, 23)), "0+2", 13),
+                        // FINAL WINDOW: B@13 left window created when B@13 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(3, 13)), "0+1+2", 13),
+                        // B@12 right window updated when B@18 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(13, 23)), "0+2+3", 18),
+                        // B@13 right window created when B@18 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(14, 24)), "0+3", 18),
+                        // B@18 left window created when B@18 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(8, 18)), "0+1+2+3", 18),
+                        // B@12 right window updated when B@19 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(13, 23)), "0+2+3+4", 19),
+                        // B@13 right window updated when B@19 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(14, 24)), "0+3+4", 19),
+                        // B@18 right window created when B@19 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(19, 29)), "0+4", 19),
+                        // B@19 left window created when B@19 processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(9, 19)), "0+1+2+3+4", 19),
+                        // FINAL WINDOW: B@18 right window updated when B@25 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(19, 29)), "0+4+5", 25),
+                        // FINAL WINDOW: B@19 right window updated when B@25 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(20, 30)), "0+5", 25),
+                        // FINAL WINDOW: B@25 left window created when B@25 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(15, 25)), "0+3+4+5", 25),
+                        // FINAL WINDOW: B@18 left window updated when B@14 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(8, 18)), "0+1+2+3+6", 18),
+                        // FINAL WINDOW: B@19 left window updated when B@14 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(9, 19)), "0+1+2+3+4+6", 19),
+                        // FINAL WINDOW: B@12 right window updated when B@14 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(13, 23)), "0+2+3+4+6", 19),
+                        // FINAL WINDOW: B@13 right window updated when B@14 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(14, 24)), "0+3+4+6", 19),
+                        // FINAL WINDOW: B@14 left window created when B@14 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(4, 14)), "0+1+2+6", 14),
+
+                        // FINAL WINDOW: C@11 left window created when C@11 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(1, 11)), "0+1", 11),
+                        // C@11 right window created when C@15 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(12, 22)), "0+2", 15),
+                        // FINAL WINDOW: C@15 left window created when C@15 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(5, 15)), "0+1+2", 15),
+                        // C@11 right window updated when C@16 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(12, 22)), "0+2+3", 16),
+                        // C@15 right window created when C@16 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(16, 26)), "0+3", 16),
+                        // FINAL WINDOW: C@16 left window created when C@16 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(6, 16)), "0+1+2+3", 16),
+                        // FINAL WINDOW: C@11 right window updated when C@21 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(12, 22)), "0+2+3+4", 21),
+                        // C@15 right window updated when C@21 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(16, 26)), "0+3+4", 21),
+                        // C@16 right window created when C@21 processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(17, 27)), "0+4", 21),
+                        // FINAL WINDOW: C@21 left window created when C@21 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(11, 21)), "0+1+2+3+4", 21),
+                        // FINAL WINDOW: C@15 right window updated when C@23 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(16, 26)), "0+3+4+5", 23),
+                        // FINAL WINDOW: C@16 right window updated when C@23 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(17, 27)), "0+4+5", 23),
+                        // FINAL WINDOW: C@21 right window created when C@23 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(22, 32)), "0+5", 23),
+                        // FINAL WINDOW: C@23 left window created when C@23 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(13, 23)), "0+2+3+4+5", 23),
+
+                        // FINAL WINDOW: D@11 left window created when D@11 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(1, 11)), "0+4", 11),
+                        // D@11 right window created when D@12 processed
+                        new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(12, 22)), "0+2", 12),
+                        // FINAL WINDOW: D@12 left window created when D@12 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(2, 12)), "0+4+2", 12),
+                        // FINAL WINDOW: D@29 left window created when D@29 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(19, 29)), "0+3", 29),
+                        // FINAL WINDOW: D@11 right window updated when D@16 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(12, 22)), "0+2+5", 16),
+                        // FINAL WINDOW: D@12 right window created when D@16 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(13, 23)), "0+5", 16),
+                        // FINAL WINDOW: D@16 left window created when D@16 
processed
+                        new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(6, 16)), "0+4+2+5", 16)
+                        ),
+                supplier.theCapturedProcessor().processed
+        );
+    }
+
+    @Test
+    public void testJoin() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+        final String topic2 = "topic2";
+
+        final KTable<Windowed<String>, String> table1 = builder
+                .stream(topic1, Consumed.with(Serdes.String(), 
Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(100)))
+                .aggregate(
+                        MockInitializer.STRING_INIT,
+                        MockAggregator.TOSTRING_ADDER,
+                        Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String())
+                );
+
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new 
MockProcessorSupplier<>();
+        table1.toStream().process(supplier);
+
+        final KTable<Windowed<String>, String> table2 = builder
+                .stream(topic2, Consumed.with(Serdes.String(), 
Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(100)))
+                .aggregate(
+                        MockInitializer.STRING_INIT,
+                        MockAggregator.TOSTRING_ADDER,
+                        Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic2-Canonized").withValueSerde(Serdes.String())
+                );
+        table2.toStream().process(supplier);
+
+        table1.join(table2, (p1, p2) -> p1 + "%" + 
p2).toStream().process(supplier);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic1 =
+                    driver.createInputTopic(topic1, new StringSerializer(), 
new StringSerializer());
+            final TestInputTopic<String, String> inputTopic2 =
+                    driver.createInputTopic(topic2, new StringSerializer(), 
new StringSerializer());
+            inputTopic1.pipeInput("A", "1", 10L);
+            inputTopic1.pipeInput("B", "2", 11L);
+            inputTopic1.pipeInput("C", "3", 12L);
+
+            final List<MockProcessor<Windowed<String>, String>> processors = 
supplier.capturedProcessors(3);
+
+            processors.get(0).checkAndClearProcessResult(
+                    // left windows created by the first set of records to 
table 1
+                    new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(0, 10)),  "0+1",  10),
+                    new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(1, 11)),  "0+2",  11),
+                    new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(2, 12)),  "0+3",  12)
+            );
+            processors.get(1).checkAndClearProcessResult();
+            processors.get(2).checkAndClearProcessResult();
+
+            inputTopic1.pipeInput("A", "1", 15L);
+            inputTopic1.pipeInput("B", "2", 16L);
+            inputTopic1.pipeInput("C", "3", 19L);
+
+            processors.get(0).checkAndClearProcessResult(
+                    // right windows from previous records are created, and 
left windows from new records to table 1
+                    new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(11, 21)),  "0+1",  15),
+                    new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(5, 15)),  "0+1+1",  15),
+                    new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(12, 22)),  "0+2",  16),
+                    new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(6, 16)),  "0+2+2",  16),
+                    new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(13, 23)),  "0+3",  19),
+                    new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(9, 19)),  "0+3+3",  19)
+            );
+            processors.get(1).checkAndClearProcessResult();
+            processors.get(2).checkAndClearProcessResult();
+
+            inputTopic2.pipeInput("A", "a", 10L);
+            inputTopic2.pipeInput("B", "b", 30L);
+            inputTopic2.pipeInput("C", "c", 12L);
+            inputTopic2.pipeInput("C", "c", 35L);
+
+
+            processors.get(0).checkAndClearProcessResult();
+            processors.get(1).checkAndClearProcessResult(
+                    // left windows from first set of records sent to table 2
+                    new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(0, 10)),  "0+a",  10),
+                    new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(20, 30)),  "0+b",  30),
+                    new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(2, 12)),  "0+c",  12),
+                    new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(25, 35)),  "0+c",  35)
+            );
+            processors.get(2).checkAndClearProcessResult(
+                    // set of join windows from windows created by table 1 and 
table 2
+                    new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(0, 10)),  "0+1%0+a",  10),
+                    new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(2, 12)),  "0+3%0+c",  12)
+            );
+
+            inputTopic2.pipeInput("A", "a", 15L);
+            inputTopic2.pipeInput("B", "b", 16L);
+            inputTopic2.pipeInput("C", "c", 17L);
+
+            processors.get(0).checkAndClearProcessResult();
+            processors.get(1).checkAndClearProcessResult(
+                    // right windows from previous records are created (where 
applicable), and left windows from new records to table 2
+                    new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(11, 21)),  "0+a",  15),
+                    new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(5, 15)),  "0+a+a",  15),
+                    new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(6, 16)),  "0+b",  16),
+                    new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(13, 23)),  "0+c",  17),
+                    new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(7, 17)),  "0+c+c",  17)
+            );
+            processors.get(2).checkAndClearProcessResult(
+                    // set of join windows from windows created by table 1 and 
table 2
+                    new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(11, 21)),  "0+1%0+a",  15),
+                    new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(5, 15)),  "0+1+1%0+a+a",  15),
+                    new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(6, 16)),  "0+2+2%0+b",  16),
+                    new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(13, 23)),  "0+3%0+c",  19)
+            );
+        }
+    }
+
+    @Test
+    public void shouldLogAndMeterWhenSkippingNullKey() {
+        final String builtInMetricsVersion = StreamsConfig.METRICS_LATEST;
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+        builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(100)))
+                .aggregate(MockInitializer.STRING_INIT, 
MockAggregator.toStringInstance("+"), Materialized.<String, String, 
WindowStore<Bytes, 
byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()));
+
+        props.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, 
builtInMetricsVersion);
+
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(KStreamSlidingWindowAggregate.class);
+             final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic =
+                    driver.createInputTopic(topic, new StringSerializer(), new 
StringSerializer());
+            inputTopic.pipeInput(null, "1");
+            assertThat(appender.getMessages(), hasItem("Skipping record due to 
null key or value. value=[1] topic=[topic] partition=[0] offset=[0]"));
+        }
+    }
+
+    @Test
+    public void shouldLogAndMeterWhenSkippingExpiredWindowByGrace() {
+        final String builtInMetricsVersion = StreamsConfig.METRICS_LATEST;
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+
+        final KStream<String, String> stream1 = builder.stream(topic, 
Consumed.with(Serdes.String(), Serdes.String()));
+        stream1.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(90L)))
+                .aggregate(
+                        () -> "",
+                        MockAggregator.toStringInstance("+"),
+                        Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled()
+                )
+                .toStream()
+                .map((key, value) -> new KeyValue<>(key.toString(), value))
+                .to("output");
+
+        props.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, 
builtInMetricsVersion);
+
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(KStreamSlidingWindowAggregate.class);
+             final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+
+            final TestInputTopic<String, String> inputTopic =
+                    driver.createInputTopic(topic, new StringSerializer(), new 
StringSerializer());
+            inputTopic.pipeInput("k", "100", 200L);
+            inputTopic.pipeInput("k", "0", 100L);
+            inputTopic.pipeInput("k", "1", 101L);
+            inputTopic.pipeInput("k", "2", 102L);
+            inputTopic.pipeInput("k", "3", 103L);
+            inputTopic.pipeInput("k", "4", 104L);
+            inputTopic.pipeInput("k", "5", 105L);
+            inputTopic.pipeInput("k", "6", 15L);
+
+            assertLatenessMetrics(driver, is(7.0), is(185.0), is(96.25));
+
+            assertThat(appender.getMessages(), hasItems(
+                    // left window for k@100
+                    "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[1] timestamp=[100] window=[90,100] expiration=[110] 
streamTime=[200]",
+                    // left window for k@101
+                    "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[2] timestamp=[101] window=[91,101] expiration=[110] 
streamTime=[200]",
+                    // left window for k@102
+                    "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[3] timestamp=[102] window=[92,102] expiration=[110] 
streamTime=[200]",
+                    // left window for k@103
+                    "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[4] timestamp=[103] window=[93,103] expiration=[110] 
streamTime=[200]",
+                    // left window for k@104
+                    "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[5] timestamp=[104] window=[94,104] expiration=[110] 
streamTime=[200]",
+                    // left window for k@105
+                    "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[6] timestamp=[105] window=[95,105] expiration=[110] 
streamTime=[200]",
+                    // left window for k@15
+                    "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[7] timestamp=[15] window=[5,15] expiration=[110] 
streamTime=[200]"
+            ));
+            final TestOutputTopic<String, String> outputTopic =
+                    driver.createOutputTopic("output", new 
StringDeserializer(), new StringDeserializer());
+            assertThat(outputTopic.readRecord(), equalTo(new 
TestRecord<>("[k@190/200]", "+100", null, 200L)));
+            assertTrue(outputTopic.isEmpty());
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testAggregateRandomInput() {

Review comment:
       Awesome test. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to