First, windows are only created if there is actual data for a window. So you get windows [0, 50), [25, 75), [50, 100) only if there are record falling into each window (btw: window start-time is inclusive while window end time is exclusive). If you have only 2 record with lets say ts=20 and ts=90 you will not have an open window [25,75). Each window is physically created each time the first record for it is processed.
If you have above 4 windows and a record with ts=101 arrives, a new window [101,151) will be created. Window [0,50) will not be deleted yet, because retention is 100 and thus Streams guarantees that all record with ts >= 1 (= 101 - 100) are still processed correctly and those records would fall into window [0,50). Thus, window [0,50) can be dropped, if time advanced to TS = 150, but not before that. -Matthias On 12/13/16 12:06 AM, Sachin Mittal wrote: > Hi, > So is until for future or past? > Say I get first record at t = 0 and until is 100 and my window size is 50 > advance by 25. > I understand it will create windows (0, 50), (25, 75), (50, 100) > Now at t = 101 it will drop > (0, 50), (25, 75), (50, 100) and create > (101, 150), (125, 175), (150, 200) > > Please confirm if this understanding us correct. It is not clear how it > will handle overlapping windows (75, 125) and (175, 225) and so on? > > What case is not clear again is that at say t = 102 I get some message with > timestamp 99. What happens then? > Will the result added to previous aggregation of (50, 100) or (75, 125), > like it should. > > Or it will recreate the old window (50, 100) and aggregate the value there > and then drop it. This would result is wrong aggregated value, as it does > not consider the previous aggregated values. > > So this is the pressing case I am not able to understand. Maybe I am wrong > at some basic understanding. > > > Next for > The parameter >> windowstore.changelog.additional.retention.ms > > How does this relate to rentention.ms param of topic config? > I create internal topic manually using say rentention.ms=3600000. > In next release (post kafka_2.10-0.10.0.1) since we support delete of > internal changelog topic as well and I want it to be retained for say just > 1 hour. > So how does that above parameter interfere with this topic level setting. > Or now I just need to set above config as 3600000 and not add > rentention.ms=3600000 > while creating internal topic. > This is just another doubt remaining here. > > Thanks > Sachin > > > > On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> Sachin, >> >> There is no reason to have an .until() AND a .retain() -- just increase >> the value of .until() >> >> If you have a window of let's say 1h size and you set .until() also to >> 1h -- you can obviously not process any late arriving data. If you set >> until() to 2h is this example, you can process data that is up to 1h >> delayed. >> >> So basically, the retention should always be larger than you window size. >> >> The parameter >>> windowstore.changelog.additional.retention.ms >> >> is applies to changelog topics that backup window state stores. Those >> changelog topics are compacted. However, the used key does encode an >> window ID and thus older data can never be cleaned up by compaction. >> Therefore, an additional retention time is applied to those topics, too. >> Thus, if an old window is not updated for this amount of time, it will >> get deleted eventually preventing this topic to grown infinitely. >> >> The value will be determined by until(), i.e., whatever you specify in >> .until() will be used to set this parameter. >> >> >> -Matthias >> >> On 12/12/16 1:07 AM, Sachin Mittal wrote: >>> Hi, >>> We are facing the exact problem as described by Matthias above. >>> We are keeping default until which is 1 day. >>> >>> Our record's times tamp extractor has a field which increases with time. >>> However for short time we cannot guarantee the time stamp is always >>> increases. So at the boundary ie after 24 hrs we can get records which >> are >>> beyond that windows retention period. >>> >>> Then it happens like it is mentioned above and our aggregation fails. >>> >>> So just to sum up when we get record >>> 24h + 1 sec (it deletes older window and since the new record belongs to >>> the new window its gets created) >>> Now when we get next record of 24 hs - 1 sec since older window is >> dropped >>> it does not get aggregated in that bucket. >>> >>> I suggest we have another setting next to until call retain which retains >>> the older windows into next window. >>> >>> I think at stream window boundary level it should use a concept of >> sliding >>> window. So we can define window like >>> >>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 * >> 1000l).untill(7 >>> * 24 * 3600 * 1000l).retain(900 * 1000l) >>> >>> So after 7 days it retains the data covered by windows in last 15 minutes >>> which rolls over the data in them to next window. This way streams work >>> continuously. >>> >>> Please let us know your thoughts on this. >>> >>> On another side question on this there is a setting: >>> >>> windowstore.changelog.additional.retention.ms >>> I is not clear what is does. Is this the default for until? >>> >>> Thanks >>> Sachin >>> >>> >>> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax <matth...@confluent.io >>> >>> wrote: >>> >>>> Windows are created on demand, ie, each time a new record arrives and >>>> there is no window yet for it, a new window will get created. >>>> >>>> Windows are accepting data until their retention time (that you can >>>> configure via .until()) passed. Thus, you will have many windows being >>>> open in parallel. >>>> >>>> If you read older data, they will just be put into the corresponding >>>> windows (as long as window retention time did not pass). If a window was >>>> discarded already, a new window with this single (later arriving) record >>>> will get created, the computation will be triggered, you get a result, >>>> and afterwards the window is deleted again (as it's retention time >>>> passed already). >>>> >>>> The retention time is driven by "stream-time", in internal tracked time >>>> that only progressed in forward direction. It gets it value from the >>>> timestamps provided by TimestampExtractor -- thus, per default it will >>>> be event-time. >>>> >>>> -Matthias >>>> >>>> On 12/11/16 3:47 PM, Jon Yeargers wrote: >>>>> I've read this and still have more questions than answers. If my data >>>> skips >>>>> about (timewise) what determines when a given window will start / stop >>>>> accepting new data? What if Im reading data from some time ago? >>>>> >>>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax < >> matth...@confluent.io> >>>>> wrote: >>>>> >>>>>> Please have a look here: >>>>>> >>>>>> http://docs.confluent.io/current/streams/developer- >>>>>> guide.html#windowing-a-stream >>>>>> >>>>>> If you have further question, just follow up :) >>>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> >>>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote: >>>>>>> Ive added the 'until()' clause to some aggregation steps and it's >>>> working >>>>>>> wonders for keeping the size of the state store in useful >> boundaries... >>>>>> But >>>>>>> Im not 100% clear on how it works. >>>>>>> >>>>>>> What is implied by the '.until()' clause? What determines when to >> stop >>>>>>> receiving further data - is it clock time (since the window was >>>> created)? >>>>>>> It seems problematic for it to refer to EventTime as this may bounce >>>> all >>>>>>> over the place. For non-overlapping windows a given record can only >>>> fall >>>>>>> into a single aggregation period - so when would a value get >> discarded? >>>>>>> >>>>>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 * >>>>>> 1000L).until(10 * >>>>>>> 1000L))' - but what is this accomplishing? >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature