vvcephei commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r478588602
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.internals.ApiUtils; +import org.apache.kafka.streams.processor.TimestampExtractor; +import java.time.Duration; +import java.util.Objects; +import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; + +/** + * A sliding window used for aggregating events. + * <p> + * Sliding Windows are defined based on a record's timestamp, the window size based on the given maximum time difference (inclusive) between + * records in the same window, and the given window grace period. While the window is sliding over the input data stream, a new window is + * created each time a record enters the sliding window or a record drops out of the sliding window. + * <p> + * Records that come after set grace period will be ignored, i.e., a window is closed when + * {@code stream-time > window-end + grace-period}. + * <p> + * For example, if we have a time difference of 5000ms and the following data arrives: + * <pre> + * +--------------------------------------+ + * | key | value | time | + * +-----------+-------------+------------+ + * | A | 1 | 8000 | + * +-----------+-------------+------------+ + * | A | 2 | 9200 | + * +-----------+-------------+------------+ + * | A | 3 | 12400 | + * +-----------+-------------+------------+ + * </pre> + * We'd have the following 5 windows: + * <ul> + * <li>window {@code [3000;8000]} contains [1] (created when first record enters the window)</li> + * <li>window {@code [4200;9200]} contains [1,2] (created when second record enters the window)</li> + * <li>window {@code [7400;124000]} contains [1,2,3] (created when third record enters the window)</li> + * <li>window {@code [8001;130001]} contains [2,3] (created when the first record drops out of the window)</li> + * <li>window {@code [9201;142001]} contains [3] (created when the second record drops out of the window)</li> Review comment: ```suggestion * <li>window {@code [7400;12400]} contains [1,2,3] (created when third record enters the window)</li> * <li>window {@code [8001;13001]} contains [2,3] (created when the first record drops out of the window)</li> * <li>window {@code [9201;14201]} contains [3] (created when the second record drops out of the window)</li> ``` ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.internals.ApiUtils; +import org.apache.kafka.streams.processor.TimestampExtractor; +import java.time.Duration; +import java.util.Objects; +import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; + +/** + * A sliding window used for aggregating events. + * <p> + * Sliding Windows are defined based on a record's timestamp, the window size based on the given maximum time difference (inclusive) between + * records in the same window, and the given window grace period. While the window is sliding over the input data stream, a new window is + * created each time a record enters the sliding window or a record drops out of the sliding window. + * <p> + * Records that come after set grace period will be ignored, i.e., a window is closed when + * {@code stream-time > window-end + grace-period}. + * <p> + * For example, if we have a time difference of 5000ms and the following data arrives: + * <pre> + * +--------------------------------------+ + * | key | value | time | + * +-----------+-------------+------------+ + * | A | 1 | 8000 | + * +-----------+-------------+------------+ + * | A | 2 | 9200 | + * +-----------+-------------+------------+ + * | A | 3 | 12400 | + * +-----------+-------------+------------+ + * </pre> + * We'd have the following 5 windows: + * <ul> + * <li>window {@code [3000;8000]} contains [1] (created when first record enters the window)</li> + * <li>window {@code [4200;9200]} contains [1,2] (created when second record enters the window)</li> + * <li>window {@code [7400;124000]} contains [1,2,3] (created when third record enters the window)</li> + * <li>window {@code [8001;130001]} contains [2,3] (created when the first record drops out of the window)</li> + * <li>window {@code [9201;142001]} contains [3] (created when the second record drops out of the window)</li> + * </ul> + *<p> + * Note that while SlidingWindows are of a fixed size, as are {@link TimeWindows}, the start and end points of the window + * depend on when events occur in the stream (i.e., event timestamps), similar to {@link SessionWindows}. + * <p> + * For time semantics, see {@link TimestampExtractor}. + * + * @see TimeWindows + * @see SessionWindows + * @see UnlimitedWindows + * @see JoinWindows + * @see KGroupedStream#windowedBy(SlidingWindows) + * @see CogroupedKStream#windowedBy(SlidingWindows) + * @see TimestampExtractor + */ + +public final class SlidingWindows { + + /** The size of the windows in milliseconds, defined by the max time difference between records. */ + private final long timeDifferenceMs; + + /** The grace period in milliseconds. */ + private final long graceMs; + + private SlidingWindows(final long timeDifferenceMs, final long graceMs) { + this.timeDifferenceMs = timeDifferenceMs; + this.graceMs = graceMs; + } + + /** + * Return a window definition with the window size based on the given maximum time difference (inclusive) between + * records in the same window and given window grace period. Reject out-of-order events that arrive after {@code grace}. + * A window is closed when {@code stream-time > window-end + grace-period}. + * + * @param timeDifference the max time difference (inclusive) between two records in a window + * @param grace the grace period to admit out-of-order events to a window + * @return a new window definition + * @throws IllegalArgumentException if the specified window size is < 0 or grace < 0, or either can't be represented as {@code long milliseconds} + */ + public static SlidingWindows withTimeDifferenceAndGrace(final Duration timeDifference, final Duration grace) throws IllegalArgumentException { + final String msgPrefixSize = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference"); + final long timeDifferenceMs = ApiUtils.validateMillisecondDuration(timeDifference, msgPrefixSize); + if (timeDifferenceMs < 0) { + throw new IllegalArgumentException("Window time difference must not be negative."); + } + final String msgPrefixGrace = prepareMillisCheckFailMsgPrefix(grace, "afterWindowEnd"); Review comment: ```suggestion final String msgPrefixGrace = prepareMillisCheckFailMsgPrefix(grace, "grace"); ``` ########## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java ########## @@ -459,6 +460,89 @@ public void shouldSupportFinalResultsForTimeWindowsWithLargeJump() { } } + @Test + public void shouldSupportFinalResultsForSlidingWindows() { Review comment: Aside from join, forgetting to test new operators in front of Suppress has also been an issue. It's great to see this test here! ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final String storeName; + private final SlidingWindows windows; + private final Initializer<Agg> initializer; + private final Aggregator<? super K, ? super V, Agg> aggregator; + + private boolean sendOldValues = false; + + public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer<Agg> initializer, + final Aggregator<? super K, ? super V, Agg> aggregator) { + this.windows = windows; + this.storeName = storeName; + this.initializer = initializer; + this.aggregator = aggregator; + } + + @Override + public Processor<K, V> get() { + return new KStreamSlidingWindowAggregateProcessor(); + } + + public SlidingWindows windows() { + return windows; + } + + @Override + public void enableSendingOldValues() { + sendOldValues = true; + } + + private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor<K, V> { + private TimestampedWindowStore<K, Agg> windowStore; + private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder; + private StreamsMetricsImpl metrics; + private InternalProcessorContext internalProcessorContext; + private Sensor lateRecordDropSensor; + private Sensor droppedRecordsSensor; + private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; + + @SuppressWarnings("unchecked") + @Override + public void init(final ProcessorContext context) { + super.init(context); + internalProcessorContext = (InternalProcessorContext) context; + metrics = internalProcessorContext.metrics(); + final String threadId = Thread.currentThread().getName(); + lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( + threadId, + context.taskId().toString(), + internalProcessorContext.currentNode().name(), + metrics + ); + droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); + windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName); + tupleForwarder = new TimestampedTupleForwarder<>( + windowStore, + context, + new TimestampedCacheFlushListener<>(context), + sendOldValues); + } + + @Override + public void process(final K key, final V value) { + if (key == null || value == null) { + log.warn( + "Skipping record due to null key or value. value=[{}] topic=[{}] partition=[{}] offset=[{}]", + value, context().topic(), context().partition(), context().offset() + ); + droppedRecordsSensor.record(); + return; + } + + final long timestamp = context().timestamp(); + //don't process records that don't fall within a full sliding window + if (timestamp < windows.timeDifferenceMs()) { + log.warn( + "Skipping record due to early arrival. value=[{}] topic=[{}] partition=[{}] offset=[{}]", + value, context().topic(), context().partition(), context().offset() + ); + droppedRecordsSensor.record(); + return; + } + processInOrder(key, value, timestamp); + } + + public void processInOrder(final K key, final V value, final long timestamp) { + + observedStreamTime = Math.max(observedStreamTime, timestamp); + final long closeTime = observedStreamTime - windows.gracePeriodMs(); + + //store start times of windows we find + final Set<Long> windowStartTimes = new HashSet<>(); + + // aggregate that will go in the current record’s left/right window (if needed) + ValueAndTimestamp<Agg> leftWinAgg = null; + ValueAndTimestamp<Agg> rightWinAgg = null; + + //if current record's left/right windows already exist + boolean leftWinAlreadyCreated = false; + boolean rightWinAlreadyCreated = false; + + // keep the left type window closest to the record + Window latestLeftTypeWindow = null; + try ( + final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch( + key, + key, + timestamp - 2 * windows.timeDifferenceMs(), + // to catch the current record's right window, if it exists, without more calls to the store + timestamp + 1) + ) { + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next; + while (iterator.hasNext()) { + next = iterator.next(); + windowStartTimes.add(next.key.window().start()); + final long startTime = next.key.window().start(); + final long endTime = startTime + windows.timeDifferenceMs(); + + if (endTime < timestamp) { + leftWinAgg = next.value; + if (isLeftWindow(next)) { + latestLeftTypeWindow = next.key.window(); + } + } else if (endTime == timestamp) { + leftWinAlreadyCreated = true; + putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); + } else if (endTime > timestamp && startTime <= timestamp) { + rightWinAgg = next.value; + putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); + } else { + rightWinAlreadyCreated = true; + } + } + } + + //create right window for previous record + if (latestLeftTypeWindow != null) { + final long rightWinStart = latestLeftTypeWindow.end() + 1; + if (!windowStartTimes.contains(rightWinStart)) { + final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs()); + final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); + putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + } Review comment: Is it already guaranteed that this window actually contains the current record? It doesn't look like we're checking that `endTime >= timestamp` anywhere, and it seems like the start of the range (`timestamp - 2 * windows.timeDifferenceMs()`) could give back a window that starts and ends before the current record's timestamp. ########## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ########## @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.test.TestRecord; +import org.apache.kafka.test.MockAggregator; +import org.apache.kafka.test.MockInitializer; +import org.apache.kafka.test.MockProcessor; +import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.StreamsTestUtils; +import org.hamcrest.Matcher; +import org.junit.Test; + +import java.util.List; +import java.util.Properties; + +import static java.time.Duration.ofMillis; +import static java.util.Arrays.asList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.CoreMatchers.hasItems; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class KStreamSlidingWindowAggregateTest { + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); + private final String threadId = Thread.currentThread().getName(); + + @Test + public void testAggBasic() { + final StreamsBuilder builder = new StreamsBuilder(); + final String topic1 = "topic1"; + + final KTable<Windowed<String>, String> table2 = builder + .stream(topic1, Consumed.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50))) + .aggregate( + MockInitializer.STRING_INIT, + MockAggregator.TOSTRING_ADDER, + Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String()) + ); + final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>(); + table2.toStream().process(supplier); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + final TestInputTopic<String, String> inputTopic1 = + driver.createInputTopic(topic1, new StringSerializer(), new StringSerializer()); + inputTopic1.pipeInput("A", "1", 10L); + inputTopic1.pipeInput("A", "1", 20L); + inputTopic1.pipeInput("A", "1", 22L); + inputTopic1.pipeInput("A", "3", 15L); + + inputTopic1.pipeInput("B", "2", 12L); + inputTopic1.pipeInput("B", "2", 13L); + inputTopic1.pipeInput("B", "2", 18L); + inputTopic1.pipeInput("B", "1", 19L); + inputTopic1.pipeInput("B", "2", 25L); + inputTopic1.pipeInput("B", "3", 14L); + + inputTopic1.pipeInput("C", "3", 11L); + inputTopic1.pipeInput("C", "4", 15L); + inputTopic1.pipeInput("C", "1", 16L); + inputTopic1.pipeInput("C", "1", 21); + inputTopic1.pipeInput("C", "1", 23L); + + inputTopic1.pipeInput("D", "4", 11L); + inputTopic1.pipeInput("D", "2", 12L); + inputTopic1.pipeInput("D", "3", 29L); + inputTopic1.pipeInput("D", "5", 16L); + } + + assertEquals( + asList( + // A@10 left window created when A@10 processed + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), "0+1", 10), + // A@10 right window created when A@20 processed + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(11, 21)), "0+1", 20), + // A@20 left window created when A@20 processed + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10, 20)), "0+1+1", 20), + // A@20 right window created when A@22 processed + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(21, 31)), "0+1", 22), + // A@22 left window created when A@22 processed + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(12, 22)), "0+1+1", 22), + // A@20 left window updated when A@15 processed + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10, 20)), "0+1+1+3", 20), + // A@10 right window updated when A@15 processed + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(11, 21)), "0+1+3", 20), + // A@22 left window updated when A@15 processed + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(12, 22)), "0+1+1+3", 22), + // A@15 left window created when A@15 processed + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+1+3", 15), + // A@15 right window created when A@15 processed + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(16, 26)), "0+1+1", 22), + + // B@12 left window created when B@12 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(2, 12)), "0+2", 12), + // B@12 right window created when B@13 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2", 13), + // B@13 left window created when B@13 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(3, 13)), "0+2+2", 13), + // B@12 right window updated when B@18 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2+2", 18), + // B@13 right window created when B@18 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(14, 24)), "0+2", 18), + // B@18 left window created when B@18 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(8, 18)), "0+2+2+2", 18), + // B@12 right window updated when B@19 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2+2+1", 19), + // B@13 right window updated when B@19 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(14, 24)), "0+2+1", 19), + // B@18 right window created when B@19 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(19, 29)), "0+1", 19), + // B@19 left window created when B@19 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(9, 19)), "0+2+2+2+1", 19), + // B@18 right window updated when B@25 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(19, 29)), "0+1+2", 25), + // B@19 right window updated when B@25 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(20, 30)), "0+2", 25), + // B@25 left window created when B@25 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(15, 25)), "0+2+1+2", 25), + // B@18 left window updated when B@14 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(8, 18)), "0+2+2+2+3", 18), + // B@19 left window updated when B@14 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(9, 19)), "0+2+2+2+1+3", 19), + // B@12 right window updated when B@14 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2+2+1+3", 19), + // B@13 right window updated when B@14 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(14, 24)), "0+2+1+3", 19), + // B@14 left window created when B@14 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(4, 14)), "0+2+2+3", 14), + + // C@11 left window created when C@11 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(1, 11)), "0+3", 11), + // C@11 right window created when C@15 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(12, 22)), "0+4", 15), + // C@15 left window created when C@15 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(5, 15)), "0+3+4", 15), + // C@11 right window updated when C@16 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(12, 22)), "0+4+1", 16), + // C@15 right window created when C@16 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(16, 26)), "0+1", 16), + // C@16 left window created when C@16 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(6, 16)), "0+3+4+1", 16), + // C@11 right window updated when C@21 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(12, 22)), "0+4+1+1", 21), + // C@15 right window updated when C@21 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(16, 26)), "0+1+1", 21), + // C@16 right window created when C@21 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(17, 27)), "0+1", 21), + // C@21 left window created when C@21 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(11, 21)), "0+3+4+1+1", 21), + // C@15 right window updated when C@23 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(16, 26)), "0+1+1+1", 23), + // C@16 right window updated when C@23 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(17, 27)), "0+1+1", 23), + // C@21 right window created when C@23 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(22, 32)), "0+1", 23), + // C@23 left window created when C@23 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(13, 23)), "0+4+1+1+1", 23), + + // D@11 left window created when D@11 processed + new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(1, 11)), "0+4", 11), + // D@11 right window created when D@12 processed + new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(12, 22)), "0+2", 12), + // D@12 left window created when D@12 processed + new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(2, 12)), "0+4+2", 12), + // D@29 left window created when D@29 processed + new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(19, 29)), "0+3", 29), + // D@11 right window updated when D@16 processed + new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(12, 22)), "0+2+5", 16), + // D@12 right window created when D@16 processed + new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(13, 23)), "0+5", 16), + // D@16 left window created when D@16 processed + new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(6, 16)), "0+4+2+5", 16) + ), + supplier.theCapturedProcessor().processed + ); + } + + @Test + public void testJoin() { Review comment: The Achilles heel of implementing new KTable features has historically been that we forgot to test them in a context that required the ValueGetter to work properly, of which Join is a notable use case. I'd actually say it should be required for every KTable operator to have a test where it's the source of a Join. For stateless operators, we should test both with and without a Materialized argument on the operator. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java ########## @@ -275,6 +275,15 @@ */ <W extends Window> TimeWindowedCogroupedKStream<K, VOut> windowedBy(final Windows<W> windows); + /** + * Create a new {@link TimeWindowedCogroupedKStream} instance that can be used to perform sliding + * windowed aggregations. + * + * @param windows the specification of the aggregation {@link SlidingWindows} + * @return an instance of {@link TimeWindowedCogroupedKStream} + */ + TimeWindowedCogroupedKStream<K, VOut> windowedBy(final SlidingWindows windows); Review comment: Coming back to this after completing the review, I'd say the biggest advice I'd share is to avoid whitespace changes and cleanups on the side when the PR is so long already. In fact, for my own super-complex PRs, I tend to go back over the whole diff and back out anything that's not critically important, just to lighten the load on the reviewers. Cleanups are nice to have, but it's better to keep them in their own PRs or in more trivial ones. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final String storeName; + private final SlidingWindows windows; + private final Initializer<Agg> initializer; + private final Aggregator<? super K, ? super V, Agg> aggregator; + + private boolean sendOldValues = false; + + public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer<Agg> initializer, + final Aggregator<? super K, ? super V, Agg> aggregator) { + this.windows = windows; + this.storeName = storeName; + this.initializer = initializer; + this.aggregator = aggregator; + } + + @Override + public Processor<K, V> get() { + return new KStreamSlidingWindowAggregateProcessor(); + } + + public SlidingWindows windows() { + return windows; + } + + @Override + public void enableSendingOldValues() { + sendOldValues = true; + } + + private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor<K, V> { + private TimestampedWindowStore<K, Agg> windowStore; + private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder; + private StreamsMetricsImpl metrics; + private InternalProcessorContext internalProcessorContext; + private Sensor lateRecordDropSensor; + private Sensor droppedRecordsSensor; + private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; + + @SuppressWarnings("unchecked") + @Override + public void init(final ProcessorContext context) { + super.init(context); + internalProcessorContext = (InternalProcessorContext) context; + metrics = internalProcessorContext.metrics(); + final String threadId = Thread.currentThread().getName(); + lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( + threadId, + context.taskId().toString(), + internalProcessorContext.currentNode().name(), + metrics + ); + droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); + windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName); + tupleForwarder = new TimestampedTupleForwarder<>( + windowStore, + context, + new TimestampedCacheFlushListener<>(context), + sendOldValues); + } + + @Override + public void process(final K key, final V value) { + if (key == null || value == null) { + log.warn( + "Skipping record due to null key or value. value=[{}] topic=[{}] partition=[{}] offset=[{}]", + value, context().topic(), context().partition(), context().offset() + ); + droppedRecordsSensor.record(); + return; + } + + final long timestamp = context().timestamp(); + //don't process records that don't fall within a full sliding window + if (timestamp < windows.timeDifferenceMs()) { + log.warn( + "Skipping record due to early arrival. value=[{}] topic=[{}] partition=[{}] offset=[{}]", + value, context().topic(), context().partition(), context().offset() + ); + droppedRecordsSensor.record(); + return; + } + processInOrder(key, value, timestamp); + } + + public void processInOrder(final K key, final V value, final long timestamp) { + + observedStreamTime = Math.max(observedStreamTime, timestamp); + final long closeTime = observedStreamTime - windows.gracePeriodMs(); + + //store start times of windows we find + final Set<Long> windowStartTimes = new HashSet<>(); + + // aggregate that will go in the current record’s left/right window (if needed) + ValueAndTimestamp<Agg> leftWinAgg = null; + ValueAndTimestamp<Agg> rightWinAgg = null; + + //if current record's left/right windows already exist + boolean leftWinAlreadyCreated = false; + boolean rightWinAlreadyCreated = false; + + // keep the left type window closest to the record + Window latestLeftTypeWindow = null; + try ( + final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch( + key, + key, + timestamp - 2 * windows.timeDifferenceMs(), + // to catch the current record's right window, if it exists, without more calls to the store + timestamp + 1) + ) { + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next; + while (iterator.hasNext()) { + next = iterator.next(); + windowStartTimes.add(next.key.window().start()); + final long startTime = next.key.window().start(); + final long endTime = startTime + windows.timeDifferenceMs(); + + if (endTime < timestamp) { + leftWinAgg = next.value; + if (isLeftWindow(next)) { + latestLeftTypeWindow = next.key.window(); + } + } else if (endTime == timestamp) { + leftWinAlreadyCreated = true; + putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); + } else if (endTime > timestamp && startTime <= timestamp) { + rightWinAgg = next.value; + putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); + } else { + rightWinAlreadyCreated = true; + } + } + } + + //create right window for previous record + if (latestLeftTypeWindow != null) { + final long rightWinStart = latestLeftTypeWindow.end() + 1; + if (!windowStartTimes.contains(rightWinStart)) { + final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs()); + final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); + putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + } + } + + //create left window for new record + if (!leftWinAlreadyCreated) { + final ValueAndTimestamp<Agg> valueAndTime; + //there's a right window that the new record could create --> new record's left window is not empty + if (latestLeftTypeWindow != null) { + valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); + } else { + valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); + } + final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); + putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + } + //create right window for new record + if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { + final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); + final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); + putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + } + } + + private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<Agg> rightWinAgg, final long timestamp) { + return rightWinAgg != null && rightWinAgg.timestamp() > timestamp; + } + + private boolean isLeftWindow(final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> window) { + return window.key.window().end() == window.value.timestamp(); + } + + private void putAndForward(final Window window, + final ValueAndTimestamp<Agg> valueAndTime, + final K key, + final V value, + final long closeTime, + final long timestamp) { Review comment: Might not be a bad idea to have an assertion here that the timestamp is actually in the window boundaries. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final String storeName; + private final SlidingWindows windows; + private final Initializer<Agg> initializer; + private final Aggregator<? super K, ? super V, Agg> aggregator; + + private boolean sendOldValues = false; + + public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer<Agg> initializer, + final Aggregator<? super K, ? super V, Agg> aggregator) { + this.windows = windows; + this.storeName = storeName; + this.initializer = initializer; + this.aggregator = aggregator; + } + + @Override + public Processor<K, V> get() { + return new KStreamSlidingWindowAggregateProcessor(); + } + + public SlidingWindows windows() { + return windows; + } + + @Override + public void enableSendingOldValues() { + sendOldValues = true; + } + + private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor<K, V> { + private TimestampedWindowStore<K, Agg> windowStore; + private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder; + private StreamsMetricsImpl metrics; + private InternalProcessorContext internalProcessorContext; + private Sensor lateRecordDropSensor; + private Sensor droppedRecordsSensor; + private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; + + @SuppressWarnings("unchecked") + @Override + public void init(final ProcessorContext context) { + super.init(context); + internalProcessorContext = (InternalProcessorContext) context; + metrics = internalProcessorContext.metrics(); + final String threadId = Thread.currentThread().getName(); + lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( + threadId, + context.taskId().toString(), + internalProcessorContext.currentNode().name(), + metrics + ); + droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); + windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName); + tupleForwarder = new TimestampedTupleForwarder<>( + windowStore, + context, + new TimestampedCacheFlushListener<>(context), + sendOldValues); + } + + @Override + public void process(final K key, final V value) { + if (key == null || value == null) { + log.warn( + "Skipping record due to null key or value. value=[{}] topic=[{}] partition=[{}] offset=[{}]", + value, context().topic(), context().partition(), context().offset() + ); + droppedRecordsSensor.record(); + return; + } + + final long timestamp = context().timestamp(); + //don't process records that don't fall within a full sliding window + if (timestamp < windows.timeDifferenceMs()) { + log.warn( + "Skipping record due to early arrival. value=[{}] topic=[{}] partition=[{}] offset=[{}]", + value, context().topic(), context().partition(), context().offset() + ); + droppedRecordsSensor.record(); + return; + } + processInOrder(key, value, timestamp); + } + + public void processInOrder(final K key, final V value, final long timestamp) { + + observedStreamTime = Math.max(observedStreamTime, timestamp); + final long closeTime = observedStreamTime - windows.gracePeriodMs(); + + //store start times of windows we find + final Set<Long> windowStartTimes = new HashSet<>(); + + // aggregate that will go in the current record’s left/right window (if needed) + ValueAndTimestamp<Agg> leftWinAgg = null; + ValueAndTimestamp<Agg> rightWinAgg = null; + + //if current record's left/right windows already exist + boolean leftWinAlreadyCreated = false; + boolean rightWinAlreadyCreated = false; + + // keep the left type window closest to the record + Window latestLeftTypeWindow = null; + try ( + final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch( + key, + key, + timestamp - 2 * windows.timeDifferenceMs(), + // to catch the current record's right window, if it exists, without more calls to the store + timestamp + 1) + ) { + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next; Review comment: minor: this could be declared `final` at the assignment on line 161 ########## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ########## @@ -0,0 +1,692 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.test.TestRecord; +import org.apache.kafka.test.MockAggregator; +import org.apache.kafka.test.MockInitializer; +import org.apache.kafka.test.MockProcessor; +import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.MockReducer; +import org.apache.kafka.test.StreamsTestUtils; +import org.hamcrest.Matcher; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; + +import static java.time.Duration.ofMillis; +import static java.util.Arrays.asList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.CoreMatchers.hasItems; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class KStreamSlidingWindowAggregateTest { + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); + private final String threadId = Thread.currentThread().getName(); + + @SuppressWarnings("unchecked") + @Test + public void testAggregateSmallInput() { + final StreamsBuilder builder = new StreamsBuilder(); + final String topic = "topic"; + + final KTable<Windowed<String>, String> table = builder + .stream(topic, Consumed.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50))) + .aggregate( + MockInitializer.STRING_INIT, + MockAggregator.TOSTRING_ADDER, + Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String()) + ); + final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>(); + table.toStream().process(supplier); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + final TestInputTopic<String, String> inputTopic = + driver.createInputTopic(topic, new StringSerializer(), new StringSerializer()); + inputTopic.pipeInput("A", "1", 10L); + inputTopic.pipeInput("A", "2", 15L); + inputTopic.pipeInput("A", "3", 20L); + inputTopic.pipeInput("A", "4", 22L); + inputTopic.pipeInput("A", "5", 30L); + } + + final Map<Long, ValueAndTimestamp<String>> actual = new HashMap<>(); + for (final KeyValueTimestamp<Object, Object> entry : supplier.theCapturedProcessor().processed) { + final Windowed<String> window = (Windowed<String>) entry.key(); + final Long start = window.window().start(); + final ValueAndTimestamp valueAndTimestamp = ValueAndTimestamp.make((String) entry.value(), entry.timestamp()); + if (actual.putIfAbsent(start, valueAndTimestamp) != null) { + actual.replace(start, valueAndTimestamp); + } + } + + final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>(); + expected.put(0L, ValueAndTimestamp.make("0+1", 10L)); + expected.put(5L, ValueAndTimestamp.make("0+1+2", 15L)); + expected.put(10L, ValueAndTimestamp.make("0+1+2+3", 20L)); + expected.put(11L, ValueAndTimestamp.make("0+2+3", 20L)); + expected.put(12L, ValueAndTimestamp.make("0+2+3+4", 22L)); + expected.put(16L, ValueAndTimestamp.make("0+3+4", 22L)); + expected.put(20L, ValueAndTimestamp.make("0+3+4+5", 30L)); + expected.put(21L, ValueAndTimestamp.make("0+4+5", 30L)); + expected.put(23L, ValueAndTimestamp.make("0+5", 30L)); + + assertEquals(expected, actual); + } + + @SuppressWarnings("unchecked") + @Test + public void testReduceSmallInput() { + final StreamsBuilder builder = new StreamsBuilder(); + final String topic = "topic"; + + final KTable<Windowed<String>, String> table = builder + .stream(topic, Consumed.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50))) + .reduce( + MockReducer.STRING_ADDER, + Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String()) + ); + final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>(); + table.toStream().process(supplier); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + final TestInputTopic<String, String> inputTopic = + driver.createInputTopic(topic, new StringSerializer(), new StringSerializer()); + inputTopic.pipeInput("A", "1", 10L); + inputTopic.pipeInput("A", "2", 14L); + inputTopic.pipeInput("A", "3", 15L); + inputTopic.pipeInput("A", "4", 22L); + inputTopic.pipeInput("A", "5", 26L); + inputTopic.pipeInput("A", "6", 30L); + } + + final Map<Long, ValueAndTimestamp<String>> actual = new HashMap<>(); + for (final KeyValueTimestamp<Object, Object> entry : supplier.theCapturedProcessor().processed) { + final Windowed<String> window = (Windowed<String>) entry.key(); + final Long start = window.window().start(); + final ValueAndTimestamp valueAndTimestamp = ValueAndTimestamp.make((String) entry.value(), entry.timestamp()); + if (actual.putIfAbsent(start, valueAndTimestamp) != null) { + actual.replace(start, valueAndTimestamp); + } + } + + final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>(); + expected.put(0L, ValueAndTimestamp.make("1", 10L)); + expected.put(4L, ValueAndTimestamp.make("1+2", 14L)); + expected.put(5L, ValueAndTimestamp.make("1+2+3", 15L)); + expected.put(11L, ValueAndTimestamp.make("2+3", 15L)); + expected.put(12L, ValueAndTimestamp.make("2+3+4", 22L)); + expected.put(15L, ValueAndTimestamp.make("3+4", 22L)); + expected.put(16L, ValueAndTimestamp.make("4+5", 26L)); + expected.put(20L, ValueAndTimestamp.make("4+5+6", 30L)); + expected.put(23L, ValueAndTimestamp.make("5+6", 30L)); + expected.put(27L, ValueAndTimestamp.make("6", 30L)); + + assertEquals(expected, actual); + } + + @Test + public void testAggregateLargeInput() { + final StreamsBuilder builder = new StreamsBuilder(); + final String topic1 = "topic1"; + + final KTable<Windowed<String>, String> table2 = builder + .stream(topic1, Consumed.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50))) + .aggregate( + MockInitializer.STRING_INIT, + MockAggregator.TOSTRING_ADDER, + Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String()) + ); + final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>(); + table2.toStream().process(supplier); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + final TestInputTopic<String, String> inputTopic1 = + driver.createInputTopic(topic1, new StringSerializer(), new StringSerializer()); + inputTopic1.pipeInput("A", "1", 10L); + inputTopic1.pipeInput("A", "2", 20L); + inputTopic1.pipeInput("A", "3", 22L); + inputTopic1.pipeInput("A", "4", 15L); + + inputTopic1.pipeInput("B", "1", 12L); + inputTopic1.pipeInput("B", "2", 13L); + inputTopic1.pipeInput("B", "3", 18L); + inputTopic1.pipeInput("B", "4", 19L); + inputTopic1.pipeInput("B", "5", 25L); + inputTopic1.pipeInput("B", "6", 14L); + + inputTopic1.pipeInput("C", "1", 11L); + inputTopic1.pipeInput("C", "2", 15L); + inputTopic1.pipeInput("C", "3", 16L); + inputTopic1.pipeInput("C", "4", 21); + inputTopic1.pipeInput("C", "5", 23L); + + inputTopic1.pipeInput("D", "4", 11L); + inputTopic1.pipeInput("D", "2", 12L); + inputTopic1.pipeInput("D", "3", 29L); + inputTopic1.pipeInput("D", "5", 16L); + } + + assertEquals( + asList( + // FINAL WINDOW: A@10 left window created when A@10 processed + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), "0+1", 10), + // A@10 right window created when A@20 processed + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(11, 21)), "0+2", 20), + // A@20 left window created when A@20 processed + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10, 20)), "0+1+2", 20), + // FINAL WINDOW: A@20 right window created when A@22 processed + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(21, 31)), "0+3", 22), + // A@22 left window created when A@22 processed + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(12, 22)), "0+2+3", 22), + // FINAL WINDOW: A@20 left window updated when A@15 processed + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10, 20)), "0+1+2+4", 20), + // FINAL WINDOW: A@10 right window updated when A@15 processed + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(11, 21)), "0+2+4", 20), + // FINAL WINDOW: A@22 left window updated when A@15 processed + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(12, 22)), "0+2+3+4", 22), + // FINAL WINDOW: A@15 left window created when A@15 processed + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+1+4", 15), + // FINAL WINDOW: A@15 right window created when A@15 processed + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(16, 26)), "0+2+3", 22), + + // FINAL WINDOW: B@12 left window created when B@12 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(2, 12)), "0+1", 12), + // B@12 right window created when B@13 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2", 13), + // FINAL WINDOW: B@13 left window created when B@13 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(3, 13)), "0+1+2", 13), + // B@12 right window updated when B@18 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2+3", 18), + // B@13 right window created when B@18 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(14, 24)), "0+3", 18), + // B@18 left window created when B@18 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(8, 18)), "0+1+2+3", 18), + // B@12 right window updated when B@19 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2+3+4", 19), + // B@13 right window updated when B@19 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(14, 24)), "0+3+4", 19), + // B@18 right window created when B@19 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(19, 29)), "0+4", 19), + // B@19 left window created when B@19 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(9, 19)), "0+1+2+3+4", 19), + // FINAL WINDOW: B@18 right window updated when B@25 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(19, 29)), "0+4+5", 25), + // FINAL WINDOW: B@19 right window updated when B@25 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(20, 30)), "0+5", 25), + // FINAL WINDOW: B@25 left window created when B@25 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(15, 25)), "0+3+4+5", 25), + // FINAL WINDOW: B@18 left window updated when B@14 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(8, 18)), "0+1+2+3+6", 18), + // FINAL WINDOW: B@19 left window updated when B@14 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(9, 19)), "0+1+2+3+4+6", 19), + // FINAL WINDOW: B@12 right window updated when B@14 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2+3+4+6", 19), + // FINAL WINDOW: B@13 right window updated when B@14 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(14, 24)), "0+3+4+6", 19), + // FINAL WINDOW: B@14 left window created when B@14 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(4, 14)), "0+1+2+6", 14), + + // FINAL WINDOW: C@11 left window created when C@11 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(1, 11)), "0+1", 11), + // C@11 right window created when C@15 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(12, 22)), "0+2", 15), + // FINAL WINDOW: C@15 left window created when C@15 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(5, 15)), "0+1+2", 15), + // C@11 right window updated when C@16 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(12, 22)), "0+2+3", 16), + // C@15 right window created when C@16 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(16, 26)), "0+3", 16), + // FINAL WINDOW: C@16 left window created when C@16 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(6, 16)), "0+1+2+3", 16), + // FINAL WINDOW: C@11 right window updated when C@21 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(12, 22)), "0+2+3+4", 21), + // C@15 right window updated when C@21 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(16, 26)), "0+3+4", 21), + // C@16 right window created when C@21 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(17, 27)), "0+4", 21), + // FINAL WINDOW: C@21 left window created when C@21 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(11, 21)), "0+1+2+3+4", 21), + // FINAL WINDOW: C@15 right window updated when C@23 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(16, 26)), "0+3+4+5", 23), + // FINAL WINDOW: C@16 right window updated when C@23 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(17, 27)), "0+4+5", 23), + // FINAL WINDOW: C@21 right window created when C@23 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(22, 32)), "0+5", 23), + // FINAL WINDOW: C@23 left window created when C@23 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(13, 23)), "0+2+3+4+5", 23), + + // FINAL WINDOW: D@11 left window created when D@11 processed + new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(1, 11)), "0+4", 11), + // D@11 right window created when D@12 processed + new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(12, 22)), "0+2", 12), + // FINAL WINDOW: D@12 left window created when D@12 processed + new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(2, 12)), "0+4+2", 12), + // FINAL WINDOW: D@29 left window created when D@29 processed + new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(19, 29)), "0+3", 29), + // FINAL WINDOW: D@11 right window updated when D@16 processed + new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(12, 22)), "0+2+5", 16), + // FINAL WINDOW: D@12 right window created when D@16 processed + new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(13, 23)), "0+5", 16), + // FINAL WINDOW: D@16 left window created when D@16 processed + new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(6, 16)), "0+4+2+5", 16) + ), + supplier.theCapturedProcessor().processed + ); + } + + @Test + public void testJoin() { + final StreamsBuilder builder = new StreamsBuilder(); + final String topic1 = "topic1"; + final String topic2 = "topic2"; + + final KTable<Windowed<String>, String> table1 = builder + .stream(topic1, Consumed.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100))) + .aggregate( + MockInitializer.STRING_INIT, + MockAggregator.TOSTRING_ADDER, + Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String()) + ); + + final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>(); + table1.toStream().process(supplier); + + final KTable<Windowed<String>, String> table2 = builder + .stream(topic2, Consumed.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100))) + .aggregate( + MockInitializer.STRING_INIT, + MockAggregator.TOSTRING_ADDER, + Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic2-Canonized").withValueSerde(Serdes.String()) + ); + table2.toStream().process(supplier); + + table1.join(table2, (p1, p2) -> p1 + "%" + p2).toStream().process(supplier); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + final TestInputTopic<String, String> inputTopic1 = + driver.createInputTopic(topic1, new StringSerializer(), new StringSerializer()); + final TestInputTopic<String, String> inputTopic2 = + driver.createInputTopic(topic2, new StringSerializer(), new StringSerializer()); + inputTopic1.pipeInput("A", "1", 10L); + inputTopic1.pipeInput("B", "2", 11L); + inputTopic1.pipeInput("C", "3", 12L); + + final List<MockProcessor<Windowed<String>, String>> processors = supplier.capturedProcessors(3); + + processors.get(0).checkAndClearProcessResult( + // left windows created by the first set of records to table 1 + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), "0+1", 10), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(1, 11)), "0+2", 11), + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(2, 12)), "0+3", 12) + ); + processors.get(1).checkAndClearProcessResult(); + processors.get(2).checkAndClearProcessResult(); + + inputTopic1.pipeInput("A", "1", 15L); + inputTopic1.pipeInput("B", "2", 16L); + inputTopic1.pipeInput("C", "3", 19L); + + processors.get(0).checkAndClearProcessResult( + // right windows from previous records are created, and left windows from new records to table 1 + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(11, 21)), "0+1", 15), + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+1+1", 15), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(12, 22)), "0+2", 16), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(6, 16)), "0+2+2", 16), + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(13, 23)), "0+3", 19), + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(9, 19)), "0+3+3", 19) + ); + processors.get(1).checkAndClearProcessResult(); + processors.get(2).checkAndClearProcessResult(); + + inputTopic2.pipeInput("A", "a", 10L); + inputTopic2.pipeInput("B", "b", 30L); + inputTopic2.pipeInput("C", "c", 12L); + inputTopic2.pipeInput("C", "c", 35L); + + + processors.get(0).checkAndClearProcessResult(); + processors.get(1).checkAndClearProcessResult( + // left windows from first set of records sent to table 2 + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), "0+a", 10), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(20, 30)), "0+b", 30), + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(2, 12)), "0+c", 12), + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(25, 35)), "0+c", 35) + ); + processors.get(2).checkAndClearProcessResult( + // set of join windows from windows created by table 1 and table 2 + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), "0+1%0+a", 10), + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(2, 12)), "0+3%0+c", 12) + ); + + inputTopic2.pipeInput("A", "a", 15L); + inputTopic2.pipeInput("B", "b", 16L); + inputTopic2.pipeInput("C", "c", 17L); + + processors.get(0).checkAndClearProcessResult(); + processors.get(1).checkAndClearProcessResult( + // right windows from previous records are created (where applicable), and left windows from new records to table 2 + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(11, 21)), "0+a", 15), + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+a+a", 15), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(6, 16)), "0+b", 16), + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(13, 23)), "0+c", 17), + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(7, 17)), "0+c+c", 17) + ); + processors.get(2).checkAndClearProcessResult( + // set of join windows from windows created by table 1 and table 2 + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(11, 21)), "0+1%0+a", 15), + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+1+1%0+a+a", 15), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(6, 16)), "0+2+2%0+b", 16), + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(13, 23)), "0+3%0+c", 19) + ); + } + } + + @Test + public void shouldLogAndMeterWhenSkippingNullKey() { Review comment: I'd normally say we should have a test also to verify we log properly on early records, but you already opened the PR to add early record handling, so we're good. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImpl.java ########## @@ -61,7 +62,7 @@ Objects.requireNonNull(groupedStream, "groupedStream can't be null"); Objects.requireNonNull(aggregator, "aggregator can't be null"); groupPatterns.put((KGroupedStreamImpl<K, ?>) groupedStream, - (Aggregator<? super K, ? super Object, VOut>) aggregator); + (Aggregator<? super K, ? super Object, VOut>) aggregator); Review comment: ```suggestion (Aggregator<? super K, ? super Object, VOut>) aggregator); ``` ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java ########## @@ -275,6 +275,15 @@ */ <W extends Window> TimeWindowedCogroupedKStream<K, VOut> windowedBy(final Windows<W> windows); + /** + * Create a new {@link TimeWindowedCogroupedKStream} instance that can be used to perform sliding + * windowed aggregations. + * + * @param windows the specification of the aggregation {@link SlidingWindows} + * @return an instance of {@link TimeWindowedCogroupedKStream} + */ + TimeWindowedCogroupedKStream<K, VOut> windowedBy(final SlidingWindows windows); Review comment: I'm reviewing this whole PR as-is, so there's no need to do anything now, but @mjsax 's specific suggestion is beside the point. The general feedback is that this PR is too large, which it is. We shoot for under 1K, and it's the PR author's responsibility to figure out the best way to break it up. This policy isn't just "reviewers complaining," it's an important component of ensuring AK's quality. Long PRs overwhelm any reviewer's cognitive capacity to pay attention to every detail, so oversights are more likely to slip through into the codebase, and once they're there, you're really at the mercy of the testing layers to catch them. When the oversights are very subtle, they wind up getting released and then surface as user-reported bugs. Reviewers can't guarantee to notice every problem, but our capacity to notice problems is inversely proportional to the length of the PR. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.HashSet; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final String storeName; + private final SlidingWindows windows; + private final Initializer<Agg> initializer; + private final Aggregator<? super K, ? super V, Agg> aggregator; + + private boolean sendOldValues = false; + + public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer<Agg> initializer, + final Aggregator<? super K, ? super V, Agg> aggregator) { + this.windows = windows; + this.storeName = storeName; + this.initializer = initializer; + this.aggregator = aggregator; + } + + @Override + public Processor<K, V> get() { + return new KStreamSlidingWindowAggregateProcessor(); + } + + public SlidingWindows windows() { + return windows; + } + + @Override + public void enableSendingOldValues() { + sendOldValues = true; + } + + private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor<K, V> { + private TimestampedWindowStore<K, Agg> windowStore; + private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder; + private StreamsMetricsImpl metrics; + private InternalProcessorContext internalProcessorContext; + private Sensor lateRecordDropSensor; + private Sensor droppedRecordsSensor; + private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; + private boolean reverseIteratorImplemented = false; + + @SuppressWarnings("unchecked") + @Override + public void init(final ProcessorContext context) { + super.init(context); + internalProcessorContext = (InternalProcessorContext) context; + metrics = internalProcessorContext.metrics(); + final String threadId = Thread.currentThread().getName(); + lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( + threadId, + context.taskId().toString(), + internalProcessorContext.currentNode().name(), + metrics + ); + //catch unsupported operation error + droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); + windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName); + tupleForwarder = new TimestampedTupleForwarder<>( + windowStore, + context, + new TimestampedCacheFlushListener<>(context), + sendOldValues); + } + + @Override + public void process(final K key, final V value) { + if (key == null || value == null) { + log.warn( + "Skipping record due to null key or value. value=[{}] topic=[{}] partition=[{}] offset=[{}]", + value, context().topic(), context().partition(), context().offset() + ); + droppedRecordsSensor.record(); + return; + } + if (reverseIteratorImplemented) { + processReverse(key, value); + } else { + processInOrder(key, value); + } + } + + public void processReverse(final K key, final V value) { + + final long timestamp = context().timestamp(); + observedStreamTime = Math.max(observedStreamTime, timestamp); + final long closeTime = observedStreamTime - windows.gracePeriodMs(); + //store start times of windows we find + final HashSet<Long> windowStartTimes = new HashSet<Long>(); + // aggregate that will go in the current record’s left/right window (if needed) + ValueAndTimestamp<Agg> leftWinAgg = null; + ValueAndTimestamp<Agg> rightWinAgg = null; + + //if current record's left/right windows already exist + boolean leftWinAlreadyCreated = false; + boolean rightWinAlreadyCreated = false; + + try ( + final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch( + key, + key, + timestamp - 2 * windows.timeDifferenceMs(), + timestamp + 1) + ) { + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next; + //if we've already seen the window with the closest start time to the record + boolean foundRightWinAgg = false; + //if we've already seen the window with the closest end time to the record + boolean foundLeftWinAgg = false; + while (iterator.hasNext()) { + next = iterator.next(); + windowStartTimes.add(next.key.window().start()); + + //determine if current record's right window exists, will only be true at most once, on the first pass + if (next.key.window().start() == timestamp + 1) { + rightWinAlreadyCreated = true; + continue; + } else if (next.key.window().end() > timestamp) { + if (!foundRightWinAgg) { + foundRightWinAgg = true; + rightWinAgg = next.value; + } + putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); + continue; + } else if (next.key.window().end() == timestamp) { + putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); + leftWinAlreadyCreated = true; + continue; + } else { + if (!foundLeftWinAgg) { + leftWinAgg = next.value; + foundLeftWinAgg = true; + } + //If it's a left window, there is a record at this window's end time who may need a corresponding right window + if (isLeftWindow(next)) { + final long rightWinStart = next.key.window().end() + 1; + if (!windowStartTimes.contains(rightWinStart)) { + final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs()); + final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); + putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + } + break; + } + } + } + } + //create the left window of the current record if it's not created + if (!leftWinAlreadyCreated) { + final ValueAndTimestamp<Agg> valueAndTime; + //confirms that the left window contains more than the current record + if (leftWinAgg.timestamp() < timestamp && leftWinAgg.timestamp() > timestamp - windows.timeDifferenceMs()) { + valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); + } else { + //left window just contains the current record + valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); + } + final TimeWindow window = new TimeWindow(Math.max(0, timestamp - windows.timeDifferenceMs()), timestamp); + putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + } + //create the right window for the current record, if need be + if (!rightWinAlreadyCreated && rightWinAgg != null && rightWinAgg.timestamp() > timestamp) { + final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); + final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); + putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + } + } + + public void processInOrder(final K key, final V value) { + + final long timestamp = context().timestamp(); + //don't process records that don't fall within a full sliding window + if (timestamp < windows.timeDifferenceMs()) { Review comment: Is this condition supposed to be checking whether records are "early" with respect to now? It looks like it should be: ```suggestion if (timestamp < (observedStreamTime - windows.timeDifferenceMs())) { ``` ########## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ########## @@ -0,0 +1,692 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.test.TestRecord; +import org.apache.kafka.test.MockAggregator; +import org.apache.kafka.test.MockInitializer; +import org.apache.kafka.test.MockProcessor; +import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.MockReducer; +import org.apache.kafka.test.StreamsTestUtils; +import org.hamcrest.Matcher; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; + +import static java.time.Duration.ofMillis; +import static java.util.Arrays.asList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.CoreMatchers.hasItems; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class KStreamSlidingWindowAggregateTest { + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); + private final String threadId = Thread.currentThread().getName(); + + @SuppressWarnings("unchecked") + @Test + public void testAggregateSmallInput() { + final StreamsBuilder builder = new StreamsBuilder(); + final String topic = "topic"; + + final KTable<Windowed<String>, String> table = builder + .stream(topic, Consumed.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50))) + .aggregate( + MockInitializer.STRING_INIT, + MockAggregator.TOSTRING_ADDER, + Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String()) + ); + final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>(); + table.toStream().process(supplier); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + final TestInputTopic<String, String> inputTopic = + driver.createInputTopic(topic, new StringSerializer(), new StringSerializer()); + inputTopic.pipeInput("A", "1", 10L); + inputTopic.pipeInput("A", "2", 15L); + inputTopic.pipeInput("A", "3", 20L); + inputTopic.pipeInput("A", "4", 22L); + inputTopic.pipeInput("A", "5", 30L); + } + + final Map<Long, ValueAndTimestamp<String>> actual = new HashMap<>(); + for (final KeyValueTimestamp<Object, Object> entry : supplier.theCapturedProcessor().processed) { + final Windowed<String> window = (Windowed<String>) entry.key(); + final Long start = window.window().start(); + final ValueAndTimestamp valueAndTimestamp = ValueAndTimestamp.make((String) entry.value(), entry.timestamp()); + if (actual.putIfAbsent(start, valueAndTimestamp) != null) { + actual.replace(start, valueAndTimestamp); + } + } + + final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>(); + expected.put(0L, ValueAndTimestamp.make("0+1", 10L)); + expected.put(5L, ValueAndTimestamp.make("0+1+2", 15L)); + expected.put(10L, ValueAndTimestamp.make("0+1+2+3", 20L)); + expected.put(11L, ValueAndTimestamp.make("0+2+3", 20L)); + expected.put(12L, ValueAndTimestamp.make("0+2+3+4", 22L)); + expected.put(16L, ValueAndTimestamp.make("0+3+4", 22L)); + expected.put(20L, ValueAndTimestamp.make("0+3+4+5", 30L)); + expected.put(21L, ValueAndTimestamp.make("0+4+5", 30L)); + expected.put(23L, ValueAndTimestamp.make("0+5", 30L)); + + assertEquals(expected, actual); + } + + @SuppressWarnings("unchecked") + @Test + public void testReduceSmallInput() { + final StreamsBuilder builder = new StreamsBuilder(); + final String topic = "topic"; + + final KTable<Windowed<String>, String> table = builder + .stream(topic, Consumed.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50))) + .reduce( + MockReducer.STRING_ADDER, + Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String()) + ); + final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>(); + table.toStream().process(supplier); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + final TestInputTopic<String, String> inputTopic = + driver.createInputTopic(topic, new StringSerializer(), new StringSerializer()); + inputTopic.pipeInput("A", "1", 10L); + inputTopic.pipeInput("A", "2", 14L); + inputTopic.pipeInput("A", "3", 15L); + inputTopic.pipeInput("A", "4", 22L); + inputTopic.pipeInput("A", "5", 26L); + inputTopic.pipeInput("A", "6", 30L); + } + + final Map<Long, ValueAndTimestamp<String>> actual = new HashMap<>(); + for (final KeyValueTimestamp<Object, Object> entry : supplier.theCapturedProcessor().processed) { + final Windowed<String> window = (Windowed<String>) entry.key(); + final Long start = window.window().start(); + final ValueAndTimestamp valueAndTimestamp = ValueAndTimestamp.make((String) entry.value(), entry.timestamp()); + if (actual.putIfAbsent(start, valueAndTimestamp) != null) { + actual.replace(start, valueAndTimestamp); + } + } + + final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>(); + expected.put(0L, ValueAndTimestamp.make("1", 10L)); + expected.put(4L, ValueAndTimestamp.make("1+2", 14L)); + expected.put(5L, ValueAndTimestamp.make("1+2+3", 15L)); + expected.put(11L, ValueAndTimestamp.make("2+3", 15L)); + expected.put(12L, ValueAndTimestamp.make("2+3+4", 22L)); + expected.put(15L, ValueAndTimestamp.make("3+4", 22L)); + expected.put(16L, ValueAndTimestamp.make("4+5", 26L)); + expected.put(20L, ValueAndTimestamp.make("4+5+6", 30L)); + expected.put(23L, ValueAndTimestamp.make("5+6", 30L)); + expected.put(27L, ValueAndTimestamp.make("6", 30L)); + + assertEquals(expected, actual); + } + + @Test + public void testAggregateLargeInput() { + final StreamsBuilder builder = new StreamsBuilder(); + final String topic1 = "topic1"; + + final KTable<Windowed<String>, String> table2 = builder + .stream(topic1, Consumed.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50))) + .aggregate( + MockInitializer.STRING_INIT, + MockAggregator.TOSTRING_ADDER, + Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String()) + ); + final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>(); + table2.toStream().process(supplier); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + final TestInputTopic<String, String> inputTopic1 = + driver.createInputTopic(topic1, new StringSerializer(), new StringSerializer()); + inputTopic1.pipeInput("A", "1", 10L); + inputTopic1.pipeInput("A", "2", 20L); + inputTopic1.pipeInput("A", "3", 22L); + inputTopic1.pipeInput("A", "4", 15L); + + inputTopic1.pipeInput("B", "1", 12L); + inputTopic1.pipeInput("B", "2", 13L); + inputTopic1.pipeInput("B", "3", 18L); + inputTopic1.pipeInput("B", "4", 19L); + inputTopic1.pipeInput("B", "5", 25L); + inputTopic1.pipeInput("B", "6", 14L); + + inputTopic1.pipeInput("C", "1", 11L); + inputTopic1.pipeInput("C", "2", 15L); + inputTopic1.pipeInput("C", "3", 16L); + inputTopic1.pipeInput("C", "4", 21); + inputTopic1.pipeInput("C", "5", 23L); + + inputTopic1.pipeInput("D", "4", 11L); + inputTopic1.pipeInput("D", "2", 12L); + inputTopic1.pipeInput("D", "3", 29L); + inputTopic1.pipeInput("D", "5", 16L); + } + + assertEquals( + asList( + // FINAL WINDOW: A@10 left window created when A@10 processed + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), "0+1", 10), + // A@10 right window created when A@20 processed + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(11, 21)), "0+2", 20), + // A@20 left window created when A@20 processed + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10, 20)), "0+1+2", 20), + // FINAL WINDOW: A@20 right window created when A@22 processed + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(21, 31)), "0+3", 22), + // A@22 left window created when A@22 processed + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(12, 22)), "0+2+3", 22), + // FINAL WINDOW: A@20 left window updated when A@15 processed + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10, 20)), "0+1+2+4", 20), + // FINAL WINDOW: A@10 right window updated when A@15 processed + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(11, 21)), "0+2+4", 20), + // FINAL WINDOW: A@22 left window updated when A@15 processed + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(12, 22)), "0+2+3+4", 22), + // FINAL WINDOW: A@15 left window created when A@15 processed + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+1+4", 15), + // FINAL WINDOW: A@15 right window created when A@15 processed + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(16, 26)), "0+2+3", 22), + + // FINAL WINDOW: B@12 left window created when B@12 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(2, 12)), "0+1", 12), + // B@12 right window created when B@13 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2", 13), + // FINAL WINDOW: B@13 left window created when B@13 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(3, 13)), "0+1+2", 13), + // B@12 right window updated when B@18 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2+3", 18), + // B@13 right window created when B@18 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(14, 24)), "0+3", 18), + // B@18 left window created when B@18 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(8, 18)), "0+1+2+3", 18), + // B@12 right window updated when B@19 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2+3+4", 19), + // B@13 right window updated when B@19 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(14, 24)), "0+3+4", 19), + // B@18 right window created when B@19 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(19, 29)), "0+4", 19), + // B@19 left window created when B@19 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(9, 19)), "0+1+2+3+4", 19), + // FINAL WINDOW: B@18 right window updated when B@25 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(19, 29)), "0+4+5", 25), + // FINAL WINDOW: B@19 right window updated when B@25 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(20, 30)), "0+5", 25), + // FINAL WINDOW: B@25 left window created when B@25 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(15, 25)), "0+3+4+5", 25), + // FINAL WINDOW: B@18 left window updated when B@14 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(8, 18)), "0+1+2+3+6", 18), + // FINAL WINDOW: B@19 left window updated when B@14 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(9, 19)), "0+1+2+3+4+6", 19), + // FINAL WINDOW: B@12 right window updated when B@14 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2+3+4+6", 19), + // FINAL WINDOW: B@13 right window updated when B@14 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(14, 24)), "0+3+4+6", 19), + // FINAL WINDOW: B@14 left window created when B@14 processed + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(4, 14)), "0+1+2+6", 14), + + // FINAL WINDOW: C@11 left window created when C@11 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(1, 11)), "0+1", 11), + // C@11 right window created when C@15 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(12, 22)), "0+2", 15), + // FINAL WINDOW: C@15 left window created when C@15 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(5, 15)), "0+1+2", 15), + // C@11 right window updated when C@16 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(12, 22)), "0+2+3", 16), + // C@15 right window created when C@16 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(16, 26)), "0+3", 16), + // FINAL WINDOW: C@16 left window created when C@16 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(6, 16)), "0+1+2+3", 16), + // FINAL WINDOW: C@11 right window updated when C@21 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(12, 22)), "0+2+3+4", 21), + // C@15 right window updated when C@21 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(16, 26)), "0+3+4", 21), + // C@16 right window created when C@21 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(17, 27)), "0+4", 21), + // FINAL WINDOW: C@21 left window created when C@21 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(11, 21)), "0+1+2+3+4", 21), + // FINAL WINDOW: C@15 right window updated when C@23 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(16, 26)), "0+3+4+5", 23), + // FINAL WINDOW: C@16 right window updated when C@23 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(17, 27)), "0+4+5", 23), + // FINAL WINDOW: C@21 right window created when C@23 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(22, 32)), "0+5", 23), + // FINAL WINDOW: C@23 left window created when C@23 processed + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(13, 23)), "0+2+3+4+5", 23), + + // FINAL WINDOW: D@11 left window created when D@11 processed + new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(1, 11)), "0+4", 11), + // D@11 right window created when D@12 processed + new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(12, 22)), "0+2", 12), + // FINAL WINDOW: D@12 left window created when D@12 processed + new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(2, 12)), "0+4+2", 12), + // FINAL WINDOW: D@29 left window created when D@29 processed + new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(19, 29)), "0+3", 29), + // FINAL WINDOW: D@11 right window updated when D@16 processed + new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(12, 22)), "0+2+5", 16), + // FINAL WINDOW: D@12 right window created when D@16 processed + new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(13, 23)), "0+5", 16), + // FINAL WINDOW: D@16 left window created when D@16 processed + new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(6, 16)), "0+4+2+5", 16) + ), + supplier.theCapturedProcessor().processed + ); + } + + @Test + public void testJoin() { + final StreamsBuilder builder = new StreamsBuilder(); + final String topic1 = "topic1"; + final String topic2 = "topic2"; + + final KTable<Windowed<String>, String> table1 = builder + .stream(topic1, Consumed.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100))) + .aggregate( + MockInitializer.STRING_INIT, + MockAggregator.TOSTRING_ADDER, + Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String()) + ); + + final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>(); + table1.toStream().process(supplier); + + final KTable<Windowed<String>, String> table2 = builder + .stream(topic2, Consumed.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100))) + .aggregate( + MockInitializer.STRING_INIT, + MockAggregator.TOSTRING_ADDER, + Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic2-Canonized").withValueSerde(Serdes.String()) + ); + table2.toStream().process(supplier); + + table1.join(table2, (p1, p2) -> p1 + "%" + p2).toStream().process(supplier); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + final TestInputTopic<String, String> inputTopic1 = + driver.createInputTopic(topic1, new StringSerializer(), new StringSerializer()); + final TestInputTopic<String, String> inputTopic2 = + driver.createInputTopic(topic2, new StringSerializer(), new StringSerializer()); + inputTopic1.pipeInput("A", "1", 10L); + inputTopic1.pipeInput("B", "2", 11L); + inputTopic1.pipeInput("C", "3", 12L); + + final List<MockProcessor<Windowed<String>, String>> processors = supplier.capturedProcessors(3); + + processors.get(0).checkAndClearProcessResult( + // left windows created by the first set of records to table 1 + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), "0+1", 10), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(1, 11)), "0+2", 11), + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(2, 12)), "0+3", 12) + ); + processors.get(1).checkAndClearProcessResult(); + processors.get(2).checkAndClearProcessResult(); + + inputTopic1.pipeInput("A", "1", 15L); + inputTopic1.pipeInput("B", "2", 16L); + inputTopic1.pipeInput("C", "3", 19L); + + processors.get(0).checkAndClearProcessResult( + // right windows from previous records are created, and left windows from new records to table 1 + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(11, 21)), "0+1", 15), + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+1+1", 15), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(12, 22)), "0+2", 16), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(6, 16)), "0+2+2", 16), + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(13, 23)), "0+3", 19), + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(9, 19)), "0+3+3", 19) + ); + processors.get(1).checkAndClearProcessResult(); + processors.get(2).checkAndClearProcessResult(); + + inputTopic2.pipeInput("A", "a", 10L); + inputTopic2.pipeInput("B", "b", 30L); + inputTopic2.pipeInput("C", "c", 12L); + inputTopic2.pipeInput("C", "c", 35L); + + + processors.get(0).checkAndClearProcessResult(); + processors.get(1).checkAndClearProcessResult( + // left windows from first set of records sent to table 2 + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), "0+a", 10), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(20, 30)), "0+b", 30), + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(2, 12)), "0+c", 12), + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(25, 35)), "0+c", 35) + ); + processors.get(2).checkAndClearProcessResult( + // set of join windows from windows created by table 1 and table 2 + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), "0+1%0+a", 10), + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(2, 12)), "0+3%0+c", 12) + ); + + inputTopic2.pipeInput("A", "a", 15L); + inputTopic2.pipeInput("B", "b", 16L); + inputTopic2.pipeInput("C", "c", 17L); + + processors.get(0).checkAndClearProcessResult(); + processors.get(1).checkAndClearProcessResult( + // right windows from previous records are created (where applicable), and left windows from new records to table 2 + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(11, 21)), "0+a", 15), + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+a+a", 15), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(6, 16)), "0+b", 16), + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(13, 23)), "0+c", 17), + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(7, 17)), "0+c+c", 17) + ); + processors.get(2).checkAndClearProcessResult( + // set of join windows from windows created by table 1 and table 2 + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(11, 21)), "0+1%0+a", 15), + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+1+1%0+a+a", 15), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(6, 16)), "0+2+2%0+b", 16), + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(13, 23)), "0+3%0+c", 19) + ); + } + } + + @Test + public void shouldLogAndMeterWhenSkippingNullKey() { + final String builtInMetricsVersion = StreamsConfig.METRICS_LATEST; + final StreamsBuilder builder = new StreamsBuilder(); + final String topic = "topic"; + builder + .stream(topic, Consumed.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100))) + .aggregate(MockInitializer.STRING_INIT, MockAggregator.toStringInstance("+"), Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String())); + + props.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtInMetricsVersion); + + try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KStreamSlidingWindowAggregate.class); + final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + final TestInputTopic<String, String> inputTopic = + driver.createInputTopic(topic, new StringSerializer(), new StringSerializer()); + inputTopic.pipeInput(null, "1"); + assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. value=[1] topic=[topic] partition=[0] offset=[0]")); + } + } + + @Test + public void shouldLogAndMeterWhenSkippingExpiredWindowByGrace() { + final String builtInMetricsVersion = StreamsConfig.METRICS_LATEST; + final StreamsBuilder builder = new StreamsBuilder(); + final String topic = "topic"; + + final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String())); + stream1.groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(90L))) + .aggregate( + () -> "", + MockAggregator.toStringInstance("+"), + Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled() + ) + .toStream() + .map((key, value) -> new KeyValue<>(key.toString(), value)) + .to("output"); + + props.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtInMetricsVersion); + + try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KStreamSlidingWindowAggregate.class); + final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + + final TestInputTopic<String, String> inputTopic = + driver.createInputTopic(topic, new StringSerializer(), new StringSerializer()); + inputTopic.pipeInput("k", "100", 200L); + inputTopic.pipeInput("k", "0", 100L); + inputTopic.pipeInput("k", "1", 101L); + inputTopic.pipeInput("k", "2", 102L); + inputTopic.pipeInput("k", "3", 103L); + inputTopic.pipeInput("k", "4", 104L); + inputTopic.pipeInput("k", "5", 105L); + inputTopic.pipeInput("k", "6", 15L); + + assertLatenessMetrics(driver, is(7.0), is(185.0), is(96.25)); + + assertThat(appender.getMessages(), hasItems( + // left window for k@100 + "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[100] window=[90,100] expiration=[110] streamTime=[200]", + // left window for k@101 + "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[101] window=[91,101] expiration=[110] streamTime=[200]", + // left window for k@102 + "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[102] window=[92,102] expiration=[110] streamTime=[200]", + // left window for k@103 + "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[103] window=[93,103] expiration=[110] streamTime=[200]", + // left window for k@104 + "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[104] window=[94,104] expiration=[110] streamTime=[200]", + // left window for k@105 + "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[105] window=[95,105] expiration=[110] streamTime=[200]", + // left window for k@15 + "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[15] window=[5,15] expiration=[110] streamTime=[200]" + )); + final TestOutputTopic<String, String> outputTopic = + driver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer()); + assertThat(outputTopic.readRecord(), equalTo(new TestRecord<>("[k@190/200]", "+100", null, 200L))); + assertTrue(outputTopic.isEmpty()); + } + } + + @SuppressWarnings("unchecked") + @Test + public void testAggregateRandomInput() { Review comment: Awesome test. Thanks! ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org