I started to work on an in-memory merge on a record-timestamp attribute for total ordered streams. But I got distracted by the Storm compatibility layer... I will continue to work on it, when I find some extra time ;)
On 04/08/2015 03:18 PM, Márton Balassi wrote: > +1 for Stephan's suggestion. > > If we would like to support event time and also sorting inside a window we > should carefully consider where to actually put the timestamp of the > records. If the timestamp is part of the record then it is more > straight-forward, but in case of we assign the timestamps in our sources > the initial idea was to keep these hidden from the user and only use it in > the network layer. > > The more extreme solution with total ordering has a JIRA, but has been a > bit silent lately. [1] > > [1] https://issues.apache.org/jira/browse/FLINK-1493? > > On Wed, Apr 8, 2015 at 3:06 PM, Stephan Ewen <se...@apache.org> wrote: > >> With the current network layer and the agenda we have for windowing, we >> should be able to support widows on event time this in the near future. >> Inside the window, you can sort all records by time and have a full >> ordering. That is independent of the order of the stream. >> >> How about this as a first goal? >> >> On Wed, Apr 8, 2015 at 2:50 PM, Bruno Cadonna < >> cado...@informatik.hu-berlin.de> wrote: >> > Hi Stephan, > > how much of CEP depends on fully ordered streams depends on the > operators that you use in the pattern query. But in general, they need > fully ordered events within a window or at least some strategies to > deal with out-of-order events. > > If I got it right, you propose windows that are build on the > occurrence time of the event, i.e., the time when the event occurred > in the real world, and then to close a window when a punctuation says > that the window is complete. > > I agree with you that with such windows, you can do a lot and decrease > overhead for maintainance. For example, some aggregations like sum, > count, avg do not need ordered events in the window, they just need > complete windows that are ordered with respect to each other to be > deterministic. However, if the windows overlap you need again the time > order to evict the oldest events from a window. > > Cheers, > Bruno > > > On 08.04.2015 13:30, Stephan Ewen wrote: >>>>> I agree, any ordering guarantees would need to be actively >>>>> enabled. >>>>> >>>>> How much of CEP depends on fully ordered streams? There is a lot >>>>> you can do with windows on event time, which are triggered by >>>>> punctuations. >>>>> >>>>> This is like a "soft" variant of the ordered streams, where order >>>>> relation occurs only with between windows, rather than between all >>>>> events. That makes it much cheaper to maintain. >>>>> >>>>> On Wed, Apr 8, 2015 at 1:19 PM, Matthias J. Sax < >>>>> mj...@informatik.hu-berlin.de> wrote: >>>>> >>>>>> This reasoning makes absolutely sense. That's why I suggested, >>>>>> that the user should actively choose ordered data processing... >>>>>> >>>>>> About deadlocks: Those can be avoided, if the buffers are >>>>>> consumed continuously in an in-memory merge buffer (maybe with >>>>>> spilling to disk if necessary). Of course, latency might suffer >>>>>> and punctuations should be used to relax this problem. >>>>>> >>>>>> As far as I understood, CEP might be an interesting use-case for >>>>>> Flink, and it depend on ordered data streams. >>>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> >>>>>> >>>>>> On 04/08/2015 11:50 AM, Stephan Ewen wrote: >>>>>>> Here is the state in Flink and why we have chosen not to do >>>>>>> global >>>>>> ordering >>>>>>> at the moment: >>>>>>> >>>>>>> - Individual streams are FIFO, that means if the sender emits >>>>>>> in order, the receiver receives in order. >>>>>>> >>>>>>> - When streams are merged (think shuffle / partition-by), then >>>>>>> the >>>>>> streams >>>>>>> are not merged, but buffers from the streams are taken as the >>>>>>> come in. >>>>>>> >>>>>>> - We had a version that merged streams (for sort merging in >>>>>>> batch programs, actually) long ago, an it performed either >>>>>>> horribly or deadlocked. The reason is that all streams are >>>>>>> always stalled if a buffer is missing from one stream, since >>>>>>> the merge cannot continue in such a >>>>>> case >>>>>>> (head-of-the-line waiting). That backpressures streams >>>>>>> unnecessarily, slowing down computation. If the streams depend >>>>>>> mutually on each other (think two partitioning steps), they >>>>>>> frequently dadlock completely. >>>>>>> >>>>>>> - The only way to do that is by stalling/buffering/punctuating >>>>>>> streams continuously, which is a lot of work to implement and >>>>>>> will definitely >>>>>> cost >>>>>>> performance. >>>>>>> >>>>>>> Therefore we have decided going for a simpler model without >>>>>>> global >>>>>> ordering >>>>>>> for now. If we start seeing that this has sever limitations in >>>>>>> practice, >>>>>> we >>>>>>> may reconsider that. >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Apr 8, 2015 at 11:25 AM, Bruno Cadonna < >>>>>>> cado...@informatik.hu-berlin.de> wrote: >>>>>>> >>>>>>> Hi Paris, >>>>>>> >>>>>>> what's the reason for not guaranteeing global ordering across >>>>>>> partitions in the stream model? Is it the smaller overhead or >>>>>>> are there any operations not computable in a distributed >>>>>>> environment with global ordering? >>>>>>> >>>>>>> In any case, I agree with Matthias that the user should choose. >>>>>>> If operations were not computable with a global ordering, I >>>>>>> would restrict the set of operations for that mode. >>>>>>> >>>>>>> Maybe, it would also be helpful to collect use cases for each >>>>>>> of the modes proposed by Matthias to understand the >>>>>>> requirements for both modes. >>>>>>> >>>>>>> Some (researchy) thoughts about indeterminism: How can the >>>>>>> indeterminism of the current setting be quantified? How "large" >>>>>>> can it grow with the current setting? Are there any limits that >>>>>>> can be guaranteed? >>>>>>> >>>>>>> Cheers, Bruno >>>>>>> >>>>>>> On 07.04.2015 12:38, Paris Carbone wrote: >>>>>>>>>> Hello Matthias, >>>>>>>>>> >>>>>>>>>> Sure, ordering guarantees are indeed a tricky thing, I >>>>>>>>>> recall having that discussion back in TU Berlin. Bear in >>>>>>>>>> mind thought that DataStream, our abstract data type, >>>>>>>>>> represents a *partitioned* unbounded sequence of events. >>>>>>>>>> There are no *global* ordering guarantees made whatsoever >>>>>>>>>> in that model across partitions. If you see it more >>>>>>>>>> generally there are many “race conditions” in a >>>>>>>>>> distributed execution graph of vertices that process >>>>>>>>>> multiple inputs asynchronously, especially when you add >>>>>>>>>> joins and iterations into the mix (how do you deal with >>>>>>>>>> reprocessing “old” tuples that iterate in the graph). Btw >>>>>>>>>> have you checked the Naiad paper [1]? Stephan cited a >>>>>>>>>> while ago and it is quite relevant to that discussion. >>>>>>>>>> >>>>>>>>>> Also, can you cite the paper with the joining semantics >>>>>>>>>> you are referring to? That would be of good help I >>>>>>>>>> think. >>>>>>>>>> >>>>>>>>>> Paris >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf >>>>>>>>>> >>>>>>>>>> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> > <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> On 07 >>>>>>>>>> Apr 2015, at 11:50, Matthias J. Sax >>>>>>>>>> <mj...@informatik.hu-berlin.de<mailto: >>> mj...@informatik.hu-berlin.de >>>>>> >>>>>>>>>> >>>>>>>>>> > wrote: >>>>>>>>>> >>>>>>>>>> Hi @all, >>>>>>>>>> >>>>>>>>>> please keep me in the loop for this work. I am highly >>>>>>>>>> interested and I want to help on it. >>>>>>>>>> >>>>>>>>>> My initial thoughts are as follows: >>>>>>>>>> >>>>>>>>>> 1) Currently, system timestamps are used and the >>>>>>>>>> suggested approach can be seen as state-of-the-art (there >>>>>>>>>> is actually a research paper using the exact same join >>>>>>>>>> semantic). Of course, the current approach is inherently >>>>>>>>>> non-deterministic. The advantage is, that there is no >>>>>>>>>> overhead in keeping track of the order of records and the >>>>>>>>>> latency should be very low. (Additionally, state-recovery >>>>>>>>>> is simplified. Because, the processing in inherently >>>>>>>>>> non-deterministic, recovery can be done with relaxed >>>>>>>>>> guarantees). >>>>>>>>>> >>>>>>>>>> 2) The user should be able to "switch on" deterministic >>>>>>>>>> processing, ie, records are timestamped (either >>>>>>>>>> externally when generated, or timestamped at the >>>>>>>>>> sources). Because deterministic processing adds some >>>>>>>>>> overhead, the user should decide for it actively. In this >>>>>>>>>> case, the order must be preserved in each re-distribution >>>>>>>>>> step (merging is sufficient, if order is preserved within >>>>>>>>>> each incoming channel). Furthermore, deterministic >>>>>>>>>> processing can be achieved by sound window semantics (and >>>>>>>>>> there is a bunch of them). Even for single-stream-windows >>>>>>>>>> it's a tricky problem; for join-windows it's even harder. >>>>>>>>>> From my point of view, it is less important which >>>>>>>>>> semantics are chosen; however, the user must be aware how >>>>>>>>>> it works. The most tricky part for deterministic >>>>>>>>>> processing, is to deal with duplicate timestamps (which >>>>>>>>>> cannot be avoided). The timestamping for (intermediate) >>>>>>>>>> result tuples, is also an important question to be >>>>>>>>>> answered. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -Matthias >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 04/07/2015 11:37 AM, Gyula Fóra wrote: Hey, >>>>>>>>>> >>>>>>>>>> I agree with Kostas, if we define the exact semantics how >>>>>>>>>> this works, this is not more ad-hoc than any other >>>>>>>>>> stateful operator with multiple inputs. (And I don't >>>>>>>>>> think any other system support something similar) >>>>>>>>>> >>>>>>>>>> We need to make some design choices that are similar to >>>>>>>>>> the issues we had for windowing. We need to chose how we >>>>>>>>>> want to evaluate the windowing policies (global or local) >>>>>>>>>> because that affects what kind of policies can be >>>>>>>>>> parallel, but I can work on these things. >>>>>>>>>> >>>>>>>>>> I think this is an amazing feature, so I wouldn't >>>>>>>>>> necessarily rush the implementation for 0.9 though. >>>>>>>>>> >>>>>>>>>> And thanks for helping writing these down. >>>>>>>>>> >>>>>>>>>> Gyula >>>>>>>>>> >>>>>>>>>> On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas >>>>>>>>>> <ktzou...@apache.org<mailto:ktzou...@apache.org>> wrote: >>>>>>>>>> >>>>>>>>>> Yes, we should write these semantics down. I volunteer to >>>>>>>>>> help. >>>>>>>>>> >>>>>>>>>> I don't think that this is very ad-hoc. The semantics are >>>>>>>>>> basically the following. Assuming an arriving element >>>>>>>>>> from the left side: (1) We find the right-side matches >>>>>>>>>> (2) We insert the left-side arrival into the left window >>>>>>>>>> (3) We recompute the left window We need to see whether >>>>>>>>>> right window re-computation needs to be triggered as >>>>>>>>>> well. I think that this way of joining streams is also >>>>>>>>>> what the symmetric hash join algorithms were meant to >>>>>>>>>> support. >>>>>>>>>> >>>>>>>>>> Kostas >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen >>>>>>>>>> <se...@apache.org<mailto:se...@apache.org>> wrote: >>>>>>>>>> >>>>>>>>>> Is the approach of joining an element at a time from one >>>>>>>>>> input against a window on the other input not a bit >>>>>>>>>> arbitrary? >>>>>>>>>> >>>>>>>>>> This just joins whatever currently happens to be the >>>>>>>>>> window by the time the single element arrives - that is a >>>>>>>>>> bit non-predictable, right? >>>>>>>>>> >>>>>>>>>> As a more general point: The whole semantics of windowing >>>>>>>>>> and when they are triggered are a bit ad-hoc now. It >>>>>>>>>> would be really good to start formalizing that a bit and >>>>>>>>>> put it down somewhere. Users need to be able to clearly >>>>>>>>>> understand and how to predict the output. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra >>>>>>>>>> <gyula.f...@gmail.com<mailto:gyula.f...@gmail.com>> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> I think it should be possible to make this compatible >>>>>>>>>> with the .window().every() calls. Maybe if there is some >>>>>>>>>> trigger set in "every" we would not join that stream 1 by >>>>>>>>>> 1 but every so many elements. The problem here is that >>>>>>>>>> the window and every in this case are very-very different >>>>>>>>>> than the normal windowing semantics. The window would >>>>>>>>>> define the join window for each element of the other >>>>>>>>>> stream while every would define how often I join This >>>>>>>>>> stream with the other one. >>>>>>>>>> >>>>>>>>>> We need to think to make this intuitive. >>>>>>>>>> >>>>>>>>>> On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi < >>>>>>>>>> balassi.mar...@gmail.com<mailto:balassi.mar...@gmail.com>> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> That would be really neat, the problem I see there, that >>>>>>>>>> we do not distinguish between dataStream.window() and >>>>>>>>>> dataStream.window().every() currently, they both return >>>>>>>>>> WindowedDataStreams and TriggerPolicies of the every call >>>>>>>>>> do not make much sense in this setting (in fact >>>>>>>>>> practically the trigger is always set to count of one). >>>>>>>>>> >>>>>>>>>> But of course we could make it in a way, that we check >>>>>>>>>> that the eviction should be either null or count of 1, in >>>>>>>>>> every other case we throw an exception while building the >>>>>>>>>> JobGraph. >>>>>>>>>> >>>>>>>>>> On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek < >>>>>>>>>> aljos...@apache.org<mailto:aljos...@apache.org>> wrote: >>>>>>>>>> >>>>>>>>>> Or you could define it like this: >>>>>>>>>> >>>>>>>>>> stream_A = a.window(...) stream_B = b.window(...) >>>>>>>>>> >>>>>>>>>> stream_A.join(stream_B).where().equals().with() >>>>>>>>>> >>>>>>>>>> So a join would just be a join of two >>>>>>>>>> WindowedDataStreamS. This would neatly move the windowing >>>>>>>>>> stuff into one place. >>>>>>>>>> >>>>>>>>>> On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi < >>>>>>>>>> balassi.mar...@gmail.com<mailto:balassi.mar...@gmail.com> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> > wrote: Big +1 for the proposal for Peter and Gyula. I'm really for >>>>>>>>>> bringing the windowing and window join API in sync. >>>>>>>>>> >>>>>>>>>> On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra >>>>>>>>>> <gyf...@apache.org<mailto:gyf...@apache.org>> wrote: >>>>>>>>>> >>>>>>>>>> Hey guys, >>>>>>>>>> >>>>>>>>>> As Aljoscha has highlighted earlier the current window >>>>>>>>>> join semantics in the streaming api doesn't follow the >>>>>>>>>> changes in the windowing api. More precisely, we >>>>>>>>>> currently only support joins over time windows of equal >>>>>>>>>> size on both streams. The reason for this is that we now >>>>>>>>>> take a window of each of the two streams and do joins >>>>>>>>>> over these pairs. This would be a blocking operation if >>>>>>>>>> the windows are not closed at exactly the same time (and >>>>>>>>>> since we dont want this we only allow time windows) >>>>>>>>>> >>>>>>>>>> I talked with Peter who came up with the initial idea of >>>>>>>>>> an alternative approach for stream joins which works as >>>>>>>>>> follows: >>>>>>>>>> >>>>>>>>>> Instead of pairing windows for joins, we do element >>>>>>>>>> against window joins. What this means is that whenever we >>>>>>>>>> receive an element from one of the streams, we join this >>>>>>>>>> element with the current window(this window is constantly >>>>>>>>>> updated) of the other stream. This is non-blocking on any >>>>>>>>>> window definitions as we dont have to wait for windows to >>>>>>>>>> be completed and we can use this with any of our >>>>>>>>>> predefined policies like Time.of(...), Count.of(...), >>>>>>>>>> Delta.of(....). >>>>>>>>>> >>>>>>>>>> Additionally this also allows some very flexible way of >>>>>>>>>> defining window joins. With this we could also define >>>>>>>>>> grouped windowing inside if a join. An example of this >>>>>>>>>> would be: Join all elements of Stream1 with the last 5 >>>>>>>>>> elements by a given windowkey of Stream2 on some join >>>>>>>>>> key. >>>>>>>>>> >>>>>>>>>> This feature can be easily implemented over the current >>>>>>>>>> operators, so I already have a working prototype for the >>>>>>>>>> simple non-grouped case. My only concern is the API, the >>>>>>>>>> best thing I could come up with is something like this: >>>>>>>>>> >>>>>>>>>> stream_A.join(stream_B).onWindow(windowDefA, >>>>>>>>>> windowDefB).by(windowKey1, >>>>>>>>>> windowKey2).where(...).equalTo(...).with(...) >>>>>>>>>> >>>>>>>>>> (the user can omit the "by" and "with" calls) >>>>>>>>>> >>>>>>>>>> I think this new approach would be worthy of our >>>>>>>>>> "flexible windowing" in contrast with the current >>>>>>>>>> approach. >>>>>>>>>> >>>>>>>>>> Regards, Gyula >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> > >>> >> >
signature.asc
Description: OpenPGP digital signature