mjsax commented on a change in pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#discussion_r690013977
##########
File path:
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -100,17 +99,78 @@
private final Properties props =
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
private final String threadId = Thread.currentThread().getName();
+ private final String topic = "topic";
+ private final String defaultInOrderName = "InOrder";
+ private final String defaultReverseName = "Reverse";
+ private final long defaultWindowSize = 10L;
+ private final long defaultRetentionPeriod = 5000L;
+
+ private WindowBytesStoreSupplier getStoreSupplier(final boolean
inOrderIterator,
+ final String inOrderName,
+ final String reverseName,
+ final long windowSize) {
+ return inOrderIterator
+ ? new InOrderMemoryWindowStoreSupplier(inOrderName,
defaultRetentionPeriod, windowSize, false)
+ : Stores.inMemoryWindowStore(reverseName,
ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testAggregateSmallInputWithZeroTimeDifference() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ // We use TimeWindow to represent the "windowed KTable" internally,
so, the window size must be greater than 0 here
+ final WindowBytesStoreSupplier storeSupplier =
getStoreSupplier(inOrderIterator, defaultInOrderName, defaultReverseName, 1L);
Review comment:
Hmmm... Seems to be in issue... The actual final return type is
`KTable<Window<K>, V>` and thus is window-type agnostic. So we already have
such a "container". -- However, `windowedBy(SlidingWindow)` returns a
`TimeWindowedKStream`... Return types are not easy to change... And I don't
think we can just switch from `TimeWindow` to `SlidingWindow` as concrete type
either for the sliding window case...
Maybe we are stuck and cannot fix the bug without a breaking change? For
this case, we would indeed need to carry on with the KIP (but we could only do
it in 4.0...), but I am wondering if it's worth fixing given the impact?
Also: we have a few issues with the current DSL that we cannot fix easily
(eg KIP-300). Thus, a long term solution could be, to leave the current API
as-is, and built a new DSL in parallel (we did this in the past when we
introduced `StreamsBuilder`). This way, we can change the API in any way, but
it would be a long-term solution only.
It might also help with regard to the new PAPI that uses `Record` instead of
`<K,V>` type, and that is not easily adopted for `transform()` (and siblings).
We could change the whole DSL to `Record` (ie, `KStream<Record<K,V>` -- or
course we don't need `Record` in the generic type -- it's just for illustrative
purpose). It would also cover the "add headers" KIP, fix KIP-300, we could
introduce a `PartitionedKStream` (cf current KIP-759 discussion) and a few
other minor issue (like rename `KGroupedStream` to `GroupedKStream`) all at
once... And we could cleanup the topology optimization step and operator naming
rules (which are a big mess to understand which `Named` object overwrites
others...) -- We can also get rid of the wrappers for `KeyValueStore` to
`TimestampedKeyValueStore` and change the interface from
`Materialized<XxxStore>` to `Materialized<TimestampXxxStore`) -- In the past it
was never worth to start a new DSL, but it seem we collected enough individual
cases to maybe
justify this investment now?
The only thing that we should consider is our investment into "versioned
state stores / version KTables". If we build a new DSL it should be compatible
to it -- if we cannot guarantee it, we might want to wait until we understand
what API we need to versioned KTables in the DSL and make the cut afterwards?
\cc @ableegoldman @guozhangwang @vvcephei @bbejeck @cadonna (also
@inponomarev @jeqo @vcrfxia)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]