This reasoning makes absolutely sense. That's why I suggested, that the
user should actively choose ordered data processing...

About deadlocks: Those can be avoided, if the buffers are consumed
continuously in an in-memory merge buffer (maybe with spilling to disk
if necessary). Of course, latency might suffer and punctuations should
be used to relax this problem.

As far as I understood, CEP might be an interesting use-case for Flink,
and it depend on ordered data streams.


-Matthias



On 04/08/2015 11:50 AM, Stephan Ewen wrote:
> Here is the state in Flink and why we have chosen not to do global ordering
> at the moment:
> 
>  - Individual streams are FIFO, that means if the sender emits in order,
> the receiver receives in order.
> 
>  - When streams are merged (think shuffle / partition-by), then the streams
> are not merged, but buffers from the streams are taken as the come in.
> 
>   - We had a version that merged streams (for sort merging in batch
> programs, actually) long ago, an it performed either horribly or
> deadlocked. The reason is that all streams are always stalled if a buffer
> is missing from one stream, since the merge cannot continue in such a case
> (head-of-the-line waiting). That backpressures streams unnecessarily,
> slowing down computation. If the streams depend mutually on each other
> (think two partitioning steps), they frequently dadlock completely.
> 
>   - The only way to do that is by stalling/buffering/punctuating streams
> continuously, which is a lot of work to implement and will definitely cost
> performance.
> 
> Therefore we have decided going for a simpler model without global ordering
> for now. If we start seeing that this has sever limitations in practice, we
> may reconsider that.
> 
> 
> 
> On Wed, Apr 8, 2015 at 11:25 AM, Bruno Cadonna <
> cado...@informatik.hu-berlin.de> wrote:
> 
> Hi Paris,
> 
> what's the reason for not guaranteeing global ordering across partitions
> in the stream model? Is it the smaller overhead or are there any
> operations not computable in a distributed environment with global
> ordering?
> 
> In any case, I agree with Matthias that the user should choose. If
> operations were not computable with a global ordering, I would
> restrict the set of operations for that mode.
> 
> Maybe, it would also be helpful to collect use cases for each of the
> modes proposed by Matthias to understand the requirements for both modes.
> 
> Some (researchy) thoughts about indeterminism: How can the
> indeterminism of the current setting be quantified? How "large" can it
> grow with the current setting? Are there any limits that can be
> guaranteed?
> 
> Cheers,
> Bruno
> 
> On 07.04.2015 12:38, Paris Carbone wrote:
>>>> Hello Matthias,
>>>>
>>>> Sure, ordering guarantees are indeed a tricky thing, I recall
>>>> having that discussion back in TU Berlin. Bear in mind thought that
>>>> DataStream, our abstract data type, represents a *partitioned*
>>>> unbounded sequence of events. There are no *global* ordering
>>>> guarantees made whatsoever in that model across partitions. If you
>>>> see it more generally there are many “race conditions” in a
>>>> distributed execution graph of vertices that process multiple
>>>> inputs asynchronously, especially when you add joins and iterations
>>>> into the mix (how do you deal with reprocessing “old” tuples that
>>>> iterate in the graph). Btw have you checked the Naiad paper [1]?
>>>> Stephan cited a while ago and it is quite relevant to that
>>>> discussion.
>>>>
>>>> Also, can you cite the paper with the joining semantics you are
>>>> referring to? That would be of good help I think.
>>>>
>>>> Paris
>>>>
>>>> [1] https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf
>>>>
>>>> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf>
>>>>
>>>> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> On 07
>>>> Apr 2015, at 11:50, Matthias J. Sax
>>>> <mj...@informatik.hu-berlin.de<mailto:mj...@informatik.hu-berlin.de>>
>>>> wrote:
>>>>
>>>> Hi @all,
>>>>
>>>> please keep me in the loop for this work. I am highly interested
>>>> and I want to help on it.
>>>>
>>>> My initial thoughts are as follows:
>>>>
>>>> 1) Currently, system timestamps are used and the suggested approach
>>>> can be seen as state-of-the-art (there is actually a research paper
>>>> using the exact same join semantic). Of course, the current
>>>> approach is inherently non-deterministic. The advantage is, that
>>>> there is no overhead in keeping track of the order of records and
>>>> the latency should be very low. (Additionally, state-recovery is
>>>> simplified. Because, the processing in inherently
>>>> non-deterministic, recovery can be done with relaxed guarantees).
>>>>
>>>> 2) The user should be able to "switch on" deterministic
>>>> processing, ie, records are timestamped (either externally when
>>>> generated, or timestamped at the sources). Because deterministic
>>>> processing adds some overhead, the user should decide for it
>>>> actively. In this case, the order must be preserved in each
>>>> re-distribution step (merging is sufficient, if order is preserved
>>>> within each incoming channel). Furthermore, deterministic
>>>> processing can be achieved by sound window semantics (and there is
>>>> a bunch of them). Even for single-stream-windows it's a tricky
>>>> problem; for join-windows it's even harder. From my point of view,
>>>> it is less important which semantics are chosen; however, the user
>>>> must be aware how it works. The most tricky part for deterministic
>>>> processing, is to deal with duplicate timestamps (which cannot be
>>>> avoided). The timestamping for (intermediate) result tuples, is
>>>> also an important question to be answered.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 04/07/2015 11:37 AM, Gyula Fóra wrote: Hey,
>>>>
>>>> I agree with Kostas, if we define the exact semantics how this
>>>> works, this is not more ad-hoc than any other stateful operator
>>>> with multiple inputs. (And I don't think any other system support
>>>> something similar)
>>>>
>>>> We need to make some design choices that are similar to the issues
>>>> we had for windowing. We need to chose how we want to evaluate the
>>>> windowing policies (global or local) because that affects what kind
>>>> of policies can be parallel, but I can work on these things.
>>>>
>>>> I think this is an amazing feature, so I wouldn't necessarily rush
>>>> the implementation for 0.9 though.
>>>>
>>>> And thanks for helping writing these down.
>>>>
>>>> Gyula
>>>>
>>>> On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas
>>>> <ktzou...@apache.org<mailto:ktzou...@apache.org>> wrote:
>>>>
>>>> Yes, we should write these semantics down. I volunteer to help.
>>>>
>>>> I don't think that this is very ad-hoc. The semantics are basically
>>>> the following. Assuming an arriving element from the left side: (1)
>>>> We find the right-side matches (2) We insert the left-side arrival
>>>> into the left window (3) We recompute the left window We need to
>>>> see whether right window re-computation needs to be triggered as
>>>> well. I think that this way of joining streams is also what the
>>>> symmetric hash join algorithms were meant to support.
>>>>
>>>> Kostas
>>>>
>>>>
>>>> On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen
>>>> <se...@apache.org<mailto:se...@apache.org>> wrote:
>>>>
>>>> Is the approach of joining an element at a time from one input
>>>> against a window on the other input not a bit arbitrary?
>>>>
>>>> This just joins whatever currently happens to be the window by the
>>>> time the single element arrives - that is a bit non-predictable,
>>>> right?
>>>>
>>>> As a more general point: The whole semantics of windowing and when
>>>> they are triggered are a bit ad-hoc now. It would be really good to
>>>> start formalizing that a bit and put it down somewhere. Users need
>>>> to be able to clearly understand and how to predict the output.
>>>>
>>>>
>>>>
>>>> On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra
>>>> <gyula.f...@gmail.com<mailto:gyula.f...@gmail.com>> wrote:
>>>>
>>>> I think it should be possible to make this compatible with the
>>>> .window().every() calls. Maybe if there is some trigger set in
>>>> "every" we would not join that stream 1 by 1 but every so many
>>>> elements. The problem here is that the window and every in this
>>>> case are very-very different than the normal windowing semantics.
>>>> The window would define the join window for each element of the
>>>> other stream while every would define how often I join This stream
>>>> with the other one.
>>>>
>>>> We need to think to make this intuitive.
>>>>
>>>> On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi <
>>>> balassi.mar...@gmail.com<mailto:balassi.mar...@gmail.com>> wrote:
>>>>
>>>> That would be really neat, the problem I see there, that we do not
>>>> distinguish between dataStream.window() and
>>>> dataStream.window().every() currently, they both return
>>>> WindowedDataStreams and TriggerPolicies of the every call do not
>>>> make much sense in this setting (in fact practically the trigger is
>>>> always set to count of one).
>>>>
>>>> But of course we could make it in a way, that we check that the
>>>> eviction should be either null or count of 1, in every other case
>>>> we throw an exception while building the JobGraph.
>>>>
>>>> On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek <
>>>> aljos...@apache.org<mailto:aljos...@apache.org>> wrote:
>>>>
>>>> Or you could define it like this:
>>>>
>>>> stream_A = a.window(...) stream_B = b.window(...)
>>>>
>>>> stream_A.join(stream_B).where().equals().with()
>>>>
>>>> So a join would just be a join of two WindowedDataStreamS. This
>>>> would neatly move the windowing stuff into one place.
>>>>
>>>> On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <
>>>> balassi.mar...@gmail.com<mailto:balassi.mar...@gmail.com>
>>>>
>>>> wrote: Big +1 for the proposal for Peter and Gyula. I'm really for
>>>> bringing the windowing and window join API in sync.
>>>>
>>>> On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra
>>>> <gyf...@apache.org<mailto:gyf...@apache.org>> wrote:
>>>>
>>>> Hey guys,
>>>>
>>>> As Aljoscha has highlighted earlier the current window join
>>>> semantics in the streaming api doesn't follow the changes in the
>>>> windowing api. More precisely, we currently only support joins over
>>>> time windows of equal size on both streams. The reason for this is
>>>> that we now take a window of each of the two streams and do joins
>>>> over these pairs. This would be a blocking operation if the windows
>>>> are not closed at exactly the same time (and since we dont want
>>>> this we only allow time windows)
>>>>
>>>> I talked with Peter who came up with the initial idea of an
>>>> alternative approach for stream joins which works as follows:
>>>>
>>>> Instead of pairing windows for joins, we do element against window
>>>> joins. What this means is that whenever we receive an element from
>>>> one of the streams, we join this element with the current
>>>> window(this window is constantly updated) of the other stream. This
>>>> is non-blocking on any window definitions as we dont have to wait
>>>> for windows to be completed and we can use this with any of our
>>>> predefined policies like Time.of(...), Count.of(...),
>>>> Delta.of(....).
>>>>
>>>> Additionally this also allows some very flexible way of defining
>>>> window joins. With this we could also define grouped windowing
>>>> inside if a join. An example of this would be: Join all elements of
>>>> Stream1 with the last 5 elements by a given windowkey of Stream2 on
>>>> some join key.
>>>>
>>>> This feature can be easily implemented over the current operators,
>>>> so I already have a working prototype for the simple non-grouped
>>>> case. My only concern is the API, the best thing I could come up
>>>> with is something like this:
>>>>
>>>> stream_A.join(stream_B).onWindow(windowDefA,
>>>> windowDefB).by(windowKey1,
>>>> windowKey2).where(...).equalTo(...).with(...)
>>>>
>>>> (the user can omit the "by" and "with" calls)
>>>>
>>>> I think this new approach would be worthy of our "flexible
>>>> windowing" in contrast with the current approach.
>>>>
>>>> Regards, Gyula
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
> 
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to