Thanks a lot Matthias. I looked at the aggregation and I'm fine with aggregating data before forwarding it to downstream but the KStreamWindowAggregateProcessor::process uses the key to determine whether the data has to be aggregated and forwarded or not. My worry is if I have a tumbling window for 2 seconds and there is just one data then how can I evict this data and forward it to downstream when there is no incoming data for the same key.
In short, Is there a way to evict the data from window even when there is no incoming data for the key. Thanks On Tue, Oct 15, 2019 at 1:40 AM Matthias J. Sax <matth...@confluent.io> wrote: > A window store contains key-value pair, with key being the window id > (key + window-start-timestamp) and the value being the current > aggregate. It does not store a list of input values. > > If you want to store a list of input values, you would need to make the > value type a `List` (or similar) or values. However, this is not > recommended and for large window might not work as all, because each > key-value pair will be stored in the corresponding changelog topic as a > single message (hence, you might hit the `max.message.bytes` limit -- > you could of course increase it). > > Hence, it's recommended to re-compute the aggregation for each input > record incrementally if possible (not sure what aggregation you want to > do and if that is possible). That is also what > `KStreamWindowAggregateProcessor` does: > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java#L105 > > Hope that helps. > > > -Matthias > > On 10/15/19 12:56 AM, Navneeth Krishnan wrote: > > Hi All, > > > > I'm trying to create a tumbling time window of two seconds using PAPI. I > > have a TimestampWindowStore with both retention and window size as 2 > > seconds and retainDuplicates as false. > > > > > Stores.timestampedWindowStoreBuilder(Stores.persistentTimestampedWindowStore("window-store-1", > > Duration.ofMillis(2000), Duration.ofMillis(2000), false) > > > > > > In my process function I keep adding data to the state store and I have a > > scheduled task initiated which runs every seconds to collect the data > > inside the window. The way I thought the window store iterator will > return > > is Key and List of records for that window but it's returning just a > window > > object and a record. How can I aggregate this and collectively send to > > downstream? > > > > I looked at KGroupedStreamImpl for reference but it seems quite > confusing. > > Any advice on how this can be done? > > > > Thanks > > > >