I'm very sorry, I had a bug in the InversePreReducer. It should be fixed now. Can you please run it again?
I also tried to reproduce some of your performance numbers, but I'm getting only less than 1/10th of yours. For example, in the Tumbling case, Current/Reduce produces only ~100000 for me. Do you have any idea what I could be doing wrong? My code: http://pastebin.com/zbEjmGhk I am running it on a 2 GHz Core i7. Best regards, Gabor 2015-06-25 12:31 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>: > Hi, > I also ran the tests on top of PR 856 (inverse reducer) now. The results > seem incorrect. When I insert a Thread.sleep(1) in the tuple source, all > the previous tests reported around 3600 tuples (Size 5 sec, Slide 1 sec) > (Theoretically there would be 5000 tuples in 5 seconds but this is due to > overhead). These are the results for the inverse reduce optimisation: > (Tuple 0,38) > (Tuple 0,829) > (Tuple 0,1625) > (Tuple 0,2424) > (Tuple 0,3190) > (Tuple 0,3198) > (Tuple 0,-339368) > (Tuple 0,-1315725) > (Tuple 0,-2932932) > (Tuple 0,-5082735) > (Tuple 0,-7743256) > (Tuple 0,75701046) > (Tuple 0,642829470) > (Tuple 0,2242018381) > (Tuple 0,5190708618) > (Tuple 0,10060360311) > (Tuple 0,-94254951) > (Tuple 0,-219806321293) > (Tuple 0,-1258895232699) > (Tuple 0,-4074432596329) > > One line is one emitted window count. This is what happens when I remove > the Thread.sleep(1): > (Tuple 0,660676) > (Tuple 0,2553733) > (Tuple 0,3542696) > (Tuple 0,1) > (Tuple 0,1107035) > (Tuple 0,2549491) > (Tuple 0,4100387) > (Tuple 0,-8406583360092) > (Tuple 0,-8406582150743) > (Tuple 0,-8406580427190) > (Tuple 0,-8406580427190) > (Tuple 0,-8406580427190) > (Tuple 0,6847279255682044995) > (Tuple 0,6847279255682044995) > (Tuple 0,-5390528042713628318) > (Tuple 0,-5390528042711551780) > (Tuple 0,-5390528042711551780) > > So at some point the pre-reducer seems to go haywire and does not recover > from it. The good thing is that it does produce results now, where the > previous Current/Reduce would simply hang and not produce any output. > > On Thu, 25 Jun 2015 at 12:02 Gábor Gévay <gga...@gmail.com> wrote: > >> Hello, >> >> Aljoscha, can you please try the performance test of Current/Reduce >> with the InversePreReducer in PR 856? (If you just call sum, it will >> use an InversePreReducer.) It would be an interesting test, because >> the inverse function optimization really depends on the stream being >> ordered, and I think it has the potential of being faster then >> Next/Reduce. Especially if the window size is much larger than the >> slide size. >> >> Best regards, >> Gabor >> >> >> 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>: >> > I think I'll have to elaborate a bit so I created a proof-of-concept >> > implementation of my Ideas and ran some throughput measurements to >> > alleviate concerns about performance. >> > >> > First, though, I want to highlight again why the current approach does >> not >> > work with out-of-order elements (which, again, occur constantly due to >> the >> > distributed nature of the system). This is the example I posted earlier: >> > https://gist.github.com/aljoscha/a367012646ab98208d27. The plan looks >> like >> > this: >> > >> > +--+ >> > | | Source >> > +--+ >> > | >> > +-----+ >> > | | >> > | +--+ >> > | | | Identity Map >> > | +--+ >> > | | >> > +-----+ >> > | >> > +--+ >> > | | Window >> > +--+ >> > | >> > | >> > +--+ >> > | | Sink >> > +--+ >> > >> > So all it does is pass the elements through an identity map and then >> merge >> > them again before the window operator. The source emits ascending >> integers >> > and the window operator has a custom timestamp extractor that uses the >> > integer itself as the timestamp and should create windows of size 4 (that >> > is elements with timestamp 0-3 are one window, the next are the elements >> > with timestamp 4-8, and so on). Since the topology basically doubles the >> > elements form the source I would expect to get these windows: >> > Window: 0, 0, 1, 1, 2, 2, 3, 3 >> > Window: 4, 4, 6, 6, 7, 7, 8, 8 >> > >> > The output is this, however: >> > Window: 0, 1, 2, 3, >> > Window: 4, 0, 1, 2, 3, 4, 5, 6, 7, >> > Window: 8, 9, 10, 11, >> > Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, >> > Window: 16, 17, 18, 19, >> > Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, >> > Window: 24, 25, 26, 27, >> > Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, >> > >> > The reason is that the elements simply arrive out-of-order. Imagine what >> > would happen if the elements actually arrived with some delay from >> > different operations. >> > >> > Now, on to the performance numbers. The proof-of-concept I created is >> > available here: >> > https://github.com/aljoscha/flink/tree/event-time-window-fn-mock. The >> basic >> > idea is that sources assign the current timestamp when emitting elements. >> > They also periodically emit watermarks that tell us that no elements with >> > an earlier timestamp will be emitted. The watermarks propagate through >> the >> > operators. The window operator looks at the timestamp of an element and >> > puts it into the buffer that corresponds to that window. When the window >> > operator receives a watermark it will look at the in-flight windows >> > (basically the buffers) and emit those windows where the window-end is >> > before the watermark. >> > >> > For measuring throughput I did the following: The source emits tuples of >> > the form ("tuple", 1) in an infinite loop. The window operator sums up >> the >> > tuples, thereby counting how many tuples the window operator can handle >> in >> > a given time window. There are two different implementations for the >> > summation: 1) simply summing up the values in a mapWindow(), there you >> get >> > a List of all tuples and simple iterate over it. 2) using sum(1), which >> is >> > implemented as a reduce() (that uses the pre-reducer optimisations). >> > >> > These are the performance numbers (Current is the current implementation, >> > Next is my proof-of-concept): >> > >> > Tumbling (1 sec): >> > - Current/Map: 1.6 mio >> > - Current/Reduce: 2 mio >> > - Next/Map: 2.2 mio >> > - Next/Reduce: 4 mio >> > >> > Sliding (5 sec, slide 1 sec): >> > - Current/Map: ca 3 mio (fluctuates a lot) >> > - Current/Reduce: No output >> > - Next/Map: ca 4 mio (fluctuates) >> > - Next/Reduce: 10 mio >> > >> > The Next/Reduce variant can basically scale indefinitely with window size >> > because the internal state does not rely on the number of elements (it is >> > just the current sum). The pre-reducer for sliding elements cannot handle >> > the amount of tuples, it produces no output. For the two Map variants the >> > performance fluctuates because they always keep all the elements in an >> > internal buffer before emission, this seems to tax the garbage collector >> a >> > bit and leads to random pauses. >> > >> > One thing that should be noted is that I had to disable the fake-element >> > emission thread, otherwise the Current versions would deadlock. >> > >> > So, I started working on this because I thought that out-of-order >> > processing would be necessary for correctness. And it is certainly, But >> the >> > proof-of-concept also shows that performance can be greatly improved. >> > >> > On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <gyula.f...@gmail.com> wrote: >> >> >> >> I agree lets separate these topics from each other so we can get faster >> >> resolution. >> >> >> >> There is already a state discussion in the thread we started with Paris. >> >> >> >> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <ktzou...@apache.org> >> > wrote: >> >> >> >> > I agree with supporting out-of-order out of the box :-), even if this >> > means >> >> > a major refactoring. This is the right time to refactor the streaming >> > API >> >> > before we pull it out of beta. I think that this is more important >> than >> > new >> >> > features in the streaming API, which can be prioritized once the API >> is >> > out >> >> > of beta (meaning, that IMO this is the right time to stall PRs until >> we >> >> > agree on the design). >> >> > >> >> > There are three sections in the document: windowing, state, and API. >> How >> >> > convoluted are those with each other? Can we separate the discussion >> or >> > do >> >> > we need to discuss those all together? I think part of the difficulty >> is >> >> > that we are discussing three design choices at once. >> >> > >> >> > On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <ted.dunn...@gmail.com> >> >> > wrote: >> >> > >> >> > > Out of order is ubiquitous in the real-world. Typically, what >> > happens is >> >> > > that businesses will declare a maximum allowable delay for delayed >> >> > > transactions and will commit to results when that delay is reached. >> >> > > Transactions that arrive later than this cutoff are collected >> > specially >> >> > as >> >> > > corrections which are reported/used when possible. >> >> > > >> >> > > Clearly, ordering can also be violated during processing, but if the >> > data >> >> > > is originally out of order the situation can't be repaired by any >> >> > protocol >> >> > > fixes that prevent transactions from becoming disordered but has to >> >> > handled >> >> > > at the data level. >> >> > > >> >> > > >> >> > > >> >> > > >> >> > > On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek < >> aljos...@apache.org >> >> >> >> > > wrote: >> >> > > >> >> > > > I also don't like big changes but sometimes they are necessary. >> The >> >> > > reason >> >> > > > why I'm so adamant about out-of-order processing is that >> > out-of-order >> >> > > > elements are not some exception that occurs once in a while; they >> > occur >> >> > > > constantly in a distributed system. For example, in this: >> >> > > > https://gist.github.com/aljoscha/a367012646ab98208d27 the >> resulting >> >> > > > windows >> >> > > > are completely bogus because the current windowing system assumes >> >> > > elements >> >> > > > to globally arrive in order, which is simply not true. (The >> example >> >> > has a >> >> > > > source that generates increasing integers. Then these pass >> through a >> >> > map >> >> > > > and are unioned with the original DataStream before a window >> > operator.) >> >> > > > This simulates elements arriving from different operators at a >> >> > windowing >> >> > > > operator. The example is also DOP=1, I imagine this to get worse >> > with >> >> > > > higher DOP. >> >> > > > >> >> > > > What do you mean by costly? As I said, I have a proof-of-concept >> >> > > windowing >> >> > > > operator that can handle out-or-order elements. This is an example >> >> > using >> >> > > > the current Flink API: >> >> > > > https://gist.github.com/aljoscha/f8dce0691732e344bbe8. >> >> > > > (It is an infinite source of tuples and a 5 second window operator >> > that >> >> > > > counts the tuples.) The first problem is that this code deadlocks >> >> > because >> >> > > > of the thread that emits fake elements. If I disable the fake >> > element >> >> > > code >> >> > > > it works, but the throughput using my mockup is 4 times higher . >> The >> >> > gap >> >> > > > widens dramatically if the window size increases. >> >> > > > >> >> > > > So, it actually increases performance (unless I'm making a mistake >> > in >> >> > my >> >> > > > explorations) and can handle elements that arrive out-of-order >> > (which >> >> > > > happens basically always in a real-world windowing use-cases). >> >> > > > >> >> > > > >> >> > > > On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <se...@apache.org> >> wrote: >> >> > > > >> >> > > > > What I like a lot about Aljoscha's proposed design is that we >> > need no >> >> > > > > different code for "system time" vs. "event time". It only >> > differs in >> >> > > > where >> >> > > > > the timestamps are assigned. >> >> > > > > >> >> > > > > The OOP approach also gives you the semantics of total ordering >> >> > without >> >> > > > > imposing merges on the streams. >> >> > > > > >> >> > > > > On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax < >> >> > > > > mj...@informatik.hu-berlin.de> wrote: >> >> > > > > >> >> > > > > > I agree that there should be multiple alternatives the user(!) >> > can >> >> > > > > > choose from. Partial out-of-order processing works for >> many/most >> >> > > > > > aggregates. However, if you consider Event-Pattern-Matching, >> > global >> >> > > > > > ordering in necessary (even if the performance penalty might >> be >> >> > > high). >> >> > > > > > >> >> > > > > > I would also keep "system-time windows" as an alternative to >> >> > "source >> >> > > > > > assigned ts-windows". >> >> > > > > > >> >> > > > > > It might also be interesting to consider the following paper >> for >> >> > > > > > overlapping windows: "Resource sharing in continuous >> > sliding-window >> >> > > > > > aggregates" >> >> > > > > > >> >> > > > > > > https://dl.acm.org/citation.cfm?id=1316720 >> >> > > > > > >> >> > > > > > -Matthias >> >> > > > > > >> >> > > > > > On 06/23/2015 10:37 AM, Gyula Fóra wrote: >> >> > > > > > > Hey >> >> > > > > > > >> >> > > > > > > I think we should not block PRs unnecessarily if your >> > suggested >> >> > > > changes >> >> > > > > > > might touch them at some point. >> >> > > > > > > >> >> > > > > > > Also I still think we should not put everything in the >> > Datastream >> >> > > > > because >> >> > > > > > > it will be a huge mess. >> >> > > > > > > >> >> > > > > > > Also we need to agree on the out of order processing, >> whether >> > we >> >> > > want >> >> > > > > it >> >> > > > > > > the way you proposed it(which is quite costly). Another >> >> > alternative >> >> > > > > > > approach there which fits in the current windowing is to >> > filter >> >> > out >> >> > > > if >> >> > > > > > > order events and apply a special handling operator on them. >> > This >> >> > > > would >> >> > > > > be >> >> > > > > > > fairly lightweight. >> >> > > > > > > >> >> > > > > > > My point is that we need to consider some alternative >> > solutions. >> >> > > And >> >> > > > we >> >> > > > > > > should not block contributions along the way. >> >> > > > > > > >> >> > > > > > > Cheers >> >> > > > > > > Gyula >> >> > > > > > > >> >> > > > > > > On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek < >> >> > > > aljos...@apache.org> >> >> > > > > > > wrote: >> >> > > > > > > >> >> > > > > > >> The reason I posted this now is that we need to think about >> > the >> >> > > API >> >> > > > > and >> >> > > > > > >> windowing before proceeding with the PRs of Gabor (inverse >> >> > reduce) >> >> > > > and >> >> > > > > > >> Gyula (removal of "aggregate" functions on DataStream). >> >> > > > > > >> >> >> > > > > > >> For the windowing, I think that the current model does not >> > work >> >> > > for >> >> > > > > > >> out-of-order processing. Therefore, the whole windowing >> >> > > > infrastructure >> >> > > > > > will >> >> > > > > > >> basically have to be redone. Meaning also that any work on >> > the >> >> > > > > > >> pre-aggregators or optimizations that we do now becomes >> > useless. >> >> > > > > > >> >> >> > > > > > >> For the API, I proposed to restructure the interactions >> > between >> >> > > all >> >> > > > > the >> >> > > > > > >> different *DataStream classes and grouping/windowing. (See >> > API >> >> > > > section >> >> > > > > > of >> >> > > > > > >> the doc I posted.) >> >> > > > > > >> >> >> > > > > > >> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra < >> gyula.f...@gmail.com >> >> >> >> > > > wrote: >> >> > > > > > >> >> >> > > > > > >>> Hi Aljoscha, >> >> > > > > > >>> >> >> > > > > > >>> Thanks for the nice summary, this is a very good >> initiative. >> >> > > > > > >>> >> >> > > > > > >>> I added some comments to the respective sections (where I >> > didnt >> >> > > > fully >> >> > > > > > >> agree >> >> > > > > > >>> :).). >> >> > > > > > >>> At some point I think it would be good to have a public >> > hangout >> >> > > > > session >> >> > > > > > >> on >> >> > > > > > >>> this, which could make a more dynamic discussion. >> >> > > > > > >>> >> >> > > > > > >>> Cheers, >> >> > > > > > >>> Gyula >> >> > > > > > >>> >> >> > > > > > >>> Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: >> >> > 2015. >> >> > > > jún. >> >> > > > > > >> 22., >> >> > > > > > >>> H, 21:34): >> >> > > > > > >>> >> >> > > > > > >>>> Hi, >> >> > > > > > >>>> with people proposing changes to the streaming part I >> also >> >> > > wanted >> >> > > > to >> >> > > > > > >>> throw >> >> > > > > > >>>> my hat into the ring. :D >> >> > > > > > >>>> >> >> > > > > > >>>> During the last few months, while I was getting >> acquainted >> >> > with >> >> > > > the >> >> > > > > > >>>> streaming system, I wrote down some thoughts I had about >> > how >> >> > > > things >> >> > > > > > >> could >> >> > > > > > >>>> be improved. Hopefully, they are in somewhat coherent >> shape >> >> > now, >> >> > > > so >> >> > > > > > >>> please >> >> > > > > > >>>> have a look if you are interested in this: >> >> > > > > > >>>> >> >> > > > > > >>>> >> >> > > > > > >>> >> >> > > > > > >> >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> > >> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing >> >> > > > > > >>>> >> >> > > > > > >>>> This mostly covers: >> >> > > > > > >>>> - Timestamps assigned at sources >> >> > > > > > >>>> - Out-of-order processing of elements in window >> operators >> >> > > > > > >>>> - API design >> >> > > > > > >>>> >> >> > > > > > >>>> Please let me know what you think. Comment in the >> document >> > or >> >> > > here >> >> > > > > in >> >> > > > > > >> the >> >> > > > > > >>>> mailing list. >> >> > > > > > >>>> >> >> > > > > > >>>> I have a PR in the makings that would introduce source >> >> > > timestamps >> >> > > > > and >> >> > > > > > >>>> watermarks for keeping track of them. I also hacked a >> >> > > > > proof-of-concept >> >> > > > > > >>> of a >> >> > > > > > >>>> windowing system that is able to process out-of-order >> > elements >> >> > > > > using a >> >> > > > > > >>>> FlatMap operator. (It uses panes to perform efficient >> >> > > > > > >> pre-aggregations.) >> >> > > > > > >>>> >> >> > > > > > >>>> Cheers, >> >> > > > > > >>>> Aljoscha >> >> > > > > > >>>> >> >> > > > > > >>> >> >> > > > > > >> >> >> > > > > > > >> >> > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >>