Is the approach of joining an element at a time from one input against a
window on the other input not a bit arbitrary?

This just joins whatever currently happens to be the window by the time the
single element arrives - that is a bit non-predictable, right?

As a more general point: The whole semantics of windowing and when they are
triggered are a bit ad-hoc now. It would be really good to start
formalizing that a bit and
put it down somewhere. Users need to be able to clearly understand and how
to predict the output.



On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra <gyula.f...@gmail.com> wrote:

> I think it should be possible to make this compatible with the
> .window().every() calls. Maybe if there is some trigger set in "every" we
> would not join that stream 1 by 1 but every so many elements. The problem
> here is that the window and every in this case are very-very different than
> the normal windowing semantics. The window would define the join window for
> each element of the other stream while every would define how often I join
> This stream with the other one.
>
> We need to think to make this intuitive.
>
> On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi <balassi.mar...@gmail.com>
> wrote:
>
> > That would be really neat, the problem I see there, that we do not
> > distinguish between dataStream.window() and dataStream.window().every()
> > currently, they both return WindowedDataStreams and TriggerPolicies of
> the
> > every call do not make much sense in this setting (in fact practically
> the
> > trigger is always set to count of one).
> >
> > But of course we could make it in a way, that we check that the eviction
> > should be either null or count of 1, in every other case we throw an
> > exception while building the JobGraph.
> >
> > On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek <aljos...@apache.org>
> > wrote:
> >
> > > Or you could define it like this:
> > >
> > > stream_A = a.window(...)
> > > stream_B = b.window(...)
> > >
> > > stream_A.join(stream_B).where().equals().with()
> > >
> > > So a join would just be a join of two WindowedDataStreamS. This would
> > > neatly move the windowing stuff into one place.
> > >
> > > On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <
> balassi.mar...@gmail.com
> > >
> > > wrote:
> > > > Big +1 for the proposal for Peter and Gyula. I'm really for bringing
> > the
> > > > windowing and window join API in sync.
> > > >
> > > > On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra <gyf...@apache.org>
> wrote:
> > > >
> > > >> Hey guys,
> > > >>
> > > >> As Aljoscha has highlighted earlier the current window join
> semantics
> > in
> > > >> the streaming api doesn't follow the changes in the windowing api.
> > More
> > > >> precisely, we currently only support joins over time windows of
> equal
> > > size
> > > >> on both streams. The reason for this is that we now take a window of
> > > each
> > > >> of the two streams and do joins over these pairs. This would be a
> > > blocking
> > > >> operation if the windows are not closed at exactly the same time
> (and
> > > since
> > > >> we dont want this we only allow time windows)
> > > >>
> > > >> I talked with Peter who came up with the initial idea of an
> > alternative
> > > >> approach for stream joins which works as follows:
> > > >>
> > > >> Instead of pairing windows for joins, we do element against window
> > > joins.
> > > >> What this means is that whenever we receive an element from one of
> the
> > > >> streams, we join this element with the current window(this window is
> > > >> constantly updated) of the other stream. This is non-blocking on any
> > > window
> > > >> definitions as we dont have to wait for windows to be completed and
> we
> > > can
> > > >> use this with any of our predefined policies like Time.of(...),
> > > >> Count.of(...), Delta.of(....).
> > > >>
> > > >> Additionally this also allows some very flexible way of defining
> > window
> > > >> joins. With this we could also define grouped windowing inside if a
> > > join.
> > > >> An example of this would be: Join all elements of Stream1 with the
> > last
> > > 5
> > > >> elements by a given windowkey of Stream2 on some join key.
> > > >>
> > > >> This feature can be easily implemented over the current operators,
> so
> > I
> > > >> already have a working prototype for the simple non-grouped case. My
> > > only
> > > >> concern is the API, the best thing I could come up with is something
> > > like
> > > >> this:
> > > >>
> > > >> stream_A.join(stream_B).onWindow(windowDefA,
> > windowDefB).by(windowKey1,
> > > >> windowKey2).where(...).equalTo(...).with(...)
> > > >>
> > > >> (the user can omit the "by" and "with" calls)
> > > >>
> > > >> I think this new approach would be worthy of our "flexible
> windowing"
> > in
> > > >> contrast with the current approach.
> > > >>
> > > >> Regards,
> > > >> Gyula
> > > >>
> > >
> >
>

Reply via email to