-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

Hi Stephan,

that sounds reasonable to me.

Cheers,
Bruno

On 08.04.2015 15:06, Stephan Ewen wrote:
> With the current network layer and the agenda we have for
> windowing, we should be able to support widows on event time this
> in the near future. Inside the window, you can sort all records by
> time and have a full ordering. That is independent of the order of
> the stream.
> 
> How about this as a first goal?
> 
> On Wed, Apr 8, 2015 at 2:50 PM, Bruno Cadonna < 
> cado...@informatik.hu-berlin.de> wrote:
> 
> Hi Stephan,
> 
> how much of CEP depends on fully ordered streams depends on the 
> operators that you use in the pattern query. But in general, they
> need fully ordered events within a window or at least some
> strategies to deal with out-of-order events.
> 
> If I got it right, you propose windows that are build on the 
> occurrence time of the event, i.e., the time when the event
> occurred in the real world, and then to close a window when a
> punctuation says that the window is complete.
> 
> I agree with you that with such windows, you can do a lot and
> decrease overhead for maintainance. For example, some aggregations
> like sum, count, avg do not need ordered events in the window, they
> just need complete windows that are ordered with respect to each
> other to be deterministic. However, if the windows overlap you need
> again the time order to evict the oldest events from a window.
> 
> Cheers, Bruno
> 
> 
> On 08.04.2015 13:30, Stephan Ewen wrote:
>>>> I agree, any ordering guarantees would need to be actively 
>>>> enabled.
>>>> 
>>>> How much of CEP depends on fully ordered streams? There is a
>>>> lot you can do with windows on event time, which are
>>>> triggered by punctuations.
>>>> 
>>>> This is like a "soft" variant of the ordered streams, where
>>>> order relation occurs only with between windows, rather than
>>>> between all events. That makes it much cheaper to maintain.
>>>> 
>>>> On Wed, Apr 8, 2015 at 1:19 PM, Matthias J. Sax < 
>>>> mj...@informatik.hu-berlin.de> wrote:
>>>> 
>>>>> This reasoning makes absolutely sense. That's why I
>>>>> suggested, that the user should actively choose ordered
>>>>> data processing...
>>>>> 
>>>>> About deadlocks: Those can be avoided, if the buffers are 
>>>>> consumed continuously in an in-memory merge buffer (maybe
>>>>> with spilling to disk if necessary). Of course, latency
>>>>> might suffer and punctuations should be used to relax this
>>>>> problem.
>>>>> 
>>>>> As far as I understood, CEP might be an interesting
>>>>> use-case for Flink, and it depend on ordered data streams.
>>>>> 
>>>>> 
>>>>> -Matthias
>>>>> 
>>>>> 
>>>>> 
>>>>> On 04/08/2015 11:50 AM, Stephan Ewen wrote:
>>>>>> Here is the state in Flink and why we have chosen not to
>>>>>> do global
>>>>> ordering
>>>>>> at the moment:
>>>>>> 
>>>>>> - Individual streams are FIFO, that means if the sender
>>>>>> emits in order, the receiver receives in order.
>>>>>> 
>>>>>> - When streams are merged (think shuffle / partition-by),
>>>>>> then the
>>>>> streams
>>>>>> are not merged, but buffers from the streams are taken as
>>>>>> the come in.
>>>>>> 
>>>>>> - We had a version that merged streams (for sort merging
>>>>>> in batch programs, actually) long ago, an it performed
>>>>>> either horribly or deadlocked. The reason is that all
>>>>>> streams are always stalled if a buffer is missing from
>>>>>> one stream, since the merge cannot continue in such a
>>>>> case
>>>>>> (head-of-the-line waiting). That backpressures streams 
>>>>>> unnecessarily, slowing down computation. If the streams
>>>>>> depend mutually on each other (think two partitioning
>>>>>> steps), they frequently dadlock completely.
>>>>>> 
>>>>>> - The only way to do that is by
>>>>>> stalling/buffering/punctuating streams continuously,
>>>>>> which is a lot of work to implement and will definitely
>>>>> cost
>>>>>> performance.
>>>>>> 
>>>>>> Therefore we have decided going for a simpler model
>>>>>> without global
>>>>> ordering
>>>>>> for now. If we start seeing that this has sever
>>>>>> limitations in practice,
>>>>> we
>>>>>> may reconsider that.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Wed, Apr 8, 2015 at 11:25 AM, Bruno Cadonna < 
>>>>>> cado...@informatik.hu-berlin.de> wrote:
>>>>>> 
>>>>>> Hi Paris,
>>>>>> 
>>>>>> what's the reason for not guaranteeing global ordering
>>>>>> across partitions in the stream model? Is it the smaller
>>>>>> overhead or are there any operations not computable in a
>>>>>> distributed environment with global ordering?
>>>>>> 
>>>>>> In any case, I agree with Matthias that the user should
>>>>>> choose. If operations were not computable with a global
>>>>>> ordering, I would restrict the set of operations for that
>>>>>> mode.
>>>>>> 
>>>>>> Maybe, it would also be helpful to collect use cases for
>>>>>> each of the modes proposed by Matthias to understand the 
>>>>>> requirements for both modes.
>>>>>> 
>>>>>> Some (researchy) thoughts about indeterminism: How can
>>>>>> the indeterminism of the current setting be quantified?
>>>>>> How "large" can it grow with the current setting? Are
>>>>>> there any limits that can be guaranteed?
>>>>>> 
>>>>>> Cheers, Bruno
>>>>>> 
>>>>>> On 07.04.2015 12:38, Paris Carbone wrote:
>>>>>>>>> Hello Matthias,
>>>>>>>>> 
>>>>>>>>> Sure, ordering guarantees are indeed a tricky
>>>>>>>>> thing, I recall having that discussion back in TU
>>>>>>>>> Berlin. Bear in mind thought that DataStream, our
>>>>>>>>> abstract data type, represents a *partitioned*
>>>>>>>>> unbounded sequence of events. There are no *global*
>>>>>>>>> ordering guarantees made whatsoever in that model
>>>>>>>>> across partitions. If you see it more generally
>>>>>>>>> there are many “race conditions” in a distributed
>>>>>>>>> execution graph of vertices that process multiple
>>>>>>>>> inputs asynchronously, especially when you add 
>>>>>>>>> joins and iterations into the mix (how do you deal
>>>>>>>>> with reprocessing “old” tuples that iterate in the
>>>>>>>>> graph). Btw have you checked the Naiad paper [1]?
>>>>>>>>> Stephan cited a while ago and it is quite relevant
>>>>>>>>> to that discussion.
>>>>>>>>> 
>>>>>>>>> Also, can you cite the paper with the joining
>>>>>>>>> semantics you are referring to? That would be of
>>>>>>>>> good help I think.
>>>>>>>>> 
>>>>>>>>> Paris
>>>>>>>>> 
>>>>>>>>> [1] 
>>>>>>>>> https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 
<https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf>
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> On 07
>>>>>>>>> Apr 2015, at 11:50, Matthias J. Sax 
>>>>>>>>> <mj...@informatik.hu-berlin.de<mailto:mj...@informatik.hu-berlin.de
>>>>>
>>>>>>>>>
>>>>>>>>>
>
>>>>>>>>> 
wrote:
>>>>>>>>> 
>>>>>>>>> Hi @all,
>>>>>>>>> 
>>>>>>>>> please keep me in the loop for this work. I am
>>>>>>>>> highly interested and I want to help on it.
>>>>>>>>> 
>>>>>>>>> My initial thoughts are as follows:
>>>>>>>>> 
>>>>>>>>> 1) Currently, system timestamps are used and the 
>>>>>>>>> suggested approach can be seen as state-of-the-art
>>>>>>>>> (there is actually a research paper using the exact
>>>>>>>>> same join semantic). Of course, the current
>>>>>>>>> approach is inherently non-deterministic. The
>>>>>>>>> advantage is, that there is no overhead in keeping
>>>>>>>>> track of the order of records and the latency
>>>>>>>>> should be very low. (Additionally, state-recovery 
>>>>>>>>> is simplified. Because, the processing in
>>>>>>>>> inherently non-deterministic, recovery can be done
>>>>>>>>> with relaxed guarantees).
>>>>>>>>> 
>>>>>>>>> 2) The user should be able to "switch on"
>>>>>>>>> deterministic processing, ie, records are
>>>>>>>>> timestamped (either externally when generated, or
>>>>>>>>> timestamped at the sources). Because deterministic
>>>>>>>>> processing adds some overhead, the user should
>>>>>>>>> decide for it actively. In this case, the order
>>>>>>>>> must be preserved in each re-distribution step
>>>>>>>>> (merging is sufficient, if order is preserved
>>>>>>>>> within each incoming channel). Furthermore,
>>>>>>>>> deterministic processing can be achieved by sound
>>>>>>>>> window semantics (and there is a bunch of them).
>>>>>>>>> Even for single-stream-windows it's a tricky
>>>>>>>>> problem; for join-windows it's even harder. From my
>>>>>>>>> point of view, it is less important which semantics
>>>>>>>>> are chosen; however, the user must be aware how it
>>>>>>>>> works. The most tricky part for deterministic 
>>>>>>>>> processing, is to deal with duplicate timestamps
>>>>>>>>> (which cannot be avoided). The timestamping for
>>>>>>>>> (intermediate) result tuples, is also an important
>>>>>>>>> question to be answered.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> -Matthias
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On 04/07/2015 11:37 AM, Gyula Fóra wrote: Hey,
>>>>>>>>> 
>>>>>>>>> I agree with Kostas, if we define the exact
>>>>>>>>> semantics how this works, this is not more ad-hoc
>>>>>>>>> than any other stateful operator with multiple
>>>>>>>>> inputs. (And I don't think any other system support
>>>>>>>>> something similar)
>>>>>>>>> 
>>>>>>>>> We need to make some design choices that are
>>>>>>>>> similar to the issues we had for windowing. We need
>>>>>>>>> to chose how we want to evaluate the windowing
>>>>>>>>> policies (global or local) because that affects
>>>>>>>>> what kind of policies can be parallel, but I can
>>>>>>>>> work on these things.
>>>>>>>>> 
>>>>>>>>> I think this is an amazing feature, so I wouldn't 
>>>>>>>>> necessarily rush the implementation for 0.9
>>>>>>>>> though.
>>>>>>>>> 
>>>>>>>>> And thanks for helping writing these down.
>>>>>>>>> 
>>>>>>>>> Gyula
>>>>>>>>> 
>>>>>>>>> On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas 
>>>>>>>>> <ktzou...@apache.org<mailto:ktzou...@apache.org>>
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>> Yes, we should write these semantics down. I
>>>>>>>>> volunteer to help.
>>>>>>>>> 
>>>>>>>>> I don't think that this is very ad-hoc. The
>>>>>>>>> semantics are basically the following. Assuming an
>>>>>>>>> arriving element from the left side: (1) We find
>>>>>>>>> the right-side matches (2) We insert the left-side
>>>>>>>>> arrival into the left window (3) We recompute the
>>>>>>>>> left window We need to see whether right window
>>>>>>>>> re-computation needs to be triggered as well. I
>>>>>>>>> think that this way of joining streams is also what
>>>>>>>>> the symmetric hash join algorithms were meant to 
>>>>>>>>> support.
>>>>>>>>> 
>>>>>>>>> Kostas
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen 
>>>>>>>>> <se...@apache.org<mailto:se...@apache.org>> wrote:
>>>>>>>>> 
>>>>>>>>> Is the approach of joining an element at a time
>>>>>>>>> from one input against a window on the other input
>>>>>>>>> not a bit arbitrary?
>>>>>>>>> 
>>>>>>>>> This just joins whatever currently happens to be
>>>>>>>>> the window by the time the single element arrives -
>>>>>>>>> that is a bit non-predictable, right?
>>>>>>>>> 
>>>>>>>>> As a more general point: The whole semantics of
>>>>>>>>> windowing and when they are triggered are a bit
>>>>>>>>> ad-hoc now. It would be really good to start
>>>>>>>>> formalizing that a bit and put it down somewhere.
>>>>>>>>> Users need to be able to clearly understand and how
>>>>>>>>> to predict the output.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra 
>>>>>>>>> <gyula.f...@gmail.com<mailto:gyula.f...@gmail.com>>
>>>>>>>>>
>>>>>>>>> 
wrote:
>>>>>>>>> 
>>>>>>>>> I think it should be possible to make this
>>>>>>>>> compatible with the .window().every() calls. Maybe
>>>>>>>>> if there is some trigger set in "every" we would
>>>>>>>>> not join that stream 1 by 1 but every so many
>>>>>>>>> elements. The problem here is that the window and
>>>>>>>>> every in this case are very-very different than the
>>>>>>>>> normal windowing semantics. The window would define
>>>>>>>>> the join window for each element of the other 
>>>>>>>>> stream while every would define how often I join
>>>>>>>>> This stream with the other one.
>>>>>>>>> 
>>>>>>>>> We need to think to make this intuitive.
>>>>>>>>> 
>>>>>>>>> On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi < 
>>>>>>>>> balassi.mar...@gmail.com<mailto:balassi.mar...@gmail.com>>
>>>>>>>>>
>>>>>>>>> 
wrote:
>>>>>>>>> 
>>>>>>>>> That would be really neat, the problem I see there,
>>>>>>>>> that we do not distinguish between
>>>>>>>>> dataStream.window() and dataStream.window().every()
>>>>>>>>> currently, they both return WindowedDataStreams and
>>>>>>>>> TriggerPolicies of the every call do not make much
>>>>>>>>> sense in this setting (in fact practically the
>>>>>>>>> trigger is always set to count of one).
>>>>>>>>> 
>>>>>>>>> But of course we could make it in a way, that we
>>>>>>>>> check that the eviction should be either null or
>>>>>>>>> count of 1, in every other case we throw an
>>>>>>>>> exception while building the JobGraph.
>>>>>>>>> 
>>>>>>>>> On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek < 
>>>>>>>>> aljos...@apache.org<mailto:aljos...@apache.org>>
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>> Or you could define it like this:
>>>>>>>>> 
>>>>>>>>> stream_A = a.window(...) stream_B = b.window(...)
>>>>>>>>> 
>>>>>>>>> stream_A.join(stream_B).where().equals().with()
>>>>>>>>> 
>>>>>>>>> So a join would just be a join of two 
>>>>>>>>> WindowedDataStreamS. This would neatly move the
>>>>>>>>> windowing stuff into one place.
>>>>>>>>> 
>>>>>>>>> On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi < 
>>>>>>>>> balassi.mar...@gmail.com<mailto:balassi.mar...@gmail.com>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>
>>>>>>>>> 
wrote: Big +1 for the proposal for Peter and Gyula. I'm really for
>>>>>>>>> bringing the windowing and window join API in
>>>>>>>>> sync.
>>>>>>>>> 
>>>>>>>>> On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra 
>>>>>>>>> <gyf...@apache.org<mailto:gyf...@apache.org>>
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>> Hey guys,
>>>>>>>>> 
>>>>>>>>> As Aljoscha has highlighted earlier the current
>>>>>>>>> window join semantics in the streaming api doesn't
>>>>>>>>> follow the changes in the windowing api. More
>>>>>>>>> precisely, we currently only support joins over
>>>>>>>>> time windows of equal size on both streams. The
>>>>>>>>> reason for this is that we now take a window of
>>>>>>>>> each of the two streams and do joins over these
>>>>>>>>> pairs. This would be a blocking operation if the
>>>>>>>>> windows are not closed at exactly the same time
>>>>>>>>> (and since we dont want this we only allow time
>>>>>>>>> windows)
>>>>>>>>> 
>>>>>>>>> I talked with Peter who came up with the initial
>>>>>>>>> idea of an alternative approach for stream joins
>>>>>>>>> which works as follows:
>>>>>>>>> 
>>>>>>>>> Instead of pairing windows for joins, we do
>>>>>>>>> element against window joins. What this means is
>>>>>>>>> that whenever we receive an element from one of the
>>>>>>>>> streams, we join this element with the current
>>>>>>>>> window(this window is constantly updated) of the
>>>>>>>>> other stream. This is non-blocking on any window
>>>>>>>>> definitions as we dont have to wait for windows to 
>>>>>>>>> be completed and we can use this with any of our 
>>>>>>>>> predefined policies like Time.of(...),
>>>>>>>>> Count.of(...), Delta.of(....).
>>>>>>>>> 
>>>>>>>>> Additionally this also allows some very flexible
>>>>>>>>> way of defining window joins. With this we could
>>>>>>>>> also define grouped windowing inside if a join. An
>>>>>>>>> example of this would be: Join all elements of
>>>>>>>>> Stream1 with the last 5 elements by a given
>>>>>>>>> windowkey of Stream2 on some join key.
>>>>>>>>> 
>>>>>>>>> This feature can be easily implemented over the
>>>>>>>>> current operators, so I already have a working
>>>>>>>>> prototype for the simple non-grouped case. My only
>>>>>>>>> concern is the API, the best thing I could come up
>>>>>>>>> with is something like this:
>>>>>>>>> 
>>>>>>>>> stream_A.join(stream_B).onWindow(windowDefA, 
>>>>>>>>> windowDefB).by(windowKey1, 
>>>>>>>>> windowKey2).where(...).equalTo(...).with(...)
>>>>>>>>> 
>>>>>>>>> (the user can omit the "by" and "with" calls)
>>>>>>>>> 
>>>>>>>>> I think this new approach would be worthy of our 
>>>>>>>>> "flexible windowing" in contrast with the current 
>>>>>>>>> approach.
>>>>>>>>> 
>>>>>>>>> Regards, Gyula
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>> 
> 
>> 
> 

