Here is the state in Flink and why we have chosen not to do global ordering
at the moment:

 - Individual streams are FIFO, that means if the sender emits in order,
the receiver receives in order.

 - When streams are merged (think shuffle / partition-by), then the streams
are not merged, but buffers from the streams are taken as the come in.

  - We had a version that merged streams (for sort merging in batch
programs, actually) long ago, an it performed either horribly or
deadlocked. The reason is that all streams are always stalled if a buffer
is missing from one stream, since the merge cannot continue in such a case
(head-of-the-line waiting). That backpressures streams unnecessarily,
slowing down computation. If the streams depend mutually on each other
(think two partitioning steps), they frequently dadlock completely.

  - The only way to do that is by stalling/buffering/punctuating streams
continuously, which is a lot of work to implement and will definitely cost
performance.

Therefore we have decided going for a simpler model without global ordering
for now. If we start seeing that this has sever limitations in practice, we
may reconsider that.



On Wed, Apr 8, 2015 at 11:25 AM, Bruno Cadonna <
cado...@informatik.hu-berlin.de> wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA1
>
> Hi Paris,
>
> what's the reason for not guaranteeing global ordering across partitions
> in the stream model? Is it the smaller overhead or are there any
> operations not computable in a distributed environment with global
> ordering?
>
> In any case, I agree with Matthias that the user should choose. If
> operations were not computable with a global ordering, I would
> restrict the set of operations for that mode.
>
> Maybe, it would also be helpful to collect use cases for each of the
> modes proposed by Matthias to understand the requirements for both modes.
>
> Some (researchy) thoughts about indeterminism: How can the
> indeterminism of the current setting be quantified? How "large" can it
> grow with the current setting? Are there any limits that can be
> guaranteed?
>
> Cheers,
> Bruno
>
> On 07.04.2015 12:38, Paris Carbone wrote:
> > Hello Matthias,
> >
> > Sure, ordering guarantees are indeed a tricky thing, I recall
> > having that discussion back in TU Berlin. Bear in mind thought that
> > DataStream, our abstract data type, represents a *partitioned*
> > unbounded sequence of events. There are no *global* ordering
> > guarantees made whatsoever in that model across partitions. If you
> > see it more generally there are many “race conditions” in a
> > distributed execution graph of vertices that process multiple
> > inputs asynchronously, especially when you add joins and iterations
> > into the mix (how do you deal with reprocessing “old” tuples that
> > iterate in the graph). Btw have you checked the Naiad paper [1]?
> > Stephan cited a while ago and it is quite relevant to that
> > discussion.
> >
> > Also, can you cite the paper with the joining semantics you are
> > referring to? That would be of good help I think.
> >
> > Paris
> >
> > [1] https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf
> >
> > <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf>
> >
> > <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> On 07
> > Apr 2015, at 11:50, Matthias J. Sax
> > <mj...@informatik.hu-berlin.de<mailto:mj...@informatik.hu-berlin.de>>
> > wrote:
> >
> > Hi @all,
> >
> > please keep me in the loop for this work. I am highly interested
> > and I want to help on it.
> >
> > My initial thoughts are as follows:
> >
> > 1) Currently, system timestamps are used and the suggested approach
> > can be seen as state-of-the-art (there is actually a research paper
> > using the exact same join semantic). Of course, the current
> > approach is inherently non-deterministic. The advantage is, that
> > there is no overhead in keeping track of the order of records and
> > the latency should be very low. (Additionally, state-recovery is
> > simplified. Because, the processing in inherently
> > non-deterministic, recovery can be done with relaxed guarantees).
> >
> > 2) The user should be able to "switch on" deterministic
> > processing, ie, records are timestamped (either externally when
> > generated, or timestamped at the sources). Because deterministic
> > processing adds some overhead, the user should decide for it
> > actively. In this case, the order must be preserved in each
> > re-distribution step (merging is sufficient, if order is preserved
> > within each incoming channel). Furthermore, deterministic
> > processing can be achieved by sound window semantics (and there is
> > a bunch of them). Even for single-stream-windows it's a tricky
> > problem; for join-windows it's even harder. From my point of view,
> > it is less important which semantics are chosen; however, the user
> > must be aware how it works. The most tricky part for deterministic
> > processing, is to deal with duplicate timestamps (which cannot be
> > avoided). The timestamping for (intermediate) result tuples, is
> > also an important question to be answered.
> >
> >
> > -Matthias
> >
> >
> > On 04/07/2015 11:37 AM, Gyula Fóra wrote: Hey,
> >
> > I agree with Kostas, if we define the exact semantics how this
> > works, this is not more ad-hoc than any other stateful operator
> > with multiple inputs. (And I don't think any other system support
> > something similar)
> >
> > We need to make some design choices that are similar to the issues
> > we had for windowing. We need to chose how we want to evaluate the
> > windowing policies (global or local) because that affects what kind
> > of policies can be parallel, but I can work on these things.
> >
> > I think this is an amazing feature, so I wouldn't necessarily rush
> > the implementation for 0.9 though.
> >
> > And thanks for helping writing these down.
> >
> > Gyula
> >
> > On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas
> > <ktzou...@apache.org<mailto:ktzou...@apache.org>> wrote:
> >
> > Yes, we should write these semantics down. I volunteer to help.
> >
> > I don't think that this is very ad-hoc. The semantics are basically
> > the following. Assuming an arriving element from the left side: (1)
> > We find the right-side matches (2) We insert the left-side arrival
> > into the left window (3) We recompute the left window We need to
> > see whether right window re-computation needs to be triggered as
> > well. I think that this way of joining streams is also what the
> > symmetric hash join algorithms were meant to support.
> >
> > Kostas
> >
> >
> > On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen
> > <se...@apache.org<mailto:se...@apache.org>> wrote:
> >
> > Is the approach of joining an element at a time from one input
> > against a window on the other input not a bit arbitrary?
> >
> > This just joins whatever currently happens to be the window by the
> > time the single element arrives - that is a bit non-predictable,
> > right?
> >
> > As a more general point: The whole semantics of windowing and when
> > they are triggered are a bit ad-hoc now. It would be really good to
> > start formalizing that a bit and put it down somewhere. Users need
> > to be able to clearly understand and how to predict the output.
> >
> >
> >
> > On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra
> > <gyula.f...@gmail.com<mailto:gyula.f...@gmail.com>> wrote:
> >
> > I think it should be possible to make this compatible with the
> > .window().every() calls. Maybe if there is some trigger set in
> > "every" we would not join that stream 1 by 1 but every so many
> > elements. The problem here is that the window and every in this
> > case are very-very different than the normal windowing semantics.
> > The window would define the join window for each element of the
> > other stream while every would define how often I join This stream
> > with the other one.
> >
> > We need to think to make this intuitive.
> >
> > On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi <
> > balassi.mar...@gmail.com<mailto:balassi.mar...@gmail.com>> wrote:
> >
> > That would be really neat, the problem I see there, that we do not
> > distinguish between dataStream.window() and
> > dataStream.window().every() currently, they both return
> > WindowedDataStreams and TriggerPolicies of the every call do not
> > make much sense in this setting (in fact practically the trigger is
> > always set to count of one).
> >
> > But of course we could make it in a way, that we check that the
> > eviction should be either null or count of 1, in every other case
> > we throw an exception while building the JobGraph.
> >
> > On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek <
> > aljos...@apache.org<mailto:aljos...@apache.org>> wrote:
> >
> > Or you could define it like this:
> >
> > stream_A = a.window(...) stream_B = b.window(...)
> >
> > stream_A.join(stream_B).where().equals().with()
> >
> > So a join would just be a join of two WindowedDataStreamS. This
> > would neatly move the windowing stuff into one place.
> >
> > On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <
> > balassi.mar...@gmail.com<mailto:balassi.mar...@gmail.com>
> >
> > wrote: Big +1 for the proposal for Peter and Gyula. I'm really for
> > bringing the windowing and window join API in sync.
> >
> > On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra
> > <gyf...@apache.org<mailto:gyf...@apache.org>> wrote:
> >
> > Hey guys,
> >
> > As Aljoscha has highlighted earlier the current window join
> > semantics in the streaming api doesn't follow the changes in the
> > windowing api. More precisely, we currently only support joins over
> > time windows of equal size on both streams. The reason for this is
> > that we now take a window of each of the two streams and do joins
> > over these pairs. This would be a blocking operation if the windows
> > are not closed at exactly the same time (and since we dont want
> > this we only allow time windows)
> >
> > I talked with Peter who came up with the initial idea of an
> > alternative approach for stream joins which works as follows:
> >
> > Instead of pairing windows for joins, we do element against window
> > joins. What this means is that whenever we receive an element from
> > one of the streams, we join this element with the current
> > window(this window is constantly updated) of the other stream. This
> > is non-blocking on any window definitions as we dont have to wait
> > for windows to be completed and we can use this with any of our
> > predefined policies like Time.of(...), Count.of(...),
> > Delta.of(....).
> >
> > Additionally this also allows some very flexible way of defining
> > window joins. With this we could also define grouped windowing
> > inside if a join. An example of this would be: Join all elements of
> > Stream1 with the last 5 elements by a given windowkey of Stream2 on
> > some join key.
> >
> > This feature can be easily implemented over the current operators,
> > so I already have a working prototype for the simple non-grouped
> > case. My only concern is the API, the best thing I could come up
> > with is something like this:
> >
> > stream_A.join(stream_B).onWindow(windowDefA,
> > windowDefB).by(windowKey1,
> > windowKey2).where(...).equalTo(...).with(...)
> >
> > (the user can omit the "by" and "with" calls)
> >
> > I think this new approach would be worthy of our "flexible
> > windowing" in contrast with the current approach.
> >
> > Regards, Gyula
> >
> >
> >
> >
> >
> >
> >
> >
> >
>
> - --
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
>
>   Dr. Bruno Cadonna
>   Postdoctoral Researcher
>
>   Databases and Information Systems
>   Department of Computer Science
>   Humboldt-Universität zu Berlin
>
>   http://www.informatik.hu-berlin.de/~cadonnab
>
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
>
> -----BEGIN PGP SIGNATURE-----
> Version: GnuPG v1.4.11 (GNU/Linux)
>
> iQEcBAEBAgAGBQJVJPQZAAoJEKdCIJx7flKwEswH/1FJXdNZBdy2Gbr5CHbNK+wJ
> EhPvFEGvSa6hE6dXruk1ZofqdTHt53xtivZtY1rzZwp+uucw7Diy9eyRsShVCLA5
> 18V5xqvcyTINqUEU48gcw2amQpC49GsE7H4gZVGAIHBHcmuRDH6nJO/Ng0aO5riV
> lmEZbdqNH3GyNGADW5gIOupptWloFqEJSAB2GZb9/Q8LG/bdnZhgXp4rAEfVDIgq
> b2Y0N+o6bC3VCxQa5kdeKemTRizpXDqzlGtExemGo4lwjZBtnmVw4i1TAcCTyuFi
> 5enUGvMzgMx7Olg/4vZs8L1yVhKJ45W5Aeypk9oyn/f8V70DP0Q1MPg2CdHzxFE=
> =C9Mj
> -----END PGP SIGNATURE-----
>

Reply via email to