showuon commented on a change in pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#discussion_r679596497



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -100,17 +99,79 @@
 
     private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
     private final String threadId = Thread.currentThread().getName();
+    private final String topic = "topic";
+    private final String defaultInOrderName = "InOrder";
+    private final String defaultReverseName = "Reverse";
+    private final long defaultWindowSize = 10L;
+    private final long defaultRetentionPeriod = 5000L;
+
+    private WindowBytesStoreSupplier getStoreSupplier(final boolean 
inOrderIterator,
+                                                      final String inOrderName,
+                                                      final String reverseName,
+                                                      final long windowSize) {
+        return inOrderIterator
+            ? new InOrderMemoryWindowStoreSupplier(inOrderName, 
defaultRetentionPeriod, windowSize, false)
+            : Stores.inMemoryWindowStore(reverseName, 
ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testAggregateSmallInputWithZeroTimeDifference() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        // We use CachingWindowStore to store the aggregated values 
internally, and then use TimeWindow to represent the "windowed KTable"
+        // thus, the window size must be greater than 0 here
+        final WindowBytesStoreSupplier storeSupplier = 
getStoreSupplier(inOrderIterator, defaultInOrderName, defaultReverseName, 1L);

Review comment:
       > Are you saying the CachingWindowStore internally uses a TimeWindow? 
   
   Yes, `CachingWindowStore` internally uses `TimeWindow` (i.e. 
`WindowKeySchema`). And looks like we use `TimeWinow` for `WindowStore`, and 
use `SessionWindow` for `SessionStore`.
   
   > doesn't this mean there's still a hole in the API since you can't use a 
custom WindowStore for a sliding windowed aggregation with the windowSize set 
to 0?
   
   I think so
   
   > If the WindowStore is going to represent different kinds of constant-size 
windows, it should probably be agnostic to the specific type of constant-sized 
window.
   
   Do you mean we should use `SessionWindow` (i.e. [start, end] inclusive time 
window) to represent the window? I'm not sure if this is the original design or 
just a miss. What do you think?
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to