I agree, any ordering guarantees would need to be actively enabled.

How much of CEP depends on fully ordered streams? There is a lot you can do
with windows on event time, which are triggered by punctuations.

This is like a "soft" variant of the ordered streams, where order relation
occurs only with between windows, rather than between all events. That
makes it much cheaper to maintain.

On Wed, Apr 8, 2015 at 1:19 PM, Matthias J. Sax <
mj...@informatik.hu-berlin.de> wrote:

> This reasoning makes absolutely sense. That's why I suggested, that the
> user should actively choose ordered data processing...
>
> About deadlocks: Those can be avoided, if the buffers are consumed
> continuously in an in-memory merge buffer (maybe with spilling to disk
> if necessary). Of course, latency might suffer and punctuations should
> be used to relax this problem.
>
> As far as I understood, CEP might be an interesting use-case for Flink,
> and it depend on ordered data streams.
>
>
> -Matthias
>
>
>
> On 04/08/2015 11:50 AM, Stephan Ewen wrote:
> > Here is the state in Flink and why we have chosen not to do global
> ordering
> > at the moment:
> >
> >  - Individual streams are FIFO, that means if the sender emits in order,
> > the receiver receives in order.
> >
> >  - When streams are merged (think shuffle / partition-by), then the
> streams
> > are not merged, but buffers from the streams are taken as the come in.
> >
> >   - We had a version that merged streams (for sort merging in batch
> > programs, actually) long ago, an it performed either horribly or
> > deadlocked. The reason is that all streams are always stalled if a buffer
> > is missing from one stream, since the merge cannot continue in such a
> case
> > (head-of-the-line waiting). That backpressures streams unnecessarily,
> > slowing down computation. If the streams depend mutually on each other
> > (think two partitioning steps), they frequently dadlock completely.
> >
> >   - The only way to do that is by stalling/buffering/punctuating streams
> > continuously, which is a lot of work to implement and will definitely
> cost
> > performance.
> >
> > Therefore we have decided going for a simpler model without global
> ordering
> > for now. If we start seeing that this has sever limitations in practice,
> we
> > may reconsider that.
> >
> >
> >
> > On Wed, Apr 8, 2015 at 11:25 AM, Bruno Cadonna <
> > cado...@informatik.hu-berlin.de> wrote:
> >
> > Hi Paris,
> >
> > what's the reason for not guaranteeing global ordering across partitions
> > in the stream model? Is it the smaller overhead or are there any
> > operations not computable in a distributed environment with global
> > ordering?
> >
> > In any case, I agree with Matthias that the user should choose. If
> > operations were not computable with a global ordering, I would
> > restrict the set of operations for that mode.
> >
> > Maybe, it would also be helpful to collect use cases for each of the
> > modes proposed by Matthias to understand the requirements for both modes.
> >
> > Some (researchy) thoughts about indeterminism: How can the
> > indeterminism of the current setting be quantified? How "large" can it
> > grow with the current setting? Are there any limits that can be
> > guaranteed?
> >
> > Cheers,
> > Bruno
> >
> > On 07.04.2015 12:38, Paris Carbone wrote:
> >>>> Hello Matthias,
> >>>>
> >>>> Sure, ordering guarantees are indeed a tricky thing, I recall
> >>>> having that discussion back in TU Berlin. Bear in mind thought that
> >>>> DataStream, our abstract data type, represents a *partitioned*
> >>>> unbounded sequence of events. There are no *global* ordering
> >>>> guarantees made whatsoever in that model across partitions. If you
> >>>> see it more generally there are many “race conditions” in a
> >>>> distributed execution graph of vertices that process multiple
> >>>> inputs asynchronously, especially when you add joins and iterations
> >>>> into the mix (how do you deal with reprocessing “old” tuples that
> >>>> iterate in the graph). Btw have you checked the Naiad paper [1]?
> >>>> Stephan cited a while ago and it is quite relevant to that
> >>>> discussion.
> >>>>
> >>>> Also, can you cite the paper with the joining semantics you are
> >>>> referring to? That would be of good help I think.
> >>>>
> >>>> Paris
> >>>>
> >>>> [1] https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf
> >>>>
> >>>> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf>
> >>>>
> >>>> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> On 07
> >>>> Apr 2015, at 11:50, Matthias J. Sax
> >>>> <mj...@informatik.hu-berlin.de<mailto:mj...@informatik.hu-berlin.de>>
> >>>> wrote:
> >>>>
> >>>> Hi @all,
> >>>>
> >>>> please keep me in the loop for this work. I am highly interested
> >>>> and I want to help on it.
> >>>>
> >>>> My initial thoughts are as follows:
> >>>>
> >>>> 1) Currently, system timestamps are used and the suggested approach
> >>>> can be seen as state-of-the-art (there is actually a research paper
> >>>> using the exact same join semantic). Of course, the current
> >>>> approach is inherently non-deterministic. The advantage is, that
> >>>> there is no overhead in keeping track of the order of records and
> >>>> the latency should be very low. (Additionally, state-recovery is
> >>>> simplified. Because, the processing in inherently
> >>>> non-deterministic, recovery can be done with relaxed guarantees).
> >>>>
> >>>> 2) The user should be able to "switch on" deterministic
> >>>> processing, ie, records are timestamped (either externally when
> >>>> generated, or timestamped at the sources). Because deterministic
> >>>> processing adds some overhead, the user should decide for it
> >>>> actively. In this case, the order must be preserved in each
> >>>> re-distribution step (merging is sufficient, if order is preserved
> >>>> within each incoming channel). Furthermore, deterministic
> >>>> processing can be achieved by sound window semantics (and there is
> >>>> a bunch of them). Even for single-stream-windows it's a tricky
> >>>> problem; for join-windows it's even harder. From my point of view,
> >>>> it is less important which semantics are chosen; however, the user
> >>>> must be aware how it works. The most tricky part for deterministic
> >>>> processing, is to deal with duplicate timestamps (which cannot be
> >>>> avoided). The timestamping for (intermediate) result tuples, is
> >>>> also an important question to be answered.
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>> On 04/07/2015 11:37 AM, Gyula Fóra wrote: Hey,
> >>>>
> >>>> I agree with Kostas, if we define the exact semantics how this
> >>>> works, this is not more ad-hoc than any other stateful operator
> >>>> with multiple inputs. (And I don't think any other system support
> >>>> something similar)
> >>>>
> >>>> We need to make some design choices that are similar to the issues
> >>>> we had for windowing. We need to chose how we want to evaluate the
> >>>> windowing policies (global or local) because that affects what kind
> >>>> of policies can be parallel, but I can work on these things.
> >>>>
> >>>> I think this is an amazing feature, so I wouldn't necessarily rush
> >>>> the implementation for 0.9 though.
> >>>>
> >>>> And thanks for helping writing these down.
> >>>>
> >>>> Gyula
> >>>>
> >>>> On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas
> >>>> <ktzou...@apache.org<mailto:ktzou...@apache.org>> wrote:
> >>>>
> >>>> Yes, we should write these semantics down. I volunteer to help.
> >>>>
> >>>> I don't think that this is very ad-hoc. The semantics are basically
> >>>> the following. Assuming an arriving element from the left side: (1)
> >>>> We find the right-side matches (2) We insert the left-side arrival
> >>>> into the left window (3) We recompute the left window We need to
> >>>> see whether right window re-computation needs to be triggered as
> >>>> well. I think that this way of joining streams is also what the
> >>>> symmetric hash join algorithms were meant to support.
> >>>>
> >>>> Kostas
> >>>>
> >>>>
> >>>> On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen
> >>>> <se...@apache.org<mailto:se...@apache.org>> wrote:
> >>>>
> >>>> Is the approach of joining an element at a time from one input
> >>>> against a window on the other input not a bit arbitrary?
> >>>>
> >>>> This just joins whatever currently happens to be the window by the
> >>>> time the single element arrives - that is a bit non-predictable,
> >>>> right?
> >>>>
> >>>> As a more general point: The whole semantics of windowing and when
> >>>> they are triggered are a bit ad-hoc now. It would be really good to
> >>>> start formalizing that a bit and put it down somewhere. Users need
> >>>> to be able to clearly understand and how to predict the output.
> >>>>
> >>>>
> >>>>
> >>>> On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra
> >>>> <gyula.f...@gmail.com<mailto:gyula.f...@gmail.com>> wrote:
> >>>>
> >>>> I think it should be possible to make this compatible with the
> >>>> .window().every() calls. Maybe if there is some trigger set in
> >>>> "every" we would not join that stream 1 by 1 but every so many
> >>>> elements. The problem here is that the window and every in this
> >>>> case are very-very different than the normal windowing semantics.
> >>>> The window would define the join window for each element of the
> >>>> other stream while every would define how often I join This stream
> >>>> with the other one.
> >>>>
> >>>> We need to think to make this intuitive.
> >>>>
> >>>> On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi <
> >>>> balassi.mar...@gmail.com<mailto:balassi.mar...@gmail.com>> wrote:
> >>>>
> >>>> That would be really neat, the problem I see there, that we do not
> >>>> distinguish between dataStream.window() and
> >>>> dataStream.window().every() currently, they both return
> >>>> WindowedDataStreams and TriggerPolicies of the every call do not
> >>>> make much sense in this setting (in fact practically the trigger is
> >>>> always set to count of one).
> >>>>
> >>>> But of course we could make it in a way, that we check that the
> >>>> eviction should be either null or count of 1, in every other case
> >>>> we throw an exception while building the JobGraph.
> >>>>
> >>>> On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek <
> >>>> aljos...@apache.org<mailto:aljos...@apache.org>> wrote:
> >>>>
> >>>> Or you could define it like this:
> >>>>
> >>>> stream_A = a.window(...) stream_B = b.window(...)
> >>>>
> >>>> stream_A.join(stream_B).where().equals().with()
> >>>>
> >>>> So a join would just be a join of two WindowedDataStreamS. This
> >>>> would neatly move the windowing stuff into one place.
> >>>>
> >>>> On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <
> >>>> balassi.mar...@gmail.com<mailto:balassi.mar...@gmail.com>
> >>>>
> >>>> wrote: Big +1 for the proposal for Peter and Gyula. I'm really for
> >>>> bringing the windowing and window join API in sync.
> >>>>
> >>>> On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra
> >>>> <gyf...@apache.org<mailto:gyf...@apache.org>> wrote:
> >>>>
> >>>> Hey guys,
> >>>>
> >>>> As Aljoscha has highlighted earlier the current window join
> >>>> semantics in the streaming api doesn't follow the changes in the
> >>>> windowing api. More precisely, we currently only support joins over
> >>>> time windows of equal size on both streams. The reason for this is
> >>>> that we now take a window of each of the two streams and do joins
> >>>> over these pairs. This would be a blocking operation if the windows
> >>>> are not closed at exactly the same time (and since we dont want
> >>>> this we only allow time windows)
> >>>>
> >>>> I talked with Peter who came up with the initial idea of an
> >>>> alternative approach for stream joins which works as follows:
> >>>>
> >>>> Instead of pairing windows for joins, we do element against window
> >>>> joins. What this means is that whenever we receive an element from
> >>>> one of the streams, we join this element with the current
> >>>> window(this window is constantly updated) of the other stream. This
> >>>> is non-blocking on any window definitions as we dont have to wait
> >>>> for windows to be completed and we can use this with any of our
> >>>> predefined policies like Time.of(...), Count.of(...),
> >>>> Delta.of(....).
> >>>>
> >>>> Additionally this also allows some very flexible way of defining
> >>>> window joins. With this we could also define grouped windowing
> >>>> inside if a join. An example of this would be: Join all elements of
> >>>> Stream1 with the last 5 elements by a given windowkey of Stream2 on
> >>>> some join key.
> >>>>
> >>>> This feature can be easily implemented over the current operators,
> >>>> so I already have a working prototype for the simple non-grouped
> >>>> case. My only concern is the API, the best thing I could come up
> >>>> with is something like this:
> >>>>
> >>>> stream_A.join(stream_B).onWindow(windowDefA,
> >>>> windowDefB).by(windowKey1,
> >>>> windowKey2).where(...).equalTo(...).with(...)
> >>>>
> >>>> (the user can omit the "by" and "with" calls)
> >>>>
> >>>> I think this new approach would be worthy of our "flexible
> >>>> windowing" in contrast with the current approach.
> >>>>
> >>>> Regards, Gyula
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >
> >>
> >
>
>

Reply via email to