Thanks a lot Matthias. I looked at the aggregation and I'm fine with
aggregating data before forwarding it to downstream but the
KStreamWindowAggregateProcessor::process
uses the key to determine whether the data has to be aggregated and
forwarded or not. My worry is if I have a tumbling window for 2 seconds and
there is just one data then how can I evict this data and forward it to
downstream when there is no incoming data for the same key.

In short, Is there a way to evict the data from window even when there is
no incoming data for the key.

Thanks

On Tue, Oct 15, 2019 at 1:40 AM Matthias J. Sax <matth...@confluent.io>
wrote:

> A window store contains key-value pair, with key being the window id
> (key + window-start-timestamp) and the value being the current
> aggregate. It does not store a list of input values.
>
> If you want to store a list of input values, you would need to make the
> value type a `List` (or similar) or values. However, this is not
> recommended and for large window might not work as all, because each
> key-value pair will be stored in the corresponding changelog topic as a
> single message (hence, you might hit the `max.message.bytes` limit --
> you could of course increase it).
>
> Hence, it's recommended to re-compute the aggregation for each input
> record incrementally if possible (not sure what aggregation you want to
> do and if that is possible). That is also what
> `KStreamWindowAggregateProcessor` does:
>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java#L105
>
> Hope that helps.
>
>
> -Matthias
>
> On 10/15/19 12:56 AM, Navneeth Krishnan wrote:
> > Hi All,
> >
> > I'm trying to create a tumbling time window of two seconds using PAPI. I
> > have a TimestampWindowStore with both retention and window size as 2
> > seconds and retainDuplicates as false.
> >
> >
> Stores.timestampedWindowStoreBuilder(Stores.persistentTimestampedWindowStore("window-store-1",
> >         Duration.ofMillis(2000), Duration.ofMillis(2000), false)
> >
> >
> > In my process function I keep adding data to the state store and I have a
> > scheduled task initiated which runs every seconds to collect the data
> > inside the window. The way I thought the window store iterator will
> return
> > is Key and List of records for that window but it's returning just a
> window
> > object and a record. How can I aggregate this and collectively send to
> > downstream?
> >
> > I looked at KGroupedStreamImpl for reference but it seems quite
> confusing.
> > Any advice on how this can be done?
> >
> > Thanks
> >
>
>

Reply via email to