Here is the state in Flink and why we have chosen not to do global ordering at the moment:
- Individual streams are FIFO, that means if the sender emits in order, the receiver receives in order. - When streams are merged (think shuffle / partition-by), then the streams are not merged, but buffers from the streams are taken as the come in. - We had a version that merged streams (for sort merging in batch programs, actually) long ago, an it performed either horribly or deadlocked. The reason is that all streams are always stalled if a buffer is missing from one stream, since the merge cannot continue in such a case (head-of-the-line waiting). That backpressures streams unnecessarily, slowing down computation. If the streams depend mutually on each other (think two partitioning steps), they frequently dadlock completely. - The only way to do that is by stalling/buffering/punctuating streams continuously, which is a lot of work to implement and will definitely cost performance. Therefore we have decided going for a simpler model without global ordering for now. If we start seeing that this has sever limitations in practice, we may reconsider that. On Wed, Apr 8, 2015 at 11:25 AM, Bruno Cadonna < cado...@informatik.hu-berlin.de> wrote: > -----BEGIN PGP SIGNED MESSAGE----- > Hash: SHA1 > > Hi Paris, > > what's the reason for not guaranteeing global ordering across partitions > in the stream model? Is it the smaller overhead or are there any > operations not computable in a distributed environment with global > ordering? > > In any case, I agree with Matthias that the user should choose. If > operations were not computable with a global ordering, I would > restrict the set of operations for that mode. > > Maybe, it would also be helpful to collect use cases for each of the > modes proposed by Matthias to understand the requirements for both modes. > > Some (researchy) thoughts about indeterminism: How can the > indeterminism of the current setting be quantified? How "large" can it > grow with the current setting? Are there any limits that can be > guaranteed? > > Cheers, > Bruno > > On 07.04.2015 12:38, Paris Carbone wrote: > > Hello Matthias, > > > > Sure, ordering guarantees are indeed a tricky thing, I recall > > having that discussion back in TU Berlin. Bear in mind thought that > > DataStream, our abstract data type, represents a *partitioned* > > unbounded sequence of events. There are no *global* ordering > > guarantees made whatsoever in that model across partitions. If you > > see it more generally there are many “race conditions” in a > > distributed execution graph of vertices that process multiple > > inputs asynchronously, especially when you add joins and iterations > > into the mix (how do you deal with reprocessing “old” tuples that > > iterate in the graph). Btw have you checked the Naiad paper [1]? > > Stephan cited a while ago and it is quite relevant to that > > discussion. > > > > Also, can you cite the paper with the joining semantics you are > > referring to? That would be of good help I think. > > > > Paris > > > > [1] https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf > > > > <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> > > > > <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> On 07 > > Apr 2015, at 11:50, Matthias J. Sax > > <mj...@informatik.hu-berlin.de<mailto:mj...@informatik.hu-berlin.de>> > > wrote: > > > > Hi @all, > > > > please keep me in the loop for this work. I am highly interested > > and I want to help on it. > > > > My initial thoughts are as follows: > > > > 1) Currently, system timestamps are used and the suggested approach > > can be seen as state-of-the-art (there is actually a research paper > > using the exact same join semantic). Of course, the current > > approach is inherently non-deterministic. The advantage is, that > > there is no overhead in keeping track of the order of records and > > the latency should be very low. (Additionally, state-recovery is > > simplified. Because, the processing in inherently > > non-deterministic, recovery can be done with relaxed guarantees). > > > > 2) The user should be able to "switch on" deterministic > > processing, ie, records are timestamped (either externally when > > generated, or timestamped at the sources). Because deterministic > > processing adds some overhead, the user should decide for it > > actively. In this case, the order must be preserved in each > > re-distribution step (merging is sufficient, if order is preserved > > within each incoming channel). Furthermore, deterministic > > processing can be achieved by sound window semantics (and there is > > a bunch of them). Even for single-stream-windows it's a tricky > > problem; for join-windows it's even harder. From my point of view, > > it is less important which semantics are chosen; however, the user > > must be aware how it works. The most tricky part for deterministic > > processing, is to deal with duplicate timestamps (which cannot be > > avoided). The timestamping for (intermediate) result tuples, is > > also an important question to be answered. > > > > > > -Matthias > > > > > > On 04/07/2015 11:37 AM, Gyula Fóra wrote: Hey, > > > > I agree with Kostas, if we define the exact semantics how this > > works, this is not more ad-hoc than any other stateful operator > > with multiple inputs. (And I don't think any other system support > > something similar) > > > > We need to make some design choices that are similar to the issues > > we had for windowing. We need to chose how we want to evaluate the > > windowing policies (global or local) because that affects what kind > > of policies can be parallel, but I can work on these things. > > > > I think this is an amazing feature, so I wouldn't necessarily rush > > the implementation for 0.9 though. > > > > And thanks for helping writing these down. > > > > Gyula > > > > On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas > > <ktzou...@apache.org<mailto:ktzou...@apache.org>> wrote: > > > > Yes, we should write these semantics down. I volunteer to help. > > > > I don't think that this is very ad-hoc. The semantics are basically > > the following. Assuming an arriving element from the left side: (1) > > We find the right-side matches (2) We insert the left-side arrival > > into the left window (3) We recompute the left window We need to > > see whether right window re-computation needs to be triggered as > > well. I think that this way of joining streams is also what the > > symmetric hash join algorithms were meant to support. > > > > Kostas > > > > > > On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen > > <se...@apache.org<mailto:se...@apache.org>> wrote: > > > > Is the approach of joining an element at a time from one input > > against a window on the other input not a bit arbitrary? > > > > This just joins whatever currently happens to be the window by the > > time the single element arrives - that is a bit non-predictable, > > right? > > > > As a more general point: The whole semantics of windowing and when > > they are triggered are a bit ad-hoc now. It would be really good to > > start formalizing that a bit and put it down somewhere. Users need > > to be able to clearly understand and how to predict the output. > > > > > > > > On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra > > <gyula.f...@gmail.com<mailto:gyula.f...@gmail.com>> wrote: > > > > I think it should be possible to make this compatible with the > > .window().every() calls. Maybe if there is some trigger set in > > "every" we would not join that stream 1 by 1 but every so many > > elements. The problem here is that the window and every in this > > case are very-very different than the normal windowing semantics. > > The window would define the join window for each element of the > > other stream while every would define how often I join This stream > > with the other one. > > > > We need to think to make this intuitive. > > > > On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi < > > balassi.mar...@gmail.com<mailto:balassi.mar...@gmail.com>> wrote: > > > > That would be really neat, the problem I see there, that we do not > > distinguish between dataStream.window() and > > dataStream.window().every() currently, they both return > > WindowedDataStreams and TriggerPolicies of the every call do not > > make much sense in this setting (in fact practically the trigger is > > always set to count of one). > > > > But of course we could make it in a way, that we check that the > > eviction should be either null or count of 1, in every other case > > we throw an exception while building the JobGraph. > > > > On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek < > > aljos...@apache.org<mailto:aljos...@apache.org>> wrote: > > > > Or you could define it like this: > > > > stream_A = a.window(...) stream_B = b.window(...) > > > > stream_A.join(stream_B).where().equals().with() > > > > So a join would just be a join of two WindowedDataStreamS. This > > would neatly move the windowing stuff into one place. > > > > On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi < > > balassi.mar...@gmail.com<mailto:balassi.mar...@gmail.com> > > > > wrote: Big +1 for the proposal for Peter and Gyula. I'm really for > > bringing the windowing and window join API in sync. > > > > On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra > > <gyf...@apache.org<mailto:gyf...@apache.org>> wrote: > > > > Hey guys, > > > > As Aljoscha has highlighted earlier the current window join > > semantics in the streaming api doesn't follow the changes in the > > windowing api. More precisely, we currently only support joins over > > time windows of equal size on both streams. The reason for this is > > that we now take a window of each of the two streams and do joins > > over these pairs. This would be a blocking operation if the windows > > are not closed at exactly the same time (and since we dont want > > this we only allow time windows) > > > > I talked with Peter who came up with the initial idea of an > > alternative approach for stream joins which works as follows: > > > > Instead of pairing windows for joins, we do element against window > > joins. What this means is that whenever we receive an element from > > one of the streams, we join this element with the current > > window(this window is constantly updated) of the other stream. This > > is non-blocking on any window definitions as we dont have to wait > > for windows to be completed and we can use this with any of our > > predefined policies like Time.of(...), Count.of(...), > > Delta.of(....). > > > > Additionally this also allows some very flexible way of defining > > window joins. With this we could also define grouped windowing > > inside if a join. An example of this would be: Join all elements of > > Stream1 with the last 5 elements by a given windowkey of Stream2 on > > some join key. > > > > This feature can be easily implemented over the current operators, > > so I already have a working prototype for the simple non-grouped > > case. My only concern is the API, the best thing I could come up > > with is something like this: > > > > stream_A.join(stream_B).onWindow(windowDefA, > > windowDefB).by(windowKey1, > > windowKey2).where(...).equalTo(...).with(...) > > > > (the user can omit the "by" and "with" calls) > > > > I think this new approach would be worthy of our "flexible > > windowing" in contrast with the current approach. > > > > Regards, Gyula > > > > > > > > > > > > > > > > > > > > - -- > ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ > > Dr. Bruno Cadonna > Postdoctoral Researcher > > Databases and Information Systems > Department of Computer Science > Humboldt-Universität zu Berlin > > http://www.informatik.hu-berlin.de/~cadonnab > > ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ > > -----BEGIN PGP SIGNATURE----- > Version: GnuPG v1.4.11 (GNU/Linux) > > iQEcBAEBAgAGBQJVJPQZAAoJEKdCIJx7flKwEswH/1FJXdNZBdy2Gbr5CHbNK+wJ > EhPvFEGvSa6hE6dXruk1ZofqdTHt53xtivZtY1rzZwp+uucw7Diy9eyRsShVCLA5 > 18V5xqvcyTINqUEU48gcw2amQpC49GsE7H4gZVGAIHBHcmuRDH6nJO/Ng0aO5riV > lmEZbdqNH3GyNGADW5gIOupptWloFqEJSAB2GZb9/Q8LG/bdnZhgXp4rAEfVDIgq > b2Y0N+o6bC3VCxQa5kdeKemTRizpXDqzlGtExemGo4lwjZBtnmVw4i1TAcCTyuFi > 5enUGvMzgMx7Olg/4vZs8L1yVhKJ45W5Aeypk9oyn/f8V70DP0Q1MPg2CdHzxFE= > =C9Mj > -----END PGP SIGNATURE----- >