-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA1 Hi Stephan,
that sounds reasonable to me. Cheers, Bruno On 08.04.2015 15:06, Stephan Ewen wrote: > With the current network layer and the agenda we have for > windowing, we should be able to support widows on event time this > in the near future. Inside the window, you can sort all records by > time and have a full ordering. That is independent of the order of > the stream. > > How about this as a first goal? > > On Wed, Apr 8, 2015 at 2:50 PM, Bruno Cadonna < > cado...@informatik.hu-berlin.de> wrote: > > Hi Stephan, > > how much of CEP depends on fully ordered streams depends on the > operators that you use in the pattern query. But in general, they > need fully ordered events within a window or at least some > strategies to deal with out-of-order events. > > If I got it right, you propose windows that are build on the > occurrence time of the event, i.e., the time when the event > occurred in the real world, and then to close a window when a > punctuation says that the window is complete. > > I agree with you that with such windows, you can do a lot and > decrease overhead for maintainance. For example, some aggregations > like sum, count, avg do not need ordered events in the window, they > just need complete windows that are ordered with respect to each > other to be deterministic. However, if the windows overlap you need > again the time order to evict the oldest events from a window. > > Cheers, Bruno > > > On 08.04.2015 13:30, Stephan Ewen wrote: >>>> I agree, any ordering guarantees would need to be actively >>>> enabled. >>>> >>>> How much of CEP depends on fully ordered streams? There is a >>>> lot you can do with windows on event time, which are >>>> triggered by punctuations. >>>> >>>> This is like a "soft" variant of the ordered streams, where >>>> order relation occurs only with between windows, rather than >>>> between all events. That makes it much cheaper to maintain. >>>> >>>> On Wed, Apr 8, 2015 at 1:19 PM, Matthias J. Sax < >>>> mj...@informatik.hu-berlin.de> wrote: >>>> >>>>> This reasoning makes absolutely sense. That's why I >>>>> suggested, that the user should actively choose ordered >>>>> data processing... >>>>> >>>>> About deadlocks: Those can be avoided, if the buffers are >>>>> consumed continuously in an in-memory merge buffer (maybe >>>>> with spilling to disk if necessary). Of course, latency >>>>> might suffer and punctuations should be used to relax this >>>>> problem. >>>>> >>>>> As far as I understood, CEP might be an interesting >>>>> use-case for Flink, and it depend on ordered data streams. >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> >>>>> >>>>> On 04/08/2015 11:50 AM, Stephan Ewen wrote: >>>>>> Here is the state in Flink and why we have chosen not to >>>>>> do global >>>>> ordering >>>>>> at the moment: >>>>>> >>>>>> - Individual streams are FIFO, that means if the sender >>>>>> emits in order, the receiver receives in order. >>>>>> >>>>>> - When streams are merged (think shuffle / partition-by), >>>>>> then the >>>>> streams >>>>>> are not merged, but buffers from the streams are taken as >>>>>> the come in. >>>>>> >>>>>> - We had a version that merged streams (for sort merging >>>>>> in batch programs, actually) long ago, an it performed >>>>>> either horribly or deadlocked. The reason is that all >>>>>> streams are always stalled if a buffer is missing from >>>>>> one stream, since the merge cannot continue in such a >>>>> case >>>>>> (head-of-the-line waiting). That backpressures streams >>>>>> unnecessarily, slowing down computation. If the streams >>>>>> depend mutually on each other (think two partitioning >>>>>> steps), they frequently dadlock completely. >>>>>> >>>>>> - The only way to do that is by >>>>>> stalling/buffering/punctuating streams continuously, >>>>>> which is a lot of work to implement and will definitely >>>>> cost >>>>>> performance. >>>>>> >>>>>> Therefore we have decided going for a simpler model >>>>>> without global >>>>> ordering >>>>>> for now. If we start seeing that this has sever >>>>>> limitations in practice, >>>>> we >>>>>> may reconsider that. >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Apr 8, 2015 at 11:25 AM, Bruno Cadonna < >>>>>> cado...@informatik.hu-berlin.de> wrote: >>>>>> >>>>>> Hi Paris, >>>>>> >>>>>> what's the reason for not guaranteeing global ordering >>>>>> across partitions in the stream model? Is it the smaller >>>>>> overhead or are there any operations not computable in a >>>>>> distributed environment with global ordering? >>>>>> >>>>>> In any case, I agree with Matthias that the user should >>>>>> choose. If operations were not computable with a global >>>>>> ordering, I would restrict the set of operations for that >>>>>> mode. >>>>>> >>>>>> Maybe, it would also be helpful to collect use cases for >>>>>> each of the modes proposed by Matthias to understand the >>>>>> requirements for both modes. >>>>>> >>>>>> Some (researchy) thoughts about indeterminism: How can >>>>>> the indeterminism of the current setting be quantified? >>>>>> How "large" can it grow with the current setting? Are >>>>>> there any limits that can be guaranteed? >>>>>> >>>>>> Cheers, Bruno >>>>>> >>>>>> On 07.04.2015 12:38, Paris Carbone wrote: >>>>>>>>> Hello Matthias, >>>>>>>>> >>>>>>>>> Sure, ordering guarantees are indeed a tricky >>>>>>>>> thing, I recall having that discussion back in TU >>>>>>>>> Berlin. Bear in mind thought that DataStream, our >>>>>>>>> abstract data type, represents a *partitioned* >>>>>>>>> unbounded sequence of events. There are no *global* >>>>>>>>> ordering guarantees made whatsoever in that model >>>>>>>>> across partitions. If you see it more generally >>>>>>>>> there are many “race conditions” in a distributed >>>>>>>>> execution graph of vertices that process multiple >>>>>>>>> inputs asynchronously, especially when you add >>>>>>>>> joins and iterations into the mix (how do you deal >>>>>>>>> with reprocessing “old” tuples that iterate in the >>>>>>>>> graph). Btw have you checked the Naiad paper [1]? >>>>>>>>> Stephan cited a while ago and it is quite relevant >>>>>>>>> to that discussion. >>>>>>>>> >>>>>>>>> Also, can you cite the paper with the joining >>>>>>>>> semantics you are referring to? That would be of >>>>>>>>> good help I think. >>>>>>>>> >>>>>>>>> Paris >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf >>>>>>>>> >>>>>>>>> >>>>>>>>> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> >>>>>>>>> >>>>>>>>> >>>>>>>>> > <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> On 07 >>>>>>>>> Apr 2015, at 11:50, Matthias J. Sax >>>>>>>>> <mj...@informatik.hu-berlin.de<mailto:mj...@informatik.hu-berlin.de >>>>> >>>>>>>>> >>>>>>>>> > >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> Hi @all, >>>>>>>>> >>>>>>>>> please keep me in the loop for this work. I am >>>>>>>>> highly interested and I want to help on it. >>>>>>>>> >>>>>>>>> My initial thoughts are as follows: >>>>>>>>> >>>>>>>>> 1) Currently, system timestamps are used and the >>>>>>>>> suggested approach can be seen as state-of-the-art >>>>>>>>> (there is actually a research paper using the exact >>>>>>>>> same join semantic). Of course, the current >>>>>>>>> approach is inherently non-deterministic. The >>>>>>>>> advantage is, that there is no overhead in keeping >>>>>>>>> track of the order of records and the latency >>>>>>>>> should be very low. (Additionally, state-recovery >>>>>>>>> is simplified. Because, the processing in >>>>>>>>> inherently non-deterministic, recovery can be done >>>>>>>>> with relaxed guarantees). >>>>>>>>> >>>>>>>>> 2) The user should be able to "switch on" >>>>>>>>> deterministic processing, ie, records are >>>>>>>>> timestamped (either externally when generated, or >>>>>>>>> timestamped at the sources). Because deterministic >>>>>>>>> processing adds some overhead, the user should >>>>>>>>> decide for it actively. In this case, the order >>>>>>>>> must be preserved in each re-distribution step >>>>>>>>> (merging is sufficient, if order is preserved >>>>>>>>> within each incoming channel). Furthermore, >>>>>>>>> deterministic processing can be achieved by sound >>>>>>>>> window semantics (and there is a bunch of them). >>>>>>>>> Even for single-stream-windows it's a tricky >>>>>>>>> problem; for join-windows it's even harder. From my >>>>>>>>> point of view, it is less important which semantics >>>>>>>>> are chosen; however, the user must be aware how it >>>>>>>>> works. The most tricky part for deterministic >>>>>>>>> processing, is to deal with duplicate timestamps >>>>>>>>> (which cannot be avoided). The timestamping for >>>>>>>>> (intermediate) result tuples, is also an important >>>>>>>>> question to be answered. >>>>>>>>> >>>>>>>>> >>>>>>>>> -Matthias >>>>>>>>> >>>>>>>>> >>>>>>>>> On 04/07/2015 11:37 AM, Gyula Fóra wrote: Hey, >>>>>>>>> >>>>>>>>> I agree with Kostas, if we define the exact >>>>>>>>> semantics how this works, this is not more ad-hoc >>>>>>>>> than any other stateful operator with multiple >>>>>>>>> inputs. (And I don't think any other system support >>>>>>>>> something similar) >>>>>>>>> >>>>>>>>> We need to make some design choices that are >>>>>>>>> similar to the issues we had for windowing. We need >>>>>>>>> to chose how we want to evaluate the windowing >>>>>>>>> policies (global or local) because that affects >>>>>>>>> what kind of policies can be parallel, but I can >>>>>>>>> work on these things. >>>>>>>>> >>>>>>>>> I think this is an amazing feature, so I wouldn't >>>>>>>>> necessarily rush the implementation for 0.9 >>>>>>>>> though. >>>>>>>>> >>>>>>>>> And thanks for helping writing these down. >>>>>>>>> >>>>>>>>> Gyula >>>>>>>>> >>>>>>>>> On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas >>>>>>>>> <ktzou...@apache.org<mailto:ktzou...@apache.org>> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> Yes, we should write these semantics down. I >>>>>>>>> volunteer to help. >>>>>>>>> >>>>>>>>> I don't think that this is very ad-hoc. The >>>>>>>>> semantics are basically the following. Assuming an >>>>>>>>> arriving element from the left side: (1) We find >>>>>>>>> the right-side matches (2) We insert the left-side >>>>>>>>> arrival into the left window (3) We recompute the >>>>>>>>> left window We need to see whether right window >>>>>>>>> re-computation needs to be triggered as well. I >>>>>>>>> think that this way of joining streams is also what >>>>>>>>> the symmetric hash join algorithms were meant to >>>>>>>>> support. >>>>>>>>> >>>>>>>>> Kostas >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen >>>>>>>>> <se...@apache.org<mailto:se...@apache.org>> wrote: >>>>>>>>> >>>>>>>>> Is the approach of joining an element at a time >>>>>>>>> from one input against a window on the other input >>>>>>>>> not a bit arbitrary? >>>>>>>>> >>>>>>>>> This just joins whatever currently happens to be >>>>>>>>> the window by the time the single element arrives - >>>>>>>>> that is a bit non-predictable, right? >>>>>>>>> >>>>>>>>> As a more general point: The whole semantics of >>>>>>>>> windowing and when they are triggered are a bit >>>>>>>>> ad-hoc now. It would be really good to start >>>>>>>>> formalizing that a bit and put it down somewhere. >>>>>>>>> Users need to be able to clearly understand and how >>>>>>>>> to predict the output. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra >>>>>>>>> <gyula.f...@gmail.com<mailto:gyula.f...@gmail.com>> >>>>>>>>> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> I think it should be possible to make this >>>>>>>>> compatible with the .window().every() calls. Maybe >>>>>>>>> if there is some trigger set in "every" we would >>>>>>>>> not join that stream 1 by 1 but every so many >>>>>>>>> elements. The problem here is that the window and >>>>>>>>> every in this case are very-very different than the >>>>>>>>> normal windowing semantics. The window would define >>>>>>>>> the join window for each element of the other >>>>>>>>> stream while every would define how often I join >>>>>>>>> This stream with the other one. >>>>>>>>> >>>>>>>>> We need to think to make this intuitive. >>>>>>>>> >>>>>>>>> On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi < >>>>>>>>> balassi.mar...@gmail.com<mailto:balassi.mar...@gmail.com>> >>>>>>>>> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> That would be really neat, the problem I see there, >>>>>>>>> that we do not distinguish between >>>>>>>>> dataStream.window() and dataStream.window().every() >>>>>>>>> currently, they both return WindowedDataStreams and >>>>>>>>> TriggerPolicies of the every call do not make much >>>>>>>>> sense in this setting (in fact practically the >>>>>>>>> trigger is always set to count of one). >>>>>>>>> >>>>>>>>> But of course we could make it in a way, that we >>>>>>>>> check that the eviction should be either null or >>>>>>>>> count of 1, in every other case we throw an >>>>>>>>> exception while building the JobGraph. >>>>>>>>> >>>>>>>>> On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek < >>>>>>>>> aljos...@apache.org<mailto:aljos...@apache.org>> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> Or you could define it like this: >>>>>>>>> >>>>>>>>> stream_A = a.window(...) stream_B = b.window(...) >>>>>>>>> >>>>>>>>> stream_A.join(stream_B).where().equals().with() >>>>>>>>> >>>>>>>>> So a join would just be a join of two >>>>>>>>> WindowedDataStreamS. This would neatly move the >>>>>>>>> windowing stuff into one place. >>>>>>>>> >>>>>>>>> On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi < >>>>>>>>> balassi.mar...@gmail.com<mailto:balassi.mar...@gmail.com> >>>>>>>>> >>>>>>>>> >>>>>>>>> > >>>>>>>>> wrote: Big +1 for the proposal for Peter and Gyula. I'm really for >>>>>>>>> bringing the windowing and window join API in >>>>>>>>> sync. >>>>>>>>> >>>>>>>>> On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra >>>>>>>>> <gyf...@apache.org<mailto:gyf...@apache.org>> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> Hey guys, >>>>>>>>> >>>>>>>>> As Aljoscha has highlighted earlier the current >>>>>>>>> window join semantics in the streaming api doesn't >>>>>>>>> follow the changes in the windowing api. More >>>>>>>>> precisely, we currently only support joins over >>>>>>>>> time windows of equal size on both streams. The >>>>>>>>> reason for this is that we now take a window of >>>>>>>>> each of the two streams and do joins over these >>>>>>>>> pairs. This would be a blocking operation if the >>>>>>>>> windows are not closed at exactly the same time >>>>>>>>> (and since we dont want this we only allow time >>>>>>>>> windows) >>>>>>>>> >>>>>>>>> I talked with Peter who came up with the initial >>>>>>>>> idea of an alternative approach for stream joins >>>>>>>>> which works as follows: >>>>>>>>> >>>>>>>>> Instead of pairing windows for joins, we do >>>>>>>>> element against window joins. What this means is >>>>>>>>> that whenever we receive an element from one of the >>>>>>>>> streams, we join this element with the current >>>>>>>>> window(this window is constantly updated) of the >>>>>>>>> other stream. This is non-blocking on any window >>>>>>>>> definitions as we dont have to wait for windows to >>>>>>>>> be completed and we can use this with any of our >>>>>>>>> predefined policies like Time.of(...), >>>>>>>>> Count.of(...), Delta.of(....). >>>>>>>>> >>>>>>>>> Additionally this also allows some very flexible >>>>>>>>> way of defining window joins. With this we could >>>>>>>>> also define grouped windowing inside if a join. An >>>>>>>>> example of this would be: Join all elements of >>>>>>>>> Stream1 with the last 5 elements by a given >>>>>>>>> windowkey of Stream2 on some join key. >>>>>>>>> >>>>>>>>> This feature can be easily implemented over the >>>>>>>>> current operators, so I already have a working >>>>>>>>> prototype for the simple non-grouped case. My only >>>>>>>>> concern is the API, the best thing I could come up >>>>>>>>> with is something like this: >>>>>>>>> >>>>>>>>> stream_A.join(stream_B).onWindow(windowDefA, >>>>>>>>> windowDefB).by(windowKey1, >>>>>>>>> windowKey2).where(...).equalTo(...).with(...) >>>>>>>>> >>>>>>>>> (the user can omit the "by" and "with" calls) >>>>>>>>> >>>>>>>>> I think this new approach would be worthy of our >>>>>>>>> "flexible windowing" in contrast with the current >>>>>>>>> approach. >>>>>>>>> >>>>>>>>> Regards, Gyula >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>> > >> > - -- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Dr. Bruno Cadonna Postdoctoral Researcher Databases and Information Systems Department of Computer Science Humboldt-Universität zu Berlin http://www.informatik.hu-berlin.de/~cadonnab ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -----BEGIN PGP SIGNATURE----- Version: GnuPG v1.4.11 (GNU/Linux) iQEcBAEBAgAGBQJVJSoUAAoJEKdCIJx7flKw2BwH/RyhPD7nXGLTwgPJvGBIaPKl ghtTi1t7WutYiVRhHJmISK+kbNWxfXSioYhC8IzK2aqZZQ/j3hn3xVZIL4c+c+Oq jpRGNhC7xFzqRCF8ki/saFpQO9vU2zLP70uk9bQGIna19z2Ye28Ofoc0d+Dv/qrV acH5HX+s+rdmbt9mgcGeyA0/+Hh/HWPiic0auRhhX06zHuynrHBLDaS32RVAOT79 bZ91VIElyu0pDCafRDrWNOzVlop+wGLl6cvBGc/vznO501M1jjJvqS9npLsPKrpE OrpBRHIrWT2PicZwLMaSzoZsjl15Sy0qsV8On4e6Ug5f6RhPlKH2IVmVVIDZZ1I= =sd8+ -----END PGP SIGNATURE-----