mjsax commented on a change in pull request #11124: URL: https://github.com/apache/kafka/pull/11124#discussion_r689950309
########## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ########## @@ -100,17 +99,78 @@ private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); private final String threadId = Thread.currentThread().getName(); + private final String topic = "topic"; + private final String defaultInOrderName = "InOrder"; + private final String defaultReverseName = "Reverse"; + private final long defaultWindowSize = 10L; + private final long defaultRetentionPeriod = 5000L; + + private WindowBytesStoreSupplier getStoreSupplier(final boolean inOrderIterator, + final String inOrderName, + final String reverseName, + final long windowSize) { + return inOrderIterator + ? new InOrderMemoryWindowStoreSupplier(inOrderName, defaultRetentionPeriod, windowSize, false) + : Stores.inMemoryWindowStore(reverseName, ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false); + } + + @SuppressWarnings("unchecked") + @Test + public void testAggregateSmallInputWithZeroTimeDifference() { + final StreamsBuilder builder = new StreamsBuilder(); + + // We use TimeWindow to represent the "windowed KTable" internally, so, the window size must be greater than 0 here + final WindowBytesStoreSupplier storeSupplier = getStoreSupplier(inOrderIterator, defaultInOrderName, defaultReverseName, 1L); Review comment: Size should be `0L` ? ########## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ########## @@ -100,17 +99,78 @@ private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); private final String threadId = Thread.currentThread().getName(); + private final String topic = "topic"; + private final String defaultInOrderName = "InOrder"; + private final String defaultReverseName = "Reverse"; + private final long defaultWindowSize = 10L; + private final long defaultRetentionPeriod = 5000L; + + private WindowBytesStoreSupplier getStoreSupplier(final boolean inOrderIterator, + final String inOrderName, + final String reverseName, + final long windowSize) { + return inOrderIterator + ? new InOrderMemoryWindowStoreSupplier(inOrderName, defaultRetentionPeriod, windowSize, false) + : Stores.inMemoryWindowStore(reverseName, ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false); + } + + @SuppressWarnings("unchecked") + @Test + public void testAggregateSmallInputWithZeroTimeDifference() { + final StreamsBuilder builder = new StreamsBuilder(); + + // We use TimeWindow to represent the "windowed KTable" internally, so, the window size must be greater than 0 here Review comment: Seems this comment needs to be updated? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org