Sachin,

There is no reason to have an .until() AND a .retain() -- just increase
the value of .until()

If you have a window of let's say 1h size and you set .until() also to
1h -- you can obviously not process any late arriving data. If you set
until() to 2h is this example, you can process data that is up to 1h
delayed.

So basically, the retention should always be larger than you window size.

The parameter
> windowstore.changelog.additional.retention.ms

is applies to changelog topics that backup window state stores. Those
changelog topics are compacted. However, the used key does encode an
window ID and thus older data can never be cleaned up by compaction.
Therefore, an additional retention time is applied to those topics, too.
Thus, if an old window is not updated for this amount of time, it will
get deleted eventually preventing this topic to grown infinitely.

The value will be determined by until(), i.e., whatever you specify in
.until() will be used to set this parameter.


-Matthias

On 12/12/16 1:07 AM, Sachin Mittal wrote:
> Hi,
> We are facing the exact problem as described by Matthias above.
> We are keeping default until which is 1 day.
> 
> Our record's times tamp extractor has a field which increases with time.
> However for short time we cannot guarantee the time stamp is always
> increases. So at the boundary ie after 24 hrs we can get records which are
> beyond that windows retention period.
> 
> Then it happens like it is mentioned above and our aggregation fails.
> 
> So just to sum up when we get record
> 24h + 1 sec (it deletes older window and since the new record belongs to
> the new window its gets created)
> Now when we get next record of 24 hs - 1 sec since older window is dropped
> it does not get aggregated in that bucket.
> 
> I suggest we have another setting next to until call retain which retains
> the older windows into next window.
> 
> I think at stream window boundary level it should use a concept of sliding
> window. So we can define window like
> 
> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 * 1000l).untill(7
> * 24 * 3600 * 1000l).retain(900 * 1000l)
> 
> So after 7 days it retains the data covered by windows in last 15 minutes
> which rolls over the data in them to next window. This way streams work
> continuously.
> 
> Please let us know your thoughts on this.
> 
> On another side question on this there is a setting:
> 
> windowstore.changelog.additional.retention.ms
> I is not clear what is does. Is this the default for until?
> 
> Thanks
> Sachin
> 
> 
> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> Windows are created on demand, ie, each time a new record arrives and
>> there is no window yet for it, a new window will get created.
>>
>> Windows are accepting data until their retention time (that you can
>> configure via .until()) passed. Thus, you will have many windows being
>> open in parallel.
>>
>> If you read older data, they will just be put into the corresponding
>> windows (as long as window retention time did not pass). If a window was
>> discarded already, a new window with this single (later arriving) record
>> will get created, the computation will be triggered, you get a result,
>> and afterwards the window is deleted again (as it's retention time
>> passed already).
>>
>> The retention time is driven by "stream-time", in internal tracked time
>> that only progressed in forward direction. It gets it value from the
>> timestamps provided by TimestampExtractor -- thus, per default it will
>> be event-time.
>>
>> -Matthias
>>
>> On 12/11/16 3:47 PM, Jon Yeargers wrote:
>>> I've read this and still have more questions than answers. If my data
>> skips
>>> about (timewise) what determines when a given window will start / stop
>>> accepting new data? What if Im reading data from some time ago?
>>>
>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <matth...@confluent.io>
>>> wrote:
>>>
>>>> Please have a look here:
>>>>
>>>> http://docs.confluent.io/current/streams/developer-
>>>> guide.html#windowing-a-stream
>>>>
>>>> If you have further question, just follow up :)
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote:
>>>>> Ive added the 'until()' clause to some aggregation steps and it's
>> working
>>>>> wonders for keeping the size of the state store in useful boundaries...
>>>> But
>>>>> Im not 100% clear on how it works.
>>>>>
>>>>> What is implied by the '.until()' clause? What determines when to stop
>>>>> receiving further data - is it clock time (since the window was
>> created)?
>>>>> It seems problematic for it to refer to EventTime as this may bounce
>> all
>>>>> over the place. For non-overlapping windows a given record can only
>> fall
>>>>> into a single aggregation period - so when would a value get discarded?
>>>>>
>>>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 *
>>>> 1000L).until(10 *
>>>>> 1000L))'  - but what is this accomplishing?
>>>>>
>>>>
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to