mjsax commented on a change in pull request #11124: URL: https://github.com/apache/kafka/pull/11124#discussion_r690013977
########## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ########## @@ -100,17 +99,78 @@ private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); private final String threadId = Thread.currentThread().getName(); + private final String topic = "topic"; + private final String defaultInOrderName = "InOrder"; + private final String defaultReverseName = "Reverse"; + private final long defaultWindowSize = 10L; + private final long defaultRetentionPeriod = 5000L; + + private WindowBytesStoreSupplier getStoreSupplier(final boolean inOrderIterator, + final String inOrderName, + final String reverseName, + final long windowSize) { + return inOrderIterator + ? new InOrderMemoryWindowStoreSupplier(inOrderName, defaultRetentionPeriod, windowSize, false) + : Stores.inMemoryWindowStore(reverseName, ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false); + } + + @SuppressWarnings("unchecked") + @Test + public void testAggregateSmallInputWithZeroTimeDifference() { + final StreamsBuilder builder = new StreamsBuilder(); + + // We use TimeWindow to represent the "windowed KTable" internally, so, the window size must be greater than 0 here + final WindowBytesStoreSupplier storeSupplier = getStoreSupplier(inOrderIterator, defaultInOrderName, defaultReverseName, 1L); Review comment: Hmmm... Seems to be in issue... The actual final return type is `KTable<Window<K>, V>` and thus is window-type agnostic. So we already have such a "container". -- However, `windowedBy(SlidingWindow)` returns a `TimeWindowedKStream`... Return types are not easy to change... And I don't think we can just switch from `TimeWindow` to `SlidingWindow` as concrete type either for the sliding window case... Maybe we are stuck and cannot fix the bug without a breaking change? For this case, we would indeed need to carry on with the KIP (but we could only do it in 4.0...), but I am wondering if it's worth fixing given the impact? Also: we have a few issues with the current DSL that we cannot fix easily (eg KIP-300). Thus, a long term solution could be, to leave the current API as-is, and built a new DSL in parallel (we did this in the past when we introduced `StreamsBuilder`). This way, we can change the API in any way, but it would be a long-term solution only. It might also help with regard to the new PAPI that uses `Record` instead of `<K,V>` type, and that is not easily adopted for `transform()` (and siblings). We could change the whole DSL to `Record` (ie, `KStream<Record<K,V>` -- or course we don't need `Record` in the generic type -- it's just for illustrative purpose). It would also cover the "add headers" KIP, fix KIP-300, we could introduce a `PartitionedKStream` (cf current KIP-759 discussion) and a few other minor issue (like rename `KGroupedStream` to `GroupedKStream`) all at once... And we could cleanup the topology optimization step and operator naming rules (which are a big mess to understand which `Named` object overwrites others...) -- We can also get rid of the wrappers for `KeyValueStore` to `TimestampedKeyValueStore` and change the interface from `Materialized<XxxStore>` to `Materialized<TimestampXxxStore`) -- In the past it was never worth to start a new DSL, but it seem we collected enough individual cases to maybe justify this investment now? The only thing that we should consider is our investment into "versioned state stores / version KTables". If we build a new DSL it should be compatible to it -- if we cannot guarantee it, we might want to wait until we understand what API we need to versioned KTables in the DSL and make the cut afterwards? \cc @ableegoldman @guozhangwang @vvcephei @bbejeck @cadonna (also @inponomarev @jeqo @vcrfxia) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org