This was more a conceptual point-of-view argument. From an implementation point of view, skipping the window building step is a good idea if a tumbling 1-tuple-size window is detected.
I prefer to work with a minimum number of concepts (and apply internal optimization if possible) instead of using redundant concepts for special cases. Of course, this is my personal point of view. -Matthias On 06/27/2015 03:47 PM, Aljoscha Krettek wrote: > What do you mean by Comment 2? Using the whole window apparatus if you just > want to have, for example, a simple partitioned filter with partitioned > state seems a bit extravagant. > > On Sat, 27 Jun 2015 at 15:19 Matthias J. Sax <mj...@informatik.hu-berlin.de> > wrote: > >> Nice starting point. >> >> Comment 1: >> "Each individual stream partition delivers elements strictly in order." >> (in 'Parallel Streams, Partitions, Time, and Ordering') >> >> I would say "FIFO" and not "strictly in order". If data is not emitted >> in-order, the stream partition will not be in-order either. >> >> Comment 2: >> Why do we need KeyedDataStream. You can get everything done with >> GroupedDataStream (using a tumbling window of size = 1 tuple). >> >> >> -Matthias >> >> On 06/26/2015 07:42 PM, Stephan Ewen wrote: >>> Here is a first bit of what I have been writing down. Will add more over >>> the next days: >>> >>> >>> https://cwiki.apache.org/confluence/display/FLINK/Stream+Windows >>> >>> >> https://cwiki.apache.org/confluence/display/FLINK/Parallel+Streams%2C+Partitions%2C+Time%2C+and+Ordering >>> >>> >>> >>> On Thu, Jun 25, 2015 at 6:35 PM, Paris Carbone <par...@kth.se> wrote: >>> >>>> +1 for writing this down >>>> >>>>> On 25 Jun 2015, at 18:11, Aljoscha Krettek <aljos...@apache.org> >> wrote: >>>>> >>>>> +1 go ahead >>>>> >>>>> On Thu, 25 Jun 2015 at 18:02 Stephan Ewen <se...@apache.org> wrote: >>>>> >>>>>> Hey! >>>>>> >>>>>> This thread covers many different topics. Lets break this up into >>>> separate >>>>>> discussions. >>>>>> >>>>>> - Operator State is already driven by Gyula and Paris and happening on >>>> the >>>>>> above mentioned pull request and the followup discussions. >>>>>> >>>>>> - For windowing, this discussion has brought some results that we >> should >>>>>> sum up and clearly write down. >>>>>> I would like to chime in to do that based on what I learned from the >>>>>> document and this discussion. I also got some input from Marton about >>>> what >>>>>> he learned from mapping the Cloud DataFlow constructs to Flink. >>>>>> I'll draft a Wiki page (with the help of Aljoscha, Marton) that sums >>>>>> this up and documents it for users (if we decide to adopt this). >>>>>> Then we run this by Gyula, Matthias Sax and Kostas for feedback. >>>>>> >>>>>> - API style discussions should be yet another thread. This will >> probably >>>>>> be opened as people start to address that. >>>>>> >>>>>> >>>>>> I'll try to get a draft of the wiki version out tomorrow noon and send >>>> the >>>>>> link around. >>>>>> >>>>>> Greetings, >>>>>> Stephan >>>>>> >>>>>> >>>>>> >>>>>> On Thu, Jun 25, 2015 at 3:51 PM, Matthias J. Sax < >>>>>> mj...@informatik.hu-berlin.de> wrote: >>>>>> >>>>>>> Sure. I picked this up. Using the current model for "occurrence time >>>>>>> semantics" does not work. >>>>>>> >>>>>>> I elaborated on this in the past many times (but nobody cared). It is >>>>>>> important to make it clear to the user what semantics are supported. >>>>>>> Claiming to support "sliding windows" doesn't mean anything; there >> are >>>>>>> too many different semantics out there. :) >>>>>>> >>>>>>> >>>>>>> On 06/25/2015 03:35 PM, Aljoscha Krettek wrote: >>>>>>>> Yes, I am aware of this requirement and it would also be supported >> in >>>>>> my >>>>>>>> proposed model. >>>>>>>> >>>>>>>> The problem is, that the "custom timestamp" feature gives the >>>>>> impression >>>>>>>> that the elements would be windowed according to a user-timestamp. >> The >>>>>>>> results, however, are wrong because of the assumption about elements >>>>>>>> arriving in order. (This is what I was trying to show with my fancy >>>>>> ASCII >>>>>>>> art and result output. >>>>>>>> >>>>>>>> On Thu, 25 Jun 2015 at 15:26 Matthias J. Sax < >>>>>>> mj...@informatik.hu-berlin.de> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Aljoscha, >>>>>>>>> >>>>>>>>> I like that you are pushing in this direction. However, IMHO you >>>>>>>>> misinterpreter the current approach. It does not assume that tuples >>>>>>>>> arrive in-order; the current approach has no notion about a >>>>>>>>> pre-defined-order (for example, the order in which the event are >>>>>>>>> created). There is only the notion of "arrival-order" at the >>>> operator. >>>>>>>>> From this "arrival-order" perspective, the result are correct(!). >>>>>>>>> >>>>>>>>> Windowing in the current approach means for example, "sum up an >>>>>>>>> attribute of all events you *received* in the last 5 seconds". That >>>>>> is a >>>>>>>>> different meaning that "sum up an attribute of all event that >>>>>> *occurred* >>>>>>>>> in the last 5 seconds". Both queries are valid and Flink should >>>>>> support >>>>>>>>> both IMHO. >>>>>>>>> >>>>>>>>> >>>>>>>>> -Matthias >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On 06/25/2015 03:03 PM, Aljoscha Krettek wrote: >>>>>>>>>> Yes, now this also processes about 3 mio Elements (Window Size 5 >>>> sec, >>>>>>>>> Slide >>>>>>>>>> 1 sec) but it still fluctuates a lot between 1 mio. and 5 mio. >>>>>>>>>> >>>>>>>>>> Performance is not my main concern, however. My concern is that >> the >>>>>>>>> current >>>>>>>>>> model assumes elements to arrive in order, which is simply not >> true. >>>>>>>>>> >>>>>>>>>> In your code you have these lines for specifying the window: >>>>>>>>>> .window(Time.of(1l, TimeUnit.SECONDS)) >>>>>>>>>> .every(Time.of(1l, TimeUnit.SECONDS)) >>>>>>>>>> >>>>>>>>>> Although this semantically specifies a tumbling window of size 1 >> sec >>>>>>> I'm >>>>>>>>>> afraid it uses the sliding window logic internally (because of the >>>>>>>>>> .every()). >>>>>>>>>> >>>>>>>>>> In my tests I only have the first line. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, 25 Jun 2015 at 14:32 Gábor Gévay <gga...@gmail.com> >> wrote: >>>>>>>>>> >>>>>>>>>>> I'm very sorry, I had a bug in the InversePreReducer. It should >> be >>>>>>>>>>> fixed now. Can you please run it again? >>>>>>>>>>> >>>>>>>>>>> I also tried to reproduce some of your performance numbers, but >> I'm >>>>>>>>>>> getting only less than 1/10th of yours. For example, in the >>>> Tumbling >>>>>>>>>>> case, Current/Reduce produces only ~100000 for me. Do you have >> any >>>>>>>>>>> idea what I could be doing wrong? My code: >>>>>>>>>>> http://pastebin.com/zbEjmGhk >>>>>>>>>>> I am running it on a 2 GHz Core i7. >>>>>>>>>>> >>>>>>>>>>> Best regards, >>>>>>>>>>> Gabor >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> 2015-06-25 12:31 GMT+02:00 Aljoscha Krettek <aljos...@apache.org >>> : >>>>>>>>>>>> Hi, >>>>>>>>>>>> I also ran the tests on top of PR 856 (inverse reducer) now. The >>>>>>>>> results >>>>>>>>>>>> seem incorrect. When I insert a Thread.sleep(1) in the tuple >>>>>> source, >>>>>>>>> all >>>>>>>>>>>> the previous tests reported around 3600 tuples (Size 5 sec, >> Slide >>>> 1 >>>>>>>>> sec) >>>>>>>>>>>> (Theoretically there would be 5000 tuples in 5 seconds but this >> is >>>>>>> due >>>>>>>>> to >>>>>>>>>>>> overhead). These are the results for the inverse reduce >>>>>> optimisation: >>>>>>>>>>>> (Tuple 0,38) >>>>>>>>>>>> (Tuple 0,829) >>>>>>>>>>>> (Tuple 0,1625) >>>>>>>>>>>> (Tuple 0,2424) >>>>>>>>>>>> (Tuple 0,3190) >>>>>>>>>>>> (Tuple 0,3198) >>>>>>>>>>>> (Tuple 0,-339368) >>>>>>>>>>>> (Tuple 0,-1315725) >>>>>>>>>>>> (Tuple 0,-2932932) >>>>>>>>>>>> (Tuple 0,-5082735) >>>>>>>>>>>> (Tuple 0,-7743256) >>>>>>>>>>>> (Tuple 0,75701046) >>>>>>>>>>>> (Tuple 0,642829470) >>>>>>>>>>>> (Tuple 0,2242018381) >>>>>>>>>>>> (Tuple 0,5190708618) >>>>>>>>>>>> (Tuple 0,10060360311) >>>>>>>>>>>> (Tuple 0,-94254951) >>>>>>>>>>>> (Tuple 0,-219806321293) >>>>>>>>>>>> (Tuple 0,-1258895232699) >>>>>>>>>>>> (Tuple 0,-4074432596329) >>>>>>>>>>>> >>>>>>>>>>>> One line is one emitted window count. This is what happens when >> I >>>>>>>>> remove >>>>>>>>>>>> the Thread.sleep(1): >>>>>>>>>>>> (Tuple 0,660676) >>>>>>>>>>>> (Tuple 0,2553733) >>>>>>>>>>>> (Tuple 0,3542696) >>>>>>>>>>>> (Tuple 0,1) >>>>>>>>>>>> (Tuple 0,1107035) >>>>>>>>>>>> (Tuple 0,2549491) >>>>>>>>>>>> (Tuple 0,4100387) >>>>>>>>>>>> (Tuple 0,-8406583360092) >>>>>>>>>>>> (Tuple 0,-8406582150743) >>>>>>>>>>>> (Tuple 0,-8406580427190) >>>>>>>>>>>> (Tuple 0,-8406580427190) >>>>>>>>>>>> (Tuple 0,-8406580427190) >>>>>>>>>>>> (Tuple 0,6847279255682044995) >>>>>>>>>>>> (Tuple 0,6847279255682044995) >>>>>>>>>>>> (Tuple 0,-5390528042713628318) >>>>>>>>>>>> (Tuple 0,-5390528042711551780) >>>>>>>>>>>> (Tuple 0,-5390528042711551780) >>>>>>>>>>>> >>>>>>>>>>>> So at some point the pre-reducer seems to go haywire and does >> not >>>>>>>>> recover >>>>>>>>>>>> from it. The good thing is that it does produce results now, >> where >>>>>>> the >>>>>>>>>>>> previous Current/Reduce would simply hang and not produce any >>>>>> output. >>>>>>>>>>>> >>>>>>>>>>>> On Thu, 25 Jun 2015 at 12:02 Gábor Gévay <gga...@gmail.com> >>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hello, >>>>>>>>>>>>> >>>>>>>>>>>>> Aljoscha, can you please try the performance test of >>>>>> Current/Reduce >>>>>>>>>>>>> with the InversePreReducer in PR 856? (If you just call sum, it >>>>>> will >>>>>>>>>>>>> use an InversePreReducer.) It would be an interesting test, >>>>>> because >>>>>>>>>>>>> the inverse function optimization really depends on the stream >>>>>> being >>>>>>>>>>>>> ordered, and I think it has the potential of being faster then >>>>>>>>>>>>> Next/Reduce. Especially if the window size is much larger than >>>> the >>>>>>>>>>>>> slide size. >>>>>>>>>>>>> >>>>>>>>>>>>> Best regards, >>>>>>>>>>>>> Gabor >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek < >> aljos...@apache.org >>>>>>> : >>>>>>>>>>>>>> I think I'll have to elaborate a bit so I created a >>>>>>> proof-of-concept >>>>>>>>>>>>>> implementation of my Ideas and ran some throughput >> measurements >>>>>> to >>>>>>>>>>>>>> alleviate concerns about performance. >>>>>>>>>>>>>> >>>>>>>>>>>>>> First, though, I want to highlight again why the current >>>> approach >>>>>>>>> does >>>>>>>>>>>>> not >>>>>>>>>>>>>> work with out-of-order elements (which, again, occur >> constantly >>>>>> due >>>>>>>>> to >>>>>>>>>>>>> the >>>>>>>>>>>>>> distributed nature of the system). This is the example I >> posted >>>>>>>>>>> earlier: >>>>>>>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27. The >> plan >>>>>>>>> looks >>>>>>>>>>>>> like >>>>>>>>>>>>>> this: >>>>>>>>>>>>>> >>>>>>>>>>>>>> +--+ >>>>>>>>>>>>>> | | Source >>>>>>>>>>>>>> +--+ >>>>>>>>>>>>>> | >>>>>>>>>>>>>> +-----+ >>>>>>>>>>>>>> | | >>>>>>>>>>>>>> | +--+ >>>>>>>>>>>>>> | | | Identity Map >>>>>>>>>>>>>> | +--+ >>>>>>>>>>>>>> | | >>>>>>>>>>>>>> +-----+ >>>>>>>>>>>>>> | >>>>>>>>>>>>>> +--+ >>>>>>>>>>>>>> | | Window >>>>>>>>>>>>>> +--+ >>>>>>>>>>>>>> | >>>>>>>>>>>>>> | >>>>>>>>>>>>>> +--+ >>>>>>>>>>>>>> | | Sink >>>>>>>>>>>>>> +--+ >>>>>>>>>>>>>> >>>>>>>>>>>>>> So all it does is pass the elements through an identity map >> and >>>>>>> then >>>>>>>>>>>>> merge >>>>>>>>>>>>>> them again before the window operator. The source emits >>>> ascending >>>>>>>>>>>>> integers >>>>>>>>>>>>>> and the window operator has a custom timestamp extractor that >>>>>> uses >>>>>>>>> the >>>>>>>>>>>>>> integer itself as the timestamp and should create windows of >>>>>> size 4 >>>>>>>>>>> (that >>>>>>>>>>>>>> is elements with timestamp 0-3 are one window, the next are >> the >>>>>>>>>>> elements >>>>>>>>>>>>>> with timestamp 4-8, and so on). Since the topology basically >>>>>>> doubles >>>>>>>>>>> the >>>>>>>>>>>>>> elements form the source I would expect to get these windows: >>>>>>>>>>>>>> Window: 0, 0, 1, 1, 2, 2, 3, 3 >>>>>>>>>>>>>> Window: 4, 4, 6, 6, 7, 7, 8, 8 >>>>>>>>>>>>>> >>>>>>>>>>>>>> The output is this, however: >>>>>>>>>>>>>> Window: 0, 1, 2, 3, >>>>>>>>>>>>>> Window: 4, 0, 1, 2, 3, 4, 5, 6, 7, >>>>>>>>>>>>>> Window: 8, 9, 10, 11, >>>>>>>>>>>>>> Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, >>>>>>>>>>>>>> Window: 16, 17, 18, 19, >>>>>>>>>>>>>> Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, >>>>>>>>>>>>>> Window: 24, 25, 26, 27, >>>>>>>>>>>>>> Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, >>>>>>>>>>>>>> >>>>>>>>>>>>>> The reason is that the elements simply arrive out-of-order. >>>>>> Imagine >>>>>>>>>>> what >>>>>>>>>>>>>> would happen if the elements actually arrived with some delay >>>>>> from >>>>>>>>>>>>>> different operations. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Now, on to the performance numbers. The proof-of-concept I >>>>>> created >>>>>>> is >>>>>>>>>>>>>> available here: >>>>>>>>>>>>>> >>>> https://github.com/aljoscha/flink/tree/event-time-window-fn-mock >>>>>> . >>>>>>>>> The >>>>>>>>>>>>> basic >>>>>>>>>>>>>> idea is that sources assign the current timestamp when >> emitting >>>>>>>>>>> elements. >>>>>>>>>>>>>> They also periodically emit watermarks that tell us that no >>>>>>> elements >>>>>>>>>>> with >>>>>>>>>>>>>> an earlier timestamp will be emitted. The watermarks propagate >>>>>>>>> through >>>>>>>>>>>>> the >>>>>>>>>>>>>> operators. The window operator looks at the timestamp of an >>>>>> element >>>>>>>>>>> and >>>>>>>>>>>>>> puts it into the buffer that corresponds to that window. When >>>> the >>>>>>>>>>> window >>>>>>>>>>>>>> operator receives a watermark it will look at the in-flight >>>>>> windows >>>>>>>>>>>>>> (basically the buffers) and emit those windows where the >>>>>> window-end >>>>>>>>> is >>>>>>>>>>>>>> before the watermark. >>>>>>>>>>>>>> >>>>>>>>>>>>>> For measuring throughput I did the following: The source emits >>>>>>> tuples >>>>>>>>>>> of >>>>>>>>>>>>>> the form ("tuple", 1) in an infinite loop. The window operator >>>>>> sums >>>>>>>>> up >>>>>>>>>>>>> the >>>>>>>>>>>>>> tuples, thereby counting how many tuples the window operator >> can >>>>>>>>>>> handle >>>>>>>>>>>>> in >>>>>>>>>>>>>> a given time window. There are two different implementations >> for >>>>>>> the >>>>>>>>>>>>>> summation: 1) simply summing up the values in a mapWindow(), >>>>>> there >>>>>>>>> you >>>>>>>>>>>>> get >>>>>>>>>>>>>> a List of all tuples and simple iterate over it. 2) using >>>> sum(1), >>>>>>>>>>> which >>>>>>>>>>>>> is >>>>>>>>>>>>>> implemented as a reduce() (that uses the pre-reducer >>>>>>> optimisations). >>>>>>>>>>>>>> >>>>>>>>>>>>>> These are the performance numbers (Current is the current >>>>>>>>>>> implementation, >>>>>>>>>>>>>> Next is my proof-of-concept): >>>>>>>>>>>>>> >>>>>>>>>>>>>> Tumbling (1 sec): >>>>>>>>>>>>>> - Current/Map: 1.6 mio >>>>>>>>>>>>>> - Current/Reduce: 2 mio >>>>>>>>>>>>>> - Next/Map: 2.2 mio >>>>>>>>>>>>>> - Next/Reduce: 4 mio >>>>>>>>>>>>>> >>>>>>>>>>>>>> Sliding (5 sec, slide 1 sec): >>>>>>>>>>>>>> - Current/Map: ca 3 mio (fluctuates a lot) >>>>>>>>>>>>>> - Current/Reduce: No output >>>>>>>>>>>>>> - Next/Map: ca 4 mio (fluctuates) >>>>>>>>>>>>>> - Next/Reduce: 10 mio >>>>>>>>>>>>>> >>>>>>>>>>>>>> The Next/Reduce variant can basically scale indefinitely with >>>>>>> window >>>>>>>>>>> size >>>>>>>>>>>>>> because the internal state does not rely on the number of >>>>>> elements >>>>>>>>>>> (it is >>>>>>>>>>>>>> just the current sum). The pre-reducer for sliding elements >>>>>> cannot >>>>>>>>>>> handle >>>>>>>>>>>>>> the amount of tuples, it produces no output. For the two Map >>>>>>> variants >>>>>>>>>>> the >>>>>>>>>>>>>> performance fluctuates because they always keep all the >> elements >>>>>> in >>>>>>>>> an >>>>>>>>>>>>>> internal buffer before emission, this seems to tax the garbage >>>>>>>>>>> collector >>>>>>>>>>>>> a >>>>>>>>>>>>>> bit and leads to random pauses. >>>>>>>>>>>>>> >>>>>>>>>>>>>> One thing that should be noted is that I had to disable the >>>>>>>>>>> fake-element >>>>>>>>>>>>>> emission thread, otherwise the Current versions would >> deadlock. >>>>>>>>>>>>>> >>>>>>>>>>>>>> So, I started working on this because I thought that >>>> out-of-order >>>>>>>>>>>>>> processing would be necessary for correctness. And it is >>>>>> certainly, >>>>>>>>>>> But >>>>>>>>>>>>> the >>>>>>>>>>>>>> proof-of-concept also shows that performance can be greatly >>>>>>> improved. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <gyula.f...@gmail.com >>> >>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I agree lets separate these topics from each other so we can >>>> get >>>>>>>>>>> faster >>>>>>>>>>>>>>> resolution. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> There is already a state discussion in the thread we started >>>>>> with >>>>>>>>>>> Paris. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas < >>>>>>> ktzou...@apache.org >>>>>>>>>> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I agree with supporting out-of-order out of the box :-), >> even >>>>>> if >>>>>>>>>>> this >>>>>>>>>>>>>> means >>>>>>>>>>>>>>>> a major refactoring. This is the right time to refactor the >>>>>>>>>>> streaming >>>>>>>>>>>>>> API >>>>>>>>>>>>>>>> before we pull it out of beta. I think that this is more >>>>>>> important >>>>>>>>>>>>> than >>>>>>>>>>>>>> new >>>>>>>>>>>>>>>> features in the streaming API, which can be prioritized once >>>>>> the >>>>>>>>>>> API >>>>>>>>>>>>> is >>>>>>>>>>>>>> out >>>>>>>>>>>>>>>> of beta (meaning, that IMO this is the right time to stall >> PRs >>>>>>>>>>> until >>>>>>>>>>>>> we >>>>>>>>>>>>>>>> agree on the design). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> There are three sections in the document: windowing, state, >>>> and >>>>>>>>>>> API. >>>>>>>>>>>>> How >>>>>>>>>>>>>>>> convoluted are those with each other? Can we separate the >>>>>>>>>>> discussion >>>>>>>>>>>>> or >>>>>>>>>>>>>> do >>>>>>>>>>>>>>>> we need to discuss those all together? I think part of the >>>>>>>>>>> difficulty >>>>>>>>>>>>> is >>>>>>>>>>>>>>>> that we are discussing three design choices at once. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning < >>>>>>>>>>> ted.dunn...@gmail.com> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Out of order is ubiquitous in the real-world. Typically, >>>> what >>>>>>>>>>>>>> happens is >>>>>>>>>>>>>>>>> that businesses will declare a maximum allowable delay for >>>>>>>>>>> delayed >>>>>>>>>>>>>>>>> transactions and will commit to results when that delay is >>>>>>>>>>> reached. >>>>>>>>>>>>>>>>> Transactions that arrive later than this cutoff are >> collected >>>>>>>>>>>>>> specially >>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>> corrections which are reported/used when possible. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Clearly, ordering can also be violated during processing, >> but >>>>>> if >>>>>>>>>>> the >>>>>>>>>>>>>> data >>>>>>>>>>>>>>>>> is originally out of order the situation can't be repaired >> by >>>>>>> any >>>>>>>>>>>>>>>> protocol >>>>>>>>>>>>>>>>> fixes that prevent transactions from becoming disordered >> but >>>>>> has >>>>>>>>>>> to >>>>>>>>>>>>>>>> handled >>>>>>>>>>>>>>>>> at the data level. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek < >>>>>>>>>>>>> aljos...@apache.org >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I also don't like big changes but sometimes they are >>>>>> necessary. >>>>>>>>>>>>> The >>>>>>>>>>>>>>>>> reason >>>>>>>>>>>>>>>>>> why I'm so adamant about out-of-order processing is that >>>>>>>>>>>>>> out-of-order >>>>>>>>>>>>>>>>>> elements are not some exception that occurs once in a >> while; >>>>>>>>>>> they >>>>>>>>>>>>>> occur >>>>>>>>>>>>>>>>>> constantly in a distributed system. For example, in this: >>>>>>>>>>>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27 the >>>>>>>>>>>>> resulting >>>>>>>>>>>>>>>>>> windows >>>>>>>>>>>>>>>>>> are completely bogus because the current windowing system >>>>>>>>>>> assumes >>>>>>>>>>>>>>>>> elements >>>>>>>>>>>>>>>>>> to globally arrive in order, which is simply not true. >> (The >>>>>>>>>>>>> example >>>>>>>>>>>>>>>> has a >>>>>>>>>>>>>>>>>> source that generates increasing integers. Then these pass >>>>>>>>>>>>> through a >>>>>>>>>>>>>>>> map >>>>>>>>>>>>>>>>>> and are unioned with the original DataStream before a >> window >>>>>>>>>>>>>> operator.) >>>>>>>>>>>>>>>>>> This simulates elements arriving from different operators >> at >>>>>> a >>>>>>>>>>>>>>>> windowing >>>>>>>>>>>>>>>>>> operator. The example is also DOP=1, I imagine this to get >>>>>>>>>>> worse >>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>> higher DOP. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> What do you mean by costly? As I said, I have a >>>>>>>>>>> proof-of-concept >>>>>>>>>>>>>>>>> windowing >>>>>>>>>>>>>>>>>> operator that can handle out-or-order elements. This is an >>>>>>>>>>> example >>>>>>>>>>>>>>>> using >>>>>>>>>>>>>>>>>> the current Flink API: >>>>>>>>>>>>>>>>>> https://gist.github.com/aljoscha/f8dce0691732e344bbe8. >>>>>>>>>>>>>>>>>> (It is an infinite source of tuples and a 5 second window >>>>>>>>>>> operator >>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>> counts the tuples.) The first problem is that this code >>>>>>>>>>> deadlocks >>>>>>>>>>>>>>>> because >>>>>>>>>>>>>>>>>> of the thread that emits fake elements. If I disable the >>>> fake >>>>>>>>>>>>>> element >>>>>>>>>>>>>>>>> code >>>>>>>>>>>>>>>>>> it works, but the throughput using my mockup is 4 times >>>>>> higher >>>>>>>>>>> . >>>>>>>>>>>>> The >>>>>>>>>>>>>>>> gap >>>>>>>>>>>>>>>>>> widens dramatically if the window size increases. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> So, it actually increases performance (unless I'm making a >>>>>>>>>>> mistake >>>>>>>>>>>>>> in >>>>>>>>>>>>>>>> my >>>>>>>>>>>>>>>>>> explorations) and can handle elements that arrive >>>>>> out-of-order >>>>>>>>>>>>>> (which >>>>>>>>>>>>>>>>>> happens basically always in a real-world windowing >>>>>> use-cases). >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Tue, 23 Jun 2015 at 12:51 Stephan Ewen < >> se...@apache.org >>>>> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> What I like a lot about Aljoscha's proposed design is >> that >>>>>> we >>>>>>>>>>>>>> need no >>>>>>>>>>>>>>>>>>> different code for "system time" vs. "event time". It >> only >>>>>>>>>>>>>> differs in >>>>>>>>>>>>>>>>>> where >>>>>>>>>>>>>>>>>>> the timestamps are assigned. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> The OOP approach also gives you the semantics of total >>>>>>>>>>> ordering >>>>>>>>>>>>>>>> without >>>>>>>>>>>>>>>>>>> imposing merges on the streams. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax < >>>>>>>>>>>>>>>>>>> mj...@informatik.hu-berlin.de> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I agree that there should be multiple alternatives the >>>>>>>>>>> user(!) >>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>> choose from. Partial out-of-order processing works for >>>>>>>>>>>>> many/most >>>>>>>>>>>>>>>>>>>> aggregates. However, if you consider >>>>>>>>>>> Event-Pattern-Matching, >>>>>>>>>>>>>> global >>>>>>>>>>>>>>>>>>>> ordering in necessary (even if the performance penalty >>>>>>>>>>> might >>>>>>>>>>>>> be >>>>>>>>>>>>>>>>> high). >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I would also keep "system-time windows" as an >> alternative >>>>>>>>>>> to >>>>>>>>>>>>>>>> "source >>>>>>>>>>>>>>>>>>>> assigned ts-windows". >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> It might also be interesting to consider the following >>>>>>>>>>> paper >>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>> overlapping windows: "Resource sharing in continuous >>>>>>>>>>>>>> sliding-window >>>>>>>>>>>>>>>>>>>> aggregates" >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> https://dl.acm.org/citation.cfm?id=1316720 >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On 06/23/2015 10:37 AM, Gyula Fóra wrote: >>>>>>>>>>>>>>>>>>>>> Hey >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I think we should not block PRs unnecessarily if your >>>>>>>>>>>>>> suggested >>>>>>>>>>>>>>>>>> changes >>>>>>>>>>>>>>>>>>>>> might touch them at some point. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Also I still think we should not put everything in the >>>>>>>>>>>>>> Datastream >>>>>>>>>>>>>>>>>>> because >>>>>>>>>>>>>>>>>>>>> it will be a huge mess. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Also we need to agree on the out of order processing, >>>>>>>>>>>>> whether >>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>> want >>>>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>>> the way you proposed it(which is quite costly). Another >>>>>>>>>>>>>>>> alternative >>>>>>>>>>>>>>>>>>>>> approach there which fits in the current windowing is >> to >>>>>>>>>>>>>> filter >>>>>>>>>>>>>>>> out >>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>>> order events and apply a special handling operator on >>>>>>>>>>> them. >>>>>>>>>>>>>> This >>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>> fairly lightweight. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> My point is that we need to consider some alternative >>>>>>>>>>>>>> solutions. >>>>>>>>>>>>>>>>> And >>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>> should not block contributions along the way. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Cheers >>>>>>>>>>>>>>>>>>>>> Gyula >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek < >>>>>>>>>>>>>>>>>> aljos...@apache.org> >>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> The reason I posted this now is that we need to think >>>>>>>>>>> about >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> API >>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>> windowing before proceeding with the PRs of Gabor >>>>>>>>>>> (inverse >>>>>>>>>>>>>>>> reduce) >>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>> Gyula (removal of "aggregate" functions on >> DataStream). >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> For the windowing, I think that the current model does >>>>>>>>>>> not >>>>>>>>>>>>>> work >>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>> out-of-order processing. Therefore, the whole >> windowing >>>>>>>>>>>>>>>>>> infrastructure >>>>>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>>>>> basically have to be redone. Meaning also that any >> work >>>>>>>>>>> on >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> pre-aggregators or optimizations that we do now >> becomes >>>>>>>>>>>>>> useless. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> For the API, I proposed to restructure the >> interactions >>>>>>>>>>>>>> between >>>>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> different *DataStream classes and grouping/windowing. >>>>>>>>>>> (See >>>>>>>>>>>>>> API >>>>>>>>>>>>>>>>>> section >>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>> the doc I posted.) >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra < >>>>>>>>>>>>> gyula.f...@gmail.com >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Hi Aljoscha, >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Thanks for the nice summary, this is a very good >>>>>>>>>>>>> initiative. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> I added some comments to the respective sections >>>>>>>>>>> (where I >>>>>>>>>>>>>> didnt >>>>>>>>>>>>>>>>>> fully >>>>>>>>>>>>>>>>>>>>>> agree >>>>>>>>>>>>>>>>>>>>>>> :).). >>>>>>>>>>>>>>>>>>>>>>> At some point I think it would be good to have a >> public >>>>>>>>>>>>>> hangout >>>>>>>>>>>>>>>>>>> session >>>>>>>>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>>>>>>> this, which could make a more dynamic discussion. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>> Gyula >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Aljoscha Krettek <aljos...@apache.org> ezt írta >>>>>>>>>>> (időpont: >>>>>>>>>>>>>>>> 2015. >>>>>>>>>>>>>>>>>> jún. >>>>>>>>>>>>>>>>>>>>>> 22., >>>>>>>>>>>>>>>>>>>>>>> H, 21:34): >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>>>>> with people proposing changes to the streaming part >> I >>>>>>>>>>>>> also >>>>>>>>>>>>>>>>> wanted >>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>> throw >>>>>>>>>>>>>>>>>>>>>>>> my hat into the ring. :D >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> During the last few months, while I was getting >>>>>>>>>>>>> acquainted >>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>> streaming system, I wrote down some thoughts I had >>>>>>>>>>> about >>>>>>>>>>>>>> how >>>>>>>>>>>>>>>>>> things >>>>>>>>>>>>>>>>>>>>>> could >>>>>>>>>>>>>>>>>>>>>>>> be improved. Hopefully, they are in somewhat >> coherent >>>>>>>>>>>>> shape >>>>>>>>>>>>>>>> now, >>>>>>>>>>>>>>>>>> so >>>>>>>>>>>>>>>>>>>>>>> please >>>>>>>>>>>>>>>>>>>>>>>> have a look if you are interested in this: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> >> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> This mostly covers: >>>>>>>>>>>>>>>>>>>>>>>> - Timestamps assigned at sources >>>>>>>>>>>>>>>>>>>>>>>> - Out-of-order processing of elements in window >>>>>>>>>>>>> operators >>>>>>>>>>>>>>>>>>>>>>>> - API design >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Please let me know what you think. Comment in the >>>>>>>>>>>>> document >>>>>>>>>>>>>> or >>>>>>>>>>>>>>>>> here >>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>> mailing list. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> I have a PR in the makings that would introduce >> source >>>>>>>>>>>>>>>>> timestamps >>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>> watermarks for keeping track of them. I also hacked >> a >>>>>>>>>>>>>>>>>>> proof-of-concept >>>>>>>>>>>>>>>>>>>>>>> of a >>>>>>>>>>>>>>>>>>>>>>>> windowing system that is able to process >> out-of-order >>>>>>>>>>>>>> elements >>>>>>>>>>>>>>>>>>> using a >>>>>>>>>>>>>>>>>>>>>>>> FlatMap operator. (It uses panes to perform >> efficient >>>>>>>>>>>>>>>>>>>>>> pre-aggregations.) >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>>> Aljoscha >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>> >>>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature