What do you mean by Comment 2? Using the whole window apparatus if you just want to have, for example, a simple partitioned filter with partitioned state seems a bit extravagant.
On Sat, 27 Jun 2015 at 15:19 Matthias J. Sax <mj...@informatik.hu-berlin.de> wrote: > Nice starting point. > > Comment 1: > "Each individual stream partition delivers elements strictly in order." > (in 'Parallel Streams, Partitions, Time, and Ordering') > > I would say "FIFO" and not "strictly in order". If data is not emitted > in-order, the stream partition will not be in-order either. > > Comment 2: > Why do we need KeyedDataStream. You can get everything done with > GroupedDataStream (using a tumbling window of size = 1 tuple). > > > -Matthias > > On 06/26/2015 07:42 PM, Stephan Ewen wrote: > > Here is a first bit of what I have been writing down. Will add more over > > the next days: > > > > > > https://cwiki.apache.org/confluence/display/FLINK/Stream+Windows > > > > > https://cwiki.apache.org/confluence/display/FLINK/Parallel+Streams%2C+Partitions%2C+Time%2C+and+Ordering > > > > > > > > On Thu, Jun 25, 2015 at 6:35 PM, Paris Carbone <par...@kth.se> wrote: > > > >> +1 for writing this down > >> > >>> On 25 Jun 2015, at 18:11, Aljoscha Krettek <aljos...@apache.org> > wrote: > >>> > >>> +1 go ahead > >>> > >>> On Thu, 25 Jun 2015 at 18:02 Stephan Ewen <se...@apache.org> wrote: > >>> > >>>> Hey! > >>>> > >>>> This thread covers many different topics. Lets break this up into > >> separate > >>>> discussions. > >>>> > >>>> - Operator State is already driven by Gyula and Paris and happening on > >> the > >>>> above mentioned pull request and the followup discussions. > >>>> > >>>> - For windowing, this discussion has brought some results that we > should > >>>> sum up and clearly write down. > >>>> I would like to chime in to do that based on what I learned from the > >>>> document and this discussion. I also got some input from Marton about > >> what > >>>> he learned from mapping the Cloud DataFlow constructs to Flink. > >>>> I'll draft a Wiki page (with the help of Aljoscha, Marton) that sums > >>>> this up and documents it for users (if we decide to adopt this). > >>>> Then we run this by Gyula, Matthias Sax and Kostas for feedback. > >>>> > >>>> - API style discussions should be yet another thread. This will > probably > >>>> be opened as people start to address that. > >>>> > >>>> > >>>> I'll try to get a draft of the wiki version out tomorrow noon and send > >> the > >>>> link around. > >>>> > >>>> Greetings, > >>>> Stephan > >>>> > >>>> > >>>> > >>>> On Thu, Jun 25, 2015 at 3:51 PM, Matthias J. Sax < > >>>> mj...@informatik.hu-berlin.de> wrote: > >>>> > >>>>> Sure. I picked this up. Using the current model for "occurrence time > >>>>> semantics" does not work. > >>>>> > >>>>> I elaborated on this in the past many times (but nobody cared). It is > >>>>> important to make it clear to the user what semantics are supported. > >>>>> Claiming to support "sliding windows" doesn't mean anything; there > are > >>>>> too many different semantics out there. :) > >>>>> > >>>>> > >>>>> On 06/25/2015 03:35 PM, Aljoscha Krettek wrote: > >>>>>> Yes, I am aware of this requirement and it would also be supported > in > >>>> my > >>>>>> proposed model. > >>>>>> > >>>>>> The problem is, that the "custom timestamp" feature gives the > >>>> impression > >>>>>> that the elements would be windowed according to a user-timestamp. > The > >>>>>> results, however, are wrong because of the assumption about elements > >>>>>> arriving in order. (This is what I was trying to show with my fancy > >>>> ASCII > >>>>>> art and result output. > >>>>>> > >>>>>> On Thu, 25 Jun 2015 at 15:26 Matthias J. Sax < > >>>>> mj...@informatik.hu-berlin.de> > >>>>>> wrote: > >>>>>> > >>>>>>> Hi Aljoscha, > >>>>>>> > >>>>>>> I like that you are pushing in this direction. However, IMHO you > >>>>>>> misinterpreter the current approach. It does not assume that tuples > >>>>>>> arrive in-order; the current approach has no notion about a > >>>>>>> pre-defined-order (for example, the order in which the event are > >>>>>>> created). There is only the notion of "arrival-order" at the > >> operator. > >>>>>>> From this "arrival-order" perspective, the result are correct(!). > >>>>>>> > >>>>>>> Windowing in the current approach means for example, "sum up an > >>>>>>> attribute of all events you *received* in the last 5 seconds". That > >>>> is a > >>>>>>> different meaning that "sum up an attribute of all event that > >>>> *occurred* > >>>>>>> in the last 5 seconds". Both queries are valid and Flink should > >>>> support > >>>>>>> both IMHO. > >>>>>>> > >>>>>>> > >>>>>>> -Matthias > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On 06/25/2015 03:03 PM, Aljoscha Krettek wrote: > >>>>>>>> Yes, now this also processes about 3 mio Elements (Window Size 5 > >> sec, > >>>>>>> Slide > >>>>>>>> 1 sec) but it still fluctuates a lot between 1 mio. and 5 mio. > >>>>>>>> > >>>>>>>> Performance is not my main concern, however. My concern is that > the > >>>>>>> current > >>>>>>>> model assumes elements to arrive in order, which is simply not > true. > >>>>>>>> > >>>>>>>> In your code you have these lines for specifying the window: > >>>>>>>> .window(Time.of(1l, TimeUnit.SECONDS)) > >>>>>>>> .every(Time.of(1l, TimeUnit.SECONDS)) > >>>>>>>> > >>>>>>>> Although this semantically specifies a tumbling window of size 1 > sec > >>>>> I'm > >>>>>>>> afraid it uses the sliding window logic internally (because of the > >>>>>>>> .every()). > >>>>>>>> > >>>>>>>> In my tests I only have the first line. > >>>>>>>> > >>>>>>>> > >>>>>>>> On Thu, 25 Jun 2015 at 14:32 Gábor Gévay <gga...@gmail.com> > wrote: > >>>>>>>> > >>>>>>>>> I'm very sorry, I had a bug in the InversePreReducer. It should > be > >>>>>>>>> fixed now. Can you please run it again? > >>>>>>>>> > >>>>>>>>> I also tried to reproduce some of your performance numbers, but > I'm > >>>>>>>>> getting only less than 1/10th of yours. For example, in the > >> Tumbling > >>>>>>>>> case, Current/Reduce produces only ~100000 for me. Do you have > any > >>>>>>>>> idea what I could be doing wrong? My code: > >>>>>>>>> http://pastebin.com/zbEjmGhk > >>>>>>>>> I am running it on a 2 GHz Core i7. > >>>>>>>>> > >>>>>>>>> Best regards, > >>>>>>>>> Gabor > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> 2015-06-25 12:31 GMT+02:00 Aljoscha Krettek <aljos...@apache.org > >: > >>>>>>>>>> Hi, > >>>>>>>>>> I also ran the tests on top of PR 856 (inverse reducer) now. The > >>>>>>> results > >>>>>>>>>> seem incorrect. When I insert a Thread.sleep(1) in the tuple > >>>> source, > >>>>>>> all > >>>>>>>>>> the previous tests reported around 3600 tuples (Size 5 sec, > Slide > >> 1 > >>>>>>> sec) > >>>>>>>>>> (Theoretically there would be 5000 tuples in 5 seconds but this > is > >>>>> due > >>>>>>> to > >>>>>>>>>> overhead). These are the results for the inverse reduce > >>>> optimisation: > >>>>>>>>>> (Tuple 0,38) > >>>>>>>>>> (Tuple 0,829) > >>>>>>>>>> (Tuple 0,1625) > >>>>>>>>>> (Tuple 0,2424) > >>>>>>>>>> (Tuple 0,3190) > >>>>>>>>>> (Tuple 0,3198) > >>>>>>>>>> (Tuple 0,-339368) > >>>>>>>>>> (Tuple 0,-1315725) > >>>>>>>>>> (Tuple 0,-2932932) > >>>>>>>>>> (Tuple 0,-5082735) > >>>>>>>>>> (Tuple 0,-7743256) > >>>>>>>>>> (Tuple 0,75701046) > >>>>>>>>>> (Tuple 0,642829470) > >>>>>>>>>> (Tuple 0,2242018381) > >>>>>>>>>> (Tuple 0,5190708618) > >>>>>>>>>> (Tuple 0,10060360311) > >>>>>>>>>> (Tuple 0,-94254951) > >>>>>>>>>> (Tuple 0,-219806321293) > >>>>>>>>>> (Tuple 0,-1258895232699) > >>>>>>>>>> (Tuple 0,-4074432596329) > >>>>>>>>>> > >>>>>>>>>> One line is one emitted window count. This is what happens when > I > >>>>>>> remove > >>>>>>>>>> the Thread.sleep(1): > >>>>>>>>>> (Tuple 0,660676) > >>>>>>>>>> (Tuple 0,2553733) > >>>>>>>>>> (Tuple 0,3542696) > >>>>>>>>>> (Tuple 0,1) > >>>>>>>>>> (Tuple 0,1107035) > >>>>>>>>>> (Tuple 0,2549491) > >>>>>>>>>> (Tuple 0,4100387) > >>>>>>>>>> (Tuple 0,-8406583360092) > >>>>>>>>>> (Tuple 0,-8406582150743) > >>>>>>>>>> (Tuple 0,-8406580427190) > >>>>>>>>>> (Tuple 0,-8406580427190) > >>>>>>>>>> (Tuple 0,-8406580427190) > >>>>>>>>>> (Tuple 0,6847279255682044995) > >>>>>>>>>> (Tuple 0,6847279255682044995) > >>>>>>>>>> (Tuple 0,-5390528042713628318) > >>>>>>>>>> (Tuple 0,-5390528042711551780) > >>>>>>>>>> (Tuple 0,-5390528042711551780) > >>>>>>>>>> > >>>>>>>>>> So at some point the pre-reducer seems to go haywire and does > not > >>>>>>> recover > >>>>>>>>>> from it. The good thing is that it does produce results now, > where > >>>>> the > >>>>>>>>>> previous Current/Reduce would simply hang and not produce any > >>>> output. > >>>>>>>>>> > >>>>>>>>>> On Thu, 25 Jun 2015 at 12:02 Gábor Gévay <gga...@gmail.com> > >> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hello, > >>>>>>>>>>> > >>>>>>>>>>> Aljoscha, can you please try the performance test of > >>>> Current/Reduce > >>>>>>>>>>> with the InversePreReducer in PR 856? (If you just call sum, it > >>>> will > >>>>>>>>>>> use an InversePreReducer.) It would be an interesting test, > >>>> because > >>>>>>>>>>> the inverse function optimization really depends on the stream > >>>> being > >>>>>>>>>>> ordered, and I think it has the potential of being faster then > >>>>>>>>>>> Next/Reduce. Especially if the window size is much larger than > >> the > >>>>>>>>>>> slide size. > >>>>>>>>>>> > >>>>>>>>>>> Best regards, > >>>>>>>>>>> Gabor > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek < > aljos...@apache.org > >>>>> : > >>>>>>>>>>>> I think I'll have to elaborate a bit so I created a > >>>>> proof-of-concept > >>>>>>>>>>>> implementation of my Ideas and ran some throughput > measurements > >>>> to > >>>>>>>>>>>> alleviate concerns about performance. > >>>>>>>>>>>> > >>>>>>>>>>>> First, though, I want to highlight again why the current > >> approach > >>>>>>> does > >>>>>>>>>>> not > >>>>>>>>>>>> work with out-of-order elements (which, again, occur > constantly > >>>> due > >>>>>>> to > >>>>>>>>>>> the > >>>>>>>>>>>> distributed nature of the system). This is the example I > posted > >>>>>>>>> earlier: > >>>>>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27. The > plan > >>>>>>> looks > >>>>>>>>>>> like > >>>>>>>>>>>> this: > >>>>>>>>>>>> > >>>>>>>>>>>> +--+ > >>>>>>>>>>>> | | Source > >>>>>>>>>>>> +--+ > >>>>>>>>>>>> | > >>>>>>>>>>>> +-----+ > >>>>>>>>>>>> | | > >>>>>>>>>>>> | +--+ > >>>>>>>>>>>> | | | Identity Map > >>>>>>>>>>>> | +--+ > >>>>>>>>>>>> | | > >>>>>>>>>>>> +-----+ > >>>>>>>>>>>> | > >>>>>>>>>>>> +--+ > >>>>>>>>>>>> | | Window > >>>>>>>>>>>> +--+ > >>>>>>>>>>>> | > >>>>>>>>>>>> | > >>>>>>>>>>>> +--+ > >>>>>>>>>>>> | | Sink > >>>>>>>>>>>> +--+ > >>>>>>>>>>>> > >>>>>>>>>>>> So all it does is pass the elements through an identity map > and > >>>>> then > >>>>>>>>>>> merge > >>>>>>>>>>>> them again before the window operator. The source emits > >> ascending > >>>>>>>>>>> integers > >>>>>>>>>>>> and the window operator has a custom timestamp extractor that > >>>> uses > >>>>>>> the > >>>>>>>>>>>> integer itself as the timestamp and should create windows of > >>>> size 4 > >>>>>>>>> (that > >>>>>>>>>>>> is elements with timestamp 0-3 are one window, the next are > the > >>>>>>>>> elements > >>>>>>>>>>>> with timestamp 4-8, and so on). Since the topology basically > >>>>> doubles > >>>>>>>>> the > >>>>>>>>>>>> elements form the source I would expect to get these windows: > >>>>>>>>>>>> Window: 0, 0, 1, 1, 2, 2, 3, 3 > >>>>>>>>>>>> Window: 4, 4, 6, 6, 7, 7, 8, 8 > >>>>>>>>>>>> > >>>>>>>>>>>> The output is this, however: > >>>>>>>>>>>> Window: 0, 1, 2, 3, > >>>>>>>>>>>> Window: 4, 0, 1, 2, 3, 4, 5, 6, 7, > >>>>>>>>>>>> Window: 8, 9, 10, 11, > >>>>>>>>>>>> Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, > >>>>>>>>>>>> Window: 16, 17, 18, 19, > >>>>>>>>>>>> Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, > >>>>>>>>>>>> Window: 24, 25, 26, 27, > >>>>>>>>>>>> Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, > >>>>>>>>>>>> > >>>>>>>>>>>> The reason is that the elements simply arrive out-of-order. > >>>> Imagine > >>>>>>>>> what > >>>>>>>>>>>> would happen if the elements actually arrived with some delay > >>>> from > >>>>>>>>>>>> different operations. > >>>>>>>>>>>> > >>>>>>>>>>>> Now, on to the performance numbers. The proof-of-concept I > >>>> created > >>>>> is > >>>>>>>>>>>> available here: > >>>>>>>>>>>> > >> https://github.com/aljoscha/flink/tree/event-time-window-fn-mock > >>>> . > >>>>>>> The > >>>>>>>>>>> basic > >>>>>>>>>>>> idea is that sources assign the current timestamp when > emitting > >>>>>>>>> elements. > >>>>>>>>>>>> They also periodically emit watermarks that tell us that no > >>>>> elements > >>>>>>>>> with > >>>>>>>>>>>> an earlier timestamp will be emitted. The watermarks propagate > >>>>>>> through > >>>>>>>>>>> the > >>>>>>>>>>>> operators. The window operator looks at the timestamp of an > >>>> element > >>>>>>>>> and > >>>>>>>>>>>> puts it into the buffer that corresponds to that window. When > >> the > >>>>>>>>> window > >>>>>>>>>>>> operator receives a watermark it will look at the in-flight > >>>> windows > >>>>>>>>>>>> (basically the buffers) and emit those windows where the > >>>> window-end > >>>>>>> is > >>>>>>>>>>>> before the watermark. > >>>>>>>>>>>> > >>>>>>>>>>>> For measuring throughput I did the following: The source emits > >>>>> tuples > >>>>>>>>> of > >>>>>>>>>>>> the form ("tuple", 1) in an infinite loop. The window operator > >>>> sums > >>>>>>> up > >>>>>>>>>>> the > >>>>>>>>>>>> tuples, thereby counting how many tuples the window operator > can > >>>>>>>>> handle > >>>>>>>>>>> in > >>>>>>>>>>>> a given time window. There are two different implementations > for > >>>>> the > >>>>>>>>>>>> summation: 1) simply summing up the values in a mapWindow(), > >>>> there > >>>>>>> you > >>>>>>>>>>> get > >>>>>>>>>>>> a List of all tuples and simple iterate over it. 2) using > >> sum(1), > >>>>>>>>> which > >>>>>>>>>>> is > >>>>>>>>>>>> implemented as a reduce() (that uses the pre-reducer > >>>>> optimisations). > >>>>>>>>>>>> > >>>>>>>>>>>> These are the performance numbers (Current is the current > >>>>>>>>> implementation, > >>>>>>>>>>>> Next is my proof-of-concept): > >>>>>>>>>>>> > >>>>>>>>>>>> Tumbling (1 sec): > >>>>>>>>>>>> - Current/Map: 1.6 mio > >>>>>>>>>>>> - Current/Reduce: 2 mio > >>>>>>>>>>>> - Next/Map: 2.2 mio > >>>>>>>>>>>> - Next/Reduce: 4 mio > >>>>>>>>>>>> > >>>>>>>>>>>> Sliding (5 sec, slide 1 sec): > >>>>>>>>>>>> - Current/Map: ca 3 mio (fluctuates a lot) > >>>>>>>>>>>> - Current/Reduce: No output > >>>>>>>>>>>> - Next/Map: ca 4 mio (fluctuates) > >>>>>>>>>>>> - Next/Reduce: 10 mio > >>>>>>>>>>>> > >>>>>>>>>>>> The Next/Reduce variant can basically scale indefinitely with > >>>>> window > >>>>>>>>> size > >>>>>>>>>>>> because the internal state does not rely on the number of > >>>> elements > >>>>>>>>> (it is > >>>>>>>>>>>> just the current sum). The pre-reducer for sliding elements > >>>> cannot > >>>>>>>>> handle > >>>>>>>>>>>> the amount of tuples, it produces no output. For the two Map > >>>>> variants > >>>>>>>>> the > >>>>>>>>>>>> performance fluctuates because they always keep all the > elements > >>>> in > >>>>>>> an > >>>>>>>>>>>> internal buffer before emission, this seems to tax the garbage > >>>>>>>>> collector > >>>>>>>>>>> a > >>>>>>>>>>>> bit and leads to random pauses. > >>>>>>>>>>>> > >>>>>>>>>>>> One thing that should be noted is that I had to disable the > >>>>>>>>> fake-element > >>>>>>>>>>>> emission thread, otherwise the Current versions would > deadlock. > >>>>>>>>>>>> > >>>>>>>>>>>> So, I started working on this because I thought that > >> out-of-order > >>>>>>>>>>>> processing would be necessary for correctness. And it is > >>>> certainly, > >>>>>>>>> But > >>>>>>>>>>> the > >>>>>>>>>>>> proof-of-concept also shows that performance can be greatly > >>>>> improved. > >>>>>>>>>>>> > >>>>>>>>>>>> On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <gyula.f...@gmail.com > > > >>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>> I agree lets separate these topics from each other so we can > >> get > >>>>>>>>> faster > >>>>>>>>>>>>> resolution. > >>>>>>>>>>>>> > >>>>>>>>>>>>> There is already a state discussion in the thread we started > >>>> with > >>>>>>>>> Paris. > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas < > >>>>> ktzou...@apache.org > >>>>>>>> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> I agree with supporting out-of-order out of the box :-), > even > >>>> if > >>>>>>>>> this > >>>>>>>>>>>> means > >>>>>>>>>>>>>> a major refactoring. This is the right time to refactor the > >>>>>>>>> streaming > >>>>>>>>>>>> API > >>>>>>>>>>>>>> before we pull it out of beta. I think that this is more > >>>>> important > >>>>>>>>>>> than > >>>>>>>>>>>> new > >>>>>>>>>>>>>> features in the streaming API, which can be prioritized once > >>>> the > >>>>>>>>> API > >>>>>>>>>>> is > >>>>>>>>>>>> out > >>>>>>>>>>>>>> of beta (meaning, that IMO this is the right time to stall > PRs > >>>>>>>>> until > >>>>>>>>>>> we > >>>>>>>>>>>>>> agree on the design). > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> There are three sections in the document: windowing, state, > >> and > >>>>>>>>> API. > >>>>>>>>>>> How > >>>>>>>>>>>>>> convoluted are those with each other? Can we separate the > >>>>>>>>> discussion > >>>>>>>>>>> or > >>>>>>>>>>>> do > >>>>>>>>>>>>>> we need to discuss those all together? I think part of the > >>>>>>>>> difficulty > >>>>>>>>>>> is > >>>>>>>>>>>>>> that we are discussing three design choices at once. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning < > >>>>>>>>> ted.dunn...@gmail.com> > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Out of order is ubiquitous in the real-world. Typically, > >> what > >>>>>>>>>>>> happens is > >>>>>>>>>>>>>>> that businesses will declare a maximum allowable delay for > >>>>>>>>> delayed > >>>>>>>>>>>>>>> transactions and will commit to results when that delay is > >>>>>>>>> reached. > >>>>>>>>>>>>>>> Transactions that arrive later than this cutoff are > collected > >>>>>>>>>>>> specially > >>>>>>>>>>>>>> as > >>>>>>>>>>>>>>> corrections which are reported/used when possible. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Clearly, ordering can also be violated during processing, > but > >>>> if > >>>>>>>>> the > >>>>>>>>>>>> data > >>>>>>>>>>>>>>> is originally out of order the situation can't be repaired > by > >>>>> any > >>>>>>>>>>>>>> protocol > >>>>>>>>>>>>>>> fixes that prevent transactions from becoming disordered > but > >>>> has > >>>>>>>>> to > >>>>>>>>>>>>>> handled > >>>>>>>>>>>>>>> at the data level. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek < > >>>>>>>>>>> aljos...@apache.org > >>>>>>>>>>>>> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I also don't like big changes but sometimes they are > >>>> necessary. > >>>>>>>>>>> The > >>>>>>>>>>>>>>> reason > >>>>>>>>>>>>>>>> why I'm so adamant about out-of-order processing is that > >>>>>>>>>>>> out-of-order > >>>>>>>>>>>>>>>> elements are not some exception that occurs once in a > while; > >>>>>>>>> they > >>>>>>>>>>>> occur > >>>>>>>>>>>>>>>> constantly in a distributed system. For example, in this: > >>>>>>>>>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27 the > >>>>>>>>>>> resulting > >>>>>>>>>>>>>>>> windows > >>>>>>>>>>>>>>>> are completely bogus because the current windowing system > >>>>>>>>> assumes > >>>>>>>>>>>>>>> elements > >>>>>>>>>>>>>>>> to globally arrive in order, which is simply not true. > (The > >>>>>>>>>>> example > >>>>>>>>>>>>>> has a > >>>>>>>>>>>>>>>> source that generates increasing integers. Then these pass > >>>>>>>>>>> through a > >>>>>>>>>>>>>> map > >>>>>>>>>>>>>>>> and are unioned with the original DataStream before a > window > >>>>>>>>>>>> operator.) > >>>>>>>>>>>>>>>> This simulates elements arriving from different operators > at > >>>> a > >>>>>>>>>>>>>> windowing > >>>>>>>>>>>>>>>> operator. The example is also DOP=1, I imagine this to get > >>>>>>>>> worse > >>>>>>>>>>>> with > >>>>>>>>>>>>>>>> higher DOP. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> What do you mean by costly? As I said, I have a > >>>>>>>>> proof-of-concept > >>>>>>>>>>>>>>> windowing > >>>>>>>>>>>>>>>> operator that can handle out-or-order elements. This is an > >>>>>>>>> example > >>>>>>>>>>>>>> using > >>>>>>>>>>>>>>>> the current Flink API: > >>>>>>>>>>>>>>>> https://gist.github.com/aljoscha/f8dce0691732e344bbe8. > >>>>>>>>>>>>>>>> (It is an infinite source of tuples and a 5 second window > >>>>>>>>> operator > >>>>>>>>>>>> that > >>>>>>>>>>>>>>>> counts the tuples.) The first problem is that this code > >>>>>>>>> deadlocks > >>>>>>>>>>>>>> because > >>>>>>>>>>>>>>>> of the thread that emits fake elements. If I disable the > >> fake > >>>>>>>>>>>> element > >>>>>>>>>>>>>>> code > >>>>>>>>>>>>>>>> it works, but the throughput using my mockup is 4 times > >>>> higher > >>>>>>>>> . > >>>>>>>>>>> The > >>>>>>>>>>>>>> gap > >>>>>>>>>>>>>>>> widens dramatically if the window size increases. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> So, it actually increases performance (unless I'm making a > >>>>>>>>> mistake > >>>>>>>>>>>> in > >>>>>>>>>>>>>> my > >>>>>>>>>>>>>>>> explorations) and can handle elements that arrive > >>>> out-of-order > >>>>>>>>>>>> (which > >>>>>>>>>>>>>>>> happens basically always in a real-world windowing > >>>> use-cases). > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Tue, 23 Jun 2015 at 12:51 Stephan Ewen < > se...@apache.org > >>> > >>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> What I like a lot about Aljoscha's proposed design is > that > >>>> we > >>>>>>>>>>>> need no > >>>>>>>>>>>>>>>>> different code for "system time" vs. "event time". It > only > >>>>>>>>>>>> differs in > >>>>>>>>>>>>>>>> where > >>>>>>>>>>>>>>>>> the timestamps are assigned. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> The OOP approach also gives you the semantics of total > >>>>>>>>> ordering > >>>>>>>>>>>>>> without > >>>>>>>>>>>>>>>>> imposing merges on the streams. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax < > >>>>>>>>>>>>>>>>> mj...@informatik.hu-berlin.de> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I agree that there should be multiple alternatives the > >>>>>>>>> user(!) > >>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>> choose from. Partial out-of-order processing works for > >>>>>>>>>>> many/most > >>>>>>>>>>>>>>>>>> aggregates. However, if you consider > >>>>>>>>> Event-Pattern-Matching, > >>>>>>>>>>>> global > >>>>>>>>>>>>>>>>>> ordering in necessary (even if the performance penalty > >>>>>>>>> might > >>>>>>>>>>> be > >>>>>>>>>>>>>>> high). > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I would also keep "system-time windows" as an > alternative > >>>>>>>>> to > >>>>>>>>>>>>>> "source > >>>>>>>>>>>>>>>>>> assigned ts-windows". > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> It might also be interesting to consider the following > >>>>>>>>> paper > >>>>>>>>>>> for > >>>>>>>>>>>>>>>>>> overlapping windows: "Resource sharing in continuous > >>>>>>>>>>>> sliding-window > >>>>>>>>>>>>>>>>>> aggregates" > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> https://dl.acm.org/citation.cfm?id=1316720 > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On 06/23/2015 10:37 AM, Gyula Fóra wrote: > >>>>>>>>>>>>>>>>>>> Hey > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> I think we should not block PRs unnecessarily if your > >>>>>>>>>>>> suggested > >>>>>>>>>>>>>>>> changes > >>>>>>>>>>>>>>>>>>> might touch them at some point. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Also I still think we should not put everything in the > >>>>>>>>>>>> Datastream > >>>>>>>>>>>>>>>>> because > >>>>>>>>>>>>>>>>>>> it will be a huge mess. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Also we need to agree on the out of order processing, > >>>>>>>>>>> whether > >>>>>>>>>>>> we > >>>>>>>>>>>>>>> want > >>>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>> the way you proposed it(which is quite costly). Another > >>>>>>>>>>>>>> alternative > >>>>>>>>>>>>>>>>>>> approach there which fits in the current windowing is > to > >>>>>>>>>>>> filter > >>>>>>>>>>>>>> out > >>>>>>>>>>>>>>>> if > >>>>>>>>>>>>>>>>>>> order events and apply a special handling operator on > >>>>>>>>> them. > >>>>>>>>>>>> This > >>>>>>>>>>>>>>>> would > >>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>> fairly lightweight. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> My point is that we need to consider some alternative > >>>>>>>>>>>> solutions. > >>>>>>>>>>>>>>> And > >>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>> should not block contributions along the way. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Cheers > >>>>>>>>>>>>>>>>>>> Gyula > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek < > >>>>>>>>>>>>>>>> aljos...@apache.org> > >>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> The reason I posted this now is that we need to think > >>>>>>>>> about > >>>>>>>>>>>> the > >>>>>>>>>>>>>>> API > >>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>> windowing before proceeding with the PRs of Gabor > >>>>>>>>> (inverse > >>>>>>>>>>>>>> reduce) > >>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>> Gyula (removal of "aggregate" functions on > DataStream). > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> For the windowing, I think that the current model does > >>>>>>>>> not > >>>>>>>>>>>> work > >>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>> out-of-order processing. Therefore, the whole > windowing > >>>>>>>>>>>>>>>> infrastructure > >>>>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>>> basically have to be redone. Meaning also that any > work > >>>>>>>>> on > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> pre-aggregators or optimizations that we do now > becomes > >>>>>>>>>>>> useless. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> For the API, I proposed to restructure the > interactions > >>>>>>>>>>>> between > >>>>>>>>>>>>>>> all > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> different *DataStream classes and grouping/windowing. > >>>>>>>>> (See > >>>>>>>>>>>> API > >>>>>>>>>>>>>>>> section > >>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>> the doc I posted.) > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra < > >>>>>>>>>>> gyula.f...@gmail.com > >>>>>>>>>>>>> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Hi Aljoscha, > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Thanks for the nice summary, this is a very good > >>>>>>>>>>> initiative. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> I added some comments to the respective sections > >>>>>>>>> (where I > >>>>>>>>>>>> didnt > >>>>>>>>>>>>>>>> fully > >>>>>>>>>>>>>>>>>>>> agree > >>>>>>>>>>>>>>>>>>>>> :).). > >>>>>>>>>>>>>>>>>>>>> At some point I think it would be good to have a > public > >>>>>>>>>>>> hangout > >>>>>>>>>>>>>>>>> session > >>>>>>>>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>>>>>>> this, which could make a more dynamic discussion. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>>>>>> Gyula > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Aljoscha Krettek <aljos...@apache.org> ezt írta > >>>>>>>>> (időpont: > >>>>>>>>>>>>>> 2015. > >>>>>>>>>>>>>>>> jún. > >>>>>>>>>>>>>>>>>>>> 22., > >>>>>>>>>>>>>>>>>>>>> H, 21:34): > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>>>>>>>>>> with people proposing changes to the streaming part > I > >>>>>>>>>>> also > >>>>>>>>>>>>>>> wanted > >>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>> throw > >>>>>>>>>>>>>>>>>>>>>> my hat into the ring. :D > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> During the last few months, while I was getting > >>>>>>>>>>> acquainted > >>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> streaming system, I wrote down some thoughts I had > >>>>>>>>> about > >>>>>>>>>>>> how > >>>>>>>>>>>>>>>> things > >>>>>>>>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>>>>>>> be improved. Hopefully, they are in somewhat > coherent > >>>>>>>>>>> shape > >>>>>>>>>>>>>> now, > >>>>>>>>>>>>>>>> so > >>>>>>>>>>>>>>>>>>>>> please > >>>>>>>>>>>>>>>>>>>>>> have a look if you are interested in this: > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>> > >>>> > >> > https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> This mostly covers: > >>>>>>>>>>>>>>>>>>>>>> - Timestamps assigned at sources > >>>>>>>>>>>>>>>>>>>>>> - Out-of-order processing of elements in window > >>>>>>>>>>> operators > >>>>>>>>>>>>>>>>>>>>>> - API design > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Please let me know what you think. Comment in the > >>>>>>>>>>> document > >>>>>>>>>>>> or > >>>>>>>>>>>>>>> here > >>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> mailing list. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> I have a PR in the makings that would introduce > source > >>>>>>>>>>>>>>> timestamps > >>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>> watermarks for keeping track of them. I also hacked > a > >>>>>>>>>>>>>>>>> proof-of-concept > >>>>>>>>>>>>>>>>>>>>> of a > >>>>>>>>>>>>>>>>>>>>>> windowing system that is able to process > out-of-order > >>>>>>>>>>>> elements > >>>>>>>>>>>>>>>>> using a > >>>>>>>>>>>>>>>>>>>>>> FlatMap operator. (It uses panes to perform > efficient > >>>>>>>>>>>>>>>>>>>> pre-aggregations.) > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>>>>>>> Aljoscha > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>> > >> > >> > > > >