Yes, we should write these semantics down. I volunteer to help.

I don't think that this is very ad-hoc. The semantics are basically the
following. Assuming an arriving element from the left side:
(1) We find the right-side matches
(2) We insert the left-side arrival into the left window
(3) We recompute the left window
We need to see whether right window re-computation needs to be triggered as
well. I think that this way of joining streams is also what the symmetric
hash join algorithms were meant to support.

Kostas


On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen <se...@apache.org> wrote:

> Is the approach of joining an element at a time from one input against a
> window on the other input not a bit arbitrary?
>
> This just joins whatever currently happens to be the window by the time the
> single element arrives - that is a bit non-predictable, right?
>
> As a more general point: The whole semantics of windowing and when they are
> triggered are a bit ad-hoc now. It would be really good to start
> formalizing that a bit and
> put it down somewhere. Users need to be able to clearly understand and how
> to predict the output.
>
>
>
> On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra <gyula.f...@gmail.com> wrote:
>
> > I think it should be possible to make this compatible with the
> > .window().every() calls. Maybe if there is some trigger set in "every" we
> > would not join that stream 1 by 1 but every so many elements. The problem
> > here is that the window and every in this case are very-very different
> than
> > the normal windowing semantics. The window would define the join window
> for
> > each element of the other stream while every would define how often I
> join
> > This stream with the other one.
> >
> > We need to think to make this intuitive.
> >
> > On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi <
> balassi.mar...@gmail.com>
> > wrote:
> >
> > > That would be really neat, the problem I see there, that we do not
> > > distinguish between dataStream.window() and dataStream.window().every()
> > > currently, they both return WindowedDataStreams and TriggerPolicies of
> > the
> > > every call do not make much sense in this setting (in fact practically
> > the
> > > trigger is always set to count of one).
> > >
> > > But of course we could make it in a way, that we check that the
> eviction
> > > should be either null or count of 1, in every other case we throw an
> > > exception while building the JobGraph.
> > >
> > > On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek <aljos...@apache.org>
> > > wrote:
> > >
> > > > Or you could define it like this:
> > > >
> > > > stream_A = a.window(...)
> > > > stream_B = b.window(...)
> > > >
> > > > stream_A.join(stream_B).where().equals().with()
> > > >
> > > > So a join would just be a join of two WindowedDataStreamS. This would
> > > > neatly move the windowing stuff into one place.
> > > >
> > > > On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <
> > balassi.mar...@gmail.com
> > > >
> > > > wrote:
> > > > > Big +1 for the proposal for Peter and Gyula. I'm really for
> bringing
> > > the
> > > > > windowing and window join API in sync.
> > > > >
> > > > > On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra <gyf...@apache.org>
> > wrote:
> > > > >
> > > > >> Hey guys,
> > > > >>
> > > > >> As Aljoscha has highlighted earlier the current window join
> > semantics
> > > in
> > > > >> the streaming api doesn't follow the changes in the windowing api.
> > > More
> > > > >> precisely, we currently only support joins over time windows of
> > equal
> > > > size
> > > > >> on both streams. The reason for this is that we now take a window
> of
> > > > each
> > > > >> of the two streams and do joins over these pairs. This would be a
> > > > blocking
> > > > >> operation if the windows are not closed at exactly the same time
> > (and
> > > > since
> > > > >> we dont want this we only allow time windows)
> > > > >>
> > > > >> I talked with Peter who came up with the initial idea of an
> > > alternative
> > > > >> approach for stream joins which works as follows:
> > > > >>
> > > > >> Instead of pairing windows for joins, we do element against window
> > > > joins.
> > > > >> What this means is that whenever we receive an element from one of
> > the
> > > > >> streams, we join this element with the current window(this window
> is
> > > > >> constantly updated) of the other stream. This is non-blocking on
> any
> > > > window
> > > > >> definitions as we dont have to wait for windows to be completed
> and
> > we
> > > > can
> > > > >> use this with any of our predefined policies like Time.of(...),
> > > > >> Count.of(...), Delta.of(....).
> > > > >>
> > > > >> Additionally this also allows some very flexible way of defining
> > > window
> > > > >> joins. With this we could also define grouped windowing inside if
> a
> > > > join.
> > > > >> An example of this would be: Join all elements of Stream1 with the
> > > last
> > > > 5
> > > > >> elements by a given windowkey of Stream2 on some join key.
> > > > >>
> > > > >> This feature can be easily implemented over the current operators,
> > so
> > > I
> > > > >> already have a working prototype for the simple non-grouped case.
> My
> > > > only
> > > > >> concern is the API, the best thing I could come up with is
> something
> > > > like
> > > > >> this:
> > > > >>
> > > > >> stream_A.join(stream_B).onWindow(windowDefA,
> > > windowDefB).by(windowKey1,
> > > > >> windowKey2).where(...).equalTo(...).with(...)
> > > > >>
> > > > >> (the user can omit the "by" and "with" calls)
> > > > >>
> > > > >> I think this new approach would be worthy of our "flexible
> > windowing"
> > > in
> > > > >> contrast with the current approach.
> > > > >>
> > > > >> Regards,
> > > > >> Gyula
> > > > >>
> > > >
> > >
> >
>

Reply via email to