I like the bit about the API a lot. What I don't see yet is how delta window can work in a distributed way with out-of-order elements.
On Fri, 26 Jun 2015 at 19:43 Stephan Ewen <se...@apache.org> wrote: > Here is a first bit of what I have been writing down. Will add more over > the next days: > > > https://cwiki.apache.org/confluence/display/FLINK/Stream+Windows > > > https://cwiki.apache.org/confluence/display/FLINK/Parallel+Streams%2C+Partitions%2C+Time%2C+and+Ordering > > > > On Thu, Jun 25, 2015 at 6:35 PM, Paris Carbone <par...@kth.se> wrote: > > > +1 for writing this down > > > > > On 25 Jun 2015, at 18:11, Aljoscha Krettek <aljos...@apache.org> > wrote: > > > > > > +1 go ahead > > > > > > On Thu, 25 Jun 2015 at 18:02 Stephan Ewen <se...@apache.org> wrote: > > > > > >> Hey! > > >> > > >> This thread covers many different topics. Lets break this up into > > separate > > >> discussions. > > >> > > >> - Operator State is already driven by Gyula and Paris and happening on > > the > > >> above mentioned pull request and the followup discussions. > > >> > > >> - For windowing, this discussion has brought some results that we > should > > >> sum up and clearly write down. > > >> I would like to chime in to do that based on what I learned from the > > >> document and this discussion. I also got some input from Marton about > > what > > >> he learned from mapping the Cloud DataFlow constructs to Flink. > > >> I'll draft a Wiki page (with the help of Aljoscha, Marton) that sums > > >> this up and documents it for users (if we decide to adopt this). > > >> Then we run this by Gyula, Matthias Sax and Kostas for feedback. > > >> > > >> - API style discussions should be yet another thread. This will > probably > > >> be opened as people start to address that. > > >> > > >> > > >> I'll try to get a draft of the wiki version out tomorrow noon and send > > the > > >> link around. > > >> > > >> Greetings, > > >> Stephan > > >> > > >> > > >> > > >> On Thu, Jun 25, 2015 at 3:51 PM, Matthias J. Sax < > > >> mj...@informatik.hu-berlin.de> wrote: > > >> > > >>> Sure. I picked this up. Using the current model for "occurrence time > > >>> semantics" does not work. > > >>> > > >>> I elaborated on this in the past many times (but nobody cared). It is > > >>> important to make it clear to the user what semantics are supported. > > >>> Claiming to support "sliding windows" doesn't mean anything; there > are > > >>> too many different semantics out there. :) > > >>> > > >>> > > >>> On 06/25/2015 03:35 PM, Aljoscha Krettek wrote: > > >>>> Yes, I am aware of this requirement and it would also be supported > in > > >> my > > >>>> proposed model. > > >>>> > > >>>> The problem is, that the "custom timestamp" feature gives the > > >> impression > > >>>> that the elements would be windowed according to a user-timestamp. > The > > >>>> results, however, are wrong because of the assumption about elements > > >>>> arriving in order. (This is what I was trying to show with my fancy > > >> ASCII > > >>>> art and result output. > > >>>> > > >>>> On Thu, 25 Jun 2015 at 15:26 Matthias J. Sax < > > >>> mj...@informatik.hu-berlin.de> > > >>>> wrote: > > >>>> > > >>>>> Hi Aljoscha, > > >>>>> > > >>>>> I like that you are pushing in this direction. However, IMHO you > > >>>>> misinterpreter the current approach. It does not assume that tuples > > >>>>> arrive in-order; the current approach has no notion about a > > >>>>> pre-defined-order (for example, the order in which the event are > > >>>>> created). There is only the notion of "arrival-order" at the > > operator. > > >>>>> From this "arrival-order" perspective, the result are correct(!). > > >>>>> > > >>>>> Windowing in the current approach means for example, "sum up an > > >>>>> attribute of all events you *received* in the last 5 seconds". That > > >> is a > > >>>>> different meaning that "sum up an attribute of all event that > > >> *occurred* > > >>>>> in the last 5 seconds". Both queries are valid and Flink should > > >> support > > >>>>> both IMHO. > > >>>>> > > >>>>> > > >>>>> -Matthias > > >>>>> > > >>>>> > > >>>>> > > >>>>> On 06/25/2015 03:03 PM, Aljoscha Krettek wrote: > > >>>>>> Yes, now this also processes about 3 mio Elements (Window Size 5 > > sec, > > >>>>> Slide > > >>>>>> 1 sec) but it still fluctuates a lot between 1 mio. and 5 mio. > > >>>>>> > > >>>>>> Performance is not my main concern, however. My concern is that > the > > >>>>> current > > >>>>>> model assumes elements to arrive in order, which is simply not > true. > > >>>>>> > > >>>>>> In your code you have these lines for specifying the window: > > >>>>>> .window(Time.of(1l, TimeUnit.SECONDS)) > > >>>>>> .every(Time.of(1l, TimeUnit.SECONDS)) > > >>>>>> > > >>>>>> Although this semantically specifies a tumbling window of size 1 > sec > > >>> I'm > > >>>>>> afraid it uses the sliding window logic internally (because of the > > >>>>>> .every()). > > >>>>>> > > >>>>>> In my tests I only have the first line. > > >>>>>> > > >>>>>> > > >>>>>> On Thu, 25 Jun 2015 at 14:32 Gábor Gévay <gga...@gmail.com> > wrote: > > >>>>>> > > >>>>>>> I'm very sorry, I had a bug in the InversePreReducer. It should > be > > >>>>>>> fixed now. Can you please run it again? > > >>>>>>> > > >>>>>>> I also tried to reproduce some of your performance numbers, but > I'm > > >>>>>>> getting only less than 1/10th of yours. For example, in the > > Tumbling > > >>>>>>> case, Current/Reduce produces only ~100000 for me. Do you have > any > > >>>>>>> idea what I could be doing wrong? My code: > > >>>>>>> http://pastebin.com/zbEjmGhk > > >>>>>>> I am running it on a 2 GHz Core i7. > > >>>>>>> > > >>>>>>> Best regards, > > >>>>>>> Gabor > > >>>>>>> > > >>>>>>> > > >>>>>>> 2015-06-25 12:31 GMT+02:00 Aljoscha Krettek <aljos...@apache.org > >: > > >>>>>>>> Hi, > > >>>>>>>> I also ran the tests on top of PR 856 (inverse reducer) now. The > > >>>>> results > > >>>>>>>> seem incorrect. When I insert a Thread.sleep(1) in the tuple > > >> source, > > >>>>> all > > >>>>>>>> the previous tests reported around 3600 tuples (Size 5 sec, > Slide > > 1 > > >>>>> sec) > > >>>>>>>> (Theoretically there would be 5000 tuples in 5 seconds but this > is > > >>> due > > >>>>> to > > >>>>>>>> overhead). These are the results for the inverse reduce > > >> optimisation: > > >>>>>>>> (Tuple 0,38) > > >>>>>>>> (Tuple 0,829) > > >>>>>>>> (Tuple 0,1625) > > >>>>>>>> (Tuple 0,2424) > > >>>>>>>> (Tuple 0,3190) > > >>>>>>>> (Tuple 0,3198) > > >>>>>>>> (Tuple 0,-339368) > > >>>>>>>> (Tuple 0,-1315725) > > >>>>>>>> (Tuple 0,-2932932) > > >>>>>>>> (Tuple 0,-5082735) > > >>>>>>>> (Tuple 0,-7743256) > > >>>>>>>> (Tuple 0,75701046) > > >>>>>>>> (Tuple 0,642829470) > > >>>>>>>> (Tuple 0,2242018381) > > >>>>>>>> (Tuple 0,5190708618) > > >>>>>>>> (Tuple 0,10060360311) > > >>>>>>>> (Tuple 0,-94254951) > > >>>>>>>> (Tuple 0,-219806321293) > > >>>>>>>> (Tuple 0,-1258895232699) > > >>>>>>>> (Tuple 0,-4074432596329) > > >>>>>>>> > > >>>>>>>> One line is one emitted window count. This is what happens when > I > > >>>>> remove > > >>>>>>>> the Thread.sleep(1): > > >>>>>>>> (Tuple 0,660676) > > >>>>>>>> (Tuple 0,2553733) > > >>>>>>>> (Tuple 0,3542696) > > >>>>>>>> (Tuple 0,1) > > >>>>>>>> (Tuple 0,1107035) > > >>>>>>>> (Tuple 0,2549491) > > >>>>>>>> (Tuple 0,4100387) > > >>>>>>>> (Tuple 0,-8406583360092) > > >>>>>>>> (Tuple 0,-8406582150743) > > >>>>>>>> (Tuple 0,-8406580427190) > > >>>>>>>> (Tuple 0,-8406580427190) > > >>>>>>>> (Tuple 0,-8406580427190) > > >>>>>>>> (Tuple 0,6847279255682044995) > > >>>>>>>> (Tuple 0,6847279255682044995) > > >>>>>>>> (Tuple 0,-5390528042713628318) > > >>>>>>>> (Tuple 0,-5390528042711551780) > > >>>>>>>> (Tuple 0,-5390528042711551780) > > >>>>>>>> > > >>>>>>>> So at some point the pre-reducer seems to go haywire and does > not > > >>>>> recover > > >>>>>>>> from it. The good thing is that it does produce results now, > where > > >>> the > > >>>>>>>> previous Current/Reduce would simply hang and not produce any > > >> output. > > >>>>>>>> > > >>>>>>>> On Thu, 25 Jun 2015 at 12:02 Gábor Gévay <gga...@gmail.com> > > wrote: > > >>>>>>>> > > >>>>>>>>> Hello, > > >>>>>>>>> > > >>>>>>>>> Aljoscha, can you please try the performance test of > > >> Current/Reduce > > >>>>>>>>> with the InversePreReducer in PR 856? (If you just call sum, it > > >> will > > >>>>>>>>> use an InversePreReducer.) It would be an interesting test, > > >> because > > >>>>>>>>> the inverse function optimization really depends on the stream > > >> being > > >>>>>>>>> ordered, and I think it has the potential of being faster then > > >>>>>>>>> Next/Reduce. Especially if the window size is much larger than > > the > > >>>>>>>>> slide size. > > >>>>>>>>> > > >>>>>>>>> Best regards, > > >>>>>>>>> Gabor > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek < > aljos...@apache.org > > >>> : > > >>>>>>>>>> I think I'll have to elaborate a bit so I created a > > >>> proof-of-concept > > >>>>>>>>>> implementation of my Ideas and ran some throughput > measurements > > >> to > > >>>>>>>>>> alleviate concerns about performance. > > >>>>>>>>>> > > >>>>>>>>>> First, though, I want to highlight again why the current > > approach > > >>>>> does > > >>>>>>>>> not > > >>>>>>>>>> work with out-of-order elements (which, again, occur > constantly > > >> due > > >>>>> to > > >>>>>>>>> the > > >>>>>>>>>> distributed nature of the system). This is the example I > posted > > >>>>>>> earlier: > > >>>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27. The > plan > > >>>>> looks > > >>>>>>>>> like > > >>>>>>>>>> this: > > >>>>>>>>>> > > >>>>>>>>>> +--+ > > >>>>>>>>>> | | Source > > >>>>>>>>>> +--+ > > >>>>>>>>>> | > > >>>>>>>>>> +-----+ > > >>>>>>>>>> | | > > >>>>>>>>>> | +--+ > > >>>>>>>>>> | | | Identity Map > > >>>>>>>>>> | +--+ > > >>>>>>>>>> | | > > >>>>>>>>>> +-----+ > > >>>>>>>>>> | > > >>>>>>>>>> +--+ > > >>>>>>>>>> | | Window > > >>>>>>>>>> +--+ > > >>>>>>>>>> | > > >>>>>>>>>> | > > >>>>>>>>>> +--+ > > >>>>>>>>>> | | Sink > > >>>>>>>>>> +--+ > > >>>>>>>>>> > > >>>>>>>>>> So all it does is pass the elements through an identity map > and > > >>> then > > >>>>>>>>> merge > > >>>>>>>>>> them again before the window operator. The source emits > > ascending > > >>>>>>>>> integers > > >>>>>>>>>> and the window operator has a custom timestamp extractor that > > >> uses > > >>>>> the > > >>>>>>>>>> integer itself as the timestamp and should create windows of > > >> size 4 > > >>>>>>> (that > > >>>>>>>>>> is elements with timestamp 0-3 are one window, the next are > the > > >>>>>>> elements > > >>>>>>>>>> with timestamp 4-8, and so on). Since the topology basically > > >>> doubles > > >>>>>>> the > > >>>>>>>>>> elements form the source I would expect to get these windows: > > >>>>>>>>>> Window: 0, 0, 1, 1, 2, 2, 3, 3 > > >>>>>>>>>> Window: 4, 4, 6, 6, 7, 7, 8, 8 > > >>>>>>>>>> > > >>>>>>>>>> The output is this, however: > > >>>>>>>>>> Window: 0, 1, 2, 3, > > >>>>>>>>>> Window: 4, 0, 1, 2, 3, 4, 5, 6, 7, > > >>>>>>>>>> Window: 8, 9, 10, 11, > > >>>>>>>>>> Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, > > >>>>>>>>>> Window: 16, 17, 18, 19, > > >>>>>>>>>> Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, > > >>>>>>>>>> Window: 24, 25, 26, 27, > > >>>>>>>>>> Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, > > >>>>>>>>>> > > >>>>>>>>>> The reason is that the elements simply arrive out-of-order. > > >> Imagine > > >>>>>>> what > > >>>>>>>>>> would happen if the elements actually arrived with some delay > > >> from > > >>>>>>>>>> different operations. > > >>>>>>>>>> > > >>>>>>>>>> Now, on to the performance numbers. The proof-of-concept I > > >> created > > >>> is > > >>>>>>>>>> available here: > > >>>>>>>>>> > > https://github.com/aljoscha/flink/tree/event-time-window-fn-mock > > >> . > > >>>>> The > > >>>>>>>>> basic > > >>>>>>>>>> idea is that sources assign the current timestamp when > emitting > > >>>>>>> elements. > > >>>>>>>>>> They also periodically emit watermarks that tell us that no > > >>> elements > > >>>>>>> with > > >>>>>>>>>> an earlier timestamp will be emitted. The watermarks propagate > > >>>>> through > > >>>>>>>>> the > > >>>>>>>>>> operators. The window operator looks at the timestamp of an > > >> element > > >>>>>>> and > > >>>>>>>>>> puts it into the buffer that corresponds to that window. When > > the > > >>>>>>> window > > >>>>>>>>>> operator receives a watermark it will look at the in-flight > > >> windows > > >>>>>>>>>> (basically the buffers) and emit those windows where the > > >> window-end > > >>>>> is > > >>>>>>>>>> before the watermark. > > >>>>>>>>>> > > >>>>>>>>>> For measuring throughput I did the following: The source emits > > >>> tuples > > >>>>>>> of > > >>>>>>>>>> the form ("tuple", 1) in an infinite loop. The window operator > > >> sums > > >>>>> up > > >>>>>>>>> the > > >>>>>>>>>> tuples, thereby counting how many tuples the window operator > can > > >>>>>>> handle > > >>>>>>>>> in > > >>>>>>>>>> a given time window. There are two different implementations > for > > >>> the > > >>>>>>>>>> summation: 1) simply summing up the values in a mapWindow(), > > >> there > > >>>>> you > > >>>>>>>>> get > > >>>>>>>>>> a List of all tuples and simple iterate over it. 2) using > > sum(1), > > >>>>>>> which > > >>>>>>>>> is > > >>>>>>>>>> implemented as a reduce() (that uses the pre-reducer > > >>> optimisations). > > >>>>>>>>>> > > >>>>>>>>>> These are the performance numbers (Current is the current > > >>>>>>> implementation, > > >>>>>>>>>> Next is my proof-of-concept): > > >>>>>>>>>> > > >>>>>>>>>> Tumbling (1 sec): > > >>>>>>>>>> - Current/Map: 1.6 mio > > >>>>>>>>>> - Current/Reduce: 2 mio > > >>>>>>>>>> - Next/Map: 2.2 mio > > >>>>>>>>>> - Next/Reduce: 4 mio > > >>>>>>>>>> > > >>>>>>>>>> Sliding (5 sec, slide 1 sec): > > >>>>>>>>>> - Current/Map: ca 3 mio (fluctuates a lot) > > >>>>>>>>>> - Current/Reduce: No output > > >>>>>>>>>> - Next/Map: ca 4 mio (fluctuates) > > >>>>>>>>>> - Next/Reduce: 10 mio > > >>>>>>>>>> > > >>>>>>>>>> The Next/Reduce variant can basically scale indefinitely with > > >>> window > > >>>>>>> size > > >>>>>>>>>> because the internal state does not rely on the number of > > >> elements > > >>>>>>> (it is > > >>>>>>>>>> just the current sum). The pre-reducer for sliding elements > > >> cannot > > >>>>>>> handle > > >>>>>>>>>> the amount of tuples, it produces no output. For the two Map > > >>> variants > > >>>>>>> the > > >>>>>>>>>> performance fluctuates because they always keep all the > elements > > >> in > > >>>>> an > > >>>>>>>>>> internal buffer before emission, this seems to tax the garbage > > >>>>>>> collector > > >>>>>>>>> a > > >>>>>>>>>> bit and leads to random pauses. > > >>>>>>>>>> > > >>>>>>>>>> One thing that should be noted is that I had to disable the > > >>>>>>> fake-element > > >>>>>>>>>> emission thread, otherwise the Current versions would > deadlock. > > >>>>>>>>>> > > >>>>>>>>>> So, I started working on this because I thought that > > out-of-order > > >>>>>>>>>> processing would be necessary for correctness. And it is > > >> certainly, > > >>>>>>> But > > >>>>>>>>> the > > >>>>>>>>>> proof-of-concept also shows that performance can be greatly > > >>> improved. > > >>>>>>>>>> > > >>>>>>>>>> On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <gyula.f...@gmail.com > > > > >>>>> wrote: > > >>>>>>>>>>> > > >>>>>>>>>>> I agree lets separate these topics from each other so we can > > get > > >>>>>>> faster > > >>>>>>>>>>> resolution. > > >>>>>>>>>>> > > >>>>>>>>>>> There is already a state discussion in the thread we started > > >> with > > >>>>>>> Paris. > > >>>>>>>>>>> > > >>>>>>>>>>> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas < > > >>> ktzou...@apache.org > > >>>>>> > > >>>>>>>>>> wrote: > > >>>>>>>>>>> > > >>>>>>>>>>>> I agree with supporting out-of-order out of the box :-), > even > > >> if > > >>>>>>> this > > >>>>>>>>>> means > > >>>>>>>>>>>> a major refactoring. This is the right time to refactor the > > >>>>>>> streaming > > >>>>>>>>>> API > > >>>>>>>>>>>> before we pull it out of beta. I think that this is more > > >>> important > > >>>>>>>>> than > > >>>>>>>>>> new > > >>>>>>>>>>>> features in the streaming API, which can be prioritized once > > >> the > > >>>>>>> API > > >>>>>>>>> is > > >>>>>>>>>> out > > >>>>>>>>>>>> of beta (meaning, that IMO this is the right time to stall > PRs > > >>>>>>> until > > >>>>>>>>> we > > >>>>>>>>>>>> agree on the design). > > >>>>>>>>>>>> > > >>>>>>>>>>>> There are three sections in the document: windowing, state, > > and > > >>>>>>> API. > > >>>>>>>>> How > > >>>>>>>>>>>> convoluted are those with each other? Can we separate the > > >>>>>>> discussion > > >>>>>>>>> or > > >>>>>>>>>> do > > >>>>>>>>>>>> we need to discuss those all together? I think part of the > > >>>>>>> difficulty > > >>>>>>>>> is > > >>>>>>>>>>>> that we are discussing three design choices at once. > > >>>>>>>>>>>> > > >>>>>>>>>>>> On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning < > > >>>>>>> ted.dunn...@gmail.com> > > >>>>>>>>>>>> wrote: > > >>>>>>>>>>>> > > >>>>>>>>>>>>> Out of order is ubiquitous in the real-world. Typically, > > what > > >>>>>>>>>> happens is > > >>>>>>>>>>>>> that businesses will declare a maximum allowable delay for > > >>>>>>> delayed > > >>>>>>>>>>>>> transactions and will commit to results when that delay is > > >>>>>>> reached. > > >>>>>>>>>>>>> Transactions that arrive later than this cutoff are > collected > > >>>>>>>>>> specially > > >>>>>>>>>>>> as > > >>>>>>>>>>>>> corrections which are reported/used when possible. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Clearly, ordering can also be violated during processing, > but > > >> if > > >>>>>>> the > > >>>>>>>>>> data > > >>>>>>>>>>>>> is originally out of order the situation can't be repaired > by > > >>> any > > >>>>>>>>>>>> protocol > > >>>>>>>>>>>>> fixes that prevent transactions from becoming disordered > but > > >> has > > >>>>>>> to > > >>>>>>>>>>>> handled > > >>>>>>>>>>>>> at the data level. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek < > > >>>>>>>>> aljos...@apache.org > > >>>>>>>>>>> > > >>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> I also don't like big changes but sometimes they are > > >> necessary. > > >>>>>>>>> The > > >>>>>>>>>>>>> reason > > >>>>>>>>>>>>>> why I'm so adamant about out-of-order processing is that > > >>>>>>>>>> out-of-order > > >>>>>>>>>>>>>> elements are not some exception that occurs once in a > while; > > >>>>>>> they > > >>>>>>>>>> occur > > >>>>>>>>>>>>>> constantly in a distributed system. For example, in this: > > >>>>>>>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27 the > > >>>>>>>>> resulting > > >>>>>>>>>>>>>> windows > > >>>>>>>>>>>>>> are completely bogus because the current windowing system > > >>>>>>> assumes > > >>>>>>>>>>>>> elements > > >>>>>>>>>>>>>> to globally arrive in order, which is simply not true. > (The > > >>>>>>>>> example > > >>>>>>>>>>>> has a > > >>>>>>>>>>>>>> source that generates increasing integers. Then these pass > > >>>>>>>>> through a > > >>>>>>>>>>>> map > > >>>>>>>>>>>>>> and are unioned with the original DataStream before a > window > > >>>>>>>>>> operator.) > > >>>>>>>>>>>>>> This simulates elements arriving from different operators > at > > >> a > > >>>>>>>>>>>> windowing > > >>>>>>>>>>>>>> operator. The example is also DOP=1, I imagine this to get > > >>>>>>> worse > > >>>>>>>>>> with > > >>>>>>>>>>>>>> higher DOP. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> What do you mean by costly? As I said, I have a > > >>>>>>> proof-of-concept > > >>>>>>>>>>>>> windowing > > >>>>>>>>>>>>>> operator that can handle out-or-order elements. This is an > > >>>>>>> example > > >>>>>>>>>>>> using > > >>>>>>>>>>>>>> the current Flink API: > > >>>>>>>>>>>>>> https://gist.github.com/aljoscha/f8dce0691732e344bbe8. > > >>>>>>>>>>>>>> (It is an infinite source of tuples and a 5 second window > > >>>>>>> operator > > >>>>>>>>>> that > > >>>>>>>>>>>>>> counts the tuples.) The first problem is that this code > > >>>>>>> deadlocks > > >>>>>>>>>>>> because > > >>>>>>>>>>>>>> of the thread that emits fake elements. If I disable the > > fake > > >>>>>>>>>> element > > >>>>>>>>>>>>> code > > >>>>>>>>>>>>>> it works, but the throughput using my mockup is 4 times > > >> higher > > >>>>>>> . > > >>>>>>>>> The > > >>>>>>>>>>>> gap > > >>>>>>>>>>>>>> widens dramatically if the window size increases. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> So, it actually increases performance (unless I'm making a > > >>>>>>> mistake > > >>>>>>>>>> in > > >>>>>>>>>>>> my > > >>>>>>>>>>>>>> explorations) and can handle elements that arrive > > >> out-of-order > > >>>>>>>>>> (which > > >>>>>>>>>>>>>> happens basically always in a real-world windowing > > >> use-cases). > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> On Tue, 23 Jun 2015 at 12:51 Stephan Ewen < > se...@apache.org > > > > > >>>>>>>>> wrote: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> What I like a lot about Aljoscha's proposed design is > that > > >> we > > >>>>>>>>>> need no > > >>>>>>>>>>>>>>> different code for "system time" vs. "event time". It > only > > >>>>>>>>>> differs in > > >>>>>>>>>>>>>> where > > >>>>>>>>>>>>>>> the timestamps are assigned. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> The OOP approach also gives you the semantics of total > > >>>>>>> ordering > > >>>>>>>>>>>> without > > >>>>>>>>>>>>>>> imposing merges on the streams. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax < > > >>>>>>>>>>>>>>> mj...@informatik.hu-berlin.de> wrote: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> I agree that there should be multiple alternatives the > > >>>>>>> user(!) > > >>>>>>>>>> can > > >>>>>>>>>>>>>>>> choose from. Partial out-of-order processing works for > > >>>>>>>>> many/most > > >>>>>>>>>>>>>>>> aggregates. However, if you consider > > >>>>>>> Event-Pattern-Matching, > > >>>>>>>>>> global > > >>>>>>>>>>>>>>>> ordering in necessary (even if the performance penalty > > >>>>>>> might > > >>>>>>>>> be > > >>>>>>>>>>>>> high). > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> I would also keep "system-time windows" as an > alternative > > >>>>>>> to > > >>>>>>>>>>>> "source > > >>>>>>>>>>>>>>>> assigned ts-windows". > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> It might also be interesting to consider the following > > >>>>>>> paper > > >>>>>>>>> for > > >>>>>>>>>>>>>>>> overlapping windows: "Resource sharing in continuous > > >>>>>>>>>> sliding-window > > >>>>>>>>>>>>>>>> aggregates" > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> https://dl.acm.org/citation.cfm?id=1316720 > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> -Matthias > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> On 06/23/2015 10:37 AM, Gyula Fóra wrote: > > >>>>>>>>>>>>>>>>> Hey > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> I think we should not block PRs unnecessarily if your > > >>>>>>>>>> suggested > > >>>>>>>>>>>>>> changes > > >>>>>>>>>>>>>>>>> might touch them at some point. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Also I still think we should not put everything in the > > >>>>>>>>>> Datastream > > >>>>>>>>>>>>>>> because > > >>>>>>>>>>>>>>>>> it will be a huge mess. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Also we need to agree on the out of order processing, > > >>>>>>>>> whether > > >>>>>>>>>> we > > >>>>>>>>>>>>> want > > >>>>>>>>>>>>>>> it > > >>>>>>>>>>>>>>>>> the way you proposed it(which is quite costly). Another > > >>>>>>>>>>>> alternative > > >>>>>>>>>>>>>>>>> approach there which fits in the current windowing is > to > > >>>>>>>>>> filter > > >>>>>>>>>>>> out > > >>>>>>>>>>>>>> if > > >>>>>>>>>>>>>>>>> order events and apply a special handling operator on > > >>>>>>> them. > > >>>>>>>>>> This > > >>>>>>>>>>>>>> would > > >>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>> fairly lightweight. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> My point is that we need to consider some alternative > > >>>>>>>>>> solutions. > > >>>>>>>>>>>>> And > > >>>>>>>>>>>>>> we > > >>>>>>>>>>>>>>>>> should not block contributions along the way. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Cheers > > >>>>>>>>>>>>>>>>> Gyula > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek < > > >>>>>>>>>>>>>> aljos...@apache.org> > > >>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> The reason I posted this now is that we need to think > > >>>>>>> about > > >>>>>>>>>> the > > >>>>>>>>>>>>> API > > >>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>> windowing before proceeding with the PRs of Gabor > > >>>>>>> (inverse > > >>>>>>>>>>>> reduce) > > >>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>> Gyula (removal of "aggregate" functions on > DataStream). > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> For the windowing, I think that the current model does > > >>>>>>> not > > >>>>>>>>>> work > > >>>>>>>>>>>>> for > > >>>>>>>>>>>>>>>>>> out-of-order processing. Therefore, the whole > windowing > > >>>>>>>>>>>>>> infrastructure > > >>>>>>>>>>>>>>>> will > > >>>>>>>>>>>>>>>>>> basically have to be redone. Meaning also that any > work > > >>>>>>> on > > >>>>>>>>>> the > > >>>>>>>>>>>>>>>>>> pre-aggregators or optimizations that we do now > becomes > > >>>>>>>>>> useless. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> For the API, I proposed to restructure the > interactions > > >>>>>>>>>> between > > >>>>>>>>>>>>> all > > >>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>> different *DataStream classes and grouping/windowing. > > >>>>>>> (See > > >>>>>>>>>> API > > >>>>>>>>>>>>>> section > > >>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>> the doc I posted.) > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra < > > >>>>>>>>> gyula.f...@gmail.com > > >>>>>>>>>>> > > >>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Hi Aljoscha, > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Thanks for the nice summary, this is a very good > > >>>>>>>>> initiative. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> I added some comments to the respective sections > > >>>>>>> (where I > > >>>>>>>>>> didnt > > >>>>>>>>>>>>>> fully > > >>>>>>>>>>>>>>>>>> agree > > >>>>>>>>>>>>>>>>>>> :).). > > >>>>>>>>>>>>>>>>>>> At some point I think it would be good to have a > public > > >>>>>>>>>> hangout > > >>>>>>>>>>>>>>> session > > >>>>>>>>>>>>>>>>>> on > > >>>>>>>>>>>>>>>>>>> this, which could make a more dynamic discussion. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Cheers, > > >>>>>>>>>>>>>>>>>>> Gyula > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Aljoscha Krettek <aljos...@apache.org> ezt írta > > >>>>>>> (időpont: > > >>>>>>>>>>>> 2015. > > >>>>>>>>>>>>>> jún. > > >>>>>>>>>>>>>>>>>> 22., > > >>>>>>>>>>>>>>>>>>> H, 21:34): > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Hi, > > >>>>>>>>>>>>>>>>>>>> with people proposing changes to the streaming part > I > > >>>>>>>>> also > > >>>>>>>>>>>>> wanted > > >>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>> throw > > >>>>>>>>>>>>>>>>>>>> my hat into the ring. :D > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> During the last few months, while I was getting > > >>>>>>>>> acquainted > > >>>>>>>>>>>> with > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>> streaming system, I wrote down some thoughts I had > > >>>>>>> about > > >>>>>>>>>> how > > >>>>>>>>>>>>>> things > > >>>>>>>>>>>>>>>>>> could > > >>>>>>>>>>>>>>>>>>>> be improved. Hopefully, they are in somewhat > coherent > > >>>>>>>>> shape > > >>>>>>>>>>>> now, > > >>>>>>>>>>>>>> so > > >>>>>>>>>>>>>>>>>>> please > > >>>>>>>>>>>>>>>>>>>> have a look if you are interested in this: > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>> > > >>>>> > > >>> > > >> > > > https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> This mostly covers: > > >>>>>>>>>>>>>>>>>>>> - Timestamps assigned at sources > > >>>>>>>>>>>>>>>>>>>> - Out-of-order processing of elements in window > > >>>>>>>>> operators > > >>>>>>>>>>>>>>>>>>>> - API design > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Please let me know what you think. Comment in the > > >>>>>>>>> document > > >>>>>>>>>> or > > >>>>>>>>>>>>> here > > >>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>> mailing list. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> I have a PR in the makings that would introduce > source > > >>>>>>>>>>>>> timestamps > > >>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>> watermarks for keeping track of them. I also hacked > a > > >>>>>>>>>>>>>>> proof-of-concept > > >>>>>>>>>>>>>>>>>>> of a > > >>>>>>>>>>>>>>>>>>>> windowing system that is able to process > out-of-order > > >>>>>>>>>> elements > > >>>>>>>>>>>>>>> using a > > >>>>>>>>>>>>>>>>>>>> FlatMap operator. (It uses panes to perform > efficient > > >>>>>>>>>>>>>>>>>> pre-aggregations.) > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Cheers, > > >>>>>>>>>>>>>>>>>>>> Aljoscha > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>>> > > >>>> > > >>> > > >>> > > >> > > > > >