A window store contains key-value pair, with key being the window id (key + window-start-timestamp) and the value being the current aggregate. It does not store a list of input values.
If you want to store a list of input values, you would need to make the value type a `List` (or similar) or values. However, this is not recommended and for large window might not work as all, because each key-value pair will be stored in the corresponding changelog topic as a single message (hence, you might hit the `max.message.bytes` limit -- you could of course increase it). Hence, it's recommended to re-compute the aggregation for each input record incrementally if possible (not sure what aggregation you want to do and if that is possible). That is also what `KStreamWindowAggregateProcessor` does: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java#L105 Hope that helps. -Matthias On 10/15/19 12:56 AM, Navneeth Krishnan wrote: > Hi All, > > I'm trying to create a tumbling time window of two seconds using PAPI. I > have a TimestampWindowStore with both retention and window size as 2 > seconds and retainDuplicates as false. > > Stores.timestampedWindowStoreBuilder(Stores.persistentTimestampedWindowStore("window-store-1", > Duration.ofMillis(2000), Duration.ofMillis(2000), false) > > > In my process function I keep adding data to the state store and I have a > scheduled task initiated which runs every seconds to collect the data > inside the window. The way I thought the window store iterator will return > is Key and List of records for that window but it's returning just a window > object and a record. How can I aggregate this and collectively send to > downstream? > > I looked at KGroupedStreamImpl for reference but it seems quite confusing. > Any advice on how this can be done? > > Thanks >
signature.asc
Description: OpenPGP digital signature