Hello Matthias,

Sure, ordering guarantees are indeed a tricky thing, I recall having that 
discussion back in TU Berlin. Bear in mind thought that DataStream, our 
abstract data type, represents a *partitioned* unbounded sequence of events. 
There are no *global* ordering guarantees made whatsoever in that model across 
partitions. If you see it more generally there are many “race conditions” in a 
distributed execution graph of vertices that process multiple inputs 
asynchronously, especially when you add joins and iterations into the mix (how 
do you deal with reprocessing “old” tuples that iterate in the graph). Btw have 
you checked the Naiad paper [1]? Stephan cited a while ago and it is quite 
relevant to that discussion.

Also, can you cite the paper with the joining semantics you are referring to? 
That would be of good help I think.

Paris

[1] https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf

<https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf>

<https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf>
On 07 Apr 2015, at 11:50, Matthias J. Sax 
<mj...@informatik.hu-berlin.de<mailto:mj...@informatik.hu-berlin.de>> wrote:

Hi @all,

please keep me in the loop for this work. I am highly interested and I
want to help on it.

My initial thoughts are as follows:

1) Currently, system timestamps are used and the suggested approach can
be seen as state-of-the-art (there is actually a research paper using
the exact same join semantic). Of course, the current approach is
inherently non-deterministic. The advantage is, that there is no
overhead in keeping track of the order of records and the latency should
be very low. (Additionally, state-recovery is simplified. Because, the
processing in inherently non-deterministic, recovery can be done with
relaxed guarantees).

 2) The user should be able to "switch on" deterministic processing,
ie, records are timestamped (either externally when generated, or
timestamped at the sources). Because deterministic processing adds some
overhead, the user should decide for it actively.
In this case, the order must be preserved in each re-distribution step
(merging is sufficient, if order is preserved within each incoming
channel). Furthermore, deterministic processing can be achieved by sound
window semantics (and there is a bunch of them). Even for
single-stream-windows it's a tricky problem; for join-windows it's even
harder. From my point of view, it is less important which semantics are
chosen; however, the user must be aware how it works. The most tricky
part for deterministic processing, is to deal with duplicate timestamps
(which cannot be avoided). The timestamping for (intermediate) result
tuples, is also an important question to be answered.


-Matthias


On 04/07/2015 11:37 AM, Gyula Fóra wrote:
Hey,

I agree with Kostas, if we define the exact semantics how this works, this
is not more ad-hoc than any other stateful operator with multiple inputs.
(And I don't think any other system support something similar)

We need to make some design choices that are similar to the issues we had
for windowing. We need to chose how we want to evaluate the windowing
policies (global or local) because that affects what kind of policies can
be parallel, but I can work on these things.

I think this is an amazing feature, so I wouldn't necessarily rush the
implementation for 0.9 though.

And thanks for helping writing these down.

Gyula

On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas 
<ktzou...@apache.org<mailto:ktzou...@apache.org>> wrote:

Yes, we should write these semantics down. I volunteer to help.

I don't think that this is very ad-hoc. The semantics are basically the
following. Assuming an arriving element from the left side:
(1) We find the right-side matches
(2) We insert the left-side arrival into the left window
(3) We recompute the left window
We need to see whether right window re-computation needs to be triggered as
well. I think that this way of joining streams is also what the symmetric
hash join algorithms were meant to support.

Kostas


On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen 
<se...@apache.org<mailto:se...@apache.org>> wrote:

Is the approach of joining an element at a time from one input against a
window on the other input not a bit arbitrary?

This just joins whatever currently happens to be the window by the time
the
single element arrives - that is a bit non-predictable, right?

As a more general point: The whole semantics of windowing and when they
are
triggered are a bit ad-hoc now. It would be really good to start
formalizing that a bit and
put it down somewhere. Users need to be able to clearly understand and
how
to predict the output.



On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra 
<gyula.f...@gmail.com<mailto:gyula.f...@gmail.com>>
wrote:

I think it should be possible to make this compatible with the
.window().every() calls. Maybe if there is some trigger set in "every"
we
would not join that stream 1 by 1 but every so many elements. The
problem
here is that the window and every in this case are very-very different
than
the normal windowing semantics. The window would define the join window
for
each element of the other stream while every would define how often I
join
This stream with the other one.

We need to think to make this intuitive.

On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi <
balassi.mar...@gmail.com<mailto:balassi.mar...@gmail.com>>
wrote:

That would be really neat, the problem I see there, that we do not
distinguish between dataStream.window() and
dataStream.window().every()
currently, they both return WindowedDataStreams and TriggerPolicies
of
the
every call do not make much sense in this setting (in fact
practically
the
trigger is always set to count of one).

But of course we could make it in a way, that we check that the
eviction
should be either null or count of 1, in every other case we throw an
exception while building the JobGraph.

On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek <
aljos...@apache.org<mailto:aljos...@apache.org>>
wrote:

Or you could define it like this:

stream_A = a.window(...)
stream_B = b.window(...)

stream_A.join(stream_B).where().equals().with()

So a join would just be a join of two WindowedDataStreamS. This
would
neatly move the windowing stuff into one place.

On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <
balassi.mar...@gmail.com<mailto:balassi.mar...@gmail.com>

wrote:
Big +1 for the proposal for Peter and Gyula. I'm really for
bringing
the
windowing and window join API in sync.

On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra 
<gyf...@apache.org<mailto:gyf...@apache.org>>
wrote:

Hey guys,

As Aljoscha has highlighted earlier the current window join
semantics
in
the streaming api doesn't follow the changes in the windowing
api.
More
precisely, we currently only support joins over time windows of
equal
size
on both streams. The reason for this is that we now take a
window
of
each
of the two streams and do joins over these pairs. This would be
a
blocking
operation if the windows are not closed at exactly the same time
(and
since
we dont want this we only allow time windows)

I talked with Peter who came up with the initial idea of an
alternative
approach for stream joins which works as follows:

Instead of pairing windows for joins, we do element against
window
joins.
What this means is that whenever we receive an element from one
of
the
streams, we join this element with the current window(this
window
is
constantly updated) of the other stream. This is non-blocking on
any
window
definitions as we dont have to wait for windows to be completed
and
we
can
use this with any of our predefined policies like Time.of(...),
Count.of(...), Delta.of(....).

Additionally this also allows some very flexible way of defining
window
joins. With this we could also define grouped windowing inside
if
a
join.
An example of this would be: Join all elements of Stream1 with
the
last
5
elements by a given windowkey of Stream2 on some join key.

This feature can be easily implemented over the current
operators,
so
I
already have a working prototype for the simple non-grouped
case.
My
only
concern is the API, the best thing I could come up with is
something
like
this:

stream_A.join(stream_B).onWindow(windowDefA,
windowDefB).by(windowKey1,
windowKey2).where(...).equalTo(...).with(...)

(the user can omit the "by" and "with" calls)

I think this new approach would be worthy of our "flexible
windowing"
in
contrast with the current approach.

Regards,
Gyula









Reply via email to