Thanks for writing this up and comparing to the current implementation. It's great to see that your mockup indicates correct/expected behaviour *and* better performance. :-)
Regarding the results for the current mechanism: does this problem affects all window operators? – Ufuk On 25 Jun 2015, at 11:36, Aljoscha Krettek <aljos...@apache.org> wrote: > I think I'll have to elaborate a bit so I created a proof-of-concept > implementation of my Ideas and ran some throughput measurements to > alleviate concerns about performance. > > First, though, I want to highlight again why the current approach does not > work with out-of-order elements (which, again, occur constantly due to the > distributed nature of the system). This is the example I posted earlier: > https://gist.github.com/aljoscha/a367012646ab98208d27. The plan looks like > this: > > +--+ > | | Source > +--+ > | > +-----+ > | | > | +--+ > | | | Identity Map > | +--+ > | | > +-----+ > | > +--+ > | | Window > +--+ > | > | > +--+ > | | Sink > +--+ > > So all it does is pass the elements through an identity map and then merge > them again before the window operator. The source emits ascending integers > and the window operator has a custom timestamp extractor that uses the > integer itself as the timestamp and should create windows of size 4 (that > is elements with timestamp 0-3 are one window, the next are the elements > with timestamp 4-8, and so on). Since the topology basically doubles the > elements form the source I would expect to get these windows: > Window: 0, 0, 1, 1, 2, 2, 3, 3 > Window: 4, 4, 6, 6, 7, 7, 8, 8 > > The output is this, however: > Window: 0, 1, 2, 3, > Window: 4, 0, 1, 2, 3, 4, 5, 6, 7, > Window: 8, 9, 10, 11, > Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, > Window: 16, 17, 18, 19, > Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, > Window: 24, 25, 26, 27, > Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, > > The reason is that the elements simply arrive out-of-order. Imagine what > would happen if the elements actually arrived with some delay from > different operations. > > Now, on to the performance numbers. The proof-of-concept I created is > available here: > https://github.com/aljoscha/flink/tree/event-time-window-fn-mock. The basic > idea is that sources assign the current timestamp when emitting elements. > They also periodically emit watermarks that tell us that no elements with > an earlier timestamp will be emitted. The watermarks propagate through the > operators. The window operator looks at the timestamp of an element and > puts it into the buffer that corresponds to that window. When the window > operator receives a watermark it will look at the in-flight windows > (basically the buffers) and emit those windows where the window-end is > before the watermark. > > For measuring throughput I did the following: The source emits tuples of > the form ("tuple", 1) in an infinite loop. The window operator sums up the > tuples, thereby counting how many tuples the window operator can handle in > a given time window. There are two different implementations for the > summation: 1) simply summing up the values in a mapWindow(), there you get > a List of all tuples and simple iterate over it. 2) using sum(1), which is > implemented as a reduce() (that uses the pre-reducer optimisations). > > These are the performance numbers (Current is the current implementation, > Next is my proof-of-concept): > > Tumbling (1 sec): > - Current/Map: 1.6 mio > - Current/Reduce: 2 mio > - Next/Map: 2.2 mio > - Next/Reduce: 4 mio > > Sliding (5 sec, slide 1 sec): > - Current/Map: ca 3 mio (fluctuates a lot) > - Current/Reduce: No output > - Next/Map: ca 4 mio (fluctuates) > - Next/Reduce: 10 mio > > The Next/Reduce variant can basically scale indefinitely with window size > because the internal state does not rely on the number of elements (it is > just the current sum). The pre-reducer for sliding elements cannot handle > the amount of tuples, it produces no output. For the two Map variants the > performance fluctuates because they always keep all the elements in an > internal buffer before emission, this seems to tax the garbage collector a > bit and leads to random pauses. > > One thing that should be noted is that I had to disable the fake-element > emission thread, otherwise the Current versions would deadlock. > > So, I started working on this because I thought that out-of-order > processing would be necessary for correctness. And it is certainly, But the > proof-of-concept also shows that performance can be greatly improved. > > On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <gyula.f...@gmail.com> wrote: >> >> I agree lets separate these topics from each other so we can get faster >> resolution. >> >> There is already a state discussion in the thread we started with Paris. >> >> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <ktzou...@apache.org> > wrote: >> >>> I agree with supporting out-of-order out of the box :-), even if this > means >>> a major refactoring. This is the right time to refactor the streaming > API >>> before we pull it out of beta. I think that this is more important than > new >>> features in the streaming API, which can be prioritized once the API is > out >>> of beta (meaning, that IMO this is the right time to stall PRs until we >>> agree on the design). >>> >>> There are three sections in the document: windowing, state, and API. How >>> convoluted are those with each other? Can we separate the discussion or > do >>> we need to discuss those all together? I think part of the difficulty is >>> that we are discussing three design choices at once. >>> >>> On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <ted.dunn...@gmail.com> >>> wrote: >>> >>>> Out of order is ubiquitous in the real-world. Typically, what > happens is >>>> that businesses will declare a maximum allowable delay for delayed >>>> transactions and will commit to results when that delay is reached. >>>> Transactions that arrive later than this cutoff are collected > specially >>> as >>>> corrections which are reported/used when possible. >>>> >>>> Clearly, ordering can also be violated during processing, but if the > data >>>> is originally out of order the situation can't be repaired by any >>> protocol >>>> fixes that prevent transactions from becoming disordered but has to >>> handled >>>> at the data level. >>>> >>>> >>>> >>>> >>>> On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek <aljos...@apache.org >> >>>> wrote: >>>> >>>>> I also don't like big changes but sometimes they are necessary. The >>>> reason >>>>> why I'm so adamant about out-of-order processing is that > out-of-order >>>>> elements are not some exception that occurs once in a while; they > occur >>>>> constantly in a distributed system. For example, in this: >>>>> https://gist.github.com/aljoscha/a367012646ab98208d27 the resulting >>>>> windows >>>>> are completely bogus because the current windowing system assumes >>>> elements >>>>> to globally arrive in order, which is simply not true. (The example >>> has a >>>>> source that generates increasing integers. Then these pass through a >>> map >>>>> and are unioned with the original DataStream before a window > operator.) >>>>> This simulates elements arriving from different operators at a >>> windowing >>>>> operator. The example is also DOP=1, I imagine this to get worse > with >>>>> higher DOP. >>>>> >>>>> What do you mean by costly? As I said, I have a proof-of-concept >>>> windowing >>>>> operator that can handle out-or-order elements. This is an example >>> using >>>>> the current Flink API: >>>>> https://gist.github.com/aljoscha/f8dce0691732e344bbe8. >>>>> (It is an infinite source of tuples and a 5 second window operator > that >>>>> counts the tuples.) The first problem is that this code deadlocks >>> because >>>>> of the thread that emits fake elements. If I disable the fake > element >>>> code >>>>> it works, but the throughput using my mockup is 4 times higher . The >>> gap >>>>> widens dramatically if the window size increases. >>>>> >>>>> So, it actually increases performance (unless I'm making a mistake > in >>> my >>>>> explorations) and can handle elements that arrive out-of-order > (which >>>>> happens basically always in a real-world windowing use-cases). >>>>> >>>>> >>>>> On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <se...@apache.org> wrote: >>>>> >>>>>> What I like a lot about Aljoscha's proposed design is that we > need no >>>>>> different code for "system time" vs. "event time". It only > differs in >>>>> where >>>>>> the timestamps are assigned. >>>>>> >>>>>> The OOP approach also gives you the semantics of total ordering >>> without >>>>>> imposing merges on the streams. >>>>>> >>>>>> On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax < >>>>>> mj...@informatik.hu-berlin.de> wrote: >>>>>> >>>>>>> I agree that there should be multiple alternatives the user(!) > can >>>>>>> choose from. Partial out-of-order processing works for many/most >>>>>>> aggregates. However, if you consider Event-Pattern-Matching, > global >>>>>>> ordering in necessary (even if the performance penalty might be >>>> high). >>>>>>> >>>>>>> I would also keep "system-time windows" as an alternative to >>> "source >>>>>>> assigned ts-windows". >>>>>>> >>>>>>> It might also be interesting to consider the following paper for >>>>>>> overlapping windows: "Resource sharing in continuous > sliding-window >>>>>>> aggregates" >>>>>>> >>>>>>>> https://dl.acm.org/citation.cfm?id=1316720 >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> On 06/23/2015 10:37 AM, Gyula Fóra wrote: >>>>>>>> Hey >>>>>>>> >>>>>>>> I think we should not block PRs unnecessarily if your > suggested >>>>> changes >>>>>>>> might touch them at some point. >>>>>>>> >>>>>>>> Also I still think we should not put everything in the > Datastream >>>>>> because >>>>>>>> it will be a huge mess. >>>>>>>> >>>>>>>> Also we need to agree on the out of order processing, whether > we >>>> want >>>>>> it >>>>>>>> the way you proposed it(which is quite costly). Another >>> alternative >>>>>>>> approach there which fits in the current windowing is to > filter >>> out >>>>> if >>>>>>>> order events and apply a special handling operator on them. > This >>>>> would >>>>>> be >>>>>>>> fairly lightweight. >>>>>>>> >>>>>>>> My point is that we need to consider some alternative > solutions. >>>> And >>>>> we >>>>>>>> should not block contributions along the way. >>>>>>>> >>>>>>>> Cheers >>>>>>>> Gyula >>>>>>>> >>>>>>>> On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek < >>>>> aljos...@apache.org> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> The reason I posted this now is that we need to think about > the >>>> API >>>>>> and >>>>>>>>> windowing before proceeding with the PRs of Gabor (inverse >>> reduce) >>>>> and >>>>>>>>> Gyula (removal of "aggregate" functions on DataStream). >>>>>>>>> >>>>>>>>> For the windowing, I think that the current model does not > work >>>> for >>>>>>>>> out-of-order processing. Therefore, the whole windowing >>>>> infrastructure >>>>>>> will >>>>>>>>> basically have to be redone. Meaning also that any work on > the >>>>>>>>> pre-aggregators or optimizations that we do now becomes > useless. >>>>>>>>> >>>>>>>>> For the API, I proposed to restructure the interactions > between >>>> all >>>>>> the >>>>>>>>> different *DataStream classes and grouping/windowing. (See > API >>>>> section >>>>>>> of >>>>>>>>> the doc I posted.) >>>>>>>>> >>>>>>>>> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <gyula.f...@gmail.com >> >>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Aljoscha, >>>>>>>>>> >>>>>>>>>> Thanks for the nice summary, this is a very good initiative. >>>>>>>>>> >>>>>>>>>> I added some comments to the respective sections (where I > didnt >>>>> fully >>>>>>>>> agree >>>>>>>>>> :).). >>>>>>>>>> At some point I think it would be good to have a public > hangout >>>>>> session >>>>>>>>> on >>>>>>>>>> this, which could make a more dynamic discussion. >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Gyula >>>>>>>>>> >>>>>>>>>> Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: >>> 2015. >>>>> jún. >>>>>>>>> 22., >>>>>>>>>> H, 21:34): >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> with people proposing changes to the streaming part I also >>>> wanted >>>>> to >>>>>>>>>> throw >>>>>>>>>>> my hat into the ring. :D >>>>>>>>>>> >>>>>>>>>>> During the last few months, while I was getting acquainted >>> with >>>>> the >>>>>>>>>>> streaming system, I wrote down some thoughts I had about > how >>>>> things >>>>>>>>> could >>>>>>>>>>> be improved. Hopefully, they are in somewhat coherent shape >>> now, >>>>> so >>>>>>>>>> please >>>>>>>>>>> have a look if you are interested in this: >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> > https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing >>>>>>>>>>> >>>>>>>>>>> This mostly covers: >>>>>>>>>>> - Timestamps assigned at sources >>>>>>>>>>> - Out-of-order processing of elements in window operators >>>>>>>>>>> - API design >>>>>>>>>>> >>>>>>>>>>> Please let me know what you think. Comment in the document > or >>>> here >>>>>> in >>>>>>>>> the >>>>>>>>>>> mailing list. >>>>>>>>>>> >>>>>>>>>>> I have a PR in the makings that would introduce source >>>> timestamps >>>>>> and >>>>>>>>>>> watermarks for keeping track of them. I also hacked a >>>>>> proof-of-concept >>>>>>>>>> of a >>>>>>>>>>> windowing system that is able to process out-of-order > elements >>>>>> using a >>>>>>>>>>> FlatMap operator. (It uses panes to perform efficient >>>>>>>>> pre-aggregations.) >>>>>>>>>>> >>>>>>>>>>> Cheers, >>>>>>>>>>> Aljoscha >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>>