I guess you have 3 options:

1) if there is data for a different key, check if there is data is the
store that you want to flush
2) register an event-time punctuation (maybe for each key? or for a
range of keys?) and check on a regular basis if there is anything that
you want to forward
3) similar to (2), however use wall-clock time


-Matthias

On 10/16/19 1:50 PM, Navneeth Krishnan wrote:
> Thanks a lot Matthias. I looked at the aggregation and I'm fine with
> aggregating data before forwarding it to downstream but the
> KStreamWindowAggregateProcessor::process
> uses the key to determine whether the data has to be aggregated and
> forwarded or not. My worry is if I have a tumbling window for 2 seconds and
> there is just one data then how can I evict this data and forward it to
> downstream when there is no incoming data for the same key.
> 
> In short, Is there a way to evict the data from window even when there is
> no incoming data for the key.
> 
> Thanks
> 
> On Tue, Oct 15, 2019 at 1:40 AM Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> A window store contains key-value pair, with key being the window id
>> (key + window-start-timestamp) and the value being the current
>> aggregate. It does not store a list of input values.
>>
>> If you want to store a list of input values, you would need to make the
>> value type a `List` (or similar) or values. However, this is not
>> recommended and for large window might not work as all, because each
>> key-value pair will be stored in the corresponding changelog topic as a
>> single message (hence, you might hit the `max.message.bytes` limit --
>> you could of course increase it).
>>
>> Hence, it's recommended to re-compute the aggregation for each input
>> record incrementally if possible (not sure what aggregation you want to
>> do and if that is possible). That is also what
>> `KStreamWindowAggregateProcessor` does:
>>
>> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java#L105
>>
>> Hope that helps.
>>
>>
>> -Matthias
>>
>> On 10/15/19 12:56 AM, Navneeth Krishnan wrote:
>>> Hi All,
>>>
>>> I'm trying to create a tumbling time window of two seconds using PAPI. I
>>> have a TimestampWindowStore with both retention and window size as 2
>>> seconds and retainDuplicates as false.
>>>
>>>
>> Stores.timestampedWindowStoreBuilder(Stores.persistentTimestampedWindowStore("window-store-1",
>>>         Duration.ofMillis(2000), Duration.ofMillis(2000), false)
>>>
>>>
>>> In my process function I keep adding data to the state store and I have a
>>> scheduled task initiated which runs every seconds to collect the data
>>> inside the window. The way I thought the window store iterator will
>> return
>>> is Key and List of records for that window but it's returning just a
>> window
>>> object and a record. How can I aggregate this and collectively send to
>>> downstream?
>>>
>>> I looked at KGroupedStreamImpl for reference but it seems quite
>> confusing.
>>> Any advice on how this can be done?
>>>
>>> Thanks
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to