This reasoning makes absolutely sense. That's why I suggested, that the user should actively choose ordered data processing...
About deadlocks: Those can be avoided, if the buffers are consumed continuously in an in-memory merge buffer (maybe with spilling to disk if necessary). Of course, latency might suffer and punctuations should be used to relax this problem. As far as I understood, CEP might be an interesting use-case for Flink, and it depend on ordered data streams. -Matthias On 04/08/2015 11:50 AM, Stephan Ewen wrote: > Here is the state in Flink and why we have chosen not to do global ordering > at the moment: > > - Individual streams are FIFO, that means if the sender emits in order, > the receiver receives in order. > > - When streams are merged (think shuffle / partition-by), then the streams > are not merged, but buffers from the streams are taken as the come in. > > - We had a version that merged streams (for sort merging in batch > programs, actually) long ago, an it performed either horribly or > deadlocked. The reason is that all streams are always stalled if a buffer > is missing from one stream, since the merge cannot continue in such a case > (head-of-the-line waiting). That backpressures streams unnecessarily, > slowing down computation. If the streams depend mutually on each other > (think two partitioning steps), they frequently dadlock completely. > > - The only way to do that is by stalling/buffering/punctuating streams > continuously, which is a lot of work to implement and will definitely cost > performance. > > Therefore we have decided going for a simpler model without global ordering > for now. If we start seeing that this has sever limitations in practice, we > may reconsider that. > > > > On Wed, Apr 8, 2015 at 11:25 AM, Bruno Cadonna < > cado...@informatik.hu-berlin.de> wrote: > > Hi Paris, > > what's the reason for not guaranteeing global ordering across partitions > in the stream model? Is it the smaller overhead or are there any > operations not computable in a distributed environment with global > ordering? > > In any case, I agree with Matthias that the user should choose. If > operations were not computable with a global ordering, I would > restrict the set of operations for that mode. > > Maybe, it would also be helpful to collect use cases for each of the > modes proposed by Matthias to understand the requirements for both modes. > > Some (researchy) thoughts about indeterminism: How can the > indeterminism of the current setting be quantified? How "large" can it > grow with the current setting? Are there any limits that can be > guaranteed? > > Cheers, > Bruno > > On 07.04.2015 12:38, Paris Carbone wrote: >>>> Hello Matthias, >>>> >>>> Sure, ordering guarantees are indeed a tricky thing, I recall >>>> having that discussion back in TU Berlin. Bear in mind thought that >>>> DataStream, our abstract data type, represents a *partitioned* >>>> unbounded sequence of events. There are no *global* ordering >>>> guarantees made whatsoever in that model across partitions. If you >>>> see it more generally there are many “race conditions” in a >>>> distributed execution graph of vertices that process multiple >>>> inputs asynchronously, especially when you add joins and iterations >>>> into the mix (how do you deal with reprocessing “old” tuples that >>>> iterate in the graph). Btw have you checked the Naiad paper [1]? >>>> Stephan cited a while ago and it is quite relevant to that >>>> discussion. >>>> >>>> Also, can you cite the paper with the joining semantics you are >>>> referring to? That would be of good help I think. >>>> >>>> Paris >>>> >>>> [1] https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf >>>> >>>> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> >>>> >>>> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> On 07 >>>> Apr 2015, at 11:50, Matthias J. Sax >>>> <mj...@informatik.hu-berlin.de<mailto:mj...@informatik.hu-berlin.de>> >>>> wrote: >>>> >>>> Hi @all, >>>> >>>> please keep me in the loop for this work. I am highly interested >>>> and I want to help on it. >>>> >>>> My initial thoughts are as follows: >>>> >>>> 1) Currently, system timestamps are used and the suggested approach >>>> can be seen as state-of-the-art (there is actually a research paper >>>> using the exact same join semantic). Of course, the current >>>> approach is inherently non-deterministic. The advantage is, that >>>> there is no overhead in keeping track of the order of records and >>>> the latency should be very low. (Additionally, state-recovery is >>>> simplified. Because, the processing in inherently >>>> non-deterministic, recovery can be done with relaxed guarantees). >>>> >>>> 2) The user should be able to "switch on" deterministic >>>> processing, ie, records are timestamped (either externally when >>>> generated, or timestamped at the sources). Because deterministic >>>> processing adds some overhead, the user should decide for it >>>> actively. In this case, the order must be preserved in each >>>> re-distribution step (merging is sufficient, if order is preserved >>>> within each incoming channel). Furthermore, deterministic >>>> processing can be achieved by sound window semantics (and there is >>>> a bunch of them). Even for single-stream-windows it's a tricky >>>> problem; for join-windows it's even harder. From my point of view, >>>> it is less important which semantics are chosen; however, the user >>>> must be aware how it works. The most tricky part for deterministic >>>> processing, is to deal with duplicate timestamps (which cannot be >>>> avoided). The timestamping for (intermediate) result tuples, is >>>> also an important question to be answered. >>>> >>>> >>>> -Matthias >>>> >>>> >>>> On 04/07/2015 11:37 AM, Gyula Fóra wrote: Hey, >>>> >>>> I agree with Kostas, if we define the exact semantics how this >>>> works, this is not more ad-hoc than any other stateful operator >>>> with multiple inputs. (And I don't think any other system support >>>> something similar) >>>> >>>> We need to make some design choices that are similar to the issues >>>> we had for windowing. We need to chose how we want to evaluate the >>>> windowing policies (global or local) because that affects what kind >>>> of policies can be parallel, but I can work on these things. >>>> >>>> I think this is an amazing feature, so I wouldn't necessarily rush >>>> the implementation for 0.9 though. >>>> >>>> And thanks for helping writing these down. >>>> >>>> Gyula >>>> >>>> On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas >>>> <ktzou...@apache.org<mailto:ktzou...@apache.org>> wrote: >>>> >>>> Yes, we should write these semantics down. I volunteer to help. >>>> >>>> I don't think that this is very ad-hoc. The semantics are basically >>>> the following. Assuming an arriving element from the left side: (1) >>>> We find the right-side matches (2) We insert the left-side arrival >>>> into the left window (3) We recompute the left window We need to >>>> see whether right window re-computation needs to be triggered as >>>> well. I think that this way of joining streams is also what the >>>> symmetric hash join algorithms were meant to support. >>>> >>>> Kostas >>>> >>>> >>>> On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen >>>> <se...@apache.org<mailto:se...@apache.org>> wrote: >>>> >>>> Is the approach of joining an element at a time from one input >>>> against a window on the other input not a bit arbitrary? >>>> >>>> This just joins whatever currently happens to be the window by the >>>> time the single element arrives - that is a bit non-predictable, >>>> right? >>>> >>>> As a more general point: The whole semantics of windowing and when >>>> they are triggered are a bit ad-hoc now. It would be really good to >>>> start formalizing that a bit and put it down somewhere. Users need >>>> to be able to clearly understand and how to predict the output. >>>> >>>> >>>> >>>> On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra >>>> <gyula.f...@gmail.com<mailto:gyula.f...@gmail.com>> wrote: >>>> >>>> I think it should be possible to make this compatible with the >>>> .window().every() calls. Maybe if there is some trigger set in >>>> "every" we would not join that stream 1 by 1 but every so many >>>> elements. The problem here is that the window and every in this >>>> case are very-very different than the normal windowing semantics. >>>> The window would define the join window for each element of the >>>> other stream while every would define how often I join This stream >>>> with the other one. >>>> >>>> We need to think to make this intuitive. >>>> >>>> On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi < >>>> balassi.mar...@gmail.com<mailto:balassi.mar...@gmail.com>> wrote: >>>> >>>> That would be really neat, the problem I see there, that we do not >>>> distinguish between dataStream.window() and >>>> dataStream.window().every() currently, they both return >>>> WindowedDataStreams and TriggerPolicies of the every call do not >>>> make much sense in this setting (in fact practically the trigger is >>>> always set to count of one). >>>> >>>> But of course we could make it in a way, that we check that the >>>> eviction should be either null or count of 1, in every other case >>>> we throw an exception while building the JobGraph. >>>> >>>> On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek < >>>> aljos...@apache.org<mailto:aljos...@apache.org>> wrote: >>>> >>>> Or you could define it like this: >>>> >>>> stream_A = a.window(...) stream_B = b.window(...) >>>> >>>> stream_A.join(stream_B).where().equals().with() >>>> >>>> So a join would just be a join of two WindowedDataStreamS. This >>>> would neatly move the windowing stuff into one place. >>>> >>>> On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi < >>>> balassi.mar...@gmail.com<mailto:balassi.mar...@gmail.com> >>>> >>>> wrote: Big +1 for the proposal for Peter and Gyula. I'm really for >>>> bringing the windowing and window join API in sync. >>>> >>>> On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra >>>> <gyf...@apache.org<mailto:gyf...@apache.org>> wrote: >>>> >>>> Hey guys, >>>> >>>> As Aljoscha has highlighted earlier the current window join >>>> semantics in the streaming api doesn't follow the changes in the >>>> windowing api. More precisely, we currently only support joins over >>>> time windows of equal size on both streams. The reason for this is >>>> that we now take a window of each of the two streams and do joins >>>> over these pairs. This would be a blocking operation if the windows >>>> are not closed at exactly the same time (and since we dont want >>>> this we only allow time windows) >>>> >>>> I talked with Peter who came up with the initial idea of an >>>> alternative approach for stream joins which works as follows: >>>> >>>> Instead of pairing windows for joins, we do element against window >>>> joins. What this means is that whenever we receive an element from >>>> one of the streams, we join this element with the current >>>> window(this window is constantly updated) of the other stream. This >>>> is non-blocking on any window definitions as we dont have to wait >>>> for windows to be completed and we can use this with any of our >>>> predefined policies like Time.of(...), Count.of(...), >>>> Delta.of(....). >>>> >>>> Additionally this also allows some very flexible way of defining >>>> window joins. With this we could also define grouped windowing >>>> inside if a join. An example of this would be: Join all elements of >>>> Stream1 with the last 5 elements by a given windowkey of Stream2 on >>>> some join key. >>>> >>>> This feature can be easily implemented over the current operators, >>>> so I already have a working prototype for the simple non-grouped >>>> case. My only concern is the API, the best thing I could come up >>>> with is something like this: >>>> >>>> stream_A.join(stream_B).onWindow(windowDefA, >>>> windowDefB).by(windowKey1, >>>> windowKey2).where(...).equalTo(...).with(...) >>>> >>>> (the user can omit the "by" and "with" calls) >>>> >>>> I think this new approach would be worthy of our "flexible >>>> windowing" in contrast with the current approach. >>>> >>>> Regards, Gyula >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> > >> >
signature.asc
Description: OpenPGP digital signature