Hi Paris,

thanks for the pointer to the Naiad paper. That is quite interesting.

The paper I mentioned [1], does not describe the semantics in detail; it
is more about the implementation for the stream-joins. However, it uses
the same semantics (from my understanding) as proposed by Gyula.

-Matthias

[1] Kang, Naughton, Viglas. "Evaluationg Window Joins over Unbounded
Streams". VLDB 2002.



On 04/07/2015 12:38 PM, Paris Carbone wrote:
> Hello Matthias,
> 
> Sure, ordering guarantees are indeed a tricky thing, I recall having that 
> discussion back in TU Berlin. Bear in mind thought that DataStream, our 
> abstract data type, represents a *partitioned* unbounded sequence of events. 
> There are no *global* ordering guarantees made whatsoever in that model 
> across partitions. If you see it more generally there are many “race 
> conditions” in a distributed execution graph of vertices that process 
> multiple inputs asynchronously, especially when you add joins and iterations 
> into the mix (how do you deal with reprocessing “old” tuples that iterate in 
> the graph). Btw have you checked the Naiad paper [1]? Stephan cited a while 
> ago and it is quite relevant to that discussion.
> 
> Also, can you cite the paper with the joining semantics you are referring to? 
> That would be of good help I think.
> 
> Paris
> 
> [1] https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf
> 
> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf>
> 
> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf>
> On 07 Apr 2015, at 11:50, Matthias J. Sax 
> <mj...@informatik.hu-berlin.de<mailto:mj...@informatik.hu-berlin.de>> wrote:
> 
> Hi @all,
> 
> please keep me in the loop for this work. I am highly interested and I
> want to help on it.
> 
> My initial thoughts are as follows:
> 
> 1) Currently, system timestamps are used and the suggested approach can
> be seen as state-of-the-art (there is actually a research paper using
> the exact same join semantic). Of course, the current approach is
> inherently non-deterministic. The advantage is, that there is no
> overhead in keeping track of the order of records and the latency should
> be very low. (Additionally, state-recovery is simplified. Because, the
> processing in inherently non-deterministic, recovery can be done with
> relaxed guarantees).
> 
>  2) The user should be able to "switch on" deterministic processing,
> ie, records are timestamped (either externally when generated, or
> timestamped at the sources). Because deterministic processing adds some
> overhead, the user should decide for it actively.
> In this case, the order must be preserved in each re-distribution step
> (merging is sufficient, if order is preserved within each incoming
> channel). Furthermore, deterministic processing can be achieved by sound
> window semantics (and there is a bunch of them). Even for
> single-stream-windows it's a tricky problem; for join-windows it's even
> harder. From my point of view, it is less important which semantics are
> chosen; however, the user must be aware how it works. The most tricky
> part for deterministic processing, is to deal with duplicate timestamps
> (which cannot be avoided). The timestamping for (intermediate) result
> tuples, is also an important question to be answered.
> 
> 
> -Matthias
> 
> 
> On 04/07/2015 11:37 AM, Gyula Fóra wrote:
> Hey,
> 
> I agree with Kostas, if we define the exact semantics how this works, this
> is not more ad-hoc than any other stateful operator with multiple inputs.
> (And I don't think any other system support something similar)
> 
> We need to make some design choices that are similar to the issues we had
> for windowing. We need to chose how we want to evaluate the windowing
> policies (global or local) because that affects what kind of policies can
> be parallel, but I can work on these things.
> 
> I think this is an amazing feature, so I wouldn't necessarily rush the
> implementation for 0.9 though.
> 
> And thanks for helping writing these down.
> 
> Gyula
> 
> On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas 
> <ktzou...@apache.org<mailto:ktzou...@apache.org>> wrote:
> 
> Yes, we should write these semantics down. I volunteer to help.
> 
> I don't think that this is very ad-hoc. The semantics are basically the
> following. Assuming an arriving element from the left side:
> (1) We find the right-side matches
> (2) We insert the left-side arrival into the left window
> (3) We recompute the left window
> We need to see whether right window re-computation needs to be triggered as
> well. I think that this way of joining streams is also what the symmetric
> hash join algorithms were meant to support.
> 
> Kostas
> 
> 
> On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen 
> <se...@apache.org<mailto:se...@apache.org>> wrote:
> 
> Is the approach of joining an element at a time from one input against a
> window on the other input not a bit arbitrary?
> 
> This just joins whatever currently happens to be the window by the time
> the
> single element arrives - that is a bit non-predictable, right?
> 
> As a more general point: The whole semantics of windowing and when they
> are
> triggered are a bit ad-hoc now. It would be really good to start
> formalizing that a bit and
> put it down somewhere. Users need to be able to clearly understand and
> how
> to predict the output.
> 
> 
> 
> On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra 
> <gyula.f...@gmail.com<mailto:gyula.f...@gmail.com>>
> wrote:
> 
> I think it should be possible to make this compatible with the
> .window().every() calls. Maybe if there is some trigger set in "every"
> we
> would not join that stream 1 by 1 but every so many elements. The
> problem
> here is that the window and every in this case are very-very different
> than
> the normal windowing semantics. The window would define the join window
> for
> each element of the other stream while every would define how often I
> join
> This stream with the other one.
> 
> We need to think to make this intuitive.
> 
> On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi <
> balassi.mar...@gmail.com<mailto:balassi.mar...@gmail.com>>
> wrote:
> 
> That would be really neat, the problem I see there, that we do not
> distinguish between dataStream.window() and
> dataStream.window().every()
> currently, they both return WindowedDataStreams and TriggerPolicies
> of
> the
> every call do not make much sense in this setting (in fact
> practically
> the
> trigger is always set to count of one).
> 
> But of course we could make it in a way, that we check that the
> eviction
> should be either null or count of 1, in every other case we throw an
> exception while building the JobGraph.
> 
> On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek <
> aljos...@apache.org<mailto:aljos...@apache.org>>
> wrote:
> 
> Or you could define it like this:
> 
> stream_A = a.window(...)
> stream_B = b.window(...)
> 
> stream_A.join(stream_B).where().equals().with()
> 
> So a join would just be a join of two WindowedDataStreamS. This
> would
> neatly move the windowing stuff into one place.
> 
> On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <
> balassi.mar...@gmail.com<mailto:balassi.mar...@gmail.com>
> 
> wrote:
> Big +1 for the proposal for Peter and Gyula. I'm really for
> bringing
> the
> windowing and window join API in sync.
> 
> On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra 
> <gyf...@apache.org<mailto:gyf...@apache.org>>
> wrote:
> 
> Hey guys,
> 
> As Aljoscha has highlighted earlier the current window join
> semantics
> in
> the streaming api doesn't follow the changes in the windowing
> api.
> More
> precisely, we currently only support joins over time windows of
> equal
> size
> on both streams. The reason for this is that we now take a
> window
> of
> each
> of the two streams and do joins over these pairs. This would be
> a
> blocking
> operation if the windows are not closed at exactly the same time
> (and
> since
> we dont want this we only allow time windows)
> 
> I talked with Peter who came up with the initial idea of an
> alternative
> approach for stream joins which works as follows:
> 
> Instead of pairing windows for joins, we do element against
> window
> joins.
> What this means is that whenever we receive an element from one
> of
> the
> streams, we join this element with the current window(this
> window
> is
> constantly updated) of the other stream. This is non-blocking on
> any
> window
> definitions as we dont have to wait for windows to be completed
> and
> we
> can
> use this with any of our predefined policies like Time.of(...),
> Count.of(...), Delta.of(....).
> 
> Additionally this also allows some very flexible way of defining
> window
> joins. With this we could also define grouped windowing inside
> if
> a
> join.
> An example of this would be: Join all elements of Stream1 with
> the
> last
> 5
> elements by a given windowkey of Stream2 on some join key.
> 
> This feature can be easily implemented over the current
> operators,
> so
> I
> already have a working prototype for the simple non-grouped
> case.
> My
> only
> concern is the API, the best thing I could come up with is
> something
> like
> this:
> 
> stream_A.join(stream_B).onWindow(windowDefA,
> windowDefB).by(windowKey1,
> windowKey2).where(...).equalTo(...).with(...)
> 
> (the user can omit the "by" and "with" calls)
> 
> I think this new approach would be worthy of our "flexible
> windowing"
> in
> contrast with the current approach.
> 
> Regards,
> Gyula
> 
> 
> 
> 
> 
> 
> 
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to