- -- 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

  Dr. Bruno Cadonna
  Postdoctoral Researcher

  Databases and Information Systems
  Department of Computer Science
  Humboldt-Universität zu Berlin

  http://www.informatik.hu-berlin.de/~cadonnab

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.11 (GNU/Linux)

iQEcBAEBAgAGBQJVJSoUAAoJEKdCIJx7flKw2BwH/RyhPD7nXGLTwgPJvGBIaPKl
ghtTi1t7WutYiVRhHJmISK+kbNWxfXSioYhC8IzK2aqZZQ/j3hn3xVZIL4c+c+Oq
jpRGNhC7xFzqRCF8ki/saFpQO9vU2zLP70uk9bQGIna19z2Ye28Ofoc0d+Dv/qrV
acH5HX+s+rdmbt9mgcGeyA0/+Hh/HWPiic0auRhhX06zHuynrHBLDaS32RVAOT79
bZ91VIElyu0pDCafRDrWNOzVlop+wGLl6cvBGc/vznO501M1jjJvqS9npLsPKrpE
OrpBRHIrWT2PicZwLMaSzoZsjl15Sy0qsV8On4e6Ug5f6RhPlKH2IVmVVIDZZ1I=
=sd8+
-----END PGP SIGNATURE-----

Reply via email to