Hi, I have created a pull request for this: https://github.com/apache/flink/pull/2736
Regards, Vishnu On Tue, Oct 18, 2016 at 3:34 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > yes, I think it's fine if we keep it in the same package as the Evictor. > StreamRecord is more of an internal class that should not really be user > facing, that's my motivation for replacing it. > > Cheers, > Aljoscha > > On Mon, 17 Oct 2016 at 19:23 Vishnu Viswanath < > vishnu.viswanat...@gmail.com> > wrote: > > > Hi Aljoscha, > > > > Thanks for the response. > > > > I did think about creating a new class similar to TimestampedValue as you > > suggested, but that class looked almost same as the current > StreamRecord<T> > > class. (Both have a timestamp field and a value field). > > > > Do you think it is fine to have another class for holding > (timestamp,value) > > tuple? > > > > Regards, > > Vishnu > > > > On Mon, Oct 17, 2016 at 4:19 AM, Aljoscha Krettek <aljos...@apache.org> > > wrote: > > > > > Hi Vishnu, > > > what you suggested is spot on! Please go forward with it like this. > > > > > > One small suggestion would be to change Tuple2<Long, T> to something > like > > > TimestampedValue<T> to not rely on tuples because they can be confusing > > for > > > people who write Scala code because they are not Scala tuples. That's > not > > > strictly necessary, though, you can spin it however you like. > > > > > > Cheers, > > > Aljoscha > > > > > > On Fri, 7 Oct 2016 at 18:46 Vishnu Viswanath < > > vishnu.viswanat...@gmail.com > > > > > > > wrote: > > > > > > > Hi Radu, > > > > > > > > Yes we can remove elements randomly using iterator.remove() > > > > > > > > Regards, > > > > Vishnu > > > > > > > > On Fri, Oct 7, 2016 at 2:57 AM, Radu Tudoran < > radu.tudo...@huawei.com> > > > > wrote: > > > > > > > > > Hi, > > > > > > > > > > I must apologies that I missed some of the email exchanges on this > > > thread > > > > > and thus my remark/question might have been already settled. > > > > > > > > > > Does this interface you propose enable to remove also elements out > of > > > > > order e.g., assuming I have elements 1,2,3,4,5 in the window buffer > > to > > > be > > > > > able to evict 2 and 4? > > > > > We discussed about this some email exchanges ago but as I said I am > > not > > > > > sure if this functionality is captured in this interface. > Basically, > > > will > > > > > the typical remove() method from Iterators be available? > > > > > > > > > > Best regards, > > > > > > > > > > > > > > > -----Original Message----- > > > > > From: Vishnu Viswanath [mailto:vishnu.viswanat...@gmail.com] > > > > > Sent: Friday, October 07, 2016 8:29 AM > > > > > To: Dev > > > > > Subject: Re: [DISCUSS][FLIP-4] Enhance Window Evictor in Flink > > > > > > > > > > Hi Aljoscha, > > > > > > > > > > To pass the time information to Evictor at the same to not expose > the > > > > > StreamRecord, I suppose we can change the signature of evictBefore > > and > > > > > evictAfter to take Iterable<Tuple2<Long, T>> instead > > > > > Iterable<StreamRecord<T>> > > > > > > > > > > void evictBefore(Iterable<Tuple2<Long, T>> elements, int size, W > > > window, > > > > > EvictorContext evictorContext); > > > > > > > > > > The fire() method of EvictingWindowOperator can transform the > > > > > Iterable<StreamRecord<IN>> to FluentIterable<Tuple2<Long, IN>> and > > pass > > > > it > > > > > on to the evictor(where f0 will be the timestamp and f1 will the > > > value). > > > > > That way the TimeEvictor will work for EventTime or IngestionTime > as > > > long > > > > > as timestamp is set in the StreamRecord. In case timestamp is not > > set, > > > > > TimeEvictor can capture this by checking the Tuple2.f0 (which will > be > > > > > Long.MIN_VALUE) and ignore the eviction. > > > > > > > > > > If you think this is fine, I will make the changes and also edit > the > > > > FLIP. > > > > > > > > > > Regards, > > > > > Vishnu > > > > > > > > > > > > > > > On Wed, Oct 5, 2016 at 9:49 PM, Vishnu Viswanath < > > > > > vishnu.viswanat...@gmail.com> wrote: > > > > > > > > > > > Thank you Aljoscha, > > > > > > > > > > > > Yes, I agree we don't need ProcessingTimeEvcitor. > > > > > > I will change the current TimeEvictors to use EventTimeEvictor as > > > > > > suggested. > > > > > > > > > > > > Also, figure out a way to pass timestamp to Evictor interface so > > that > > > > we > > > > > > can avoid exposing StreamRecrods. > > > > > > > > > > > > Regards, > > > > > > Vishnu > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Sep 20, 2016 at 4:33 AM, Aljoscha Krettek < > > > aljos...@apache.org > > > > > > > > > > > wrote: > > > > > > > > > > > >> Hi, > > > > > >> now you again see what I mentioned a while back: eviction based > on > > > > > >> processing time is not really well defined. I think we can > > > completely > > > > > get > > > > > >> rid of "processing time eviction" because it can be replaced by > > > > > something > > > > > >> like this: > > > > > >> > > > > > >> DataStream input = ... > > > > > >> DataStream withTimestamps = > > input.assignTimestampsAndWatermarks(new > > > > > >> IngestionTimeExtractor()) // this will assign the current > > processing > > > > > time > > > > > >> as timestamp > > > > > >> withTimestamps > > > > > >> .keyBy(...) > > > > > >> .window(...) > > > > > >> .evictor(new EventTimeEvictor()) > > > > > >> .apply(...) > > > > > >> > > > > > >> With this, we would just have to find a good way of passing the > > > > > timestamps > > > > > >> in the Evictor interface and a good way of implementing the > > > > > >> EvictingWindowOperator. > > > > > >> > > > > > >> Cheers, > > > > > >> Aljoscha > > > > > >> > > > > > >> > > > > > >> On Sun, 18 Sep 2016 at 18:14 Vishnu Viswanath < > > > > > >> vishnu.viswanat...@gmail.com> > > > > > >> wrote: > > > > > >> > > > > > >> > Hi Aljoscha, > > > > > >> > > > > > > >> > A) > > > > > >> > I tried the approach where we set the ProcessingTime > explicitly > > by > > > > > >> > converting DataStream<T> input to DataStream<Tuple2<Long, T>> > > > using > > > > > map > > > > > >> > function and below are my observations: > > > > > >> > 1. All the current code which uses TimeEvictor (which will be > by > > > > > default > > > > > >> > changed to ProcessingTimeEvictor) will be forced to implement > a > > > > > mapping > > > > > >> > Function to agree with the new method signature. > > > > > >> > 2. Even after doing the above mapping function, the timestamp > > > field > > > > of > > > > > >> the > > > > > >> > StreamRecord will not be changed. Which might be confusing > since > > > now > > > > > we > > > > > >> > have two timestamps for the record, one set by the mapping > > > function, > > > > > >> other > > > > > >> > in the StreamRecord. > > > > > >> > 3. Having a Stream of Tuple2<Long, T> makes it confusing to do > > the > > > > > keyBy > > > > > >> > and also the now the WindowFunction has to process > > Tuple2<Long,T> > > > > > >> instead > > > > > >> > of T. > > > > > >> > 4. Users might get confused on how to set the ProcessingTime > > since > > > > > >> > ProcessingTime is the time at which the records are processed > > and > > > > > users > > > > > >> > might expect that to be a responsibility of Flink > > > > > >> > > > > > > >> > Ideally, ProcessingTime should be the time at which a > > StreamRecord > > > > is > > > > > >> > processed. And if a record is Processed multiple times, e.g., > in > > > the > > > > > >> case > > > > > >> > when an element was not evicted from the window, hence > processed > > > > again > > > > > >> > during the next trigger the ProcessingTime should be the time > at > > > > which > > > > > >> the > > > > > >> > record was seen/processed the first time. "If my understanding > > of > > > > > >> > ProcessingTime is correct", I am thinking I can iterate > through > > > the > > > > > >> records > > > > > >> > and set the current timestamp as the ProcessingTime if absent. > > > > (before > > > > > >> > doing the eviction) > > > > > >> > > > > > > >> > Something like: > > > > > >> > for(StreamRecord<Object> element: elements) { > > > > > >> > if (!element.hasTimestamp()) { > > > > > >> > element.setTimestamp(System.currentTimeMillis()); > > > > > >> > } > > > > > >> > } > > > > > >> > > > > > > >> > B) Regarding not exposing StreamRecord<IN> in the Evictor. If > > > > Evictor > > > > > is > > > > > >> > given Iterable<IN> then we cannot retrieve time information of > > the > > > > > >> records > > > > > >> > in the EventTimeEvictor do the eviction (but I do see that > > > > > StreamRecord > > > > > >> is > > > > > >> > marked with @Internal) > > > > > >> > > > > > > >> > C) Regarding modifying WindowOperator class to take type > > parameter > > > > <S > > > > > >> > extends AppendingState<IN, ACC>> so that we can remove the > > > duplicate > > > > > >> code > > > > > >> > from EvictingWindowOperator, I would prefer to separate it > from > > > this > > > > > >> FLIP > > > > > >> > and create a JIRA for it, what do you say? > > > > > >> > > > > > > >> > Please let me know your thoughts. > > > > > >> > > > > > > >> > Regards, > > > > > >> > Vishnu > > > > > >> > > > > > > >> > On Sun, Jul 31, 2016 at 12:07 PM, Aljoscha Krettek < > > > > > aljos...@apache.org > > > > > >> > > > > > > >> > wrote: > > > > > >> > > > > > > >> > > Hi, > > > > > >> > > regarding a), b) and c): The WindowOperator can be extended > to > > > > have > > > > > >> this > > > > > >> > > signature: > > > > > >> > > public class WindowOperator<K, IN, ACC, OUT, W extends > > Window, S > > > > > >> extends > > > > > >> > > AppendingState<IN, ACC>> > > > > > >> > > > > > > > >> > > that way the shape of state is generic and > > > EvictingWindowOperator > > > > > can > > > > > >> use > > > > > >> > > ListState<IN> there. > > > > > >> > > > > > > > >> > > regarding 2.: Yes, we can either take the current processing > > > > > >> time/event > > > > > >> > > time or the max timestamp of elements in the window as the > > > > benchmark > > > > > >> > > against which we compare. > > > > > >> > > > > > > > >> > > About ProcessingTimeEvictor: the proposal was to make the > > > > timestamp > > > > > >> > > explicit in the type of elements. Otherwise, how would you > > > access > > > > > the > > > > > >> > > processing time of each element? (As I said, the timestamp > > field > > > > in > > > > > >> > > StreamRecord does not usually contain a processing-time > > > timestamp > > > > > and > > > > > >> I > > > > > >> > > would like to remove the StreamRecord from the type of the > > > > Iterable > > > > > >> that > > > > > >> > is > > > > > >> > > passed to the evictor to avoid code duplication in > > > > > >> > EvictingWindowOperator) > > > > > >> > > I'm open for suggestions there since I didn't come up with a > > > > better > > > > > >> > > solution yet. :-) > > > > > >> > > > > > > > >> > > Cheers, > > > > > >> > > Aljoscha > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > On Sat, 30 Jul 2016 at 05:56 Vishnu Viswanath < > > > > > >> > > vishnu.viswanat...@gmail.com> > > > > > >> > > wrote: > > > > > >> > > > > > > > >> > > > Hi Aljoscha, > > > > > >> > > > > > > > > >> > > > 1. Regarding the Evictor interface taking Iterable<IN> > > instead > > > > of > > > > > >> > > > StreamRecord - > > > > > >> > > > > > > > > >> > > > a) I am not quite sure I understood what you meant by > *"It > > > > could > > > > > >> be a > > > > > >> > > very > > > > > >> > > > thin subclass of WindowOperator"* - Currently, most of the > > > code > > > > > >> > > duplication > > > > > >> > > > in EvictingWindowOperator is due to the > > windowStateDescriptor > > > > > >> > (ListState > > > > > >> > > > instead of AppendingState compared to WindowOperator). Is > > this > > > > > >> > correct?. > > > > > >> > > > > > > > > >> > > > b) Do you hope to keep using AppendingState instead of > > > > ListState > > > > > to > > > > > >> > > avoid > > > > > >> > > > the duplicate code (e.g., processWatermark(), trigger() > > etc). > > > If > > > > > we > > > > > >> use > > > > > >> > > > AppendingState, the get() method returns an state of the > OUT > > > > type > > > > > >> ACC, > > > > > >> > > > which cannot be passed to Evictor. So I am assuming we > will > > > have > > > > > to > > > > > >> > keep > > > > > >> > > > using ListState here. > > > > > >> > > > > > > > > >> > > > c) My not so good idea was to use the FluentIterable to > > > convert > > > > > the > > > > > >> > > > Iterable<StreamRecord<IN>> to Iterable<IN> and pass it on > to > > > > > Evictor > > > > > >> > and > > > > > >> > > > Window function. Evictor can remove the elements from the > > > > > Iterable. > > > > > >> > (Even > > > > > >> > > > Window function can remove elements). Then clear the state > > and > > > > add > > > > > >> > > > elements(after removal) back to the state. But in that > > case, I > > > > > need > > > > > >> to > > > > > >> > > > reconstruct StreamRecord<IN> from IN. Doing so, we will > lose > > > the > > > > > >> > > timestamp > > > > > >> > > > information that might have been previously set on the > > > original > > > > > >> > > > StreamRecord<IN> - is there any other way to recreate > > > > > StreamRecord? > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > 2. Regarding ProcessingTimeEvictor - > > > > > >> > > > > > > > > >> > > > A TimeEvictor has to evict elements from the window which > > are > > > > > older > > > > > >> > than > > > > > >> > > a > > > > > >> > > > given Period from the element with maximum timestamp in > the > > > > > window. > > > > > >> > When > > > > > >> > > > considering ProcessingTimestamp(even if it was explicitly > > > set), > > > > > >> > shouldn't > > > > > >> > > > the timestamp associated with records be strictly > > increasing. > > > > > i.e., > > > > > >> > newer > > > > > >> > > > elements should have higher timestamp than earlier > elements. > > > So > > > > to > > > > > >> get > > > > > >> > > the > > > > > >> > > > max timestamp we could just get the last element. When > using > > > > > >> > > > EventTimeEvictor, the elements might have arrived out of > > order > > > > > >> hence we > > > > > >> > > > can't just take the timestamp of the last element as > maximum > > > > > >> timestamp, > > > > > >> > > but > > > > > >> > > > check each and every element in the window. > > > > > >> > > > > > > > > >> > > > We should have two versions of TimeEvictors - EventTime > and > > > > > >> > > ProcessingTime, > > > > > >> > > > but does ProcessingTimeEvictor need to take a > Tupel2<Long,T> > > > > since > > > > > >> > > anyways > > > > > >> > > > we are going to get the max timestamp by looking at the > last > > > > > >> element in > > > > > >> > > the > > > > > >> > > > window?. > > > > > >> > > > > > > > > >> > > > Thanks, > > > > > >> > > > Vishnu > > > > > >> > > > > > > > > >> > > > On Fri, Jul 29, 2016 at 6:22 AM, Aljoscha Krettek < > > > > > >> aljos...@apache.org > > > > > >> > > > > > > > >> > > > wrote: > > > > > >> > > > > > > > > >> > > > > About processing time and timestamps: > > > > > >> > > > > > > > > > >> > > > > The timestamp is either set in the source of in an > > > > > >> > > > > in-between TimestampAssigner that can be used with > > > > > >> > > > > DataStream.assignTimestampsAndWatermarks(). However, > the > > > > > >> timestamp in > > > > > >> > > the > > > > > >> > > > > element is normally not a "processing-time timestamp". I > > > think > > > > > it > > > > > >> > might > > > > > >> > > > > make sense to split the functionality for the evictors > > into > > > > two > > > > > >> > parts: > > > > > >> > > > one > > > > > >> > > > > that implicitly sets a timestamp and one that uses these > > > > > >> timestamps. > > > > > >> > It > > > > > >> > > > > could look like this: > > > > > >> > > > > > > > > > >> > > > > DataStream<T> input = ... > > > > > >> > > > > // this makes the current processing time explicit in > the > > > > > tuples: > > > > > >> > > > > DataStream<Tuple2<Long, T>> withTimestamps = > input.map(new > > > > > >> > > > > ReifyProcessingTIme<T>()); > > > > > >> > > > > withTimestamps > > > > > >> > > > > .keyBy(...) > > > > > >> > > > > .window(..) > > > > > >> > > > > .evictor(new ProcessingTimeEvictor<T>()) > > > > > >> > > > > .apply(...) > > > > > >> > > > > > > > > > >> > > > > where ProcessingTimeEvictor looks like this: > > > > > >> > > > > > > > > > >> > > > > class ProcessingTimeEvictor<T> extends > > Evictor<Tuple2<Long, > > > > T>> > > > > > { > > > > > >> > > > > void evictBefore(Iterable<Tuple2<Long, T>>, ...); > > > > > >> > > > > void evictAfter ... > > > > > >> > > > > } > > > > > >> > > > > > > > > > >> > > > > This would make everything that is happening explicit in > > the > > > > > type > > > > > >> > > > > signatures and explicit for the user. > > > > > >> > > > > > > > > > >> > > > > Cheers, > > > > > >> > > > > Aljoscha > > > > > >> > > > > > > > > > >> > > > > On Thu, 28 Jul 2016 at 18:32 Aljoscha Krettek < > > > > > >> aljos...@apache.org> > > > > > >> > > > wrote: > > > > > >> > > > > > > > > > >> > > > > > Hi, > > > > > >> > > > > > in fact, changing it to Iterable<IN> would simplify > > things > > > > > >> because > > > > > >> > > then > > > > > >> > > > > we > > > > > >> > > > > > would not have to duplicate code for the > > > > > EvictingWindowOperator > > > > > >> any > > > > > >> > > > more. > > > > > >> > > > > > It could be a very thin subclass of WindowOperator. > > > > > >> > > > > > > > > > > >> > > > > > Cheers, > > > > > >> > > > > > Aljoscha > > > > > >> > > > > > > > > > > >> > > > > > On Wed, 27 Jul 2016 at 03:56 Vishnu Viswanath < > > > > > >> > > > > > vishnu.viswanat...@gmail.com> wrote: > > > > > >> > > > > > > > > > > >> > > > > >> Hi Aljoscha, > > > > > >> > > > > >> > > > > > >> > > > > >> Regarding your concern - to not expose the > > StreamRecord > > > in > > > > > the > > > > > >> > > > Evictor, > > > > > >> > > > > >> were you able to find any alternative? > > > > > >> > > > > >> > > > > > >> > > > > >> I tried to make the methods take Iterable<IN> input > > > similar > > > > > to > > > > > >> the > > > > > >> > > > > >> WindowFunction, but that didn't work since we have to > > > clear > > > > > the > > > > > >> > > state > > > > > >> > > > > and > > > > > >> > > > > >> add the elements back to the state (to fix the bug > > > > mentioned > > > > > in > > > > > >> > the > > > > > >> > > > > >> previous mail) > > > > > >> > > > > >> > > > > > >> > > > > >> If you think the interface that accepts > > > > > >> Iterable<StreamRecord<T>> > > > > > >> > > > > >> elements is > > > > > >> > > > > >> good enough, I have the changes ready. > > > > > >> > > > > >> > > > > > >> > > > > >> Thanks, > > > > > >> > > > > >> Vishnu > > > > > >> > > > > >> > > > > > >> > > > > >> On Mon, Jul 25, 2016 at 7:48 AM, Aljoscha Krettek < > > > > > >> > > > aljos...@apache.org> > > > > > >> > > > > >> wrote: > > > > > >> > > > > >> > > > > > >> > > > > >> > Hi, > > > > > >> > > > > >> > the elements are currently not being removed from > the > > > > > >> buffers. > > > > > >> > > > That's > > > > > >> > > > > a > > > > > >> > > > > >> bug > > > > > >> > > > > >> > that we could fix while adding the new Evictor > > > interface. > > > > > >> > > > > >> > > > > > > >> > > > > >> > Cheers, > > > > > >> > > > > >> > Aljoscha > > > > > >> > > > > >> > > > > > > >> > > > > >> > On Mon, 25 Jul 2016 at 13:00 Radu Tudoran < > > > > > >> > > radu.tudo...@huawei.com> > > > > > >> > > > > >> wrote: > > > > > >> > > > > >> > > > > > > >> > > > > >> > > Hi Aljoscha, > > > > > >> > > > > >> > > > > > > > >> > > > > >> > > Can you point us to the way it is handled now. Is > > > there > > > > > >> > anything > > > > > >> > > > > else > > > > > >> > > > > >> for > > > > > >> > > > > >> > > the removing of elements other than the skip in > > > > > >> > > > > >> EvictingWindowOperator. > > > > > >> > > > > >> > Is > > > > > >> > > > > >> > > there something as it was before version 1.x > where > > > you > > > > > had > > > > > >> an > > > > > >> > > > > explicit > > > > > >> > > > > >> > > remove from window buffers? > > > > > >> > > > > >> > > > > > > > >> > > > > >> > > Dr. Radu Tudoran > > > > > >> > > > > >> > > Research Engineer - Big Data Expert > > > > > >> > > > > >> > > IT R&D Division > > > > > >> > > > > >> > > > > > > > >> > > > > >> > > > > > > > >> > > > > >> > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > > > > >> > > > > >> > > European Research Center > > > > > >> > > > > >> > > Riesstrasse 25, 80992 München > > > > > >> > > > > >> > > > > > > > >> > > > > >> > > E-mail: radu.tudo...@huawei.com > > > > > >> > > > > >> > > Mobile: +49 15209084330 <01520%209084330> > > <01520%209084330> > > > > > >> > > > > >> > > Telephone: +49 891588344173 <089%201588344173> > > <089%201588344173> > > > > > >> > > > > >> > > > > > > > >> > > > > >> > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > > > > >> > > > > >> > > Hansaallee 205, 40549 Düsseldorf, Germany, > > > > > www.huawei.com > > > > > >> > > > > >> > > Registered Office: Düsseldorf, Register Court > > > > Düsseldorf, > > > > > >> HRB > > > > > >> > > > 56063, > > > > > >> > > > > >> > > Managing Director: Bo PENG, Wanzhou MENG, Lifang > > CHEN > > > > > >> > > > > >> > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht > > > > > Düsseldorf, > > > > > >> HRB > > > > > >> > > > > 56063, > > > > > >> > > > > >> > > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang > CHEN > > > > > >> > > > > >> > > This e-mail and its attachments contain > > confidential > > > > > >> > information > > > > > >> > > > > from > > > > > >> > > > > >> > > HUAWEI, which is intended only for the person or > > > entity > > > > > >> whose > > > > > >> > > > > address > > > > > >> > > > > >> is > > > > > >> > > > > >> > > listed above. Any use of the information > contained > > > > herein > > > > > >> in > > > > > >> > any > > > > > >> > > > way > > > > > >> > > > > >> > > (including, but not limited to, total or partial > > > > > >> disclosure, > > > > > >> > > > > >> > reproduction, > > > > > >> > > > > >> > > or dissemination) by persons other than the > > intended > > > > > >> > > recipient(s) > > > > > >> > > > is > > > > > >> > > > > >> > > prohibited. If you receive this e-mail in error, > > > please > > > > > >> notify > > > > > >> > > the > > > > > >> > > > > >> sender > > > > > >> > > > > >> > > by phone or email immediately and delete it! > > > > > >> > > > > >> > > > > > > > >> > > > > >> > > > > > > > >> > > > > >> > > -----Original Message----- > > > > > >> > > > > >> > > From: Aljoscha Krettek [mailto: > aljos...@apache.org > > ] > > > > > >> > > > > >> > > Sent: Monday, July 25, 2016 11:45 AM > > > > > >> > > > > >> > > To: dev@flink.apache.org > > > > > >> > > > > >> > > Subject: Re: [DISCUSS][FLIP-4] Enhance Window > > Evictor > > > > in > > > > > >> Flink > > > > > >> > > > > >> > > > > > > > >> > > > > >> > > Hi, > > > > > >> > > > > >> > > I think there is not yet a clear specification > for > > > how > > > > > the > > > > > >> > > actual > > > > > >> > > > > >> removal > > > > > >> > > > > >> > > of elements from the buffer will work. I think > > > naively > > > > > one > > > > > >> can > > > > > >> > > do: > > > > > >> > > > > >> > > > > > > > >> > > > > >> > > Iterable<E> currentElements = state.get() > > > > > >> > > > > >> > > evictor.evict(currentElements); // this will > remove > > > > some > > > > > >> stuff > > > > > >> > > > from > > > > > >> > > > > >> > there, > > > > > >> > > > > >> > > or mark for removal > > > > > >> > > > > >> > > > > > > > >> > > > > >> > > state.clear() > > > > > >> > > > > >> > > // the Iterable does not loop over the > > removed/marked > > > > > >> elements > > > > > >> > > > > >> > > for (E element : currentElements) { > > > > > >> > > > > >> > > state.add(element) > > > > > >> > > > > >> > > } > > > > > >> > > > > >> > > > > > > > >> > > > > >> > > This is very costly but the only way I see of > doing > > > > this > > > > > >> right > > > > > >> > > now > > > > > >> > > > > >> with > > > > > >> > > > > >> > > every state backend. > > > > > >> > > > > >> > > > > > > > >> > > > > >> > > Cheers, > > > > > >> > > > > >> > > Aljoscha > > > > > >> > > > > >> > > > > > > > >> > > > > >> > > On Mon, 25 Jul 2016 at 09:46 Radu Tudoran < > > > > > >> > > > radu.tudo...@huawei.com> > > > > > >> > > > > >> > wrote: > > > > > >> > > > > >> > > > > > > > >> > > > > >> > > > Hi, > > > > > >> > > > > >> > > > > > > > > >> > > > > >> > > > Thanks for the clarification. Can someone point > > to > > > > > where > > > > > >> the > > > > > >> > > > > events > > > > > >> > > > > >> are > > > > > >> > > > > >> > > > removed from buffers - I am trying to > understand > > > the > > > > > new > > > > > >> > logic > > > > > >> > > > of > > > > > >> > > > > >> > > handling > > > > > >> > > > > >> > > > the eviction in this new API. Thanks > > > > > >> > > > > >> > > > > > > > > >> > > > > >> > > > > > > > > >> > > > > >> > > > > > > > > >> > > > > >> > > > -----Original Message----- > > > > > >> > > > > >> > > > From: Vishnu Viswanath [mailto: > > > > vishnu.viswanath25@gma > > > > > >> il.com > > > > > >> > ] > > > > > >> > > > > >> > > > Sent: Saturday, July 23, 2016 3:04 AM > > > > > >> > > > > >> > > > To: Dev > > > > > >> > > > > >> > > > Subject: Re: [DISCUSS][FLIP-4] Enhance Window > > > Evictor > > > > > in > > > > > >> > Flink > > > > > >> > > > > >> > > > > > > > > >> > > > > >> > > > Hi Radu, > > > > > >> > > > > >> > > > > > > > > >> > > > > >> > > > - Yes we can remove elements from the iterator. > > > > > >> > > > > >> > > > - Right now the EvictingWindowOperator just > skips > > > the > > > > > >> > elements > > > > > >> > > > > from > > > > > >> > > > > >> the > > > > > >> > > > > >> > > > Iterable before passing to the window > > function(Yes > > > > this > > > > > >> has > > > > > >> > to > > > > > >> > > > be > > > > > >> > > > > >> > changed > > > > > >> > > > > >> > > > in the new API) > > > > > >> > > > > >> > > > - Regarding how the last question on how > elements > > > are > > > > > >> being > > > > > >> > > > > removed > > > > > >> > > > > >> > from > > > > > >> > > > > >> > > > the window buffer. I am not sure how it is > > working > > > > > right > > > > > >> > now, > > > > > >> > > > but > > > > > >> > > > > >> when > > > > > >> > > > > >> > > > trying out the new API that I am working on, I > > did > > > > > find a > > > > > >> > bug > > > > > >> > > > > where > > > > > >> > > > > >> the > > > > > >> > > > > >> > > > evicted elements are not actually removed from > > the > > > > > >> State. I > > > > > >> > > have > > > > > >> > > > > >> added > > > > > >> > > > > >> > a > > > > > >> > > > > >> > > > fix for that. (You can see a mail regarding > that > > > in > > > > > this > > > > > >> > mail > > > > > >> > > > > >> chain) > > > > > >> > > > > >> > > > > > > > > >> > > > > >> > > > Thanks, > > > > > >> > > > > >> > > > Vishnu > > > > > >> > > > > >> > > > > > > > > >> > > > > >> > > > On Fri, Jul 22, 2016 at 1:03 PM, Radu Tudoran < > > > > > >> > > > > >> radu.tudo...@huawei.com > > > > > >> > > > > >> > > > > > > > >> > > > > >> > > > wrote: > > > > > >> > > > > >> > > > > > > > > >> > > > > >> > > > > Hi, > > > > > >> > > > > >> > > > > > > > > > >> > > > > >> > > > > Overall I believe that the interfaces and the > > > > > proposal > > > > > >> is > > > > > >> > > > good. > > > > > >> > > > > I > > > > > >> > > > > >> > have > > > > > >> > > > > >> > > > the > > > > > >> > > > > >> > > > > following question though: can you delete via > > the > > > > > >> iterator > > > > > >> > > > > >> > > > > (Iterable<StreamRecord<T>> elements) the > > > elements? > > > > > >> > > > > >> > > > > > > > > > >> > > > > >> > > > > I tried to look over the code where the > > eviction > > > > > >> happens > > > > > >> > (I > > > > > >> > > > did > > > > > >> > > > > >> not > > > > > >> > > > > >> > do > > > > > >> > > > > >> > > > > these since version 0.10...looks very > different > > > now > > > > > :) > > > > > >> > > )...the > > > > > >> > > > > >> only > > > > > >> > > > > >> > > > > reference I found was the > > EvictingWindowOperator > > > > > which > > > > > >> at > > > > > >> > > the > > > > > >> > > > > >> > > > > fireOrContinue has a "skip" based on the > number > > > of > > > > > >> > elements > > > > > >> > > > > >> returned > > > > > >> > > > > >> > > from > > > > > >> > > > > >> > > > > the evictor...and these are not put in the > > > > collection > > > > > >> to > > > > > >> > be > > > > > >> > > > > given > > > > > >> > > > > >> to > > > > > >> > > > > >> > > the > > > > > >> > > > > >> > > > > user function to be applied. I think these > will > > > > also > > > > > >> need > > > > > >> > to > > > > > >> > > > be > > > > > >> > > > > >> > changed > > > > > >> > > > > >> > > > to > > > > > >> > > > > >> > > > > adjust to the "any operator from anywhere in > > the > > > > > window > > > > > >> > > > buffer". > > > > > >> > > > > >> > > > > Also - as we are on this topic - can someone > > > > explain > > > > > >> how > > > > > >> > > these > > > > > >> > > > > >> > elements > > > > > >> > > > > >> > > > > that are not consider anymore for the user > > > function > > > > > are > > > > > >> > > > actually > > > > > >> > > > > >> > > deleted > > > > > >> > > > > >> > > > > from the window buffer?..i did not manage to > > find > > > > > >> this.. > > > > > >> > > some > > > > > >> > > > > >> > reference > > > > > >> > > > > >> > > > to > > > > > >> > > > > >> > > > > classes/code where this happens would be > useful > > > > > >> > > > > >> > > > > > > > > > >> > > > > >> > > > > > > > > > >> > > > > >> > > > > Dr. Radu Tudoran > > > > > >> > > > > >> > > > > Research Engineer - Big Data Expert > > > > > >> > > > > >> > > > > IT R&D Division > > > > > >> > > > > >> > > > > > > > > > >> > > > > >> > > > > > > > > > >> > > > > >> > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > > > > >> > > > > >> > > > > European Research Center > > > > > >> > > > > >> > > > > Riesstrasse 25, 80992 München > > > > > >> > > > > >> > > > > > > > > > >> > > > > >> > > > > E-mail: radu.tudo...@huawei.com > > > > > >> > > > > >> > > > > Mobile: +49 15209084330 <01520%209084330> > > <01520%209084330> > > > > > >> > > > > >> > > > > Telephone: +49 891588344173 > <089%201588344173> > > <089%201588344173> > > > > > >> > > > > >> > > > > > > > > > >> > > > > >> > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > > > > >> > > > > >> > > > > Hansaallee 205, 40549 Düsseldorf, Germany, > > > > > >> www.huawei.com > > > > > >> > > > > >> > > > > Registered Office: Düsseldorf, Register Court > > > > > >> Düsseldorf, > > > > > >> > > HRB > > > > > >> > > > > >> 56063, > > > > > >> > > > > >> > > > > Managing Director: Bo PENG, Wanzhou MENG, > > Lifang > > > > CHEN > > > > > >> > > > > >> > > > > Sitz der Gesellschaft: Düsseldorf, > Amtsgericht > > > > > >> Düsseldorf, > > > > > >> > > HRB > > > > > >> > > > > >> 56063, > > > > > >> > > > > >> > > > > Geschäftsführer: Bo PENG, Wanzhou MENG, > Lifang > > > CHEN > > > > > >> > > > > >> > > > > This e-mail and its attachments contain > > > > confidential > > > > > >> > > > information > > > > > >> > > > > >> from > > > > > >> > > > > >> > > > > HUAWEI, which is intended only for the person > > or > > > > > entity > > > > > >> > > whose > > > > > >> > > > > >> address > > > > > >> > > > > >> > > is > > > > > >> > > > > >> > > > > listed above. Any use of the information > > > contained > > > > > >> herein > > > > > >> > in > > > > > >> > > > any > > > > > >> > > > > >> way > > > > > >> > > > > >> > > > > (including, but not limited to, total or > > partial > > > > > >> > disclosure, > > > > > >> > > > > >> > > > reproduction, > > > > > >> > > > > >> > > > > or dissemination) by persons other than the > > > > intended > > > > > >> > > > > recipient(s) > > > > > >> > > > > >> is > > > > > >> > > > > >> > > > > prohibited. If you receive this e-mail in > > error, > > > > > please > > > > > >> > > notify > > > > > >> > > > > the > > > > > >> > > > > >> > > sender > > > > > >> > > > > >> > > > > by phone or email immediately and delete it! > > > > > >> > > > > >> > > > > > > > > > >> > > > > >> > > > > > > > > > >> > > > > >> > > > > -----Original Message----- > > > > > >> > > > > >> > > > > From: Vishnu Viswanath [mailto: > > > > > >> > vishnu.viswanat...@gmail.com > > > > > >> > > ] > > > > > >> > > > > >> > > > > Sent: Friday, July 22, 2016 12:43 PM > > > > > >> > > > > >> > > > > To: Dev > > > > > >> > > > > >> > > > > Subject: Re: [DISCUSS][FLIP-4] Enhance Window > > > > Evictor > > > > > >> in > > > > > >> > > Flink > > > > > >> > > > > >> > > > > > > > > > >> > > > > >> > > > > Hi, > > > > > >> > > > > >> > > > > > > > > > >> > > > > >> > > > > I have created a FLIP page for this > enhancement > > > > > >> > > > > >> > > > > > > > > > >> > > > > >> > > > > > > > > > >> > > > > >> > > > > > > > > >> > > > > >> > > > > > > > >> > > > > >> > > > > > > >> > > > > >> > > > > > >> > > > > > > > > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP- > > > > > >> > > 4+%3A+Enhance+Window+Evictor > > > > > >> > > > > >> > > > > > > > > > >> > > > > >> > > > > Thanks, > > > > > >> > > > > >> > > > > Vishnu > > > > > >> > > > > >> > > > > > > > > > >> > > > > >> > > > > On Thu, Jul 21, 2016 at 6:53 AM, Vishnu > > > Viswanath < > > > > > >> > > > > >> > > > > vishnu.viswanat...@gmail.com> wrote: > > > > > >> > > > > >> > > > > > > > > > >> > > > > >> > > > > > Thanks Aljoscha. > > > > > >> > > > > >> > > > > > > > > > > >> > > > > >> > > > > > On Thu, Jul 21, 2016 at 4:46 AM, Aljoscha > > > > Krettek < > > > > > >> > > > > >> > > aljos...@apache.org > > > > > >> > > > > >> > > > > > > > > > >> > > > > >> > > > > > wrote: > > > > > >> > > > > >> > > > > > > > > > > >> > > > > >> > > > > >> Hi, > > > > > >> > > > > >> > > > > >> this, in fact, seems to be a bug. There > > should > > > > be > > > > > >> > > something > > > > > >> > > > > >> like > > > > > >> > > > > >> > > > > >> windowState.clear(); > > > > > >> > > > > >> > > > > >> for (IN element: projectedContents) { > > > > > >> > > > > >> > > > > >> windowState.add(element); > > > > > >> > > > > >> > > > > >> } > > > > > >> > > > > >> > > > > >> > > > > > >> > > > > >> > > > > >> after passing the elements to the window > > > > function. > > > > > >> > > > > >> > > > > >> > > > > > >> > > > > >> > > > > >> This is very inefficient but the only way > I > > > see > > > > of > > > > > >> > doing > > > > > >> > > it > > > > > >> > > > > >> right > > > > > >> > > > > >> > > now. > > > > > >> > > > > >> > > > > >> > > > > > >> > > > > >> > > > > >> Cheers, > > > > > >> > > > > >> > > > > >> Aljoscha > > > > > >> > > > > >> > > > > >> > > > > > >> > > > > >> > > > > >> > > > > > >> > > > > >> > > > > >> On Thu, 21 Jul 2016 at 01:32 Vishnu > > Viswanath > > > < > > > > > >> > > > > >> > > > > >> vishnu.viswanat...@gmail.com> > > > > > >> > > > > >> > > > > >> wrote: > > > > > >> > > > > >> > > > > >> > > > > > >> > > > > >> > > > > >> > Hi, > > > > > >> > > > > >> > > > > >> > > > > > > >> > > > > >> > > > > >> > When we use RocksDB as state backend, > how > > > does > > > > > the > > > > > >> > > > backend > > > > > >> > > > > >> state > > > > > >> > > > > >> > > get > > > > > >> > > > > >> > > > > >> > updated after some elements are evicted > > from > > > > the > > > > > >> > > window? > > > > > >> > > > > >> > > > > >> > I don't see any update call being made > to > > > > remove > > > > > >> the > > > > > >> > > > > element > > > > > >> > > > > >> > from > > > > > >> > > > > >> > > > the > > > > > >> > > > > >> > > > > >> state > > > > > >> > > > > >> > > > > >> > stored in RocksDB. > > > > > >> > > > > >> > > > > >> > > > > > > >> > > > > >> > > > > >> > It looks like the RocksDBListState is > only > > > > > having > > > > > >> > get() > > > > > >> > > > and > > > > > >> > > > > >> > add() > > > > > >> > > > > >> > > > > >> methods > > > > > >> > > > > >> > > > > >> > since it is an AppendingState, but that > > > causes > > > > > the > > > > > >> > > > evicted > > > > > >> > > > > >> > > elements > > > > > >> > > > > >> > > > to > > > > > >> > > > > >> > > > > >> come > > > > > >> > > > > >> > > > > >> > back when the trigger is fired next > time. > > > (It > > > > > >> works > > > > > >> > > fine > > > > > >> > > > > >> when I > > > > > >> > > > > >> > > use > > > > > >> > > > > >> > > > > >> > MemoryStateBackend) > > > > > >> > > > > >> > > > > >> > > > > > > >> > > > > >> > > > > >> > Is this expected behavior or am I > missing > > > > > >> something. > > > > > >> > > > > >> > > > > >> > > > > > > >> > > > > >> > > > > >> > Thanks, > > > > > >> > > > > >> > > > > >> > Vishnu > > > > > >> > > > > >> > > > > >> > > > > > > >> > > > > >> > > > > >> > On Mon, Jul 18, 2016 at 7:15 AM, Vishnu > > > > > Viswanath > > > > > >> < > > > > > >> > > > > >> > > > > >> > vishnu.viswanat...@gmail.com> wrote: > > > > > >> > > > > >> > > > > >> > > > > > > >> > > > > >> > > > > >> > > Hi Aljoscha, > > > > > >> > > > > >> > > > > >> > > > > > > > >> > > > > >> > > > > >> > > Thanks! Yes, I have the create page > > option > > > > now > > > > > >> in > > > > > >> > > wiki. > > > > > >> > > > > >> > > > > >> > > > > > > > >> > > > > >> > > > > >> > > Regards, > > > > > >> > > > > >> > > > > >> > > Vishnu Viswanath, > > > > > >> > > > > >> > > > > >> > > > > > > > >> > > > > >> > > > > >> > > On Mon, Jul 18, 2016 at 6:34 AM, > > Aljoscha > > > > > >> Krettek < > > > > > >> > > > > >> > > > > >> aljos...@apache.org> > > > > > >> > > > > >> > > > > >> > > wrote: > > > > > >> > > > > >> > > > > >> > > > > > > > >> > > > > >> > > > > >> > >> @Radu, addition of more window types > > and > > > > > >> sorting > > > > > >> > > > should > > > > > >> > > > > be > > > > > >> > > > > >> > part > > > > > >> > > > > >> > > > of > > > > > >> > > > > >> > > > > >> > another > > > > > >> > > > > >> > > > > >> > >> design proposal. This is interesting > > > stuff > > > > > but > > > > > >> I > > > > > >> > > think > > > > > >> > > > > we > > > > > >> > > > > >> > > should > > > > > >> > > > > >> > > > > keep > > > > > >> > > > > >> > > > > >> > >> issues separated because things can > get > > > > > >> > complicated > > > > > >> > > > very > > > > > >> > > > > >> > > quickly. > > > > > >> > > > > >> > > > > >> > >> > > > > > >> > > > > >> > > > > >> > >> On Mon, 18 Jul 2016 at 12:32 Aljoscha > > > > > Krettek < > > > > > >> > > > > >> > > > aljos...@apache.org > > > > > >> > > > > >> > > > > > > > > > > >> > > > > >> > > > > >> > >> wrote: > > > > > >> > > > > >> > > > > >> > >> > > > > > >> > > > > >> > > > > >> > >> > Hi, > > > > > >> > > > > >> > > > > >> > >> > about TimeEvictor, yes, I think > there > > > > > should > > > > > >> be > > > > > >> > > > > specific > > > > > >> > > > > >> > > > evictors > > > > > >> > > > > >> > > > > >> for > > > > > >> > > > > >> > > > > >> > >> > processing time and event time. > Also, > > > the > > > > > >> > current > > > > > >> > > > time > > > > > >> > > > > >> > should > > > > > >> > > > > >> > > > be > > > > > >> > > > > >> > > > > >> > >> > retrievable from the > EvictorContext. > > > > > >> > > > > >> > > > > >> > >> > > > > > > >> > > > > >> > > > > >> > >> > For the wiki you will need > > permissions. > > > > > This > > > > > >> was > > > > > >> > > > > >> recently > > > > > >> > > > > >> > > > changed > > > > > >> > > > > >> > > > > >> > >> because > > > > > >> > > > > >> > > > > >> > >> > there was too much spam. I gave you > > > > > >> permission > > > > > >> > to > > > > > >> > > > add > > > > > >> > > > > >> > pages. > > > > > >> > > > > >> > > > Can > > > > > >> > > > > >> > > > > >> you > > > > > >> > > > > >> > > > > >> > >> please > > > > > >> > > > > >> > > > > >> > >> > try and check if it works? > > > > > >> > > > > >> > > > > >> > >> > > > > > > >> > > > > >> > > > > >> > >> > Cheers, > > > > > >> > > > > >> > > > > >> > >> > Aljoscha > > > > > >> > > > > >> > > > > >> > >> > > > > > > >> > > > > >> > > > > >> > >> > On Fri, 15 Jul 2016 at 13:28 Vishnu > > > > > >> Viswanath < > > > > > >> > > > > >> > > > > >> > >> > vishnu.viswanat...@gmail.com> > wrote: > > > > > >> > > > > >> > > > > >> > >> > > > > > > >> > > > > >> > > > > >> > >> >> Hi all, > > > > > >> > > > > >> > > > > >> > >> >> > > > > > >> > > > > >> > > > > >> > >> >> How do we create a FLIP page, is > > there > > > > any > > > > > >> > > > permission > > > > > >> > > > > >> > setup > > > > > >> > > > > >> > > > > >> > required? I > > > > > >> > > > > >> > > > > >> > >> >> don't see any "Create" page(after > > > > logging > > > > > >> in) > > > > > >> > > > option > > > > > >> > > > > in > > > > > >> > > > > >> > the > > > > > >> > > > > >> > > > > >> header as > > > > > >> > > > > >> > > > > >> > >> >> mentioned in > > > > > >> > > > > >> > > > > >> > >> >> > > > > > >> > > > > >> > > > > >> > >> >> > > > > > >> > > > > >> > > > > >> > >> > > > > > >> > > > > >> > > > > >> > > > > > > >> > > > > >> > > > > >> > > > > > >> > > > > >> > > > > > > > > > >> > > > > >> > > > > > > > > >> > > > > >> > > > > > > > >> > > > > >> > > > > > > >> > > > > >> > > > > > >> > > > > > > > > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/ > > > > > >> > > Flink+Improvement+Proposals > > > > > >> > > > > >> > > > > >> > >> >> > > > > > >> > > > > >> > > > > >> > >> >> > > > > > >> > > > > >> > > > > >> > >> >> Thanks, > > > > > >> > > > > >> > > > > >> > >> >> Vishnu > > > > > >> > > > > >> > > > > >> > >> >> > > > > > >> > > > > >> > > > > >> > >> >> On Wed, Jul 13, 2016 at 10:22 PM, > > > Vishnu > > > > > >> > > Viswanath > > > > > >> > > > < > > > > > >> > > > > >> > > > > >> > >> >> vishnu.viswanat...@gmail.com> > > wrote: > > > > > >> > > > > >> > > > > >> > >> >> > > > > > >> > > > > >> > > > > >> > >> >> > Hi Aljoscha, > > > > > >> > > > > >> > > > > >> > >> >> > > > > > > >> > > > > >> > > > > >> > >> >> > I agree, the user will know > > exactly > > > > that > > > > > >> they > > > > > >> > > are > > > > > >> > > > > >> > creating > > > > > >> > > > > >> > > > an > > > > > >> > > > > >> > > > > >> > >> EventTime > > > > > >> > > > > >> > > > > >> > >> >> > based evictor or ProcessingTime > > > based > > > > > >> evictor > > > > > >> > > > > >> looking at > > > > > >> > > > > >> > > the > > > > > >> > > > > >> > > > > >> code. > > > > > >> > > > > >> > > > > >> > >> >> > So do you think it will be ok to > > > have > > > > > >> > multiple > > > > > >> > > > > >> versions > > > > > >> > > > > >> > of > > > > > >> > > > > >> > > > > >> > >> TimeEvictor > > > > > >> > > > > >> > > > > >> > >> >> > (one for event time and one for > > > > > processing > > > > > >> > > time) > > > > > >> > > > > and > > > > > >> > > > > >> > also > > > > > >> > > > > >> > > a > > > > > >> > > > > >> > > > > >> > >> DeltaEvcitor > > > > > >> > > > > >> > > > > >> > >> >> > (again 2 versions- for event > time > > > and > > > > > >> > > processing > > > > > >> > > > > >> time) ? > > > > > >> > > > > >> > > > > >> > >> >> > > > > > > >> > > > > >> > > > > >> > >> >> > Please note that the existing > > > behavior > > > > > of > > > > > >> > > > > >> > > > > >> TimeEvictor/DeltaEvictor > > > > > >> > > > > >> > > > > >> > >> does > > > > > >> > > > > >> > > > > >> > >> >> > not consider if it is EventTime > or > > > > > >> > > ProcessingTime > > > > > >> > > > > >> > > > > >> > >> >> > e.g., in TimeEvictor the current > > > time > > > > is > > > > > >> > > > considered > > > > > >> > > > > >> as > > > > > >> > > > > >> > the > > > > > >> > > > > >> > > > > >> > timestamp > > > > > >> > > > > >> > > > > >> > >> of > > > > > >> > > > > >> > > > > >> > >> >> > the last element in the window > > > > > >> > > > > >> > > > > >> > >> >> > > > > > > >> > > > > >> > > > > >> > >> >> > *long currentTime = > > > > > >> > > > > >> > > > > Iterables.getLast(elements).getTimestamp();* > > > > > >> > > > > >> > > > > >> > >> >> > > > > > > >> > > > > >> > > > > >> > >> >> > not the highest timestamp of all > > > > > elements > > > > > >> > > > > >> > > > > >> > >> >> > what I am trying to achieve is > > > > something > > > > > >> > like: > > > > > >> > > > > >> > > > > >> > >> >> > > > > > > >> > > > > >> > > > > >> > >> >> > *long currentTime;* > > > > > >> > > > > >> > > > > >> > >> >> > * if (ctx.isEventTime()) {* > > > > > >> > > > > >> > > > > >> > >> >> > * currentTime = > > > > > >> getMaxTimestamp(elements);* > > > > > >> > > > > >> > > > > >> > >> >> > * } else {* > > > > > >> > > > > >> > > > > >> > >> >> > * currentTime = > > > > > >> > > > > >> > > Iterables.getLast(elements).getTimestamp();* > > > > > >> > > > > >> > > > > >> > >> >> > * }* > > > > > >> > > > > >> > > > > >> > >> >> > > > > > > >> > > > > >> > > > > >> > >> >> > Similarly, in DeltaEvictor the > > > > > >> > *`lastElement`* > > > > > >> > > is > > > > > >> > > > > >> > > > > >> > >> >> > *`Iterables.getLast(elements); > `* > > > and I > > > > > am > > > > > >> > > > thinking > > > > > >> > > > > we > > > > > >> > > > > >> > > should > > > > > >> > > > > >> > > > > >> > consider > > > > > >> > > > > >> > > > > >> > >> >> the > > > > > >> > > > > >> > > > > >> > >> >> > element with max timestamp as > the > > > last > > > > > >> > element > > > > > >> > > > > >> instead > > > > > >> > > > > >> > of > > > > > >> > > > > >> > > > just > > > > > >> > > > > >> > > > > >> > >> getting > > > > > >> > > > > >> > > > > >> > >> >> the > > > > > >> > > > > >> > > > > >> > >> >> > last inserted element as > > > > *`lastElement`* > > > > > >> > > > > >> > > > > >> > >> >> > > > > > > >> > > > > >> > > > > >> > >> >> > Do you think it is the right > thing > > > to > > > > do > > > > > >> or > > > > > >> > > leave > > > > > >> > > > > the > > > > > >> > > > > >> > > > behavior > > > > > >> > > > > >> > > > > >> > >> Evictors > > > > > >> > > > > >> > > > > >> > >> >> as > > > > > >> > > > > >> > > > > >> > >> >> > is, w.r.t to choosing the last > > > > element? > > > > > >> > > > > >> > > > > >> > >> >> > > > > > > >> > > > > >> > > > > >> > >> >> > Thanks, > > > > > >> > > > > >> > > > > >> > >> >> > Vishnu > > > > > >> > > > > >> > > > > >> > >> >> > > > > > > >> > > > > >> > > > > >> > >> >> > On Wed, Jul 13, 2016 at 11:07 > AM, > > > > > Aljoscha > > > > > >> > > > Krettek > > > > > >> > > > > < > > > > > >> > > > > >> > > > > >> > >> aljos...@apache.org > > > > > >> > > > > >> > > > > >> > >> >> > > > > > > >> > > > > >> > > > > >> > >> >> > wrote: > > > > > >> > > > > >> > > > > >> > >> >> > > > > > > >> > > > > >> > > > > >> > >> >> >> I still think it should be > > explicit > > > > in > > > > > >> the > > > > > >> > > > class. > > > > > >> > > > > >> For > > > > > >> > > > > >> > > > > example, > > > > > >> > > > > >> > > > > >> if > > > > > >> > > > > >> > > > > >> > >> you > > > > > >> > > > > >> > > > > >> > >> >> have > > > > > >> > > > > >> > > > > >> > >> >> >> this code: > > > > > >> > > > > >> > > > > >> > >> >> >> > > > > > >> > > > > >> > > > > >> > >> >> >> input > > > > > >> > > > > >> > > > > >> > >> >> >> .keyBy() > > > > > >> > > > > >> > > > > >> > >> >> >> .window() > > > > > >> > > > > >> > > > > >> > >> >> >> .trigger(EventTimeTrigger.cre > > > ate()) > > > > > >> > > > > >> > > > > >> > >> >> >> > .evictor(TimeTrigger.create()) > > > > > >> > > > > >> > > > > >> > >> >> >> > > > > > >> > > > > >> > > > > >> > >> >> >> the time behavior of the > trigger > > is > > > > > >> > explicitly > > > > > >> > > > > >> > specified > > > > > >> > > > > >> > > > > while > > > > > >> > > > > >> > > > > >> the > > > > > >> > > > > >> > > > > >> > >> >> evictor > > > > > >> > > > > >> > > > > >> > >> >> >> would dynamically adapt based > on > > > > > internal > > > > > >> > > > workings > > > > > >> > > > > >> that > > > > > >> > > > > >> > > the > > > > > >> > > > > >> > > > > >> user > > > > > >> > > > > >> > > > > >> > >> might > > > > > >> > > > > >> > > > > >> > >> >> not > > > > > >> > > > > >> > > > > >> > >> >> >> be aware of. Having the > behavior > > > > > >> explicit at > > > > > >> > > the > > > > > >> > > > > >> call > > > > > >> > > > > >> > > site > > > > > >> > > > > >> > > > is > > > > > >> > > > > >> > > > > >> very > > > > > >> > > > > >> > > > > >> > >> >> >> important, in my opinion. > > > > > >> > > > > >> > > > > >> > >> >> >> > > > > > >> > > > > >> > > > > >> > >> >> >> On Wed, 13 Jul 2016 at 16:28 > > Vishnu > > > > > >> > Viswanath > > > > > >> > > < > > > > > >> > > > > >> > > > > >> > >> >> >> vishnu.viswanat...@gmail.com> > > > > > >> > > > > >> > > > > >> > >> >> >> wrote: > > > > > >> > > > > >> > > > > >> > >> >> >> > > > > > >> > > > > >> > > > > >> > >> >> >> > Hi, > > > > > >> > > > > >> > > > > >> > >> >> >> > > > > > > >> > > > > >> > > > > >> > >> >> >> > I was hoping to use the > > > isEventTime > > > > > >> method > > > > > >> > > in > > > > > >> > > > > the > > > > > >> > > > > >> > > > > >> WindowAssigner > > > > > >> > > > > >> > > > > >> > >> to > > > > > >> > > > > >> > > > > >> > >> >> set > > > > > >> > > > > >> > > > > >> > >> >> >> > that information in the > > > > > EvictorContext. > > > > > >> > > > > >> > > > > >> > >> >> >> > What do you think?. > > > > > >> > > > > >> > > > > >> > >> >> >> > > > > > > >> > > > > >> > > > > >> > >> >> >> > Thanks and Regards, > > > > > >> > > > > >> > > > > >> > >> >> >> > Vishnu Viswanath, > > > > > >> > > > > >> > > > > >> > >> >> >> > > > > > > >> > > > > >> > > > > >> > >> >> >> > On Wed, Jul 13, 2016 at 10:09 > > AM, > > > > > >> Aljoscha > > > > > >> > > > > >> Krettek < > > > > > >> > > > > >> > > > > >> > >> >> aljos...@apache.org > > > > > >> > > > > >> > > > > >> > >> >> >> > > > > > > >> > > > > >> > > > > >> > >> >> >> > wrote: > > > > > >> > > > > >> > > > > >> > >> >> >> > > > > > > >> > > > > >> > > > > >> > >> >> >> > > Hi, > > > > > >> > > > > >> > > > > >> > >> >> >> > > I think the way to go here > is > > > to > > > > > add > > > > > >> > both > > > > > >> > > an > > > > > >> > > > > >> > > > > >> EventTimeEvictor > > > > > >> > > > > >> > > > > >> > >> and a > > > > > >> > > > > >> > > > > >> > >> >> >> > > ProcessingTimeEvictor. The > > > > problem > > > > > is > > > > > >> > that > > > > > >> > > > > >> > > > "isEventTime" > > > > > >> > > > > >> > > > > >> > cannot > > > > > >> > > > > >> > > > > >> > >> >> >> really be > > > > > >> > > > > >> > > > > >> > >> >> >> > > determined. That's also the > > > > reason > > > > > >> why > > > > > >> > > there > > > > > >> > > > > is > > > > > >> > > > > >> an > > > > > >> > > > > >> > > > > >> > >> EventTimeTrigger > > > > > >> > > > > >> > > > > >> > >> >> >> and a > > > > > >> > > > > >> > > > > >> > >> >> >> > > ProcessingTimeTrigger. It > was > > > > just > > > > > an > > > > > >> > > > > oversight > > > > > >> > > > > >> > that > > > > > >> > > > > >> > > > the > > > > > >> > > > > >> > > > > >> > >> >> TimeEvictor > > > > > >> > > > > >> > > > > >> > >> >> >> does > > > > > >> > > > > >> > > > > >> > >> >> >> > > not also have these two > > > versions. > > > > > >> > > > > >> > > > > >> > >> >> >> > > > > > > > >> > > > > >> > > > > >> > >> >> >> > > About > > EvictingWindowOperator, I > > > > > think > > > > > >> > you > > > > > >> > > > can > > > > > >> > > > > >> make > > > > > >> > > > > >> > > the > > > > > >> > > > > >> > > > > two > > > > > >> > > > > >> > > > > >> > >> methods > > > > > >> > > > > >> > > > > >> > >> >> >> > > non-final in > WindowOperator, > > > yes. > > > > > >> > > > > >> > > > > >> > >> >> >> > > > > > > > >> > > > > >> > > > > >> > >> >> >> > > Cheers, > > > > > >> > > > > >> > > > > >> > >> >> >> > > Aljoscha > > > > > >> > > > > >> > > > > >> > >> >> >> > > > > > > > >> > > > > >> > > > > >> > >> >> >> > > On Wed, 13 Jul 2016 at > 14:32 > > > > Vishnu > > > > > >> > > > Viswanath > > > > > >> > > > > < > > > > > >> > > > > >> > > > > >> > >> >> >> > > > vishnu.viswanat...@gmail.com > > > > > > > > >> > > > > >> > > > > >> > >> >> >> > > wrote: > > > > > >> > > > > >> > > > > >> > >> >> >> > > > > > > > >> > > > > >> > > > > >> > >> >> >> > > > Hi Aljoscha, > > > > > >> > > > > >> > > > > >> > >> >> >> > > > > > > > > >> > > > > >> > > > > >> > >> >> >> > > > I am thinking of adding a > > > > method > > > > > >> > boolean > > > > > >> > > > > >> > > > isEventTime(); > > > > > >> > > > > >> > > > > >> in > > > > > >> > > > > >> > > > > >> > the > > > > > >> > > > > >> > > > > >> > >> >> >> > > > EvictorContext apart from > > > > > >> > > > > >> > > > > >> > >> >> >> > > > > > > > > >> > > > > >> > > > > >> > >> >> >> > > > long > > > > getCurrentProcessingTime(); > > > > > >> > > > > >> > > > > >> > >> >> >> > > > MetricGroup > > getMetricGroup(); > > > > > >> > > > > >> > > > > >> > >> >> >> > > > long > getCurrentWatermark(); > > > > > >> > > > > >> > > > > >> > >> >> >> > > > > > > > > >> > > > > >> > > > > >> > >> >> >> > > > This method can be used > to > > > make > > > > > the > > > > > >> > > > Evictor > > > > > >> > > > > >> not > > > > > >> > > > > >> > > > iterate > > > > > >> > > > > >> > > > > >> > >> through > > > > > >> > > > > >> > > > > >> > >> >> all > > > > > >> > > > > >> > > > > >> > >> >> >> the > > > > > >> > > > > >> > > > > >> > >> >> >> > > > elements in TimeEvictor. > > > There > > > > > will > > > > > >> > be a > > > > > >> > > > few > > > > > >> > > > > >> > > changes > > > > > >> > > > > >> > > > in > > > > > >> > > > > >> > > > > >> the > > > > > >> > > > > >> > > > > >> > >> >> existing > > > > > >> > > > > >> > > > > >> > >> >> >> > > > behavior of TimeEvictor > and > > > > > >> > DeltaEvictor > > > > > >> > > > (I > > > > > >> > > > > >> have > > > > > >> > > > > >> > > > > >> mentioned > > > > > >> > > > > >> > > > > >> > >> this > > > > > >> > > > > >> > > > > >> > >> >> in > > > > > >> > > > > >> > > > > >> > >> >> >> the > > > > > >> > > > > >> > > > > >> > >> >> >> > > > design doc) > > > > > >> > > > > >> > > > > >> > >> >> >> > > > > > > > > >> > > > > >> > > > > >> > >> >> >> > > > Also, is there any > specific > > > > > reason > > > > > >> why > > > > > >> > > the > > > > > >> > > > > >> open > > > > > >> > > > > >> > and > > > > > >> > > > > >> > > > > close > > > > > >> > > > > >> > > > > >> > >> method > > > > > >> > > > > >> > > > > >> > >> >> in > > > > > >> > > > > >> > > > > >> > >> >> >> > > > WindowEvictor is made > > final? > > > > > Since > > > > > >> the > > > > > >> > > > > >> > > EvictorContext > > > > > >> > > > > >> > > > > >> will > > > > > >> > > > > >> > > > > >> > be > > > > > >> > > > > >> > > > > >> > >> in > > > > > >> > > > > >> > > > > >> > >> >> the > > > > > >> > > > > >> > > > > >> > >> >> >> > > > EvictingWindowOperator, I > > > need > > > > to > > > > > >> > > override > > > > > >> > > > > the > > > > > >> > > > > >> > open > > > > > >> > > > > >> > > > and > > > > > >> > > > > >> > > > > >> > close > > > > > >> > > > > >> > > > > >> > >> in > > > > > >> > > > > >> > > > > >> > >> >> >> > > > EvitingWindowOperator to > > make > > > > the > > > > > >> > > > reference > > > > > >> > > > > of > > > > > >> > > > > >> > > > > >> > EvictorContext > > > > > >> > > > > >> > > > > >> > >> >> null. > > > > > >> > > > > >> > > > > >> > >> >> >> > > > > > > > > >> > > > > >> > > > > >> > >> >> >> > > > Thanks and Regards, > > > > > >> > > > > >> > > > > >> > >> >> >> > > > Vishnu Viswanath, > > > > > >> > > > > >> > > > > >> > >> >> >> > > > > > > > > >> > > > > >> > > > > >> > >> >> >> > > > On Fri, Ju >