Here is a first bit of what I have been writing down. Will add more over the next days:
https://cwiki.apache.org/confluence/display/FLINK/Stream+Windows https://cwiki.apache.org/confluence/display/FLINK/Parallel+Streams%2C+Partitions%2C+Time%2C+and+Ordering On Thu, Jun 25, 2015 at 6:35 PM, Paris Carbone <par...@kth.se> wrote: > +1 for writing this down > > > On 25 Jun 2015, at 18:11, Aljoscha Krettek <aljos...@apache.org> wrote: > > > > +1 go ahead > > > > On Thu, 25 Jun 2015 at 18:02 Stephan Ewen <se...@apache.org> wrote: > > > >> Hey! > >> > >> This thread covers many different topics. Lets break this up into > separate > >> discussions. > >> > >> - Operator State is already driven by Gyula and Paris and happening on > the > >> above mentioned pull request and the followup discussions. > >> > >> - For windowing, this discussion has brought some results that we should > >> sum up and clearly write down. > >> I would like to chime in to do that based on what I learned from the > >> document and this discussion. I also got some input from Marton about > what > >> he learned from mapping the Cloud DataFlow constructs to Flink. > >> I'll draft a Wiki page (with the help of Aljoscha, Marton) that sums > >> this up and documents it for users (if we decide to adopt this). > >> Then we run this by Gyula, Matthias Sax and Kostas for feedback. > >> > >> - API style discussions should be yet another thread. This will probably > >> be opened as people start to address that. > >> > >> > >> I'll try to get a draft of the wiki version out tomorrow noon and send > the > >> link around. > >> > >> Greetings, > >> Stephan > >> > >> > >> > >> On Thu, Jun 25, 2015 at 3:51 PM, Matthias J. Sax < > >> mj...@informatik.hu-berlin.de> wrote: > >> > >>> Sure. I picked this up. Using the current model for "occurrence time > >>> semantics" does not work. > >>> > >>> I elaborated on this in the past many times (but nobody cared). It is > >>> important to make it clear to the user what semantics are supported. > >>> Claiming to support "sliding windows" doesn't mean anything; there are > >>> too many different semantics out there. :) > >>> > >>> > >>> On 06/25/2015 03:35 PM, Aljoscha Krettek wrote: > >>>> Yes, I am aware of this requirement and it would also be supported in > >> my > >>>> proposed model. > >>>> > >>>> The problem is, that the "custom timestamp" feature gives the > >> impression > >>>> that the elements would be windowed according to a user-timestamp. The > >>>> results, however, are wrong because of the assumption about elements > >>>> arriving in order. (This is what I was trying to show with my fancy > >> ASCII > >>>> art and result output. > >>>> > >>>> On Thu, 25 Jun 2015 at 15:26 Matthias J. Sax < > >>> mj...@informatik.hu-berlin.de> > >>>> wrote: > >>>> > >>>>> Hi Aljoscha, > >>>>> > >>>>> I like that you are pushing in this direction. However, IMHO you > >>>>> misinterpreter the current approach. It does not assume that tuples > >>>>> arrive in-order; the current approach has no notion about a > >>>>> pre-defined-order (for example, the order in which the event are > >>>>> created). There is only the notion of "arrival-order" at the > operator. > >>>>> From this "arrival-order" perspective, the result are correct(!). > >>>>> > >>>>> Windowing in the current approach means for example, "sum up an > >>>>> attribute of all events you *received* in the last 5 seconds". That > >> is a > >>>>> different meaning that "sum up an attribute of all event that > >> *occurred* > >>>>> in the last 5 seconds". Both queries are valid and Flink should > >> support > >>>>> both IMHO. > >>>>> > >>>>> > >>>>> -Matthias > >>>>> > >>>>> > >>>>> > >>>>> On 06/25/2015 03:03 PM, Aljoscha Krettek wrote: > >>>>>> Yes, now this also processes about 3 mio Elements (Window Size 5 > sec, > >>>>> Slide > >>>>>> 1 sec) but it still fluctuates a lot between 1 mio. and 5 mio. > >>>>>> > >>>>>> Performance is not my main concern, however. My concern is that the > >>>>> current > >>>>>> model assumes elements to arrive in order, which is simply not true. > >>>>>> > >>>>>> In your code you have these lines for specifying the window: > >>>>>> .window(Time.of(1l, TimeUnit.SECONDS)) > >>>>>> .every(Time.of(1l, TimeUnit.SECONDS)) > >>>>>> > >>>>>> Although this semantically specifies a tumbling window of size 1 sec > >>> I'm > >>>>>> afraid it uses the sliding window logic internally (because of the > >>>>>> .every()). > >>>>>> > >>>>>> In my tests I only have the first line. > >>>>>> > >>>>>> > >>>>>> On Thu, 25 Jun 2015 at 14:32 Gábor Gévay <gga...@gmail.com> wrote: > >>>>>> > >>>>>>> I'm very sorry, I had a bug in the InversePreReducer. It should be > >>>>>>> fixed now. Can you please run it again? > >>>>>>> > >>>>>>> I also tried to reproduce some of your performance numbers, but I'm > >>>>>>> getting only less than 1/10th of yours. For example, in the > Tumbling > >>>>>>> case, Current/Reduce produces only ~100000 for me. Do you have any > >>>>>>> idea what I could be doing wrong? My code: > >>>>>>> http://pastebin.com/zbEjmGhk > >>>>>>> I am running it on a 2 GHz Core i7. > >>>>>>> > >>>>>>> Best regards, > >>>>>>> Gabor > >>>>>>> > >>>>>>> > >>>>>>> 2015-06-25 12:31 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>: > >>>>>>>> Hi, > >>>>>>>> I also ran the tests on top of PR 856 (inverse reducer) now. The > >>>>> results > >>>>>>>> seem incorrect. When I insert a Thread.sleep(1) in the tuple > >> source, > >>>>> all > >>>>>>>> the previous tests reported around 3600 tuples (Size 5 sec, Slide > 1 > >>>>> sec) > >>>>>>>> (Theoretically there would be 5000 tuples in 5 seconds but this is > >>> due > >>>>> to > >>>>>>>> overhead). These are the results for the inverse reduce > >> optimisation: > >>>>>>>> (Tuple 0,38) > >>>>>>>> (Tuple 0,829) > >>>>>>>> (Tuple 0,1625) > >>>>>>>> (Tuple 0,2424) > >>>>>>>> (Tuple 0,3190) > >>>>>>>> (Tuple 0,3198) > >>>>>>>> (Tuple 0,-339368) > >>>>>>>> (Tuple 0,-1315725) > >>>>>>>> (Tuple 0,-2932932) > >>>>>>>> (Tuple 0,-5082735) > >>>>>>>> (Tuple 0,-7743256) > >>>>>>>> (Tuple 0,75701046) > >>>>>>>> (Tuple 0,642829470) > >>>>>>>> (Tuple 0,2242018381) > >>>>>>>> (Tuple 0,5190708618) > >>>>>>>> (Tuple 0,10060360311) > >>>>>>>> (Tuple 0,-94254951) > >>>>>>>> (Tuple 0,-219806321293) > >>>>>>>> (Tuple 0,-1258895232699) > >>>>>>>> (Tuple 0,-4074432596329) > >>>>>>>> > >>>>>>>> One line is one emitted window count. This is what happens when I > >>>>> remove > >>>>>>>> the Thread.sleep(1): > >>>>>>>> (Tuple 0,660676) > >>>>>>>> (Tuple 0,2553733) > >>>>>>>> (Tuple 0,3542696) > >>>>>>>> (Tuple 0,1) > >>>>>>>> (Tuple 0,1107035) > >>>>>>>> (Tuple 0,2549491) > >>>>>>>> (Tuple 0,4100387) > >>>>>>>> (Tuple 0,-8406583360092) > >>>>>>>> (Tuple 0,-8406582150743) > >>>>>>>> (Tuple 0,-8406580427190) > >>>>>>>> (Tuple 0,-8406580427190) > >>>>>>>> (Tuple 0,-8406580427190) > >>>>>>>> (Tuple 0,6847279255682044995) > >>>>>>>> (Tuple 0,6847279255682044995) > >>>>>>>> (Tuple 0,-5390528042713628318) > >>>>>>>> (Tuple 0,-5390528042711551780) > >>>>>>>> (Tuple 0,-5390528042711551780) > >>>>>>>> > >>>>>>>> So at some point the pre-reducer seems to go haywire and does not > >>>>> recover > >>>>>>>> from it. The good thing is that it does produce results now, where > >>> the > >>>>>>>> previous Current/Reduce would simply hang and not produce any > >> output. > >>>>>>>> > >>>>>>>> On Thu, 25 Jun 2015 at 12:02 Gábor Gévay <gga...@gmail.com> > wrote: > >>>>>>>> > >>>>>>>>> Hello, > >>>>>>>>> > >>>>>>>>> Aljoscha, can you please try the performance test of > >> Current/Reduce > >>>>>>>>> with the InversePreReducer in PR 856? (If you just call sum, it > >> will > >>>>>>>>> use an InversePreReducer.) It would be an interesting test, > >> because > >>>>>>>>> the inverse function optimization really depends on the stream > >> being > >>>>>>>>> ordered, and I think it has the potential of being faster then > >>>>>>>>> Next/Reduce. Especially if the window size is much larger than > the > >>>>>>>>> slide size. > >>>>>>>>> > >>>>>>>>> Best regards, > >>>>>>>>> Gabor > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek <aljos...@apache.org > >>> : > >>>>>>>>>> I think I'll have to elaborate a bit so I created a > >>> proof-of-concept > >>>>>>>>>> implementation of my Ideas and ran some throughput measurements > >> to > >>>>>>>>>> alleviate concerns about performance. > >>>>>>>>>> > >>>>>>>>>> First, though, I want to highlight again why the current > approach > >>>>> does > >>>>>>>>> not > >>>>>>>>>> work with out-of-order elements (which, again, occur constantly > >> due > >>>>> to > >>>>>>>>> the > >>>>>>>>>> distributed nature of the system). This is the example I posted > >>>>>>> earlier: > >>>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27. The plan > >>>>> looks > >>>>>>>>> like > >>>>>>>>>> this: > >>>>>>>>>> > >>>>>>>>>> +--+ > >>>>>>>>>> | | Source > >>>>>>>>>> +--+ > >>>>>>>>>> | > >>>>>>>>>> +-----+ > >>>>>>>>>> | | > >>>>>>>>>> | +--+ > >>>>>>>>>> | | | Identity Map > >>>>>>>>>> | +--+ > >>>>>>>>>> | | > >>>>>>>>>> +-----+ > >>>>>>>>>> | > >>>>>>>>>> +--+ > >>>>>>>>>> | | Window > >>>>>>>>>> +--+ > >>>>>>>>>> | > >>>>>>>>>> | > >>>>>>>>>> +--+ > >>>>>>>>>> | | Sink > >>>>>>>>>> +--+ > >>>>>>>>>> > >>>>>>>>>> So all it does is pass the elements through an identity map and > >>> then > >>>>>>>>> merge > >>>>>>>>>> them again before the window operator. The source emits > ascending > >>>>>>>>> integers > >>>>>>>>>> and the window operator has a custom timestamp extractor that > >> uses > >>>>> the > >>>>>>>>>> integer itself as the timestamp and should create windows of > >> size 4 > >>>>>>> (that > >>>>>>>>>> is elements with timestamp 0-3 are one window, the next are the > >>>>>>> elements > >>>>>>>>>> with timestamp 4-8, and so on). Since the topology basically > >>> doubles > >>>>>>> the > >>>>>>>>>> elements form the source I would expect to get these windows: > >>>>>>>>>> Window: 0, 0, 1, 1, 2, 2, 3, 3 > >>>>>>>>>> Window: 4, 4, 6, 6, 7, 7, 8, 8 > >>>>>>>>>> > >>>>>>>>>> The output is this, however: > >>>>>>>>>> Window: 0, 1, 2, 3, > >>>>>>>>>> Window: 4, 0, 1, 2, 3, 4, 5, 6, 7, > >>>>>>>>>> Window: 8, 9, 10, 11, > >>>>>>>>>> Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, > >>>>>>>>>> Window: 16, 17, 18, 19, > >>>>>>>>>> Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, > >>>>>>>>>> Window: 24, 25, 26, 27, > >>>>>>>>>> Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, > >>>>>>>>>> > >>>>>>>>>> The reason is that the elements simply arrive out-of-order. > >> Imagine > >>>>>>> what > >>>>>>>>>> would happen if the elements actually arrived with some delay > >> from > >>>>>>>>>> different operations. > >>>>>>>>>> > >>>>>>>>>> Now, on to the performance numbers. The proof-of-concept I > >> created > >>> is > >>>>>>>>>> available here: > >>>>>>>>>> > https://github.com/aljoscha/flink/tree/event-time-window-fn-mock > >> . > >>>>> The > >>>>>>>>> basic > >>>>>>>>>> idea is that sources assign the current timestamp when emitting > >>>>>>> elements. > >>>>>>>>>> They also periodically emit watermarks that tell us that no > >>> elements > >>>>>>> with > >>>>>>>>>> an earlier timestamp will be emitted. The watermarks propagate > >>>>> through > >>>>>>>>> the > >>>>>>>>>> operators. The window operator looks at the timestamp of an > >> element > >>>>>>> and > >>>>>>>>>> puts it into the buffer that corresponds to that window. When > the > >>>>>>> window > >>>>>>>>>> operator receives a watermark it will look at the in-flight > >> windows > >>>>>>>>>> (basically the buffers) and emit those windows where the > >> window-end > >>>>> is > >>>>>>>>>> before the watermark. > >>>>>>>>>> > >>>>>>>>>> For measuring throughput I did the following: The source emits > >>> tuples > >>>>>>> of > >>>>>>>>>> the form ("tuple", 1) in an infinite loop. The window operator > >> sums > >>>>> up > >>>>>>>>> the > >>>>>>>>>> tuples, thereby counting how many tuples the window operator can > >>>>>>> handle > >>>>>>>>> in > >>>>>>>>>> a given time window. There are two different implementations for > >>> the > >>>>>>>>>> summation: 1) simply summing up the values in a mapWindow(), > >> there > >>>>> you > >>>>>>>>> get > >>>>>>>>>> a List of all tuples and simple iterate over it. 2) using > sum(1), > >>>>>>> which > >>>>>>>>> is > >>>>>>>>>> implemented as a reduce() (that uses the pre-reducer > >>> optimisations). > >>>>>>>>>> > >>>>>>>>>> These are the performance numbers (Current is the current > >>>>>>> implementation, > >>>>>>>>>> Next is my proof-of-concept): > >>>>>>>>>> > >>>>>>>>>> Tumbling (1 sec): > >>>>>>>>>> - Current/Map: 1.6 mio > >>>>>>>>>> - Current/Reduce: 2 mio > >>>>>>>>>> - Next/Map: 2.2 mio > >>>>>>>>>> - Next/Reduce: 4 mio > >>>>>>>>>> > >>>>>>>>>> Sliding (5 sec, slide 1 sec): > >>>>>>>>>> - Current/Map: ca 3 mio (fluctuates a lot) > >>>>>>>>>> - Current/Reduce: No output > >>>>>>>>>> - Next/Map: ca 4 mio (fluctuates) > >>>>>>>>>> - Next/Reduce: 10 mio > >>>>>>>>>> > >>>>>>>>>> The Next/Reduce variant can basically scale indefinitely with > >>> window > >>>>>>> size > >>>>>>>>>> because the internal state does not rely on the number of > >> elements > >>>>>>> (it is > >>>>>>>>>> just the current sum). The pre-reducer for sliding elements > >> cannot > >>>>>>> handle > >>>>>>>>>> the amount of tuples, it produces no output. For the two Map > >>> variants > >>>>>>> the > >>>>>>>>>> performance fluctuates because they always keep all the elements > >> in > >>>>> an > >>>>>>>>>> internal buffer before emission, this seems to tax the garbage > >>>>>>> collector > >>>>>>>>> a > >>>>>>>>>> bit and leads to random pauses. > >>>>>>>>>> > >>>>>>>>>> One thing that should be noted is that I had to disable the > >>>>>>> fake-element > >>>>>>>>>> emission thread, otherwise the Current versions would deadlock. > >>>>>>>>>> > >>>>>>>>>> So, I started working on this because I thought that > out-of-order > >>>>>>>>>> processing would be necessary for correctness. And it is > >> certainly, > >>>>>>> But > >>>>>>>>> the > >>>>>>>>>> proof-of-concept also shows that performance can be greatly > >>> improved. > >>>>>>>>>> > >>>>>>>>>> On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <gyula.f...@gmail.com> > >>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>> I agree lets separate these topics from each other so we can > get > >>>>>>> faster > >>>>>>>>>>> resolution. > >>>>>>>>>>> > >>>>>>>>>>> There is already a state discussion in the thread we started > >> with > >>>>>>> Paris. > >>>>>>>>>>> > >>>>>>>>>>> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas < > >>> ktzou...@apache.org > >>>>>> > >>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> I agree with supporting out-of-order out of the box :-), even > >> if > >>>>>>> this > >>>>>>>>>> means > >>>>>>>>>>>> a major refactoring. This is the right time to refactor the > >>>>>>> streaming > >>>>>>>>>> API > >>>>>>>>>>>> before we pull it out of beta. I think that this is more > >>> important > >>>>>>>>> than > >>>>>>>>>> new > >>>>>>>>>>>> features in the streaming API, which can be prioritized once > >> the > >>>>>>> API > >>>>>>>>> is > >>>>>>>>>> out > >>>>>>>>>>>> of beta (meaning, that IMO this is the right time to stall PRs > >>>>>>> until > >>>>>>>>> we > >>>>>>>>>>>> agree on the design). > >>>>>>>>>>>> > >>>>>>>>>>>> There are three sections in the document: windowing, state, > and > >>>>>>> API. > >>>>>>>>> How > >>>>>>>>>>>> convoluted are those with each other? Can we separate the > >>>>>>> discussion > >>>>>>>>> or > >>>>>>>>>> do > >>>>>>>>>>>> we need to discuss those all together? I think part of the > >>>>>>> difficulty > >>>>>>>>> is > >>>>>>>>>>>> that we are discussing three design choices at once. > >>>>>>>>>>>> > >>>>>>>>>>>> On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning < > >>>>>>> ted.dunn...@gmail.com> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Out of order is ubiquitous in the real-world. Typically, > what > >>>>>>>>>> happens is > >>>>>>>>>>>>> that businesses will declare a maximum allowable delay for > >>>>>>> delayed > >>>>>>>>>>>>> transactions and will commit to results when that delay is > >>>>>>> reached. > >>>>>>>>>>>>> Transactions that arrive later than this cutoff are collected > >>>>>>>>>> specially > >>>>>>>>>>>> as > >>>>>>>>>>>>> corrections which are reported/used when possible. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Clearly, ordering can also be violated during processing, but > >> if > >>>>>>> the > >>>>>>>>>> data > >>>>>>>>>>>>> is originally out of order the situation can't be repaired by > >>> any > >>>>>>>>>>>> protocol > >>>>>>>>>>>>> fixes that prevent transactions from becoming disordered but > >> has > >>>>>>> to > >>>>>>>>>>>> handled > >>>>>>>>>>>>> at the data level. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek < > >>>>>>>>> aljos...@apache.org > >>>>>>>>>>> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> I also don't like big changes but sometimes they are > >> necessary. > >>>>>>>>> The > >>>>>>>>>>>>> reason > >>>>>>>>>>>>>> why I'm so adamant about out-of-order processing is that > >>>>>>>>>> out-of-order > >>>>>>>>>>>>>> elements are not some exception that occurs once in a while; > >>>>>>> they > >>>>>>>>>> occur > >>>>>>>>>>>>>> constantly in a distributed system. For example, in this: > >>>>>>>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27 the > >>>>>>>>> resulting > >>>>>>>>>>>>>> windows > >>>>>>>>>>>>>> are completely bogus because the current windowing system > >>>>>>> assumes > >>>>>>>>>>>>> elements > >>>>>>>>>>>>>> to globally arrive in order, which is simply not true. (The > >>>>>>>>> example > >>>>>>>>>>>> has a > >>>>>>>>>>>>>> source that generates increasing integers. Then these pass > >>>>>>>>> through a > >>>>>>>>>>>> map > >>>>>>>>>>>>>> and are unioned with the original DataStream before a window > >>>>>>>>>> operator.) > >>>>>>>>>>>>>> This simulates elements arriving from different operators at > >> a > >>>>>>>>>>>> windowing > >>>>>>>>>>>>>> operator. The example is also DOP=1, I imagine this to get > >>>>>>> worse > >>>>>>>>>> with > >>>>>>>>>>>>>> higher DOP. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> What do you mean by costly? As I said, I have a > >>>>>>> proof-of-concept > >>>>>>>>>>>>> windowing > >>>>>>>>>>>>>> operator that can handle out-or-order elements. This is an > >>>>>>> example > >>>>>>>>>>>> using > >>>>>>>>>>>>>> the current Flink API: > >>>>>>>>>>>>>> https://gist.github.com/aljoscha/f8dce0691732e344bbe8. > >>>>>>>>>>>>>> (It is an infinite source of tuples and a 5 second window > >>>>>>> operator > >>>>>>>>>> that > >>>>>>>>>>>>>> counts the tuples.) The first problem is that this code > >>>>>>> deadlocks > >>>>>>>>>>>> because > >>>>>>>>>>>>>> of the thread that emits fake elements. If I disable the > fake > >>>>>>>>>> element > >>>>>>>>>>>>> code > >>>>>>>>>>>>>> it works, but the throughput using my mockup is 4 times > >> higher > >>>>>>> . > >>>>>>>>> The > >>>>>>>>>>>> gap > >>>>>>>>>>>>>> widens dramatically if the window size increases. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> So, it actually increases performance (unless I'm making a > >>>>>>> mistake > >>>>>>>>>> in > >>>>>>>>>>>> my > >>>>>>>>>>>>>> explorations) and can handle elements that arrive > >> out-of-order > >>>>>>>>>> (which > >>>>>>>>>>>>>> happens basically always in a real-world windowing > >> use-cases). > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <se...@apache.org > > > >>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> What I like a lot about Aljoscha's proposed design is that > >> we > >>>>>>>>>> need no > >>>>>>>>>>>>>>> different code for "system time" vs. "event time". It only > >>>>>>>>>> differs in > >>>>>>>>>>>>>> where > >>>>>>>>>>>>>>> the timestamps are assigned. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> The OOP approach also gives you the semantics of total > >>>>>>> ordering > >>>>>>>>>>>> without > >>>>>>>>>>>>>>> imposing merges on the streams. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax < > >>>>>>>>>>>>>>> mj...@informatik.hu-berlin.de> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I agree that there should be multiple alternatives the > >>>>>>> user(!) > >>>>>>>>>> can > >>>>>>>>>>>>>>>> choose from. Partial out-of-order processing works for > >>>>>>>>> many/most > >>>>>>>>>>>>>>>> aggregates. However, if you consider > >>>>>>> Event-Pattern-Matching, > >>>>>>>>>> global > >>>>>>>>>>>>>>>> ordering in necessary (even if the performance penalty > >>>>>>> might > >>>>>>>>> be > >>>>>>>>>>>>> high). > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I would also keep "system-time windows" as an alternative > >>>>>>> to > >>>>>>>>>>>> "source > >>>>>>>>>>>>>>>> assigned ts-windows". > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> It might also be interesting to consider the following > >>>>>>> paper > >>>>>>>>> for > >>>>>>>>>>>>>>>> overlapping windows: "Resource sharing in continuous > >>>>>>>>>> sliding-window > >>>>>>>>>>>>>>>> aggregates" > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> https://dl.acm.org/citation.cfm?id=1316720 > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On 06/23/2015 10:37 AM, Gyula Fóra wrote: > >>>>>>>>>>>>>>>>> Hey > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> I think we should not block PRs unnecessarily if your > >>>>>>>>>> suggested > >>>>>>>>>>>>>> changes > >>>>>>>>>>>>>>>>> might touch them at some point. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Also I still think we should not put everything in the > >>>>>>>>>> Datastream > >>>>>>>>>>>>>>> because > >>>>>>>>>>>>>>>>> it will be a huge mess. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Also we need to agree on the out of order processing, > >>>>>>>>> whether > >>>>>>>>>> we > >>>>>>>>>>>>> want > >>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>> the way you proposed it(which is quite costly). Another > >>>>>>>>>>>> alternative > >>>>>>>>>>>>>>>>> approach there which fits in the current windowing is to > >>>>>>>>>> filter > >>>>>>>>>>>> out > >>>>>>>>>>>>>> if > >>>>>>>>>>>>>>>>> order events and apply a special handling operator on > >>>>>>> them. > >>>>>>>>>> This > >>>>>>>>>>>>>> would > >>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>> fairly lightweight. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> My point is that we need to consider some alternative > >>>>>>>>>> solutions. > >>>>>>>>>>>>> And > >>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>> should not block contributions along the way. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Cheers > >>>>>>>>>>>>>>>>> Gyula > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek < > >>>>>>>>>>>>>> aljos...@apache.org> > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> The reason I posted this now is that we need to think > >>>>>>> about > >>>>>>>>>> the > >>>>>>>>>>>>> API > >>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>> windowing before proceeding with the PRs of Gabor > >>>>>>> (inverse > >>>>>>>>>>>> reduce) > >>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>> Gyula (removal of "aggregate" functions on DataStream). > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> For the windowing, I think that the current model does > >>>>>>> not > >>>>>>>>>> work > >>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>> out-of-order processing. Therefore, the whole windowing > >>>>>>>>>>>>>> infrastructure > >>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>> basically have to be redone. Meaning also that any work > >>>>>>> on > >>>>>>>>>> the > >>>>>>>>>>>>>>>>>> pre-aggregators or optimizations that we do now becomes > >>>>>>>>>> useless. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> For the API, I proposed to restructure the interactions > >>>>>>>>>> between > >>>>>>>>>>>>> all > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> different *DataStream classes and grouping/windowing. > >>>>>>> (See > >>>>>>>>>> API > >>>>>>>>>>>>>> section > >>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>> the doc I posted.) > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra < > >>>>>>>>> gyula.f...@gmail.com > >>>>>>>>>>> > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Hi Aljoscha, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thanks for the nice summary, this is a very good > >>>>>>>>> initiative. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> I added some comments to the respective sections > >>>>>>> (where I > >>>>>>>>>> didnt > >>>>>>>>>>>>>> fully > >>>>>>>>>>>>>>>>>> agree > >>>>>>>>>>>>>>>>>>> :).). > >>>>>>>>>>>>>>>>>>> At some point I think it would be good to have a public > >>>>>>>>>> hangout > >>>>>>>>>>>>>>> session > >>>>>>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>>>>> this, which could make a more dynamic discussion. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>>>> Gyula > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Aljoscha Krettek <aljos...@apache.org> ezt írta > >>>>>>> (időpont: > >>>>>>>>>>>> 2015. > >>>>>>>>>>>>>> jún. > >>>>>>>>>>>>>>>>>> 22., > >>>>>>>>>>>>>>>>>>> H, 21:34): > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>>>>>>>> with people proposing changes to the streaming part I > >>>>>>>>> also > >>>>>>>>>>>>> wanted > >>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> throw > >>>>>>>>>>>>>>>>>>>> my hat into the ring. :D > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> During the last few months, while I was getting > >>>>>>>>> acquainted > >>>>>>>>>>>> with > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> streaming system, I wrote down some thoughts I had > >>>>>>> about > >>>>>>>>>> how > >>>>>>>>>>>>>> things > >>>>>>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>>>>> be improved. Hopefully, they are in somewhat coherent > >>>>>>>>> shape > >>>>>>>>>>>> now, > >>>>>>>>>>>>>> so > >>>>>>>>>>>>>>>>>>> please > >>>>>>>>>>>>>>>>>>>> have a look if you are interested in this: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>> > >>> > >> > https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> This mostly covers: > >>>>>>>>>>>>>>>>>>>> - Timestamps assigned at sources > >>>>>>>>>>>>>>>>>>>> - Out-of-order processing of elements in window > >>>>>>>>> operators > >>>>>>>>>>>>>>>>>>>> - API design > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Please let me know what you think. Comment in the > >>>>>>>>> document > >>>>>>>>>> or > >>>>>>>>>>>>> here > >>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> mailing list. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I have a PR in the makings that would introduce source > >>>>>>>>>>>>> timestamps > >>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>> watermarks for keeping track of them. I also hacked a > >>>>>>>>>>>>>>> proof-of-concept > >>>>>>>>>>>>>>>>>>> of a > >>>>>>>>>>>>>>>>>>>> windowing system that is able to process out-of-order > >>>>>>>>>> elements > >>>>>>>>>>>>>>> using a > >>>>>>>>>>>>>>>>>>>> FlatMap operator. (It uses panes to perform efficient > >>>>>>>>>>>>>>>>>> pre-aggregations.) > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>>>>> Aljoscha > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>> > >>> > >>> > >> > >