Nice starting point. Comment 1: "Each individual stream partition delivers elements strictly in order." (in 'Parallel Streams, Partitions, Time, and Ordering')
I would say "FIFO" and not "strictly in order". If data is not emitted in-order, the stream partition will not be in-order either. Comment 2: Why do we need KeyedDataStream. You can get everything done with GroupedDataStream (using a tumbling window of size = 1 tuple). -Matthias On 06/26/2015 07:42 PM, Stephan Ewen wrote: > Here is a first bit of what I have been writing down. Will add more over > the next days: > > > https://cwiki.apache.org/confluence/display/FLINK/Stream+Windows > > https://cwiki.apache.org/confluence/display/FLINK/Parallel+Streams%2C+Partitions%2C+Time%2C+and+Ordering > > > > On Thu, Jun 25, 2015 at 6:35 PM, Paris Carbone <par...@kth.se> wrote: > >> +1 for writing this down >> >>> On 25 Jun 2015, at 18:11, Aljoscha Krettek <aljos...@apache.org> wrote: >>> >>> +1 go ahead >>> >>> On Thu, 25 Jun 2015 at 18:02 Stephan Ewen <se...@apache.org> wrote: >>> >>>> Hey! >>>> >>>> This thread covers many different topics. Lets break this up into >> separate >>>> discussions. >>>> >>>> - Operator State is already driven by Gyula and Paris and happening on >> the >>>> above mentioned pull request and the followup discussions. >>>> >>>> - For windowing, this discussion has brought some results that we should >>>> sum up and clearly write down. >>>> I would like to chime in to do that based on what I learned from the >>>> document and this discussion. I also got some input from Marton about >> what >>>> he learned from mapping the Cloud DataFlow constructs to Flink. >>>> I'll draft a Wiki page (with the help of Aljoscha, Marton) that sums >>>> this up and documents it for users (if we decide to adopt this). >>>> Then we run this by Gyula, Matthias Sax and Kostas for feedback. >>>> >>>> - API style discussions should be yet another thread. This will probably >>>> be opened as people start to address that. >>>> >>>> >>>> I'll try to get a draft of the wiki version out tomorrow noon and send >> the >>>> link around. >>>> >>>> Greetings, >>>> Stephan >>>> >>>> >>>> >>>> On Thu, Jun 25, 2015 at 3:51 PM, Matthias J. Sax < >>>> mj...@informatik.hu-berlin.de> wrote: >>>> >>>>> Sure. I picked this up. Using the current model for "occurrence time >>>>> semantics" does not work. >>>>> >>>>> I elaborated on this in the past many times (but nobody cared). It is >>>>> important to make it clear to the user what semantics are supported. >>>>> Claiming to support "sliding windows" doesn't mean anything; there are >>>>> too many different semantics out there. :) >>>>> >>>>> >>>>> On 06/25/2015 03:35 PM, Aljoscha Krettek wrote: >>>>>> Yes, I am aware of this requirement and it would also be supported in >>>> my >>>>>> proposed model. >>>>>> >>>>>> The problem is, that the "custom timestamp" feature gives the >>>> impression >>>>>> that the elements would be windowed according to a user-timestamp. The >>>>>> results, however, are wrong because of the assumption about elements >>>>>> arriving in order. (This is what I was trying to show with my fancy >>>> ASCII >>>>>> art and result output. >>>>>> >>>>>> On Thu, 25 Jun 2015 at 15:26 Matthias J. Sax < >>>>> mj...@informatik.hu-berlin.de> >>>>>> wrote: >>>>>> >>>>>>> Hi Aljoscha, >>>>>>> >>>>>>> I like that you are pushing in this direction. However, IMHO you >>>>>>> misinterpreter the current approach. It does not assume that tuples >>>>>>> arrive in-order; the current approach has no notion about a >>>>>>> pre-defined-order (for example, the order in which the event are >>>>>>> created). There is only the notion of "arrival-order" at the >> operator. >>>>>>> From this "arrival-order" perspective, the result are correct(!). >>>>>>> >>>>>>> Windowing in the current approach means for example, "sum up an >>>>>>> attribute of all events you *received* in the last 5 seconds". That >>>> is a >>>>>>> different meaning that "sum up an attribute of all event that >>>> *occurred* >>>>>>> in the last 5 seconds". Both queries are valid and Flink should >>>> support >>>>>>> both IMHO. >>>>>>> >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> >>>>>>> >>>>>>> On 06/25/2015 03:03 PM, Aljoscha Krettek wrote: >>>>>>>> Yes, now this also processes about 3 mio Elements (Window Size 5 >> sec, >>>>>>> Slide >>>>>>>> 1 sec) but it still fluctuates a lot between 1 mio. and 5 mio. >>>>>>>> >>>>>>>> Performance is not my main concern, however. My concern is that the >>>>>>> current >>>>>>>> model assumes elements to arrive in order, which is simply not true. >>>>>>>> >>>>>>>> In your code you have these lines for specifying the window: >>>>>>>> .window(Time.of(1l, TimeUnit.SECONDS)) >>>>>>>> .every(Time.of(1l, TimeUnit.SECONDS)) >>>>>>>> >>>>>>>> Although this semantically specifies a tumbling window of size 1 sec >>>>> I'm >>>>>>>> afraid it uses the sliding window logic internally (because of the >>>>>>>> .every()). >>>>>>>> >>>>>>>> In my tests I only have the first line. >>>>>>>> >>>>>>>> >>>>>>>> On Thu, 25 Jun 2015 at 14:32 Gábor Gévay <gga...@gmail.com> wrote: >>>>>>>> >>>>>>>>> I'm very sorry, I had a bug in the InversePreReducer. It should be >>>>>>>>> fixed now. Can you please run it again? >>>>>>>>> >>>>>>>>> I also tried to reproduce some of your performance numbers, but I'm >>>>>>>>> getting only less than 1/10th of yours. For example, in the >> Tumbling >>>>>>>>> case, Current/Reduce produces only ~100000 for me. Do you have any >>>>>>>>> idea what I could be doing wrong? My code: >>>>>>>>> http://pastebin.com/zbEjmGhk >>>>>>>>> I am running it on a 2 GHz Core i7. >>>>>>>>> >>>>>>>>> Best regards, >>>>>>>>> Gabor >>>>>>>>> >>>>>>>>> >>>>>>>>> 2015-06-25 12:31 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>: >>>>>>>>>> Hi, >>>>>>>>>> I also ran the tests on top of PR 856 (inverse reducer) now. The >>>>>>> results >>>>>>>>>> seem incorrect. When I insert a Thread.sleep(1) in the tuple >>>> source, >>>>>>> all >>>>>>>>>> the previous tests reported around 3600 tuples (Size 5 sec, Slide >> 1 >>>>>>> sec) >>>>>>>>>> (Theoretically there would be 5000 tuples in 5 seconds but this is >>>>> due >>>>>>> to >>>>>>>>>> overhead). These are the results for the inverse reduce >>>> optimisation: >>>>>>>>>> (Tuple 0,38) >>>>>>>>>> (Tuple 0,829) >>>>>>>>>> (Tuple 0,1625) >>>>>>>>>> (Tuple 0,2424) >>>>>>>>>> (Tuple 0,3190) >>>>>>>>>> (Tuple 0,3198) >>>>>>>>>> (Tuple 0,-339368) >>>>>>>>>> (Tuple 0,-1315725) >>>>>>>>>> (Tuple 0,-2932932) >>>>>>>>>> (Tuple 0,-5082735) >>>>>>>>>> (Tuple 0,-7743256) >>>>>>>>>> (Tuple 0,75701046) >>>>>>>>>> (Tuple 0,642829470) >>>>>>>>>> (Tuple 0,2242018381) >>>>>>>>>> (Tuple 0,5190708618) >>>>>>>>>> (Tuple 0,10060360311) >>>>>>>>>> (Tuple 0,-94254951) >>>>>>>>>> (Tuple 0,-219806321293) >>>>>>>>>> (Tuple 0,-1258895232699) >>>>>>>>>> (Tuple 0,-4074432596329) >>>>>>>>>> >>>>>>>>>> One line is one emitted window count. This is what happens when I >>>>>>> remove >>>>>>>>>> the Thread.sleep(1): >>>>>>>>>> (Tuple 0,660676) >>>>>>>>>> (Tuple 0,2553733) >>>>>>>>>> (Tuple 0,3542696) >>>>>>>>>> (Tuple 0,1) >>>>>>>>>> (Tuple 0,1107035) >>>>>>>>>> (Tuple 0,2549491) >>>>>>>>>> (Tuple 0,4100387) >>>>>>>>>> (Tuple 0,-8406583360092) >>>>>>>>>> (Tuple 0,-8406582150743) >>>>>>>>>> (Tuple 0,-8406580427190) >>>>>>>>>> (Tuple 0,-8406580427190) >>>>>>>>>> (Tuple 0,-8406580427190) >>>>>>>>>> (Tuple 0,6847279255682044995) >>>>>>>>>> (Tuple 0,6847279255682044995) >>>>>>>>>> (Tuple 0,-5390528042713628318) >>>>>>>>>> (Tuple 0,-5390528042711551780) >>>>>>>>>> (Tuple 0,-5390528042711551780) >>>>>>>>>> >>>>>>>>>> So at some point the pre-reducer seems to go haywire and does not >>>>>>> recover >>>>>>>>>> from it. The good thing is that it does produce results now, where >>>>> the >>>>>>>>>> previous Current/Reduce would simply hang and not produce any >>>> output. >>>>>>>>>> >>>>>>>>>> On Thu, 25 Jun 2015 at 12:02 Gábor Gévay <gga...@gmail.com> >> wrote: >>>>>>>>>> >>>>>>>>>>> Hello, >>>>>>>>>>> >>>>>>>>>>> Aljoscha, can you please try the performance test of >>>> Current/Reduce >>>>>>>>>>> with the InversePreReducer in PR 856? (If you just call sum, it >>>> will >>>>>>>>>>> use an InversePreReducer.) It would be an interesting test, >>>> because >>>>>>>>>>> the inverse function optimization really depends on the stream >>>> being >>>>>>>>>>> ordered, and I think it has the potential of being faster then >>>>>>>>>>> Next/Reduce. Especially if the window size is much larger than >> the >>>>>>>>>>> slide size. >>>>>>>>>>> >>>>>>>>>>> Best regards, >>>>>>>>>>> Gabor >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek <aljos...@apache.org >>>>> : >>>>>>>>>>>> I think I'll have to elaborate a bit so I created a >>>>> proof-of-concept >>>>>>>>>>>> implementation of my Ideas and ran some throughput measurements >>>> to >>>>>>>>>>>> alleviate concerns about performance. >>>>>>>>>>>> >>>>>>>>>>>> First, though, I want to highlight again why the current >> approach >>>>>>> does >>>>>>>>>>> not >>>>>>>>>>>> work with out-of-order elements (which, again, occur constantly >>>> due >>>>>>> to >>>>>>>>>>> the >>>>>>>>>>>> distributed nature of the system). This is the example I posted >>>>>>>>> earlier: >>>>>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27. The plan >>>>>>> looks >>>>>>>>>>> like >>>>>>>>>>>> this: >>>>>>>>>>>> >>>>>>>>>>>> +--+ >>>>>>>>>>>> | | Source >>>>>>>>>>>> +--+ >>>>>>>>>>>> | >>>>>>>>>>>> +-----+ >>>>>>>>>>>> | | >>>>>>>>>>>> | +--+ >>>>>>>>>>>> | | | Identity Map >>>>>>>>>>>> | +--+ >>>>>>>>>>>> | | >>>>>>>>>>>> +-----+ >>>>>>>>>>>> | >>>>>>>>>>>> +--+ >>>>>>>>>>>> | | Window >>>>>>>>>>>> +--+ >>>>>>>>>>>> | >>>>>>>>>>>> | >>>>>>>>>>>> +--+ >>>>>>>>>>>> | | Sink >>>>>>>>>>>> +--+ >>>>>>>>>>>> >>>>>>>>>>>> So all it does is pass the elements through an identity map and >>>>> then >>>>>>>>>>> merge >>>>>>>>>>>> them again before the window operator. The source emits >> ascending >>>>>>>>>>> integers >>>>>>>>>>>> and the window operator has a custom timestamp extractor that >>>> uses >>>>>>> the >>>>>>>>>>>> integer itself as the timestamp and should create windows of >>>> size 4 >>>>>>>>> (that >>>>>>>>>>>> is elements with timestamp 0-3 are one window, the next are the >>>>>>>>> elements >>>>>>>>>>>> with timestamp 4-8, and so on). Since the topology basically >>>>> doubles >>>>>>>>> the >>>>>>>>>>>> elements form the source I would expect to get these windows: >>>>>>>>>>>> Window: 0, 0, 1, 1, 2, 2, 3, 3 >>>>>>>>>>>> Window: 4, 4, 6, 6, 7, 7, 8, 8 >>>>>>>>>>>> >>>>>>>>>>>> The output is this, however: >>>>>>>>>>>> Window: 0, 1, 2, 3, >>>>>>>>>>>> Window: 4, 0, 1, 2, 3, 4, 5, 6, 7, >>>>>>>>>>>> Window: 8, 9, 10, 11, >>>>>>>>>>>> Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, >>>>>>>>>>>> Window: 16, 17, 18, 19, >>>>>>>>>>>> Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, >>>>>>>>>>>> Window: 24, 25, 26, 27, >>>>>>>>>>>> Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, >>>>>>>>>>>> >>>>>>>>>>>> The reason is that the elements simply arrive out-of-order. >>>> Imagine >>>>>>>>> what >>>>>>>>>>>> would happen if the elements actually arrived with some delay >>>> from >>>>>>>>>>>> different operations. >>>>>>>>>>>> >>>>>>>>>>>> Now, on to the performance numbers. The proof-of-concept I >>>> created >>>>> is >>>>>>>>>>>> available here: >>>>>>>>>>>> >> https://github.com/aljoscha/flink/tree/event-time-window-fn-mock >>>> . >>>>>>> The >>>>>>>>>>> basic >>>>>>>>>>>> idea is that sources assign the current timestamp when emitting >>>>>>>>> elements. >>>>>>>>>>>> They also periodically emit watermarks that tell us that no >>>>> elements >>>>>>>>> with >>>>>>>>>>>> an earlier timestamp will be emitted. The watermarks propagate >>>>>>> through >>>>>>>>>>> the >>>>>>>>>>>> operators. The window operator looks at the timestamp of an >>>> element >>>>>>>>> and >>>>>>>>>>>> puts it into the buffer that corresponds to that window. When >> the >>>>>>>>> window >>>>>>>>>>>> operator receives a watermark it will look at the in-flight >>>> windows >>>>>>>>>>>> (basically the buffers) and emit those windows where the >>>> window-end >>>>>>> is >>>>>>>>>>>> before the watermark. >>>>>>>>>>>> >>>>>>>>>>>> For measuring throughput I did the following: The source emits >>>>> tuples >>>>>>>>> of >>>>>>>>>>>> the form ("tuple", 1) in an infinite loop. The window operator >>>> sums >>>>>>> up >>>>>>>>>>> the >>>>>>>>>>>> tuples, thereby counting how many tuples the window operator can >>>>>>>>> handle >>>>>>>>>>> in >>>>>>>>>>>> a given time window. There are two different implementations for >>>>> the >>>>>>>>>>>> summation: 1) simply summing up the values in a mapWindow(), >>>> there >>>>>>> you >>>>>>>>>>> get >>>>>>>>>>>> a List of all tuples and simple iterate over it. 2) using >> sum(1), >>>>>>>>> which >>>>>>>>>>> is >>>>>>>>>>>> implemented as a reduce() (that uses the pre-reducer >>>>> optimisations). >>>>>>>>>>>> >>>>>>>>>>>> These are the performance numbers (Current is the current >>>>>>>>> implementation, >>>>>>>>>>>> Next is my proof-of-concept): >>>>>>>>>>>> >>>>>>>>>>>> Tumbling (1 sec): >>>>>>>>>>>> - Current/Map: 1.6 mio >>>>>>>>>>>> - Current/Reduce: 2 mio >>>>>>>>>>>> - Next/Map: 2.2 mio >>>>>>>>>>>> - Next/Reduce: 4 mio >>>>>>>>>>>> >>>>>>>>>>>> Sliding (5 sec, slide 1 sec): >>>>>>>>>>>> - Current/Map: ca 3 mio (fluctuates a lot) >>>>>>>>>>>> - Current/Reduce: No output >>>>>>>>>>>> - Next/Map: ca 4 mio (fluctuates) >>>>>>>>>>>> - Next/Reduce: 10 mio >>>>>>>>>>>> >>>>>>>>>>>> The Next/Reduce variant can basically scale indefinitely with >>>>> window >>>>>>>>> size >>>>>>>>>>>> because the internal state does not rely on the number of >>>> elements >>>>>>>>> (it is >>>>>>>>>>>> just the current sum). The pre-reducer for sliding elements >>>> cannot >>>>>>>>> handle >>>>>>>>>>>> the amount of tuples, it produces no output. For the two Map >>>>> variants >>>>>>>>> the >>>>>>>>>>>> performance fluctuates because they always keep all the elements >>>> in >>>>>>> an >>>>>>>>>>>> internal buffer before emission, this seems to tax the garbage >>>>>>>>> collector >>>>>>>>>>> a >>>>>>>>>>>> bit and leads to random pauses. >>>>>>>>>>>> >>>>>>>>>>>> One thing that should be noted is that I had to disable the >>>>>>>>> fake-element >>>>>>>>>>>> emission thread, otherwise the Current versions would deadlock. >>>>>>>>>>>> >>>>>>>>>>>> So, I started working on this because I thought that >> out-of-order >>>>>>>>>>>> processing would be necessary for correctness. And it is >>>> certainly, >>>>>>>>> But >>>>>>>>>>> the >>>>>>>>>>>> proof-of-concept also shows that performance can be greatly >>>>> improved. >>>>>>>>>>>> >>>>>>>>>>>> On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <gyula.f...@gmail.com> >>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> I agree lets separate these topics from each other so we can >> get >>>>>>>>> faster >>>>>>>>>>>>> resolution. >>>>>>>>>>>>> >>>>>>>>>>>>> There is already a state discussion in the thread we started >>>> with >>>>>>>>> Paris. >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas < >>>>> ktzou...@apache.org >>>>>>>> >>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> I agree with supporting out-of-order out of the box :-), even >>>> if >>>>>>>>> this >>>>>>>>>>>> means >>>>>>>>>>>>>> a major refactoring. This is the right time to refactor the >>>>>>>>> streaming >>>>>>>>>>>> API >>>>>>>>>>>>>> before we pull it out of beta. I think that this is more >>>>> important >>>>>>>>>>> than >>>>>>>>>>>> new >>>>>>>>>>>>>> features in the streaming API, which can be prioritized once >>>> the >>>>>>>>> API >>>>>>>>>>> is >>>>>>>>>>>> out >>>>>>>>>>>>>> of beta (meaning, that IMO this is the right time to stall PRs >>>>>>>>> until >>>>>>>>>>> we >>>>>>>>>>>>>> agree on the design). >>>>>>>>>>>>>> >>>>>>>>>>>>>> There are three sections in the document: windowing, state, >> and >>>>>>>>> API. >>>>>>>>>>> How >>>>>>>>>>>>>> convoluted are those with each other? Can we separate the >>>>>>>>> discussion >>>>>>>>>>> or >>>>>>>>>>>> do >>>>>>>>>>>>>> we need to discuss those all together? I think part of the >>>>>>>>> difficulty >>>>>>>>>>> is >>>>>>>>>>>>>> that we are discussing three design choices at once. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning < >>>>>>>>> ted.dunn...@gmail.com> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Out of order is ubiquitous in the real-world. Typically, >> what >>>>>>>>>>>> happens is >>>>>>>>>>>>>>> that businesses will declare a maximum allowable delay for >>>>>>>>> delayed >>>>>>>>>>>>>>> transactions and will commit to results when that delay is >>>>>>>>> reached. >>>>>>>>>>>>>>> Transactions that arrive later than this cutoff are collected >>>>>>>>>>>> specially >>>>>>>>>>>>>> as >>>>>>>>>>>>>>> corrections which are reported/used when possible. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Clearly, ordering can also be violated during processing, but >>>> if >>>>>>>>> the >>>>>>>>>>>> data >>>>>>>>>>>>>>> is originally out of order the situation can't be repaired by >>>>> any >>>>>>>>>>>>>> protocol >>>>>>>>>>>>>>> fixes that prevent transactions from becoming disordered but >>>> has >>>>>>>>> to >>>>>>>>>>>>>> handled >>>>>>>>>>>>>>> at the data level. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek < >>>>>>>>>>> aljos...@apache.org >>>>>>>>>>>>> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I also don't like big changes but sometimes they are >>>> necessary. >>>>>>>>>>> The >>>>>>>>>>>>>>> reason >>>>>>>>>>>>>>>> why I'm so adamant about out-of-order processing is that >>>>>>>>>>>> out-of-order >>>>>>>>>>>>>>>> elements are not some exception that occurs once in a while; >>>>>>>>> they >>>>>>>>>>>> occur >>>>>>>>>>>>>>>> constantly in a distributed system. For example, in this: >>>>>>>>>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27 the >>>>>>>>>>> resulting >>>>>>>>>>>>>>>> windows >>>>>>>>>>>>>>>> are completely bogus because the current windowing system >>>>>>>>> assumes >>>>>>>>>>>>>>> elements >>>>>>>>>>>>>>>> to globally arrive in order, which is simply not true. (The >>>>>>>>>>> example >>>>>>>>>>>>>> has a >>>>>>>>>>>>>>>> source that generates increasing integers. Then these pass >>>>>>>>>>> through a >>>>>>>>>>>>>> map >>>>>>>>>>>>>>>> and are unioned with the original DataStream before a window >>>>>>>>>>>> operator.) >>>>>>>>>>>>>>>> This simulates elements arriving from different operators at >>>> a >>>>>>>>>>>>>> windowing >>>>>>>>>>>>>>>> operator. The example is also DOP=1, I imagine this to get >>>>>>>>> worse >>>>>>>>>>>> with >>>>>>>>>>>>>>>> higher DOP. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> What do you mean by costly? As I said, I have a >>>>>>>>> proof-of-concept >>>>>>>>>>>>>>> windowing >>>>>>>>>>>>>>>> operator that can handle out-or-order elements. This is an >>>>>>>>> example >>>>>>>>>>>>>> using >>>>>>>>>>>>>>>> the current Flink API: >>>>>>>>>>>>>>>> https://gist.github.com/aljoscha/f8dce0691732e344bbe8. >>>>>>>>>>>>>>>> (It is an infinite source of tuples and a 5 second window >>>>>>>>> operator >>>>>>>>>>>> that >>>>>>>>>>>>>>>> counts the tuples.) The first problem is that this code >>>>>>>>> deadlocks >>>>>>>>>>>>>> because >>>>>>>>>>>>>>>> of the thread that emits fake elements. If I disable the >> fake >>>>>>>>>>>> element >>>>>>>>>>>>>>> code >>>>>>>>>>>>>>>> it works, but the throughput using my mockup is 4 times >>>> higher >>>>>>>>> . >>>>>>>>>>> The >>>>>>>>>>>>>> gap >>>>>>>>>>>>>>>> widens dramatically if the window size increases. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> So, it actually increases performance (unless I'm making a >>>>>>>>> mistake >>>>>>>>>>>> in >>>>>>>>>>>>>> my >>>>>>>>>>>>>>>> explorations) and can handle elements that arrive >>>> out-of-order >>>>>>>>>>>> (which >>>>>>>>>>>>>>>> happens basically always in a real-world windowing >>>> use-cases). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <se...@apache.org >>> >>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> What I like a lot about Aljoscha's proposed design is that >>>> we >>>>>>>>>>>> need no >>>>>>>>>>>>>>>>> different code for "system time" vs. "event time". It only >>>>>>>>>>>> differs in >>>>>>>>>>>>>>>> where >>>>>>>>>>>>>>>>> the timestamps are assigned. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The OOP approach also gives you the semantics of total >>>>>>>>> ordering >>>>>>>>>>>>>> without >>>>>>>>>>>>>>>>> imposing merges on the streams. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax < >>>>>>>>>>>>>>>>> mj...@informatik.hu-berlin.de> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I agree that there should be multiple alternatives the >>>>>>>>> user(!) >>>>>>>>>>>> can >>>>>>>>>>>>>>>>>> choose from. Partial out-of-order processing works for >>>>>>>>>>> many/most >>>>>>>>>>>>>>>>>> aggregates. However, if you consider >>>>>>>>> Event-Pattern-Matching, >>>>>>>>>>>> global >>>>>>>>>>>>>>>>>> ordering in necessary (even if the performance penalty >>>>>>>>> might >>>>>>>>>>> be >>>>>>>>>>>>>>> high). >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I would also keep "system-time windows" as an alternative >>>>>>>>> to >>>>>>>>>>>>>> "source >>>>>>>>>>>>>>>>>> assigned ts-windows". >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> It might also be interesting to consider the following >>>>>>>>> paper >>>>>>>>>>> for >>>>>>>>>>>>>>>>>> overlapping windows: "Resource sharing in continuous >>>>>>>>>>>> sliding-window >>>>>>>>>>>>>>>>>> aggregates" >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> https://dl.acm.org/citation.cfm?id=1316720 >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On 06/23/2015 10:37 AM, Gyula Fóra wrote: >>>>>>>>>>>>>>>>>>> Hey >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I think we should not block PRs unnecessarily if your >>>>>>>>>>>> suggested >>>>>>>>>>>>>>>> changes >>>>>>>>>>>>>>>>>>> might touch them at some point. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Also I still think we should not put everything in the >>>>>>>>>>>> Datastream >>>>>>>>>>>>>>>>> because >>>>>>>>>>>>>>>>>>> it will be a huge mess. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Also we need to agree on the out of order processing, >>>>>>>>>>> whether >>>>>>>>>>>> we >>>>>>>>>>>>>>> want >>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>> the way you proposed it(which is quite costly). Another >>>>>>>>>>>>>> alternative >>>>>>>>>>>>>>>>>>> approach there which fits in the current windowing is to >>>>>>>>>>>> filter >>>>>>>>>>>>>> out >>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>> order events and apply a special handling operator on >>>>>>>>> them. >>>>>>>>>>>> This >>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>> fairly lightweight. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> My point is that we need to consider some alternative >>>>>>>>>>>> solutions. >>>>>>>>>>>>>>> And >>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>> should not block contributions along the way. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Cheers >>>>>>>>>>>>>>>>>>> Gyula >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek < >>>>>>>>>>>>>>>> aljos...@apache.org> >>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> The reason I posted this now is that we need to think >>>>>>>>> about >>>>>>>>>>>> the >>>>>>>>>>>>>>> API >>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>> windowing before proceeding with the PRs of Gabor >>>>>>>>> (inverse >>>>>>>>>>>>>> reduce) >>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>> Gyula (removal of "aggregate" functions on DataStream). >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> For the windowing, I think that the current model does >>>>>>>>> not >>>>>>>>>>>> work >>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>> out-of-order processing. Therefore, the whole windowing >>>>>>>>>>>>>>>> infrastructure >>>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>>> basically have to be redone. Meaning also that any work >>>>>>>>> on >>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> pre-aggregators or optimizations that we do now becomes >>>>>>>>>>>> useless. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> For the API, I proposed to restructure the interactions >>>>>>>>>>>> between >>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> different *DataStream classes and grouping/windowing. >>>>>>>>> (See >>>>>>>>>>>> API >>>>>>>>>>>>>>>> section >>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>> the doc I posted.) >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra < >>>>>>>>>>> gyula.f...@gmail.com >>>>>>>>>>>>> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Hi Aljoscha, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thanks for the nice summary, this is a very good >>>>>>>>>>> initiative. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I added some comments to the respective sections >>>>>>>>> (where I >>>>>>>>>>>> didnt >>>>>>>>>>>>>>>> fully >>>>>>>>>>>>>>>>>>>> agree >>>>>>>>>>>>>>>>>>>>> :).). >>>>>>>>>>>>>>>>>>>>> At some point I think it would be good to have a public >>>>>>>>>>>> hangout >>>>>>>>>>>>>>>>> session >>>>>>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>>>>> this, which could make a more dynamic discussion. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>> Gyula >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Aljoscha Krettek <aljos...@apache.org> ezt írta >>>>>>>>> (időpont: >>>>>>>>>>>>>> 2015. >>>>>>>>>>>>>>>> jún. >>>>>>>>>>>>>>>>>>>> 22., >>>>>>>>>>>>>>>>>>>>> H, 21:34): >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>>> with people proposing changes to the streaming part I >>>>>>>>>>> also >>>>>>>>>>>>>>> wanted >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>> throw >>>>>>>>>>>>>>>>>>>>>> my hat into the ring. :D >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> During the last few months, while I was getting >>>>>>>>>>> acquainted >>>>>>>>>>>>>> with >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> streaming system, I wrote down some thoughts I had >>>>>>>>> about >>>>>>>>>>>> how >>>>>>>>>>>>>>>> things >>>>>>>>>>>>>>>>>>>> could >>>>>>>>>>>>>>>>>>>>>> be improved. Hopefully, they are in somewhat coherent >>>>>>>>>>> shape >>>>>>>>>>>>>> now, >>>>>>>>>>>>>>>> so >>>>>>>>>>>>>>>>>>>>> please >>>>>>>>>>>>>>>>>>>>>> have a look if you are interested in this: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>> >>>> >> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> This mostly covers: >>>>>>>>>>>>>>>>>>>>>> - Timestamps assigned at sources >>>>>>>>>>>>>>>>>>>>>> - Out-of-order processing of elements in window >>>>>>>>>>> operators >>>>>>>>>>>>>>>>>>>>>> - API design >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Please let me know what you think. Comment in the >>>>>>>>>>> document >>>>>>>>>>>> or >>>>>>>>>>>>>>> here >>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> mailing list. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I have a PR in the makings that would introduce source >>>>>>>>>>>>>>> timestamps >>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>> watermarks for keeping track of them. I also hacked a >>>>>>>>>>>>>>>>> proof-of-concept >>>>>>>>>>>>>>>>>>>>> of a >>>>>>>>>>>>>>>>>>>>>> windowing system that is able to process out-of-order >>>>>>>>>>>> elements >>>>>>>>>>>>>>>>> using a >>>>>>>>>>>>>>>>>>>>>> FlatMap operator. (It uses panes to perform efficient >>>>>>>>>>>>>>>>>>>> pre-aggregations.) >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>> Aljoscha >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>> >> >> >
signature.asc
Description: OpenPGP digital signature