I guess you have 3 options: 1) if there is data for a different key, check if there is data is the store that you want to flush 2) register an event-time punctuation (maybe for each key? or for a range of keys?) and check on a regular basis if there is anything that you want to forward 3) similar to (2), however use wall-clock time
-Matthias On 10/16/19 1:50 PM, Navneeth Krishnan wrote: > Thanks a lot Matthias. I looked at the aggregation and I'm fine with > aggregating data before forwarding it to downstream but the > KStreamWindowAggregateProcessor::process > uses the key to determine whether the data has to be aggregated and > forwarded or not. My worry is if I have a tumbling window for 2 seconds and > there is just one data then how can I evict this data and forward it to > downstream when there is no incoming data for the same key. > > In short, Is there a way to evict the data from window even when there is > no incoming data for the key. > > Thanks > > On Tue, Oct 15, 2019 at 1:40 AM Matthias J. Sax <matth...@confluent.io> > wrote: > >> A window store contains key-value pair, with key being the window id >> (key + window-start-timestamp) and the value being the current >> aggregate. It does not store a list of input values. >> >> If you want to store a list of input values, you would need to make the >> value type a `List` (or similar) or values. However, this is not >> recommended and for large window might not work as all, because each >> key-value pair will be stored in the corresponding changelog topic as a >> single message (hence, you might hit the `max.message.bytes` limit -- >> you could of course increase it). >> >> Hence, it's recommended to re-compute the aggregation for each input >> record incrementally if possible (not sure what aggregation you want to >> do and if that is possible). That is also what >> `KStreamWindowAggregateProcessor` does: >> >> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java#L105 >> >> Hope that helps. >> >> >> -Matthias >> >> On 10/15/19 12:56 AM, Navneeth Krishnan wrote: >>> Hi All, >>> >>> I'm trying to create a tumbling time window of two seconds using PAPI. I >>> have a TimestampWindowStore with both retention and window size as 2 >>> seconds and retainDuplicates as false. >>> >>> >> Stores.timestampedWindowStoreBuilder(Stores.persistentTimestampedWindowStore("window-store-1", >>> Duration.ofMillis(2000), Duration.ofMillis(2000), false) >>> >>> >>> In my process function I keep adding data to the state store and I have a >>> scheduled task initiated which runs every seconds to collect the data >>> inside the window. The way I thought the window store iterator will >> return >>> is Key and List of records for that window but it's returning just a >> window >>> object and a record. How can I aggregate this and collectively send to >>> downstream? >>> >>> I looked at KGroupedStreamImpl for reference but it seems quite >> confusing. >>> Any advice on how this can be done? >>> >>> Thanks >>> >> >> >
signature.asc
Description: OpenPGP digital signature