Thanks for clarifying. For map/flapMap it makes sense. However, I would handle filter differently. Even if the re-grouping can be removed in an optimization step internally -- what might also be possible for map/flatMap -- I think it is annoying for the user to specify .byKey() twice...
-> stream.byKey(MYKEY).filter(...).byKey(MYKEY).window(...) instead of the shorter -> stream.byKey(MYKEY).filter(...).window(...) Furthermore, I agree, that the use should choose global order explicitly. However, I disagree at some point: 1) OrderedDataStream in not non-parallel from my point of view. It only requires to receive all tuples within a Stream-Partition in order (not sure, if there is any use-case for this though). 2) Why not supporting OrderedNonParallelStream? NonParallelStream is already non-parallel, so adding global ordering on top if it should be no problem (and sounds useful to me). -Matthias On 07/23/2015 11:37 AM, Aljoscha Krettek wrote: > Hi, > I'll try to answer these one at a time: > > 1) After a map/flatMap the key (and partitioning) of the data is lost, > that's why it goes back to a vanilla DataStream. I think filter also has > this behavior just to fit in with the other ones. Also, any chain of > filter/map/flatMap can also be expressed in a single flatMap. > > 2) An OrderedDataStream would require a global ordering of the elements > which is an inherently non-parallel operation. With the API rework we want > to make fast (i.e. parallel, in most cases) operations more prominent while > making it hard to specify operations that cannot run in parallel. As for > windows, the ordering of the elements inside a window is decoupled from any > ordering of a DataStream, the user should be able to specify whether his > windowing policies or windowing functions require the input elements to be > ordered by timestamp. > > Now for the Discretization. I think it makes sense to keep it in the same > document at the beginning but this has the potential to grow very large, at > which point it should be moved to it's own document. (Exactly what Gyula > suggested.) > > On Thu, 23 Jul 2015 at 11:22 Matthias J. Sax <mj...@informatik.hu-berlin.de> > wrote: > >> I just had a look into the "Streams+and+Operations+on+Streams" document. >> >> The initial figure is confusing... (it makes sense after reading the >> document but is a bumper in the beginning) >> >> >> A few comments/question: >> >> 1) Why is a (Ordered)KeyedDataStream converted into a DataStream if >> map/flatMap/filter is applied? (for broadcast/rebalance it makes sense >> to me) >> >> 2) Why is there only an ordered KEYED-DataStream? An >> NonParallelWindowStream might require ordered input, too. Maybe, even an >> OrderedDataStream, ie, non-keyed, would make sense, too. >> >> >> @Aljoscha: Decoupling API and window implementation is mandatory of >> course (from my point of view). >> >> @Gyula: I would prefer to extend the current document. It is easier to >> follow if all information is on a single place and not spread out. >> >> >> -Matthias >> >> On 07/23/2015 10:57 AM, Gyula Fóra wrote: >>> I think aside from the Discretization part we reached a consensus. I >> think >>> you can start with the implementation for the rest. >>> >>> I will do some updates to the Discretization part, and might even start a >>> new doc if it gets too long. >>> >>> Gyula >>> >>> Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2015. júl. >> 23., >>> Cs, 10:47): >>> >>>> What's the status of the discussion? What are the opinions on the >> reworking >>>> of the Streaming API as presented here: >>>> >>>> >> https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams >>>> >>>> If we could reach a consensus I would like to start working on this to >> have >>>> it done before the next release. In the process of this I would also >> like >>>> to decouple the current windowing implementation from the API to make it >>>> possible to select different windowing systems per job, as outlined >> here: >>>> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=60624830 >>>> >>>> On Mon, 29 Jun 2015 at 10:55 Stephan Ewen <se...@apache.org> wrote: >>>> >>>>> @Matthias: >>>>> >>>>> I think using the KeyedDataStream will simply result in smaller >> programs. >>>>> May be hard for some users to make the connection to a >>>>> 1-element-tumbling-window, simply because they want to use state. Not >>>>> everyone is a deep into that stuff as you are ;-) >>>>> >>>>> >>>>> On Sun, Jun 28, 2015 at 1:13 AM, Matthias J. Sax < >>>>> mj...@informatik.hu-berlin.de> wrote: >>>>> >>>>>> Yes. But as I said, you can get the same behavior with a >>>>>> GroupedDataStream using a tumbling 1-tuple-size window. Thus, there is >>>>>> no conceptual advantage in using KeyedDataStream and no disadvantage >> in >>>>>> binding stateful operations to GroupedDataStreams. >>>>>> >>>>>> On 06/27/2015 06:54 PM, Márton Balassi wrote: >>>>>>> @Matthias: Your point of working with a minimal number of clear >>>>> concepts >>>>>> is >>>>>>> desirable to say the least. :) >>>>>>> >>>>>>> The reasoning behind the KeyedDatastream is to associate Flink >>>>> persisted >>>>>>> operator state with the keys of the data that produced it, so that >>>>>> stateful >>>>>>> computation becomes scalabe in the future. This should not be tied to >>>>> the >>>>>>> GroupedDataStream, especially not if we are removing the option to >>>>> create >>>>>>> groups without windows as proposed on the Wiki by Stephan. >>>>>>> >>>>>>> On Sat, Jun 27, 2015 at 4:15 PM, Matthias J. Sax < >>>>>>> mj...@informatik.hu-berlin.de> wrote: >>>>>>> >>>>>>>> This was more a conceptual point-of-view argument. From an >>>>>>>> implementation point of view, skipping the window building step is a >>>>>>>> good idea if a tumbling 1-tuple-size window is detected. >>>>>>>> >>>>>>>> I prefer to work with a minimum number of concepts (and apply >>>> internal >>>>>>>> optimization if possible) instead of using redundant concepts for >>>>>>>> special cases. Of course, this is my personal point of view. >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> On 06/27/2015 03:47 PM, Aljoscha Krettek wrote: >>>>>>>>> What do you mean by Comment 2? Using the whole window apparatus if >>>>> you >>>>>>>> just >>>>>>>>> want to have, for example, a simple partitioned filter with >>>>> partitioned >>>>>>>>> state seems a bit extravagant. >>>>>>>>> >>>>>>>>> On Sat, 27 Jun 2015 at 15:19 Matthias J. Sax < >>>>>>>> mj...@informatik.hu-berlin.de> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Nice starting point. >>>>>>>>>> >>>>>>>>>> Comment 1: >>>>>>>>>> "Each individual stream partition delivers elements strictly in >>>>>> order." >>>>>>>>>> (in 'Parallel Streams, Partitions, Time, and Ordering') >>>>>>>>>> >>>>>>>>>> I would say "FIFO" and not "strictly in order". If data is not >>>>> emitted >>>>>>>>>> in-order, the stream partition will not be in-order either. >>>>>>>>>> >>>>>>>>>> Comment 2: >>>>>>>>>> Why do we need KeyedDataStream. You can get everything done with >>>>>>>>>> GroupedDataStream (using a tumbling window of size = 1 tuple). >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -Matthias >>>>>>>>>> >>>>>>>>>> On 06/26/2015 07:42 PM, Stephan Ewen wrote: >>>>>>>>>>> Here is a first bit of what I have been writing down. Will add >>>> more >>>>>>>> over >>>>>>>>>>> the next days: >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/Stream+Windows >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>>> >> https://cwiki.apache.org/confluence/display/FLINK/Parallel+Streams%2C+Partitions%2C+Time%2C+and+Ordering >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Jun 25, 2015 at 6:35 PM, Paris Carbone <par...@kth.se> >>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> +1 for writing this down >>>>>>>>>>>> >>>>>>>>>>>>> On 25 Jun 2015, at 18:11, Aljoscha Krettek < >>>> aljos...@apache.org> >>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> +1 go ahead >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, 25 Jun 2015 at 18:02 Stephan Ewen <se...@apache.org> >>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hey! >>>>>>>>>>>>>> >>>>>>>>>>>>>> This thread covers many different topics. Lets break this up >>>>> into >>>>>>>>>>>> separate >>>>>>>>>>>>>> discussions. >>>>>>>>>>>>>> >>>>>>>>>>>>>> - Operator State is already driven by Gyula and Paris and >>>>>> happening >>>>>>>> on >>>>>>>>>>>> the >>>>>>>>>>>>>> above mentioned pull request and the followup discussions. >>>>>>>>>>>>>> >>>>>>>>>>>>>> - For windowing, this discussion has brought some results that >>>>> we >>>>>>>>>> should >>>>>>>>>>>>>> sum up and clearly write down. >>>>>>>>>>>>>> I would like to chime in to do that based on what I learned >>>>> from >>>>>>>> the >>>>>>>>>>>>>> document and this discussion. I also got some input from >>>> Marton >>>>>>>> about >>>>>>>>>>>> what >>>>>>>>>>>>>> he learned from mapping the Cloud DataFlow constructs to >>>> Flink. >>>>>>>>>>>>>> I'll draft a Wiki page (with the help of Aljoscha, Marton) >>>>> that >>>>>>>> sums >>>>>>>>>>>>>> this up and documents it for users (if we decide to adopt >>>> this). >>>>>>>>>>>>>> Then we run this by Gyula, Matthias Sax and Kostas for >>>>> feedback. >>>>>>>>>>>>>> >>>>>>>>>>>>>> - API style discussions should be yet another thread. This >>>> will >>>>>>>>>> probably >>>>>>>>>>>>>> be opened as people start to address that. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> I'll try to get a draft of the wiki version out tomorrow noon >>>>> and >>>>>>>> send >>>>>>>>>>>> the >>>>>>>>>>>>>> link around. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Greetings, >>>>>>>>>>>>>> Stephan >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Jun 25, 2015 at 3:51 PM, Matthias J. Sax < >>>>>>>>>>>>>> mj...@informatik.hu-berlin.de> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Sure. I picked this up. Using the current model for >>>> "occurrence >>>>>>>> time >>>>>>>>>>>>>>> semantics" does not work. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I elaborated on this in the past many times (but nobody >>>> cared). >>>>>> It >>>>>>>> is >>>>>>>>>>>>>>> important to make it clear to the user what semantics are >>>>>>>> supported. >>>>>>>>>>>>>>> Claiming to support "sliding windows" doesn't mean anything; >>>>>> there >>>>>>>>>> are >>>>>>>>>>>>>>> too many different semantics out there. :) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 06/25/2015 03:35 PM, Aljoscha Krettek wrote: >>>>>>>>>>>>>>>> Yes, I am aware of this requirement and it would also be >>>>>> supported >>>>>>>>>> in >>>>>>>>>>>>>> my >>>>>>>>>>>>>>>> proposed model. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The problem is, that the "custom timestamp" feature gives >>>> the >>>>>>>>>>>>>> impression >>>>>>>>>>>>>>>> that the elements would be windowed according to a >>>>>> user-timestamp. >>>>>>>>>> The >>>>>>>>>>>>>>>> results, however, are wrong because of the assumption about >>>>>>>> elements >>>>>>>>>>>>>>>> arriving in order. (This is what I was trying to show with >>>> my >>>>>>>> fancy >>>>>>>>>>>>>> ASCII >>>>>>>>>>>>>>>> art and result output. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Thu, 25 Jun 2015 at 15:26 Matthias J. Sax < >>>>>>>>>>>>>>> mj...@informatik.hu-berlin.de> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi Aljoscha, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I like that you are pushing in this direction. However, >>>> IMHO >>>>>> you >>>>>>>>>>>>>>>>> misinterpreter the current approach. It does not assume >>>> that >>>>>>>> tuples >>>>>>>>>>>>>>>>> arrive in-order; the current approach has no notion about a >>>>>>>>>>>>>>>>> pre-defined-order (for example, the order in which the >>>> event >>>>>> are >>>>>>>>>>>>>>>>> created). There is only the notion of "arrival-order" at >>>> the >>>>>>>>>>>> operator. >>>>>>>>>>>>>>>>> From this "arrival-order" perspective, the result are >>>>>> correct(!). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Windowing in the current approach means for example, "sum >>>> up >>>>> an >>>>>>>>>>>>>>>>> attribute of all events you *received* in the last 5 >>>>> seconds". >>>>>>>> That >>>>>>>>>>>>>> is a >>>>>>>>>>>>>>>>> different meaning that "sum up an attribute of all event >>>> that >>>>>>>>>>>>>> *occurred* >>>>>>>>>>>>>>>>> in the last 5 seconds". Both queries are valid and Flink >>>>> should >>>>>>>>>>>>>> support >>>>>>>>>>>>>>>>> both IMHO. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On 06/25/2015 03:03 PM, Aljoscha Krettek wrote: >>>>>>>>>>>>>>>>>> Yes, now this also processes about 3 mio Elements (Window >>>>>> Size 5 >>>>>>>>>>>> sec, >>>>>>>>>>>>>>>>> Slide >>>>>>>>>>>>>>>>>> 1 sec) but it still fluctuates a lot between 1 mio. and 5 >>>>> mio. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Performance is not my main concern, however. My concern is >>>>>> that >>>>>>>>>> the >>>>>>>>>>>>>>>>> current >>>>>>>>>>>>>>>>>> model assumes elements to arrive in order, which is simply >>>>> not >>>>>>>>>> true. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> In your code you have these lines for specifying the >>>> window: >>>>>>>>>>>>>>>>>> .window(Time.of(1l, TimeUnit.SECONDS)) >>>>>>>>>>>>>>>>>> .every(Time.of(1l, TimeUnit.SECONDS)) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Although this semantically specifies a tumbling window of >>>>>> size 1 >>>>>>>>>> sec >>>>>>>>>>>>>>> I'm >>>>>>>>>>>>>>>>>> afraid it uses the sliding window logic internally >>>> (because >>>>> of >>>>>>>> the >>>>>>>>>>>>>>>>>> .every()). >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> In my tests I only have the first line. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Thu, 25 Jun 2015 at 14:32 Gábor Gévay < >>>> gga...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I'm very sorry, I had a bug in the InversePreReducer. It >>>>>> should >>>>>>>>>> be >>>>>>>>>>>>>>>>>>> fixed now. Can you please run it again? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I also tried to reproduce some of your performance >>>> numbers, >>>>>> but >>>>>>>>>> I'm >>>>>>>>>>>>>>>>>>> getting only less than 1/10th of yours. For example, in >>>> the >>>>>>>>>>>> Tumbling >>>>>>>>>>>>>>>>>>> case, Current/Reduce produces only ~100000 for me. Do you >>>>>> have >>>>>>>>>> any >>>>>>>>>>>>>>>>>>> idea what I could be doing wrong? My code: >>>>>>>>>>>>>>>>>>> http://pastebin.com/zbEjmGhk >>>>>>>>>>>>>>>>>>> I am running it on a 2 GHz Core i7. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Best regards, >>>>>>>>>>>>>>>>>>> Gabor >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 2015-06-25 12:31 GMT+02:00 Aljoscha Krettek < >>>>>>>> aljos...@apache.org >>>>>>>>>>> : >>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>> I also ran the tests on top of PR 856 (inverse reducer) >>>>> now. >>>>>>>> The >>>>>>>>>>>>>>>>> results >>>>>>>>>>>>>>>>>>>> seem incorrect. When I insert a Thread.sleep(1) in the >>>>> tuple >>>>>>>>>>>>>> source, >>>>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>>>>> the previous tests reported around 3600 tuples (Size 5 >>>>> sec, >>>>>>>>>> Slide >>>>>>>>>>>> 1 >>>>>>>>>>>>>>>>> sec) >>>>>>>>>>>>>>>>>>>> (Theoretically there would be 5000 tuples in 5 seconds >>>> but >>>>>>>> this >>>>>>>>>> is >>>>>>>>>>>>>>> due >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> overhead). These are the results for the inverse reduce >>>>>>>>>>>>>> optimisation: >>>>>>>>>>>>>>>>>>>> (Tuple 0,38) >>>>>>>>>>>>>>>>>>>> (Tuple 0,829) >>>>>>>>>>>>>>>>>>>> (Tuple 0,1625) >>>>>>>>>>>>>>>>>>>> (Tuple 0,2424) >>>>>>>>>>>>>>>>>>>> (Tuple 0,3190) >>>>>>>>>>>>>>>>>>>> (Tuple 0,3198) >>>>>>>>>>>>>>>>>>>> (Tuple 0,-339368) >>>>>>>>>>>>>>>>>>>> (Tuple 0,-1315725) >>>>>>>>>>>>>>>>>>>> (Tuple 0,-2932932) >>>>>>>>>>>>>>>>>>>> (Tuple 0,-5082735) >>>>>>>>>>>>>>>>>>>> (Tuple 0,-7743256) >>>>>>>>>>>>>>>>>>>> (Tuple 0,75701046) >>>>>>>>>>>>>>>>>>>> (Tuple 0,642829470) >>>>>>>>>>>>>>>>>>>> (Tuple 0,2242018381) >>>>>>>>>>>>>>>>>>>> (Tuple 0,5190708618) >>>>>>>>>>>>>>>>>>>> (Tuple 0,10060360311) >>>>>>>>>>>>>>>>>>>> (Tuple 0,-94254951) >>>>>>>>>>>>>>>>>>>> (Tuple 0,-219806321293) >>>>>>>>>>>>>>>>>>>> (Tuple 0,-1258895232699) >>>>>>>>>>>>>>>>>>>> (Tuple 0,-4074432596329) >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> One line is one emitted window count. This is what >>>> happens >>>>>>>> when >>>>>>>>>> I >>>>>>>>>>>>>>>>> remove >>>>>>>>>>>>>>>>>>>> the Thread.sleep(1): >>>>>>>>>>>>>>>>>>>> (Tuple 0,660676) >>>>>>>>>>>>>>>>>>>> (Tuple 0,2553733) >>>>>>>>>>>>>>>>>>>> (Tuple 0,3542696) >>>>>>>>>>>>>>>>>>>> (Tuple 0,1) >>>>>>>>>>>>>>>>>>>> (Tuple 0,1107035) >>>>>>>>>>>>>>>>>>>> (Tuple 0,2549491) >>>>>>>>>>>>>>>>>>>> (Tuple 0,4100387) >>>>>>>>>>>>>>>>>>>> (Tuple 0,-8406583360092) >>>>>>>>>>>>>>>>>>>> (Tuple 0,-8406582150743) >>>>>>>>>>>>>>>>>>>> (Tuple 0,-8406580427190) >>>>>>>>>>>>>>>>>>>> (Tuple 0,-8406580427190) >>>>>>>>>>>>>>>>>>>> (Tuple 0,-8406580427190) >>>>>>>>>>>>>>>>>>>> (Tuple 0,6847279255682044995) >>>>>>>>>>>>>>>>>>>> (Tuple 0,6847279255682044995) >>>>>>>>>>>>>>>>>>>> (Tuple 0,-5390528042713628318) >>>>>>>>>>>>>>>>>>>> (Tuple 0,-5390528042711551780) >>>>>>>>>>>>>>>>>>>> (Tuple 0,-5390528042711551780) >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> So at some point the pre-reducer seems to go haywire and >>>>>> does >>>>>>>>>> not >>>>>>>>>>>>>>>>> recover >>>>>>>>>>>>>>>>>>>> from it. The good thing is that it does produce results >>>>> now, >>>>>>>>>> where >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> previous Current/Reduce would simply hang and not >>>> produce >>>>>> any >>>>>>>>>>>>>> output. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Thu, 25 Jun 2015 at 12:02 Gábor Gévay < >>>>> gga...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Hello, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Aljoscha, can you please try the performance test of >>>>>>>>>>>>>> Current/Reduce >>>>>>>>>>>>>>>>>>>>> with the InversePreReducer in PR 856? (If you just call >>>>>> sum, >>>>>>>> it >>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>>>> use an InversePreReducer.) It would be an interesting >>>>> test, >>>>>>>>>>>>>> because >>>>>>>>>>>>>>>>>>>>> the inverse function optimization really depends on the >>>>>>>> stream >>>>>>>>>>>>>> being >>>>>>>>>>>>>>>>>>>>> ordered, and I think it has the potential of being >>>> faster >>>>>>>> then >>>>>>>>>>>>>>>>>>>>> Next/Reduce. Especially if the window size is much >>>> larger >>>>>>>> than >>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> slide size. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Best regards, >>>>>>>>>>>>>>>>>>>>> Gabor >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek < >>>>>>>>>> aljos...@apache.org >>>>>>>>>>>>>>> : >>>>>>>>>>>>>>>>>>>>>> I think I'll have to elaborate a bit so I created a >>>>>>>>>>>>>>> proof-of-concept >>>>>>>>>>>>>>>>>>>>>> implementation of my Ideas and ran some throughput >>>>>>>>>> measurements >>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>> alleviate concerns about performance. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> First, though, I want to highlight again why the >>>> current >>>>>>>>>>>> approach >>>>>>>>>>>>>>>>> does >>>>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>> work with out-of-order elements (which, again, occur >>>>>>>>>> constantly >>>>>>>>>>>>>> due >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> distributed nature of the system). This is the >>>> example I >>>>>>>>>> posted >>>>>>>>>>>>>>>>>>> earlier: >>>>>>>>>>>>>>>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27 >>>> . >>>>>> The >>>>>>>>>> plan >>>>>>>>>>>>>>>>> looks >>>>>>>>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>>>>>>> this: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> +--+ >>>>>>>>>>>>>>>>>>>>>> | | Source >>>>>>>>>>>>>>>>>>>>>> +--+ >>>>>>>>>>>>>>>>>>>>>> | >>>>>>>>>>>>>>>>>>>>>> +-----+ >>>>>>>>>>>>>>>>>>>>>> | | >>>>>>>>>>>>>>>>>>>>>> | +--+ >>>>>>>>>>>>>>>>>>>>>> | | | Identity Map >>>>>>>>>>>>>>>>>>>>>> | +--+ >>>>>>>>>>>>>>>>>>>>>> | | >>>>>>>>>>>>>>>>>>>>>> +-----+ >>>>>>>>>>>>>>>>>>>>>> | >>>>>>>>>>>>>>>>>>>>>> +--+ >>>>>>>>>>>>>>>>>>>>>> | | Window >>>>>>>>>>>>>>>>>>>>>> +--+ >>>>>>>>>>>>>>>>>>>>>> | >>>>>>>>>>>>>>>>>>>>>> | >>>>>>>>>>>>>>>>>>>>>> +--+ >>>>>>>>>>>>>>>>>>>>>> | | Sink >>>>>>>>>>>>>>>>>>>>>> +--+ >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> So all it does is pass the elements through an >>>> identity >>>>>> map >>>>>>>>>> and >>>>>>>>>>>>>>> then >>>>>>>>>>>>>>>>>>>>> merge >>>>>>>>>>>>>>>>>>>>>> them again before the window operator. The source >>>> emits >>>>>>>>>>>> ascending >>>>>>>>>>>>>>>>>>>>> integers >>>>>>>>>>>>>>>>>>>>>> and the window operator has a custom timestamp >>>> extractor >>>>>>>> that >>>>>>>>>>>>>> uses >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> integer itself as the timestamp and should create >>>>> windows >>>>>> of >>>>>>>>>>>>>> size 4 >>>>>>>>>>>>>>>>>>> (that >>>>>>>>>>>>>>>>>>>>>> is elements with timestamp 0-3 are one window, the >>>> next >>>>>> are >>>>>>>>>> the >>>>>>>>>>>>>>>>>>> elements >>>>>>>>>>>>>>>>>>>>>> with timestamp 4-8, and so on). Since the topology >>>>>> basically >>>>>>>>>>>>>>> doubles >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> elements form the source I would expect to get these >>>>>>>> windows: >>>>>>>>>>>>>>>>>>>>>> Window: 0, 0, 1, 1, 2, 2, 3, 3 >>>>>>>>>>>>>>>>>>>>>> Window: 4, 4, 6, 6, 7, 7, 8, 8 >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> The output is this, however: >>>>>>>>>>>>>>>>>>>>>> Window: 0, 1, 2, 3, >>>>>>>>>>>>>>>>>>>>>> Window: 4, 0, 1, 2, 3, 4, 5, 6, 7, >>>>>>>>>>>>>>>>>>>>>> Window: 8, 9, 10, 11, >>>>>>>>>>>>>>>>>>>>>> Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, >>>>>>>>>>>>>>>>>>>>>> Window: 16, 17, 18, 19, >>>>>>>>>>>>>>>>>>>>>> Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, >>>> 22, >>>>>> 23, >>>>>>>>>>>>>>>>>>>>>> Window: 24, 25, 26, 27, >>>>>>>>>>>>>>>>>>>>>> Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, >>>> 30, >>>>>> 31, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> The reason is that the elements simply arrive >>>>>> out-of-order. >>>>>>>>>>>>>> Imagine >>>>>>>>>>>>>>>>>>> what >>>>>>>>>>>>>>>>>>>>>> would happen if the elements actually arrived with >>>> some >>>>>>>> delay >>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>>>> different operations. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Now, on to the performance numbers. The >>>>> proof-of-concept I >>>>>>>>>>>>>> created >>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>> available here: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>> https://github.com/aljoscha/flink/tree/event-time-window-fn-mock >>>>>>>>>>>>>> . >>>>>>>>>>>>>>>>> The >>>>>>>>>>>>>>>>>>>>> basic >>>>>>>>>>>>>>>>>>>>>> idea is that sources assign the current timestamp when >>>>>>>>>> emitting >>>>>>>>>>>>>>>>>>> elements. >>>>>>>>>>>>>>>>>>>>>> They also periodically emit watermarks that tell us >>>> that >>>>>> no >>>>>>>>>>>>>>> elements >>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>> an earlier timestamp will be emitted. The watermarks >>>>>>>> propagate >>>>>>>>>>>>>>>>> through >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> operators. The window operator looks at the timestamp >>>> of >>>>>> an >>>>>>>>>>>>>> element >>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>> puts it into the buffer that corresponds to that >>>> window. >>>>>>>> When >>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> window >>>>>>>>>>>>>>>>>>>>>> operator receives a watermark it will look at the >>>>>> in-flight >>>>>>>>>>>>>> windows >>>>>>>>>>>>>>>>>>>>>> (basically the buffers) and emit those windows where >>>> the >>>>>>>>>>>>>> window-end >>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>> before the watermark. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> For measuring throughput I did the following: The >>>> source >>>>>>>> emits >>>>>>>>>>>>>>> tuples >>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>> the form ("tuple", 1) in an infinite loop. The window >>>>>>>> operator >>>>>>>>>>>>>> sums >>>>>>>>>>>>>>>>> up >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> tuples, thereby counting how many tuples the window >>>>>> operator >>>>>>>>>> can >>>>>>>>>>>>>>>>>>> handle >>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>> a given time window. There are two different >>>>>> implementations >>>>>>>>>> for >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> summation: 1) simply summing up the values in a >>>>>> mapWindow(), >>>>>>>>>>>>>> there >>>>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>>>>>>> get >>>>>>>>>>>>>>>>>>>>>> a List of all tuples and simple iterate over it. 2) >>>>> using >>>>>>>>>>>> sum(1), >>>>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>> implemented as a reduce() (that uses the pre-reducer >>>>>>>>>>>>>>> optimisations). >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> These are the performance numbers (Current is the >>>>> current >>>>>>>>>>>>>>>>>>> implementation, >>>>>>>>>>>>>>>>>>>>>> Next is my proof-of-concept): >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Tumbling (1 sec): >>>>>>>>>>>>>>>>>>>>>> - Current/Map: 1.6 mio >>>>>>>>>>>>>>>>>>>>>> - Current/Reduce: 2 mio >>>>>>>>>>>>>>>>>>>>>> - Next/Map: 2.2 mio >>>>>>>>>>>>>>>>>>>>>> - Next/Reduce: 4 mio >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Sliding (5 sec, slide 1 sec): >>>>>>>>>>>>>>>>>>>>>> - Current/Map: ca 3 mio (fluctuates a lot) >>>>>>>>>>>>>>>>>>>>>> - Current/Reduce: No output >>>>>>>>>>>>>>>>>>>>>> - Next/Map: ca 4 mio (fluctuates) >>>>>>>>>>>>>>>>>>>>>> - Next/Reduce: 10 mio >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> The Next/Reduce variant can basically scale >>>> indefinitely >>>>>>>> with >>>>>>>>>>>>>>> window >>>>>>>>>>>>>>>>>>> size >>>>>>>>>>>>>>>>>>>>>> because the internal state does not rely on the number >>>>> of >>>>>>>>>>>>>> elements >>>>>>>>>>>>>>>>>>> (it is >>>>>>>>>>>>>>>>>>>>>> just the current sum). The pre-reducer for sliding >>>>>> elements >>>>>>>>>>>>>> cannot >>>>>>>>>>>>>>>>>>> handle >>>>>>>>>>>>>>>>>>>>>> the amount of tuples, it produces no output. For the >>>> two >>>>>> Map >>>>>>>>>>>>>>> variants >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> performance fluctuates because they always keep all >>>> the >>>>>>>>>> elements >>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>> an >>>>>>>>>>>>>>>>>>>>>> internal buffer before emission, this seems to tax the >>>>>>>> garbage >>>>>>>>>>>>>>>>>>> collector >>>>>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>> bit and leads to random pauses. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> One thing that should be noted is that I had to >>>> disable >>>>>> the >>>>>>>>>>>>>>>>>>> fake-element >>>>>>>>>>>>>>>>>>>>>> emission thread, otherwise the Current versions would >>>>>>>>>> deadlock. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> So, I started working on this because I thought that >>>>>>>>>>>> out-of-order >>>>>>>>>>>>>>>>>>>>>> processing would be necessary for correctness. And it >>>> is >>>>>>>>>>>>>> certainly, >>>>>>>>>>>>>>>>>>> But >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> proof-of-concept also shows that performance can be >>>>>> greatly >>>>>>>>>>>>>>> improved. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Wed, 24 Jun 2015 at 09:46 Gyula Fóra < >>>>>>>> gyula.f...@gmail.com >>>>>>>>>>> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> I agree lets separate these topics from each other so >>>>> we >>>>>>>> can >>>>>>>>>>>> get >>>>>>>>>>>>>>>>>>> faster >>>>>>>>>>>>>>>>>>>>>>> resolution. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> There is already a state discussion in the thread we >>>>>>>> started >>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>> Paris. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas < >>>>>>>>>>>>>>> ktzou...@apache.org >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> I agree with supporting out-of-order out of the box >>>>> :-), >>>>>>>>>> even >>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>> means >>>>>>>>>>>>>>>>>>>>>>>> a major refactoring. This is the right time to >>>>> refactor >>>>>>>> the >>>>>>>>>>>>>>>>>>> streaming >>>>>>>>>>>>>>>>>>>>>> API >>>>>>>>>>>>>>>>>>>>>>>> before we pull it out of beta. I think that this is >>>>> more >>>>>>>>>>>>>>> important >>>>>>>>>>>>>>>>>>>>> than >>>>>>>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>>>>>> features in the streaming API, which can be >>>>> prioritized >>>>>>>> once >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> API >>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>> out >>>>>>>>>>>>>>>>>>>>>>>> of beta (meaning, that IMO this is the right time to >>>>>> stall >>>>>>>>>> PRs >>>>>>>>>>>>>>>>>>> until >>>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>>>> agree on the design). >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> There are three sections in the document: windowing, >>>>>>>> state, >>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>> API. >>>>>>>>>>>>>>>>>>>>> How >>>>>>>>>>>>>>>>>>>>>>>> convoluted are those with each other? Can we >>>> separate >>>>>> the >>>>>>>>>>>>>>>>>>> discussion >>>>>>>>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>>>>>>>> do >>>>>>>>>>>>>>>>>>>>>>>> we need to discuss those all together? I think part >>>> of >>>>>> the >>>>>>>>>>>>>>>>>>> difficulty >>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>> that we are discussing three design choices at once. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning < >>>>>>>>>>>>>>>>>>> ted.dunn...@gmail.com> >>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Out of order is ubiquitous in the real-world. >>>>>> Typically, >>>>>>>>>>>> what >>>>>>>>>>>>>>>>>>>>>> happens is >>>>>>>>>>>>>>>>>>>>>>>>> that businesses will declare a maximum allowable >>>>> delay >>>>>>>> for >>>>>>>>>>>>>>>>>>> delayed >>>>>>>>>>>>>>>>>>>>>>>>> transactions and will commit to results when that >>>>> delay >>>>>>>> is >>>>>>>>>>>>>>>>>>> reached. >>>>>>>>>>>>>>>>>>>>>>>>> Transactions that arrive later than this cutoff are >>>>>>>>>> collected >>>>>>>>>>>>>>>>>>>>>> specially >>>>>>>>>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>>>>>>> corrections which are reported/used when possible. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Clearly, ordering can also be violated during >>>>>> processing, >>>>>>>>>> but >>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> data >>>>>>>>>>>>>>>>>>>>>>>>> is originally out of order the situation can't be >>>>>>>> repaired >>>>>>>>>> by >>>>>>>>>>>>>>> any >>>>>>>>>>>>>>>>>>>>>>>> protocol >>>>>>>>>>>>>>>>>>>>>>>>> fixes that prevent transactions from becoming >>>>>> disordered >>>>>>>>>> but >>>>>>>>>>>>>> has >>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>> handled >>>>>>>>>>>>>>>>>>>>>>>>> at the data level. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek < >>>>>>>>>>>>>>>>>>>>> aljos...@apache.org >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> I also don't like big changes but sometimes they >>>> are >>>>>>>>>>>>>> necessary. >>>>>>>>>>>>>>>>>>>>> The >>>>>>>>>>>>>>>>>>>>>>>>> reason >>>>>>>>>>>>>>>>>>>>>>>>>> why I'm so adamant about out-of-order processing >>>> is >>>>>> that >>>>>>>>>>>>>>>>>>>>>> out-of-order >>>>>>>>>>>>>>>>>>>>>>>>>> elements are not some exception that occurs once >>>> in >>>>> a >>>>>>>>>> while; >>>>>>>>>>>>>>>>>>> they >>>>>>>>>>>>>>>>>>>>>> occur >>>>>>>>>>>>>>>>>>>>>>>>>> constantly in a distributed system. For example, >>>> in >>>>>>>> this: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>> https://gist.github.com/aljoscha/a367012646ab98208d27 >>>>>>>> the >>>>>>>>>>>>>>>>>>>>> resulting >>>>>>>>>>>>>>>>>>>>>>>>>> windows >>>>>>>>>>>>>>>>>>>>>>>>>> are completely bogus because the current windowing >>>>>>>> system >>>>>>>>>>>>>>>>>>> assumes >>>>>>>>>>>>>>>>>>>>>>>>> elements >>>>>>>>>>>>>>>>>>>>>>>>>> to globally arrive in order, which is simply not >>>>> true. >>>>>>>>>> (The >>>>>>>>>>>>>>>>>>>>> example >>>>>>>>>>>>>>>>>>>>>>>> has a >>>>>>>>>>>>>>>>>>>>>>>>>> source that generates increasing integers. Then >>>>> these >>>>>>>> pass >>>>>>>>>>>>>>>>>>>>> through a >>>>>>>>>>>>>>>>>>>>>>>> map >>>>>>>>>>>>>>>>>>>>>>>>>> and are unioned with the original DataStream >>>> before >>>>> a >>>>>>>>>> window >>>>>>>>>>>>>>>>>>>>>> operator.) >>>>>>>>>>>>>>>>>>>>>>>>>> This simulates elements arriving from different >>>>>>>> operators >>>>>>>>>> at >>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>>>> windowing >>>>>>>>>>>>>>>>>>>>>>>>>> operator. The example is also DOP=1, I imagine >>>> this >>>>> to >>>>>>>> get >>>>>>>>>>>>>>>>>>> worse >>>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>>>> higher DOP. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> What do you mean by costly? As I said, I have a >>>>>>>>>>>>>>>>>>> proof-of-concept >>>>>>>>>>>>>>>>>>>>>>>>> windowing >>>>>>>>>>>>>>>>>>>>>>>>>> operator that can handle out-or-order elements. >>>> This >>>>>> is >>>>>>>> an >>>>>>>>>>>>>>>>>>> example >>>>>>>>>>>>>>>>>>>>>>>> using >>>>>>>>>>>>>>>>>>>>>>>>>> the current Flink API: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>> https://gist.github.com/aljoscha/f8dce0691732e344bbe8 >>>>>> . >>>>>>>>>>>>>>>>>>>>>>>>>> (It is an infinite source of tuples and a 5 second >>>>>>>> window >>>>>>>>>>>>>>>>>>> operator >>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>> counts the tuples.) The first problem is that this >>>>>> code >>>>>>>>>>>>>>>>>>> deadlocks >>>>>>>>>>>>>>>>>>>>>>>> because >>>>>>>>>>>>>>>>>>>>>>>>>> of the thread that emits fake elements. If I >>>> disable >>>>>> the >>>>>>>>>>>> fake >>>>>>>>>>>>>>>>>>>>>> element >>>>>>>>>>>>>>>>>>>>>>>>> code >>>>>>>>>>>>>>>>>>>>>>>>>> it works, but the throughput using my mockup is 4 >>>>>> times >>>>>>>>>>>>>> higher >>>>>>>>>>>>>>>>>>> . >>>>>>>>>>>>>>>>>>>>> The >>>>>>>>>>>>>>>>>>>>>>>> gap >>>>>>>>>>>>>>>>>>>>>>>>>> widens dramatically if the window size increases. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> So, it actually increases performance (unless I'm >>>>>>>> making a >>>>>>>>>>>>>>>>>>> mistake >>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>> my >>>>>>>>>>>>>>>>>>>>>>>>>> explorations) and can handle elements that arrive >>>>>>>>>>>>>> out-of-order >>>>>>>>>>>>>>>>>>>>>> (which >>>>>>>>>>>>>>>>>>>>>>>>>> happens basically always in a real-world windowing >>>>>>>>>>>>>> use-cases). >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, 23 Jun 2015 at 12:51 Stephan Ewen < >>>>>>>>>> se...@apache.org >>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> What I like a lot about Aljoscha's proposed >>>> design >>>>> is >>>>>>>>>> that >>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>> need no >>>>>>>>>>>>>>>>>>>>>>>>>>> different code for "system time" vs. "event >>>> time". >>>>> It >>>>>>>>>> only >>>>>>>>>>>>>>>>>>>>>> differs in >>>>>>>>>>>>>>>>>>>>>>>>>> where >>>>>>>>>>>>>>>>>>>>>>>>>>> the timestamps are assigned. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> The OOP approach also gives you the semantics of >>>>>> total >>>>>>>>>>>>>>>>>>> ordering >>>>>>>>>>>>>>>>>>>>>>>> without >>>>>>>>>>>>>>>>>>>>>>>>>>> imposing merges on the streams. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. >>>> Sax < >>>>>>>>>>>>>>>>>>>>>>>>>>> mj...@informatik.hu-berlin.de> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> I agree that there should be multiple >>>> alternatives >>>>>> the >>>>>>>>>>>>>>>>>>> user(!) >>>>>>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>>>>>>>>> choose from. Partial out-of-order processing >>>> works >>>>>> for >>>>>>>>>>>>>>>>>>>>> many/most >>>>>>>>>>>>>>>>>>>>>>>>>>>> aggregates. However, if you consider >>>>>>>>>>>>>>>>>>> Event-Pattern-Matching, >>>>>>>>>>>>>>>>>>>>>> global >>>>>>>>>>>>>>>>>>>>>>>>>>>> ordering in necessary (even if the performance >>>>>> penalty >>>>>>>>>>>>>>>>>>> might >>>>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>>>> high). >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> I would also keep "system-time windows" as an >>>>>>>>>> alternative >>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>> "source >>>>>>>>>>>>>>>>>>>>>>>>>>>> assigned ts-windows". >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> It might also be interesting to consider the >>>>>> following >>>>>>>>>>>>>>>>>>> paper >>>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>>>>>> overlapping windows: "Resource sharing in >>>>> continuous >>>>>>>>>>>>>>>>>>>>>> sliding-window >>>>>>>>>>>>>>>>>>>>>>>>>>>> aggregates" >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://dl.acm.org/citation.cfm?id=1316720 >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> On 06/23/2015 10:37 AM, Gyula Fóra wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think we should not block PRs unnecessarily >>>> if >>>>>> your >>>>>>>>>>>>>>>>>>>>>> suggested >>>>>>>>>>>>>>>>>>>>>>>>>> changes >>>>>>>>>>>>>>>>>>>>>>>>>>>>> might touch them at some point. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Also I still think we should not put everything >>>>> in >>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> Datastream >>>>>>>>>>>>>>>>>>>>>>>>>>> because >>>>>>>>>>>>>>>>>>>>>>>>>>>>> it will be a huge mess. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Also we need to agree on the out of order >>>>>> processing, >>>>>>>>>>>>>>>>>>>>> whether >>>>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>>>>> want >>>>>>>>>>>>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the way you proposed it(which is quite costly). >>>>>>>> Another >>>>>>>>>>>>>>>>>>>>>>>> alternative >>>>>>>>>>>>>>>>>>>>>>>>>>>>> approach there which fits in the current >>>>> windowing >>>>>> is >>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>> filter >>>>>>>>>>>>>>>>>>>>>>>> out >>>>>>>>>>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>>>>>>>>>>> order events and apply a special handling >>>>> operator >>>>>> on >>>>>>>>>>>>>>>>>>> them. >>>>>>>>>>>>>>>>>>>>>> This >>>>>>>>>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>>>>>>>> fairly lightweight. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> My point is that we need to consider some >>>>>> alternative >>>>>>>>>>>>>>>>>>>>>> solutions. >>>>>>>>>>>>>>>>>>>>>>>>> And >>>>>>>>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>>>>>>>>> should not block contributions along the way. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Gyula >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:55 AM Aljoscha >>>> Krettek >>>>> < >>>>>>>>>>>>>>>>>>>>>>>>>> aljos...@apache.org> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The reason I posted this now is that we need >>>> to >>>>>>>> think >>>>>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>> API >>>>>>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> windowing before proceeding with the PRs of >>>>> Gabor >>>>>>>>>>>>>>>>>>> (inverse >>>>>>>>>>>>>>>>>>>>>>>> reduce) >>>>>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Gyula (removal of "aggregate" functions on >>>>>>>>>> DataStream). >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For the windowing, I think that the current >>>>> model >>>>>>>> does >>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>> work >>>>>>>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> out-of-order processing. Therefore, the whole >>>>>>>>>> windowing >>>>>>>>>>>>>>>>>>>>>>>>>> infrastructure >>>>>>>>>>>>>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> basically have to be redone. Meaning also that >>>>> any >>>>>>>>>> work >>>>>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> pre-aggregators or optimizations that we do >>>> now >>>>>>>>>> becomes >>>>>>>>>>>>>>>>>>>>>> useless. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For the API, I proposed to restructure the >>>>>>>>>> interactions >>>>>>>>>>>>>>>>>>>>>> between >>>>>>>>>>>>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> different *DataStream classes and >>>>>>>> grouping/windowing. >>>>>>>>>>>>>>>>>>> (See >>>>>>>>>>>>>>>>>>>>>> API >>>>>>>>>>>>>>>>>>>>>>>>>> section >>>>>>>>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the doc I posted.) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra < >>>>>>>>>>>>>>>>>>>>> gyula.f...@gmail.com >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Aljoscha, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the nice summary, this is a very >>>>> good >>>>>>>>>>>>>>>>>>>>> initiative. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I added some comments to the respective >>>>> sections >>>>>>>>>>>>>>>>>>> (where I >>>>>>>>>>>>>>>>>>>>>> didnt >>>>>>>>>>>>>>>>>>>>>>>>>> fully >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> agree >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> :).). >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> At some point I think it would be good to >>>> have >>>>> a >>>>>>>>>> public >>>>>>>>>>>>>>>>>>>>>> hangout >>>>>>>>>>>>>>>>>>>>>>>>>>> session >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this, which could make a more dynamic >>>>> discussion. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Gyula >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Aljoscha Krettek <aljos...@apache.org> ezt >>>>> írta >>>>>>>>>>>>>>>>>>> (időpont: >>>>>>>>>>>>>>>>>>>>>>>> 2015. >>>>>>>>>>>>>>>>>>>>>>>>>> jún. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 22., >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> H, 21:34): >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with people proposing changes to the >>>> streaming >>>>>>>> part >>>>>>>>>> I >>>>>>>>>>>>>>>>>>>>> also >>>>>>>>>>>>>>>>>>>>>>>>> wanted >>>>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> throw >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> my hat into the ring. :D >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> During the last few months, while I was >>>>> getting >>>>>>>>>>>>>>>>>>>>> acquainted >>>>>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> streaming system, I wrote down some >>>> thoughts I >>>>>> had >>>>>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>>>>>>> how >>>>>>>>>>>>>>>>>>>>>>>>>> things >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> could >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be improved. Hopefully, they are in somewhat >>>>>>>>>> coherent >>>>>>>>>>>>>>>>>>>>> shape >>>>>>>>>>>>>>>>>>>>>>>> now, >>>>>>>>>>>>>>>>>>>>>>>>>> so >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> please >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have a look if you are interested in this: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>>> >> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This mostly covers: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Timestamps assigned at sources >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Out-of-order processing of elements in >>>>> window >>>>>>>>>>>>>>>>>>>>> operators >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - API design >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Please let me know what you think. Comment >>>> in >>>>>> the >>>>>>>>>>>>>>>>>>>>> document >>>>>>>>>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>>>>>>>>>>> here >>>>>>>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mailing list. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I have a PR in the makings that would >>>>> introduce >>>>>>>>>> source >>>>>>>>>>>>>>>>>>>>>>>>> timestamps >>>>>>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> watermarks for keeping track of them. I also >>>>>>>> hacked >>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>>>>>>> proof-of-concept >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> windowing system that is able to process >>>>>>>>>> out-of-order >>>>>>>>>>>>>>>>>>>>>> elements >>>>>>>>>>>>>>>>>>>>>>>>>>> using a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> FlatMap operator. (It uses panes to perform >>>>>>>>>> efficient >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> pre-aggregations.) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Aljoscha >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> < >
signature.asc
Description: OpenPGP digital signature