Hi Aljoscha, I like that you are pushing in this direction. However, IMHO you misinterpreter the current approach. It does not assume that tuples arrive in-order; the current approach has no notion about a pre-defined-order (for example, the order in which the event are created). There is only the notion of "arrival-order" at the operator. From this "arrival-order" perspective, the result are correct(!).
Windowing in the current approach means for example, "sum up an attribute of all events you *received* in the last 5 seconds". That is a different meaning that "sum up an attribute of all event that *occurred* in the last 5 seconds". Both queries are valid and Flink should support both IMHO. -Matthias On 06/25/2015 03:03 PM, Aljoscha Krettek wrote: > Yes, now this also processes about 3 mio Elements (Window Size 5 sec, Slide > 1 sec) but it still fluctuates a lot between 1 mio. and 5 mio. > > Performance is not my main concern, however. My concern is that the current > model assumes elements to arrive in order, which is simply not true. > > In your code you have these lines for specifying the window: > .window(Time.of(1l, TimeUnit.SECONDS)) > .every(Time.of(1l, TimeUnit.SECONDS)) > > Although this semantically specifies a tumbling window of size 1 sec I'm > afraid it uses the sliding window logic internally (because of the > .every()). > > In my tests I only have the first line. > > > On Thu, 25 Jun 2015 at 14:32 Gábor Gévay <gga...@gmail.com> wrote: > >> I'm very sorry, I had a bug in the InversePreReducer. It should be >> fixed now. Can you please run it again? >> >> I also tried to reproduce some of your performance numbers, but I'm >> getting only less than 1/10th of yours. For example, in the Tumbling >> case, Current/Reduce produces only ~100000 for me. Do you have any >> idea what I could be doing wrong? My code: >> http://pastebin.com/zbEjmGhk >> I am running it on a 2 GHz Core i7. >> >> Best regards, >> Gabor >> >> >> 2015-06-25 12:31 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>: >>> Hi, >>> I also ran the tests on top of PR 856 (inverse reducer) now. The results >>> seem incorrect. When I insert a Thread.sleep(1) in the tuple source, all >>> the previous tests reported around 3600 tuples (Size 5 sec, Slide 1 sec) >>> (Theoretically there would be 5000 tuples in 5 seconds but this is due to >>> overhead). These are the results for the inverse reduce optimisation: >>> (Tuple 0,38) >>> (Tuple 0,829) >>> (Tuple 0,1625) >>> (Tuple 0,2424) >>> (Tuple 0,3190) >>> (Tuple 0,3198) >>> (Tuple 0,-339368) >>> (Tuple 0,-1315725) >>> (Tuple 0,-2932932) >>> (Tuple 0,-5082735) >>> (Tuple 0,-7743256) >>> (Tuple 0,75701046) >>> (Tuple 0,642829470) >>> (Tuple 0,2242018381) >>> (Tuple 0,5190708618) >>> (Tuple 0,10060360311) >>> (Tuple 0,-94254951) >>> (Tuple 0,-219806321293) >>> (Tuple 0,-1258895232699) >>> (Tuple 0,-4074432596329) >>> >>> One line is one emitted window count. This is what happens when I remove >>> the Thread.sleep(1): >>> (Tuple 0,660676) >>> (Tuple 0,2553733) >>> (Tuple 0,3542696) >>> (Tuple 0,1) >>> (Tuple 0,1107035) >>> (Tuple 0,2549491) >>> (Tuple 0,4100387) >>> (Tuple 0,-8406583360092) >>> (Tuple 0,-8406582150743) >>> (Tuple 0,-8406580427190) >>> (Tuple 0,-8406580427190) >>> (Tuple 0,-8406580427190) >>> (Tuple 0,6847279255682044995) >>> (Tuple 0,6847279255682044995) >>> (Tuple 0,-5390528042713628318) >>> (Tuple 0,-5390528042711551780) >>> (Tuple 0,-5390528042711551780) >>> >>> So at some point the pre-reducer seems to go haywire and does not recover >>> from it. The good thing is that it does produce results now, where the >>> previous Current/Reduce would simply hang and not produce any output. >>> >>> On Thu, 25 Jun 2015 at 12:02 Gábor Gévay <gga...@gmail.com> wrote: >>> >>>> Hello, >>>> >>>> Aljoscha, can you please try the performance test of Current/Reduce >>>> with the InversePreReducer in PR 856? (If you just call sum, it will >>>> use an InversePreReducer.) It would be an interesting test, because >>>> the inverse function optimization really depends on the stream being >>>> ordered, and I think it has the potential of being faster then >>>> Next/Reduce. Especially if the window size is much larger than the >>>> slide size. >>>> >>>> Best regards, >>>> Gabor >>>> >>>> >>>> 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>: >>>>> I think I'll have to elaborate a bit so I created a proof-of-concept >>>>> implementation of my Ideas and ran some throughput measurements to >>>>> alleviate concerns about performance. >>>>> >>>>> First, though, I want to highlight again why the current approach does >>>> not >>>>> work with out-of-order elements (which, again, occur constantly due to >>>> the >>>>> distributed nature of the system). This is the example I posted >> earlier: >>>>> https://gist.github.com/aljoscha/a367012646ab98208d27. The plan looks >>>> like >>>>> this: >>>>> >>>>> +--+ >>>>> | | Source >>>>> +--+ >>>>> | >>>>> +-----+ >>>>> | | >>>>> | +--+ >>>>> | | | Identity Map >>>>> | +--+ >>>>> | | >>>>> +-----+ >>>>> | >>>>> +--+ >>>>> | | Window >>>>> +--+ >>>>> | >>>>> | >>>>> +--+ >>>>> | | Sink >>>>> +--+ >>>>> >>>>> So all it does is pass the elements through an identity map and then >>>> merge >>>>> them again before the window operator. The source emits ascending >>>> integers >>>>> and the window operator has a custom timestamp extractor that uses the >>>>> integer itself as the timestamp and should create windows of size 4 >> (that >>>>> is elements with timestamp 0-3 are one window, the next are the >> elements >>>>> with timestamp 4-8, and so on). Since the topology basically doubles >> the >>>>> elements form the source I would expect to get these windows: >>>>> Window: 0, 0, 1, 1, 2, 2, 3, 3 >>>>> Window: 4, 4, 6, 6, 7, 7, 8, 8 >>>>> >>>>> The output is this, however: >>>>> Window: 0, 1, 2, 3, >>>>> Window: 4, 0, 1, 2, 3, 4, 5, 6, 7, >>>>> Window: 8, 9, 10, 11, >>>>> Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, >>>>> Window: 16, 17, 18, 19, >>>>> Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, >>>>> Window: 24, 25, 26, 27, >>>>> Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, >>>>> >>>>> The reason is that the elements simply arrive out-of-order. Imagine >> what >>>>> would happen if the elements actually arrived with some delay from >>>>> different operations. >>>>> >>>>> Now, on to the performance numbers. The proof-of-concept I created is >>>>> available here: >>>>> https://github.com/aljoscha/flink/tree/event-time-window-fn-mock. The >>>> basic >>>>> idea is that sources assign the current timestamp when emitting >> elements. >>>>> They also periodically emit watermarks that tell us that no elements >> with >>>>> an earlier timestamp will be emitted. The watermarks propagate through >>>> the >>>>> operators. The window operator looks at the timestamp of an element >> and >>>>> puts it into the buffer that corresponds to that window. When the >> window >>>>> operator receives a watermark it will look at the in-flight windows >>>>> (basically the buffers) and emit those windows where the window-end is >>>>> before the watermark. >>>>> >>>>> For measuring throughput I did the following: The source emits tuples >> of >>>>> the form ("tuple", 1) in an infinite loop. The window operator sums up >>>> the >>>>> tuples, thereby counting how many tuples the window operator can >> handle >>>> in >>>>> a given time window. There are two different implementations for the >>>>> summation: 1) simply summing up the values in a mapWindow(), there you >>>> get >>>>> a List of all tuples and simple iterate over it. 2) using sum(1), >> which >>>> is >>>>> implemented as a reduce() (that uses the pre-reducer optimisations). >>>>> >>>>> These are the performance numbers (Current is the current >> implementation, >>>>> Next is my proof-of-concept): >>>>> >>>>> Tumbling (1 sec): >>>>> - Current/Map: 1.6 mio >>>>> - Current/Reduce: 2 mio >>>>> - Next/Map: 2.2 mio >>>>> - Next/Reduce: 4 mio >>>>> >>>>> Sliding (5 sec, slide 1 sec): >>>>> - Current/Map: ca 3 mio (fluctuates a lot) >>>>> - Current/Reduce: No output >>>>> - Next/Map: ca 4 mio (fluctuates) >>>>> - Next/Reduce: 10 mio >>>>> >>>>> The Next/Reduce variant can basically scale indefinitely with window >> size >>>>> because the internal state does not rely on the number of elements >> (it is >>>>> just the current sum). The pre-reducer for sliding elements cannot >> handle >>>>> the amount of tuples, it produces no output. For the two Map variants >> the >>>>> performance fluctuates because they always keep all the elements in an >>>>> internal buffer before emission, this seems to tax the garbage >> collector >>>> a >>>>> bit and leads to random pauses. >>>>> >>>>> One thing that should be noted is that I had to disable the >> fake-element >>>>> emission thread, otherwise the Current versions would deadlock. >>>>> >>>>> So, I started working on this because I thought that out-of-order >>>>> processing would be necessary for correctness. And it is certainly, >> But >>>> the >>>>> proof-of-concept also shows that performance can be greatly improved. >>>>> >>>>> On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <gyula.f...@gmail.com> wrote: >>>>>> >>>>>> I agree lets separate these topics from each other so we can get >> faster >>>>>> resolution. >>>>>> >>>>>> There is already a state discussion in the thread we started with >> Paris. >>>>>> >>>>>> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <ktzou...@apache.org> >>>>> wrote: >>>>>> >>>>>>> I agree with supporting out-of-order out of the box :-), even if >> this >>>>> means >>>>>>> a major refactoring. This is the right time to refactor the >> streaming >>>>> API >>>>>>> before we pull it out of beta. I think that this is more important >>>> than >>>>> new >>>>>>> features in the streaming API, which can be prioritized once the >> API >>>> is >>>>> out >>>>>>> of beta (meaning, that IMO this is the right time to stall PRs >> until >>>> we >>>>>>> agree on the design). >>>>>>> >>>>>>> There are three sections in the document: windowing, state, and >> API. >>>> How >>>>>>> convoluted are those with each other? Can we separate the >> discussion >>>> or >>>>> do >>>>>>> we need to discuss those all together? I think part of the >> difficulty >>>> is >>>>>>> that we are discussing three design choices at once. >>>>>>> >>>>>>> On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning < >> ted.dunn...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Out of order is ubiquitous in the real-world. Typically, what >>>>> happens is >>>>>>>> that businesses will declare a maximum allowable delay for >> delayed >>>>>>>> transactions and will commit to results when that delay is >> reached. >>>>>>>> Transactions that arrive later than this cutoff are collected >>>>> specially >>>>>>> as >>>>>>>> corrections which are reported/used when possible. >>>>>>>> >>>>>>>> Clearly, ordering can also be violated during processing, but if >> the >>>>> data >>>>>>>> is originally out of order the situation can't be repaired by any >>>>>>> protocol >>>>>>>> fixes that prevent transactions from becoming disordered but has >> to >>>>>>> handled >>>>>>>> at the data level. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek < >>>> aljos...@apache.org >>>>>> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I also don't like big changes but sometimes they are necessary. >>>> The >>>>>>>> reason >>>>>>>>> why I'm so adamant about out-of-order processing is that >>>>> out-of-order >>>>>>>>> elements are not some exception that occurs once in a while; >> they >>>>> occur >>>>>>>>> constantly in a distributed system. For example, in this: >>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27 the >>>> resulting >>>>>>>>> windows >>>>>>>>> are completely bogus because the current windowing system >> assumes >>>>>>>> elements >>>>>>>>> to globally arrive in order, which is simply not true. (The >>>> example >>>>>>> has a >>>>>>>>> source that generates increasing integers. Then these pass >>>> through a >>>>>>> map >>>>>>>>> and are unioned with the original DataStream before a window >>>>> operator.) >>>>>>>>> This simulates elements arriving from different operators at a >>>>>>> windowing >>>>>>>>> operator. The example is also DOP=1, I imagine this to get >> worse >>>>> with >>>>>>>>> higher DOP. >>>>>>>>> >>>>>>>>> What do you mean by costly? As I said, I have a >> proof-of-concept >>>>>>>> windowing >>>>>>>>> operator that can handle out-or-order elements. This is an >> example >>>>>>> using >>>>>>>>> the current Flink API: >>>>>>>>> https://gist.github.com/aljoscha/f8dce0691732e344bbe8. >>>>>>>>> (It is an infinite source of tuples and a 5 second window >> operator >>>>> that >>>>>>>>> counts the tuples.) The first problem is that this code >> deadlocks >>>>>>> because >>>>>>>>> of the thread that emits fake elements. If I disable the fake >>>>> element >>>>>>>> code >>>>>>>>> it works, but the throughput using my mockup is 4 times higher >> . >>>> The >>>>>>> gap >>>>>>>>> widens dramatically if the window size increases. >>>>>>>>> >>>>>>>>> So, it actually increases performance (unless I'm making a >> mistake >>>>> in >>>>>>> my >>>>>>>>> explorations) and can handle elements that arrive out-of-order >>>>> (which >>>>>>>>> happens basically always in a real-world windowing use-cases). >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <se...@apache.org> >>>> wrote: >>>>>>>>> >>>>>>>>>> What I like a lot about Aljoscha's proposed design is that we >>>>> need no >>>>>>>>>> different code for "system time" vs. "event time". It only >>>>> differs in >>>>>>>>> where >>>>>>>>>> the timestamps are assigned. >>>>>>>>>> >>>>>>>>>> The OOP approach also gives you the semantics of total >> ordering >>>>>>> without >>>>>>>>>> imposing merges on the streams. >>>>>>>>>> >>>>>>>>>> On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax < >>>>>>>>>> mj...@informatik.hu-berlin.de> wrote: >>>>>>>>>> >>>>>>>>>>> I agree that there should be multiple alternatives the >> user(!) >>>>> can >>>>>>>>>>> choose from. Partial out-of-order processing works for >>>> many/most >>>>>>>>>>> aggregates. However, if you consider >> Event-Pattern-Matching, >>>>> global >>>>>>>>>>> ordering in necessary (even if the performance penalty >> might >>>> be >>>>>>>> high). >>>>>>>>>>> >>>>>>>>>>> I would also keep "system-time windows" as an alternative >> to >>>>>>> "source >>>>>>>>>>> assigned ts-windows". >>>>>>>>>>> >>>>>>>>>>> It might also be interesting to consider the following >> paper >>>> for >>>>>>>>>>> overlapping windows: "Resource sharing in continuous >>>>> sliding-window >>>>>>>>>>> aggregates" >>>>>>>>>>> >>>>>>>>>>>> https://dl.acm.org/citation.cfm?id=1316720 >>>>>>>>>>> >>>>>>>>>>> -Matthias >>>>>>>>>>> >>>>>>>>>>> On 06/23/2015 10:37 AM, Gyula Fóra wrote: >>>>>>>>>>>> Hey >>>>>>>>>>>> >>>>>>>>>>>> I think we should not block PRs unnecessarily if your >>>>> suggested >>>>>>>>> changes >>>>>>>>>>>> might touch them at some point. >>>>>>>>>>>> >>>>>>>>>>>> Also I still think we should not put everything in the >>>>> Datastream >>>>>>>>>> because >>>>>>>>>>>> it will be a huge mess. >>>>>>>>>>>> >>>>>>>>>>>> Also we need to agree on the out of order processing, >>>> whether >>>>> we >>>>>>>> want >>>>>>>>>> it >>>>>>>>>>>> the way you proposed it(which is quite costly). Another >>>>>>> alternative >>>>>>>>>>>> approach there which fits in the current windowing is to >>>>> filter >>>>>>> out >>>>>>>>> if >>>>>>>>>>>> order events and apply a special handling operator on >> them. >>>>> This >>>>>>>>> would >>>>>>>>>> be >>>>>>>>>>>> fairly lightweight. >>>>>>>>>>>> >>>>>>>>>>>> My point is that we need to consider some alternative >>>>> solutions. >>>>>>>> And >>>>>>>>> we >>>>>>>>>>>> should not block contributions along the way. >>>>>>>>>>>> >>>>>>>>>>>> Cheers >>>>>>>>>>>> Gyula >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek < >>>>>>>>> aljos...@apache.org> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> The reason I posted this now is that we need to think >> about >>>>> the >>>>>>>> API >>>>>>>>>> and >>>>>>>>>>>>> windowing before proceeding with the PRs of Gabor >> (inverse >>>>>>> reduce) >>>>>>>>> and >>>>>>>>>>>>> Gyula (removal of "aggregate" functions on DataStream). >>>>>>>>>>>>> >>>>>>>>>>>>> For the windowing, I think that the current model does >> not >>>>> work >>>>>>>> for >>>>>>>>>>>>> out-of-order processing. Therefore, the whole windowing >>>>>>>>> infrastructure >>>>>>>>>>> will >>>>>>>>>>>>> basically have to be redone. Meaning also that any work >> on >>>>> the >>>>>>>>>>>>> pre-aggregators or optimizations that we do now becomes >>>>> useless. >>>>>>>>>>>>> >>>>>>>>>>>>> For the API, I proposed to restructure the interactions >>>>> between >>>>>>>> all >>>>>>>>>> the >>>>>>>>>>>>> different *DataStream classes and grouping/windowing. >> (See >>>>> API >>>>>>>>> section >>>>>>>>>>> of >>>>>>>>>>>>> the doc I posted.) >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra < >>>> gyula.f...@gmail.com >>>>>> >>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Aljoscha, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for the nice summary, this is a very good >>>> initiative. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I added some comments to the respective sections >> (where I >>>>> didnt >>>>>>>>> fully >>>>>>>>>>>>> agree >>>>>>>>>>>>>> :).). >>>>>>>>>>>>>> At some point I think it would be good to have a public >>>>> hangout >>>>>>>>>> session >>>>>>>>>>>>> on >>>>>>>>>>>>>> this, which could make a more dynamic discussion. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>> Gyula >>>>>>>>>>>>>> >>>>>>>>>>>>>> Aljoscha Krettek <aljos...@apache.org> ezt írta >> (időpont: >>>>>>> 2015. >>>>>>>>> jún. >>>>>>>>>>>>> 22., >>>>>>>>>>>>>> H, 21:34): >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>> with people proposing changes to the streaming part I >>>> also >>>>>>>> wanted >>>>>>>>> to >>>>>>>>>>>>>> throw >>>>>>>>>>>>>>> my hat into the ring. :D >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> During the last few months, while I was getting >>>> acquainted >>>>>>> with >>>>>>>>> the >>>>>>>>>>>>>>> streaming system, I wrote down some thoughts I had >> about >>>>> how >>>>>>>>> things >>>>>>>>>>>>> could >>>>>>>>>>>>>>> be improved. Hopefully, they are in somewhat coherent >>>> shape >>>>>>> now, >>>>>>>>> so >>>>>>>>>>>>>> please >>>>>>>>>>>>>>> have a look if you are interested in this: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>>> >> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> This mostly covers: >>>>>>>>>>>>>>> - Timestamps assigned at sources >>>>>>>>>>>>>>> - Out-of-order processing of elements in window >>>> operators >>>>>>>>>>>>>>> - API design >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Please let me know what you think. Comment in the >>>> document >>>>> or >>>>>>>> here >>>>>>>>>> in >>>>>>>>>>>>> the >>>>>>>>>>>>>>> mailing list. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I have a PR in the makings that would introduce source >>>>>>>> timestamps >>>>>>>>>> and >>>>>>>>>>>>>>> watermarks for keeping track of them. I also hacked a >>>>>>>>>> proof-of-concept >>>>>>>>>>>>>> of a >>>>>>>>>>>>>>> windowing system that is able to process out-of-order >>>>> elements >>>>>>>>>> using a >>>>>>>>>>>>>>> FlatMap operator. (It uses panes to perform efficient >>>>>>>>>>>>> pre-aggregations.) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>> Aljoscha >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>> >> >
signature.asc
Description: OpenPGP digital signature