Yes. But as I said, you can get the same behavior with a GroupedDataStream using a tumbling 1-tuple-size window. Thus, there is no conceptual advantage in using KeyedDataStream and no disadvantage in binding stateful operations to GroupedDataStreams.
On 06/27/2015 06:54 PM, Márton Balassi wrote: > @Matthias: Your point of working with a minimal number of clear concepts is > desirable to say the least. :) > > The reasoning behind the KeyedDatastream is to associate Flink persisted > operator state with the keys of the data that produced it, so that stateful > computation becomes scalabe in the future. This should not be tied to the > GroupedDataStream, especially not if we are removing the option to create > groups without windows as proposed on the Wiki by Stephan. > > On Sat, Jun 27, 2015 at 4:15 PM, Matthias J. Sax < > mj...@informatik.hu-berlin.de> wrote: > >> This was more a conceptual point-of-view argument. From an >> implementation point of view, skipping the window building step is a >> good idea if a tumbling 1-tuple-size window is detected. >> >> I prefer to work with a minimum number of concepts (and apply internal >> optimization if possible) instead of using redundant concepts for >> special cases. Of course, this is my personal point of view. >> >> -Matthias >> >> On 06/27/2015 03:47 PM, Aljoscha Krettek wrote: >>> What do you mean by Comment 2? Using the whole window apparatus if you >> just >>> want to have, for example, a simple partitioned filter with partitioned >>> state seems a bit extravagant. >>> >>> On Sat, 27 Jun 2015 at 15:19 Matthias J. Sax < >> mj...@informatik.hu-berlin.de> >>> wrote: >>> >>>> Nice starting point. >>>> >>>> Comment 1: >>>> "Each individual stream partition delivers elements strictly in order." >>>> (in 'Parallel Streams, Partitions, Time, and Ordering') >>>> >>>> I would say "FIFO" and not "strictly in order". If data is not emitted >>>> in-order, the stream partition will not be in-order either. >>>> >>>> Comment 2: >>>> Why do we need KeyedDataStream. You can get everything done with >>>> GroupedDataStream (using a tumbling window of size = 1 tuple). >>>> >>>> >>>> -Matthias >>>> >>>> On 06/26/2015 07:42 PM, Stephan Ewen wrote: >>>>> Here is a first bit of what I have been writing down. Will add more >> over >>>>> the next days: >>>>> >>>>> >>>>> https://cwiki.apache.org/confluence/display/FLINK/Stream+Windows >>>>> >>>>> >>>> >> https://cwiki.apache.org/confluence/display/FLINK/Parallel+Streams%2C+Partitions%2C+Time%2C+and+Ordering >>>>> >>>>> >>>>> >>>>> On Thu, Jun 25, 2015 at 6:35 PM, Paris Carbone <par...@kth.se> wrote: >>>>> >>>>>> +1 for writing this down >>>>>> >>>>>>> On 25 Jun 2015, at 18:11, Aljoscha Krettek <aljos...@apache.org> >>>> wrote: >>>>>>> >>>>>>> +1 go ahead >>>>>>> >>>>>>> On Thu, 25 Jun 2015 at 18:02 Stephan Ewen <se...@apache.org> wrote: >>>>>>> >>>>>>>> Hey! >>>>>>>> >>>>>>>> This thread covers many different topics. Lets break this up into >>>>>> separate >>>>>>>> discussions. >>>>>>>> >>>>>>>> - Operator State is already driven by Gyula and Paris and happening >> on >>>>>> the >>>>>>>> above mentioned pull request and the followup discussions. >>>>>>>> >>>>>>>> - For windowing, this discussion has brought some results that we >>>> should >>>>>>>> sum up and clearly write down. >>>>>>>> I would like to chime in to do that based on what I learned from >> the >>>>>>>> document and this discussion. I also got some input from Marton >> about >>>>>> what >>>>>>>> he learned from mapping the Cloud DataFlow constructs to Flink. >>>>>>>> I'll draft a Wiki page (with the help of Aljoscha, Marton) that >> sums >>>>>>>> this up and documents it for users (if we decide to adopt this). >>>>>>>> Then we run this by Gyula, Matthias Sax and Kostas for feedback. >>>>>>>> >>>>>>>> - API style discussions should be yet another thread. This will >>>> probably >>>>>>>> be opened as people start to address that. >>>>>>>> >>>>>>>> >>>>>>>> I'll try to get a draft of the wiki version out tomorrow noon and >> send >>>>>> the >>>>>>>> link around. >>>>>>>> >>>>>>>> Greetings, >>>>>>>> Stephan >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Jun 25, 2015 at 3:51 PM, Matthias J. Sax < >>>>>>>> mj...@informatik.hu-berlin.de> wrote: >>>>>>>> >>>>>>>>> Sure. I picked this up. Using the current model for "occurrence >> time >>>>>>>>> semantics" does not work. >>>>>>>>> >>>>>>>>> I elaborated on this in the past many times (but nobody cared). It >> is >>>>>>>>> important to make it clear to the user what semantics are >> supported. >>>>>>>>> Claiming to support "sliding windows" doesn't mean anything; there >>>> are >>>>>>>>> too many different semantics out there. :) >>>>>>>>> >>>>>>>>> >>>>>>>>> On 06/25/2015 03:35 PM, Aljoscha Krettek wrote: >>>>>>>>>> Yes, I am aware of this requirement and it would also be supported >>>> in >>>>>>>> my >>>>>>>>>> proposed model. >>>>>>>>>> >>>>>>>>>> The problem is, that the "custom timestamp" feature gives the >>>>>>>> impression >>>>>>>>>> that the elements would be windowed according to a user-timestamp. >>>> The >>>>>>>>>> results, however, are wrong because of the assumption about >> elements >>>>>>>>>> arriving in order. (This is what I was trying to show with my >> fancy >>>>>>>> ASCII >>>>>>>>>> art and result output. >>>>>>>>>> >>>>>>>>>> On Thu, 25 Jun 2015 at 15:26 Matthias J. Sax < >>>>>>>>> mj...@informatik.hu-berlin.de> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Aljoscha, >>>>>>>>>>> >>>>>>>>>>> I like that you are pushing in this direction. However, IMHO you >>>>>>>>>>> misinterpreter the current approach. It does not assume that >> tuples >>>>>>>>>>> arrive in-order; the current approach has no notion about a >>>>>>>>>>> pre-defined-order (for example, the order in which the event are >>>>>>>>>>> created). There is only the notion of "arrival-order" at the >>>>>> operator. >>>>>>>>>>> From this "arrival-order" perspective, the result are correct(!). >>>>>>>>>>> >>>>>>>>>>> Windowing in the current approach means for example, "sum up an >>>>>>>>>>> attribute of all events you *received* in the last 5 seconds". >> That >>>>>>>> is a >>>>>>>>>>> different meaning that "sum up an attribute of all event that >>>>>>>> *occurred* >>>>>>>>>>> in the last 5 seconds". Both queries are valid and Flink should >>>>>>>> support >>>>>>>>>>> both IMHO. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -Matthias >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 06/25/2015 03:03 PM, Aljoscha Krettek wrote: >>>>>>>>>>>> Yes, now this also processes about 3 mio Elements (Window Size 5 >>>>>> sec, >>>>>>>>>>> Slide >>>>>>>>>>>> 1 sec) but it still fluctuates a lot between 1 mio. and 5 mio. >>>>>>>>>>>> >>>>>>>>>>>> Performance is not my main concern, however. My concern is that >>>> the >>>>>>>>>>> current >>>>>>>>>>>> model assumes elements to arrive in order, which is simply not >>>> true. >>>>>>>>>>>> >>>>>>>>>>>> In your code you have these lines for specifying the window: >>>>>>>>>>>> .window(Time.of(1l, TimeUnit.SECONDS)) >>>>>>>>>>>> .every(Time.of(1l, TimeUnit.SECONDS)) >>>>>>>>>>>> >>>>>>>>>>>> Although this semantically specifies a tumbling window of size 1 >>>> sec >>>>>>>>> I'm >>>>>>>>>>>> afraid it uses the sliding window logic internally (because of >> the >>>>>>>>>>>> .every()). >>>>>>>>>>>> >>>>>>>>>>>> In my tests I only have the first line. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thu, 25 Jun 2015 at 14:32 Gábor Gévay <gga...@gmail.com> >>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I'm very sorry, I had a bug in the InversePreReducer. It should >>>> be >>>>>>>>>>>>> fixed now. Can you please run it again? >>>>>>>>>>>>> >>>>>>>>>>>>> I also tried to reproduce some of your performance numbers, but >>>> I'm >>>>>>>>>>>>> getting only less than 1/10th of yours. For example, in the >>>>>> Tumbling >>>>>>>>>>>>> case, Current/Reduce produces only ~100000 for me. Do you have >>>> any >>>>>>>>>>>>> idea what I could be doing wrong? My code: >>>>>>>>>>>>> http://pastebin.com/zbEjmGhk >>>>>>>>>>>>> I am running it on a 2 GHz Core i7. >>>>>>>>>>>>> >>>>>>>>>>>>> Best regards, >>>>>>>>>>>>> Gabor >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> 2015-06-25 12:31 GMT+02:00 Aljoscha Krettek < >> aljos...@apache.org >>>>> : >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> I also ran the tests on top of PR 856 (inverse reducer) now. >> The >>>>>>>>>>> results >>>>>>>>>>>>>> seem incorrect. When I insert a Thread.sleep(1) in the tuple >>>>>>>> source, >>>>>>>>>>> all >>>>>>>>>>>>>> the previous tests reported around 3600 tuples (Size 5 sec, >>>> Slide >>>>>> 1 >>>>>>>>>>> sec) >>>>>>>>>>>>>> (Theoretically there would be 5000 tuples in 5 seconds but >> this >>>> is >>>>>>>>> due >>>>>>>>>>> to >>>>>>>>>>>>>> overhead). These are the results for the inverse reduce >>>>>>>> optimisation: >>>>>>>>>>>>>> (Tuple 0,38) >>>>>>>>>>>>>> (Tuple 0,829) >>>>>>>>>>>>>> (Tuple 0,1625) >>>>>>>>>>>>>> (Tuple 0,2424) >>>>>>>>>>>>>> (Tuple 0,3190) >>>>>>>>>>>>>> (Tuple 0,3198) >>>>>>>>>>>>>> (Tuple 0,-339368) >>>>>>>>>>>>>> (Tuple 0,-1315725) >>>>>>>>>>>>>> (Tuple 0,-2932932) >>>>>>>>>>>>>> (Tuple 0,-5082735) >>>>>>>>>>>>>> (Tuple 0,-7743256) >>>>>>>>>>>>>> (Tuple 0,75701046) >>>>>>>>>>>>>> (Tuple 0,642829470) >>>>>>>>>>>>>> (Tuple 0,2242018381) >>>>>>>>>>>>>> (Tuple 0,5190708618) >>>>>>>>>>>>>> (Tuple 0,10060360311) >>>>>>>>>>>>>> (Tuple 0,-94254951) >>>>>>>>>>>>>> (Tuple 0,-219806321293) >>>>>>>>>>>>>> (Tuple 0,-1258895232699) >>>>>>>>>>>>>> (Tuple 0,-4074432596329) >>>>>>>>>>>>>> >>>>>>>>>>>>>> One line is one emitted window count. This is what happens >> when >>>> I >>>>>>>>>>> remove >>>>>>>>>>>>>> the Thread.sleep(1): >>>>>>>>>>>>>> (Tuple 0,660676) >>>>>>>>>>>>>> (Tuple 0,2553733) >>>>>>>>>>>>>> (Tuple 0,3542696) >>>>>>>>>>>>>> (Tuple 0,1) >>>>>>>>>>>>>> (Tuple 0,1107035) >>>>>>>>>>>>>> (Tuple 0,2549491) >>>>>>>>>>>>>> (Tuple 0,4100387) >>>>>>>>>>>>>> (Tuple 0,-8406583360092) >>>>>>>>>>>>>> (Tuple 0,-8406582150743) >>>>>>>>>>>>>> (Tuple 0,-8406580427190) >>>>>>>>>>>>>> (Tuple 0,-8406580427190) >>>>>>>>>>>>>> (Tuple 0,-8406580427190) >>>>>>>>>>>>>> (Tuple 0,6847279255682044995) >>>>>>>>>>>>>> (Tuple 0,6847279255682044995) >>>>>>>>>>>>>> (Tuple 0,-5390528042713628318) >>>>>>>>>>>>>> (Tuple 0,-5390528042711551780) >>>>>>>>>>>>>> (Tuple 0,-5390528042711551780) >>>>>>>>>>>>>> >>>>>>>>>>>>>> So at some point the pre-reducer seems to go haywire and does >>>> not >>>>>>>>>>> recover >>>>>>>>>>>>>> from it. The good thing is that it does produce results now, >>>> where >>>>>>>>> the >>>>>>>>>>>>>> previous Current/Reduce would simply hang and not produce any >>>>>>>> output. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, 25 Jun 2015 at 12:02 Gábor Gévay <gga...@gmail.com> >>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hello, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Aljoscha, can you please try the performance test of >>>>>>>> Current/Reduce >>>>>>>>>>>>>>> with the InversePreReducer in PR 856? (If you just call sum, >> it >>>>>>>> will >>>>>>>>>>>>>>> use an InversePreReducer.) It would be an interesting test, >>>>>>>> because >>>>>>>>>>>>>>> the inverse function optimization really depends on the >> stream >>>>>>>> being >>>>>>>>>>>>>>> ordered, and I think it has the potential of being faster >> then >>>>>>>>>>>>>>> Next/Reduce. Especially if the window size is much larger >> than >>>>>> the >>>>>>>>>>>>>>> slide size. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Best regards, >>>>>>>>>>>>>>> Gabor >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek < >>>> aljos...@apache.org >>>>>>>>> : >>>>>>>>>>>>>>>> I think I'll have to elaborate a bit so I created a >>>>>>>>> proof-of-concept >>>>>>>>>>>>>>>> implementation of my Ideas and ran some throughput >>>> measurements >>>>>>>> to >>>>>>>>>>>>>>>> alleviate concerns about performance. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> First, though, I want to highlight again why the current >>>>>> approach >>>>>>>>>>> does >>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>> work with out-of-order elements (which, again, occur >>>> constantly >>>>>>>> due >>>>>>>>>>> to >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> distributed nature of the system). This is the example I >>>> posted >>>>>>>>>>>>> earlier: >>>>>>>>>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27. The >>>> plan >>>>>>>>>>> looks >>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>> this: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> +--+ >>>>>>>>>>>>>>>> | | Source >>>>>>>>>>>>>>>> +--+ >>>>>>>>>>>>>>>> | >>>>>>>>>>>>>>>> +-----+ >>>>>>>>>>>>>>>> | | >>>>>>>>>>>>>>>> | +--+ >>>>>>>>>>>>>>>> | | | Identity Map >>>>>>>>>>>>>>>> | +--+ >>>>>>>>>>>>>>>> | | >>>>>>>>>>>>>>>> +-----+ >>>>>>>>>>>>>>>> | >>>>>>>>>>>>>>>> +--+ >>>>>>>>>>>>>>>> | | Window >>>>>>>>>>>>>>>> +--+ >>>>>>>>>>>>>>>> | >>>>>>>>>>>>>>>> | >>>>>>>>>>>>>>>> +--+ >>>>>>>>>>>>>>>> | | Sink >>>>>>>>>>>>>>>> +--+ >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> So all it does is pass the elements through an identity map >>>> and >>>>>>>>> then >>>>>>>>>>>>>>> merge >>>>>>>>>>>>>>>> them again before the window operator. The source emits >>>>>> ascending >>>>>>>>>>>>>>> integers >>>>>>>>>>>>>>>> and the window operator has a custom timestamp extractor >> that >>>>>>>> uses >>>>>>>>>>> the >>>>>>>>>>>>>>>> integer itself as the timestamp and should create windows of >>>>>>>> size 4 >>>>>>>>>>>>> (that >>>>>>>>>>>>>>>> is elements with timestamp 0-3 are one window, the next are >>>> the >>>>>>>>>>>>> elements >>>>>>>>>>>>>>>> with timestamp 4-8, and so on). Since the topology basically >>>>>>>>> doubles >>>>>>>>>>>>> the >>>>>>>>>>>>>>>> elements form the source I would expect to get these >> windows: >>>>>>>>>>>>>>>> Window: 0, 0, 1, 1, 2, 2, 3, 3 >>>>>>>>>>>>>>>> Window: 4, 4, 6, 6, 7, 7, 8, 8 >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The output is this, however: >>>>>>>>>>>>>>>> Window: 0, 1, 2, 3, >>>>>>>>>>>>>>>> Window: 4, 0, 1, 2, 3, 4, 5, 6, 7, >>>>>>>>>>>>>>>> Window: 8, 9, 10, 11, >>>>>>>>>>>>>>>> Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, >>>>>>>>>>>>>>>> Window: 16, 17, 18, 19, >>>>>>>>>>>>>>>> Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, >>>>>>>>>>>>>>>> Window: 24, 25, 26, 27, >>>>>>>>>>>>>>>> Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The reason is that the elements simply arrive out-of-order. >>>>>>>> Imagine >>>>>>>>>>>>> what >>>>>>>>>>>>>>>> would happen if the elements actually arrived with some >> delay >>>>>>>> from >>>>>>>>>>>>>>>> different operations. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Now, on to the performance numbers. The proof-of-concept I >>>>>>>> created >>>>>>>>> is >>>>>>>>>>>>>>>> available here: >>>>>>>>>>>>>>>> >>>>>> https://github.com/aljoscha/flink/tree/event-time-window-fn-mock >>>>>>>> . >>>>>>>>>>> The >>>>>>>>>>>>>>> basic >>>>>>>>>>>>>>>> idea is that sources assign the current timestamp when >>>> emitting >>>>>>>>>>>>> elements. >>>>>>>>>>>>>>>> They also periodically emit watermarks that tell us that no >>>>>>>>> elements >>>>>>>>>>>>> with >>>>>>>>>>>>>>>> an earlier timestamp will be emitted. The watermarks >> propagate >>>>>>>>>>> through >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> operators. The window operator looks at the timestamp of an >>>>>>>> element >>>>>>>>>>>>> and >>>>>>>>>>>>>>>> puts it into the buffer that corresponds to that window. >> When >>>>>> the >>>>>>>>>>>>> window >>>>>>>>>>>>>>>> operator receives a watermark it will look at the in-flight >>>>>>>> windows >>>>>>>>>>>>>>>> (basically the buffers) and emit those windows where the >>>>>>>> window-end >>>>>>>>>>> is >>>>>>>>>>>>>>>> before the watermark. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> For measuring throughput I did the following: The source >> emits >>>>>>>>> tuples >>>>>>>>>>>>> of >>>>>>>>>>>>>>>> the form ("tuple", 1) in an infinite loop. The window >> operator >>>>>>>> sums >>>>>>>>>>> up >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> tuples, thereby counting how many tuples the window operator >>>> can >>>>>>>>>>>>> handle >>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>> a given time window. There are two different implementations >>>> for >>>>>>>>> the >>>>>>>>>>>>>>>> summation: 1) simply summing up the values in a mapWindow(), >>>>>>>> there >>>>>>>>>>> you >>>>>>>>>>>>>>> get >>>>>>>>>>>>>>>> a List of all tuples and simple iterate over it. 2) using >>>>>> sum(1), >>>>>>>>>>>>> which >>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>> implemented as a reduce() (that uses the pre-reducer >>>>>>>>> optimisations). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> These are the performance numbers (Current is the current >>>>>>>>>>>>> implementation, >>>>>>>>>>>>>>>> Next is my proof-of-concept): >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Tumbling (1 sec): >>>>>>>>>>>>>>>> - Current/Map: 1.6 mio >>>>>>>>>>>>>>>> - Current/Reduce: 2 mio >>>>>>>>>>>>>>>> - Next/Map: 2.2 mio >>>>>>>>>>>>>>>> - Next/Reduce: 4 mio >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Sliding (5 sec, slide 1 sec): >>>>>>>>>>>>>>>> - Current/Map: ca 3 mio (fluctuates a lot) >>>>>>>>>>>>>>>> - Current/Reduce: No output >>>>>>>>>>>>>>>> - Next/Map: ca 4 mio (fluctuates) >>>>>>>>>>>>>>>> - Next/Reduce: 10 mio >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The Next/Reduce variant can basically scale indefinitely >> with >>>>>>>>> window >>>>>>>>>>>>> size >>>>>>>>>>>>>>>> because the internal state does not rely on the number of >>>>>>>> elements >>>>>>>>>>>>> (it is >>>>>>>>>>>>>>>> just the current sum). The pre-reducer for sliding elements >>>>>>>> cannot >>>>>>>>>>>>> handle >>>>>>>>>>>>>>>> the amount of tuples, it produces no output. For the two Map >>>>>>>>> variants >>>>>>>>>>>>> the >>>>>>>>>>>>>>>> performance fluctuates because they always keep all the >>>> elements >>>>>>>> in >>>>>>>>>>> an >>>>>>>>>>>>>>>> internal buffer before emission, this seems to tax the >> garbage >>>>>>>>>>>>> collector >>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>> bit and leads to random pauses. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> One thing that should be noted is that I had to disable the >>>>>>>>>>>>> fake-element >>>>>>>>>>>>>>>> emission thread, otherwise the Current versions would >>>> deadlock. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> So, I started working on this because I thought that >>>>>> out-of-order >>>>>>>>>>>>>>>> processing would be necessary for correctness. And it is >>>>>>>> certainly, >>>>>>>>>>>>> But >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> proof-of-concept also shows that performance can be greatly >>>>>>>>> improved. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Wed, 24 Jun 2015 at 09:46 Gyula Fóra < >> gyula.f...@gmail.com >>>>> >>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I agree lets separate these topics from each other so we >> can >>>>>> get >>>>>>>>>>>>> faster >>>>>>>>>>>>>>>>> resolution. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> There is already a state discussion in the thread we >> started >>>>>>>> with >>>>>>>>>>>>> Paris. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas < >>>>>>>>> ktzou...@apache.org >>>>>>>>>>>> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I agree with supporting out-of-order out of the box :-), >>>> even >>>>>>>> if >>>>>>>>>>>>> this >>>>>>>>>>>>>>>> means >>>>>>>>>>>>>>>>>> a major refactoring. This is the right time to refactor >> the >>>>>>>>>>>>> streaming >>>>>>>>>>>>>>>> API >>>>>>>>>>>>>>>>>> before we pull it out of beta. I think that this is more >>>>>>>>> important >>>>>>>>>>>>>>> than >>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>> features in the streaming API, which can be prioritized >> once >>>>>>>> the >>>>>>>>>>>>> API >>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>> out >>>>>>>>>>>>>>>>>> of beta (meaning, that IMO this is the right time to stall >>>> PRs >>>>>>>>>>>>> until >>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>> agree on the design). >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> There are three sections in the document: windowing, >> state, >>>>>> and >>>>>>>>>>>>> API. >>>>>>>>>>>>>>> How >>>>>>>>>>>>>>>>>> convoluted are those with each other? Can we separate the >>>>>>>>>>>>> discussion >>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>> do >>>>>>>>>>>>>>>>>> we need to discuss those all together? I think part of the >>>>>>>>>>>>> difficulty >>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>> that we are discussing three design choices at once. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning < >>>>>>>>>>>>> ted.dunn...@gmail.com> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Out of order is ubiquitous in the real-world. Typically, >>>>>> what >>>>>>>>>>>>>>>> happens is >>>>>>>>>>>>>>>>>>> that businesses will declare a maximum allowable delay >> for >>>>>>>>>>>>> delayed >>>>>>>>>>>>>>>>>>> transactions and will commit to results when that delay >> is >>>>>>>>>>>>> reached. >>>>>>>>>>>>>>>>>>> Transactions that arrive later than this cutoff are >>>> collected >>>>>>>>>>>>>>>> specially >>>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>> corrections which are reported/used when possible. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Clearly, ordering can also be violated during processing, >>>> but >>>>>>>> if >>>>>>>>>>>>> the >>>>>>>>>>>>>>>> data >>>>>>>>>>>>>>>>>>> is originally out of order the situation can't be >> repaired >>>> by >>>>>>>>> any >>>>>>>>>>>>>>>>>> protocol >>>>>>>>>>>>>>>>>>> fixes that prevent transactions from becoming disordered >>>> but >>>>>>>> has >>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>> handled >>>>>>>>>>>>>>>>>>> at the data level. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek < >>>>>>>>>>>>>>> aljos...@apache.org >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I also don't like big changes but sometimes they are >>>>>>>> necessary. >>>>>>>>>>>>>>> The >>>>>>>>>>>>>>>>>>> reason >>>>>>>>>>>>>>>>>>>> why I'm so adamant about out-of-order processing is that >>>>>>>>>>>>>>>> out-of-order >>>>>>>>>>>>>>>>>>>> elements are not some exception that occurs once in a >>>> while; >>>>>>>>>>>>> they >>>>>>>>>>>>>>>> occur >>>>>>>>>>>>>>>>>>>> constantly in a distributed system. For example, in >> this: >>>>>>>>>>>>>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27 >> the >>>>>>>>>>>>>>> resulting >>>>>>>>>>>>>>>>>>>> windows >>>>>>>>>>>>>>>>>>>> are completely bogus because the current windowing >> system >>>>>>>>>>>>> assumes >>>>>>>>>>>>>>>>>>> elements >>>>>>>>>>>>>>>>>>>> to globally arrive in order, which is simply not true. >>>> (The >>>>>>>>>>>>>>> example >>>>>>>>>>>>>>>>>> has a >>>>>>>>>>>>>>>>>>>> source that generates increasing integers. Then these >> pass >>>>>>>>>>>>>>> through a >>>>>>>>>>>>>>>>>> map >>>>>>>>>>>>>>>>>>>> and are unioned with the original DataStream before a >>>> window >>>>>>>>>>>>>>>> operator.) >>>>>>>>>>>>>>>>>>>> This simulates elements arriving from different >> operators >>>> at >>>>>>>> a >>>>>>>>>>>>>>>>>> windowing >>>>>>>>>>>>>>>>>>>> operator. The example is also DOP=1, I imagine this to >> get >>>>>>>>>>>>> worse >>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>> higher DOP. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> What do you mean by costly? As I said, I have a >>>>>>>>>>>>> proof-of-concept >>>>>>>>>>>>>>>>>>> windowing >>>>>>>>>>>>>>>>>>>> operator that can handle out-or-order elements. This is >> an >>>>>>>>>>>>> example >>>>>>>>>>>>>>>>>> using >>>>>>>>>>>>>>>>>>>> the current Flink API: >>>>>>>>>>>>>>>>>>>> https://gist.github.com/aljoscha/f8dce0691732e344bbe8. >>>>>>>>>>>>>>>>>>>> (It is an infinite source of tuples and a 5 second >> window >>>>>>>>>>>>> operator >>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>> counts the tuples.) The first problem is that this code >>>>>>>>>>>>> deadlocks >>>>>>>>>>>>>>>>>> because >>>>>>>>>>>>>>>>>>>> of the thread that emits fake elements. If I disable the >>>>>> fake >>>>>>>>>>>>>>>> element >>>>>>>>>>>>>>>>>>> code >>>>>>>>>>>>>>>>>>>> it works, but the throughput using my mockup is 4 times >>>>>>>> higher >>>>>>>>>>>>> . >>>>>>>>>>>>>>> The >>>>>>>>>>>>>>>>>> gap >>>>>>>>>>>>>>>>>>>> widens dramatically if the window size increases. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> So, it actually increases performance (unless I'm >> making a >>>>>>>>>>>>> mistake >>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>> my >>>>>>>>>>>>>>>>>>>> explorations) and can handle elements that arrive >>>>>>>> out-of-order >>>>>>>>>>>>>>>> (which >>>>>>>>>>>>>>>>>>>> happens basically always in a real-world windowing >>>>>>>> use-cases). >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Tue, 23 Jun 2015 at 12:51 Stephan Ewen < >>>> se...@apache.org >>>>>>> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> What I like a lot about Aljoscha's proposed design is >>>> that >>>>>>>> we >>>>>>>>>>>>>>>> need no >>>>>>>>>>>>>>>>>>>>> different code for "system time" vs. "event time". It >>>> only >>>>>>>>>>>>>>>> differs in >>>>>>>>>>>>>>>>>>>> where >>>>>>>>>>>>>>>>>>>>> the timestamps are assigned. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> The OOP approach also gives you the semantics of total >>>>>>>>>>>>> ordering >>>>>>>>>>>>>>>>>> without >>>>>>>>>>>>>>>>>>>>> imposing merges on the streams. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax < >>>>>>>>>>>>>>>>>>>>> mj...@informatik.hu-berlin.de> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I agree that there should be multiple alternatives the >>>>>>>>>>>>> user(!) >>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>>> choose from. Partial out-of-order processing works for >>>>>>>>>>>>>>> many/most >>>>>>>>>>>>>>>>>>>>>> aggregates. However, if you consider >>>>>>>>>>>>> Event-Pattern-Matching, >>>>>>>>>>>>>>>> global >>>>>>>>>>>>>>>>>>>>>> ordering in necessary (even if the performance penalty >>>>>>>>>>>>> might >>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>> high). >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I would also keep "system-time windows" as an >>>> alternative >>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>> "source >>>>>>>>>>>>>>>>>>>>>> assigned ts-windows". >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> It might also be interesting to consider the following >>>>>>>>>>>>> paper >>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>> overlapping windows: "Resource sharing in continuous >>>>>>>>>>>>>>>> sliding-window >>>>>>>>>>>>>>>>>>>>>> aggregates" >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> https://dl.acm.org/citation.cfm?id=1316720 >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On 06/23/2015 10:37 AM, Gyula Fóra wrote: >>>>>>>>>>>>>>>>>>>>>>> Hey >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> I think we should not block PRs unnecessarily if your >>>>>>>>>>>>>>>> suggested >>>>>>>>>>>>>>>>>>>> changes >>>>>>>>>>>>>>>>>>>>>>> might touch them at some point. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Also I still think we should not put everything in >> the >>>>>>>>>>>>>>>> Datastream >>>>>>>>>>>>>>>>>>>>> because >>>>>>>>>>>>>>>>>>>>>>> it will be a huge mess. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Also we need to agree on the out of order processing, >>>>>>>>>>>>>>> whether >>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>> want >>>>>>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>>>>> the way you proposed it(which is quite costly). >> Another >>>>>>>>>>>>>>>>>> alternative >>>>>>>>>>>>>>>>>>>>>>> approach there which fits in the current windowing is >>>> to >>>>>>>>>>>>>>>> filter >>>>>>>>>>>>>>>>>> out >>>>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>>>>> order events and apply a special handling operator on >>>>>>>>>>>>> them. >>>>>>>>>>>>>>>> This >>>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>> fairly lightweight. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> My point is that we need to consider some alternative >>>>>>>>>>>>>>>> solutions. >>>>>>>>>>>>>>>>>>> And >>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>>> should not block contributions along the way. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Cheers >>>>>>>>>>>>>>>>>>>>>>> Gyula >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek < >>>>>>>>>>>>>>>>>>>> aljos...@apache.org> >>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> The reason I posted this now is that we need to >> think >>>>>>>>>>>>> about >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> API >>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>> windowing before proceeding with the PRs of Gabor >>>>>>>>>>>>> (inverse >>>>>>>>>>>>>>>>>> reduce) >>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>> Gyula (removal of "aggregate" functions on >>>> DataStream). >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> For the windowing, I think that the current model >> does >>>>>>>>>>>>> not >>>>>>>>>>>>>>>> work >>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>> out-of-order processing. Therefore, the whole >>>> windowing >>>>>>>>>>>>>>>>>>>> infrastructure >>>>>>>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>>>>>>> basically have to be redone. Meaning also that any >>>> work >>>>>>>>>>>>> on >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>> pre-aggregators or optimizations that we do now >>>> becomes >>>>>>>>>>>>>>>> useless. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> For the API, I proposed to restructure the >>>> interactions >>>>>>>>>>>>>>>> between >>>>>>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>> different *DataStream classes and >> grouping/windowing. >>>>>>>>>>>>> (See >>>>>>>>>>>>>>>> API >>>>>>>>>>>>>>>>>>>> section >>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>> the doc I posted.) >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra < >>>>>>>>>>>>>>> gyula.f...@gmail.com >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Hi Aljoscha, >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the nice summary, this is a very good >>>>>>>>>>>>>>> initiative. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I added some comments to the respective sections >>>>>>>>>>>>> (where I >>>>>>>>>>>>>>>> didnt >>>>>>>>>>>>>>>>>>>> fully >>>>>>>>>>>>>>>>>>>>>>>> agree >>>>>>>>>>>>>>>>>>>>>>>>> :).). >>>>>>>>>>>>>>>>>>>>>>>>> At some point I think it would be good to have a >>>> public >>>>>>>>>>>>>>>> hangout >>>>>>>>>>>>>>>>>>>>> session >>>>>>>>>>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>>>>>>>>> this, which could make a more dynamic discussion. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>>>> Gyula >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Aljoscha Krettek <aljos...@apache.org> ezt írta >>>>>>>>>>>>> (időpont: >>>>>>>>>>>>>>>>>> 2015. >>>>>>>>>>>>>>>>>>>> jún. >>>>>>>>>>>>>>>>>>>>>>>> 22., >>>>>>>>>>>>>>>>>>>>>>>>> H, 21:34): >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>>>>>>> with people proposing changes to the streaming >> part >>>> I >>>>>>>>>>>>>>> also >>>>>>>>>>>>>>>>>>> wanted >>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>> throw >>>>>>>>>>>>>>>>>>>>>>>>>> my hat into the ring. :D >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> During the last few months, while I was getting >>>>>>>>>>>>>>> acquainted >>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>> streaming system, I wrote down some thoughts I had >>>>>>>>>>>>> about >>>>>>>>>>>>>>>> how >>>>>>>>>>>>>>>>>>>> things >>>>>>>>>>>>>>>>>>>>>>>> could >>>>>>>>>>>>>>>>>>>>>>>>>> be improved. Hopefully, they are in somewhat >>>> coherent >>>>>>>>>>>>>>> shape >>>>>>>>>>>>>>>>>> now, >>>>>>>>>>>>>>>>>>>> so >>>>>>>>>>>>>>>>>>>>>>>>> please >>>>>>>>>>>>>>>>>>>>>>>>>> have a look if you are interested in this: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>> >>>> >> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> This mostly covers: >>>>>>>>>>>>>>>>>>>>>>>>>> - Timestamps assigned at sources >>>>>>>>>>>>>>>>>>>>>>>>>> - Out-of-order processing of elements in window >>>>>>>>>>>>>>> operators >>>>>>>>>>>>>>>>>>>>>>>>>> - API design >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Please let me know what you think. Comment in the >>>>>>>>>>>>>>> document >>>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>>>>> here >>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>> mailing list. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> I have a PR in the makings that would introduce >>>> source >>>>>>>>>>>>>>>>>>> timestamps >>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>> watermarks for keeping track of them. I also >> hacked >>>> a >>>>>>>>>>>>>>>>>>>>> proof-of-concept >>>>>>>>>>>>>>>>>>>>>>>>> of a >>>>>>>>>>>>>>>>>>>>>>>>>> windowing system that is able to process >>>> out-of-order >>>>>>>>>>>>>>>> elements >>>>>>>>>>>>>>>>>>>>> using a >>>>>>>>>>>>>>>>>>>>>>>>>> FlatMap operator. (It uses panes to perform >>>> efficient >>>>>>>>>>>>>>>>>>>>>>>> pre-aggregations.) >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>>>>> Aljoscha >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature