I think it should be possible to make this compatible with the
.window().every() calls. Maybe if there is some trigger set in "every" we
would not join that stream 1 by 1 but every so many elements. The problem
here is that the window and every in this case are very-very different than
the normal windowing semantics. The window would define the join window for
each element of the other stream while every would define how often I join
This stream with the other one.

We need to think to make this intuitive.

On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi <balassi.mar...@gmail.com>
wrote:

> That would be really neat, the problem I see there, that we do not
> distinguish between dataStream.window() and dataStream.window().every()
> currently, they both return WindowedDataStreams and TriggerPolicies of the
> every call do not make much sense in this setting (in fact practically the
> trigger is always set to count of one).
>
> But of course we could make it in a way, that we check that the eviction
> should be either null or count of 1, in every other case we throw an
> exception while building the JobGraph.
>
> On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> > Or you could define it like this:
> >
> > stream_A = a.window(...)
> > stream_B = b.window(...)
> >
> > stream_A.join(stream_B).where().equals().with()
> >
> > So a join would just be a join of two WindowedDataStreamS. This would
> > neatly move the windowing stuff into one place.
> >
> > On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <balassi.mar...@gmail.com
> >
> > wrote:
> > > Big +1 for the proposal for Peter and Gyula. I'm really for bringing
> the
> > > windowing and window join API in sync.
> > >
> > > On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra <gyf...@apache.org> wrote:
> > >
> > >> Hey guys,
> > >>
> > >> As Aljoscha has highlighted earlier the current window join semantics
> in
> > >> the streaming api doesn't follow the changes in the windowing api.
> More
> > >> precisely, we currently only support joins over time windows of equal
> > size
> > >> on both streams. The reason for this is that we now take a window of
> > each
> > >> of the two streams and do joins over these pairs. This would be a
> > blocking
> > >> operation if the windows are not closed at exactly the same time (and
> > since
> > >> we dont want this we only allow time windows)
> > >>
> > >> I talked with Peter who came up with the initial idea of an
> alternative
> > >> approach for stream joins which works as follows:
> > >>
> > >> Instead of pairing windows for joins, we do element against window
> > joins.
> > >> What this means is that whenever we receive an element from one of the
> > >> streams, we join this element with the current window(this window is
> > >> constantly updated) of the other stream. This is non-blocking on any
> > window
> > >> definitions as we dont have to wait for windows to be completed and we
> > can
> > >> use this with any of our predefined policies like Time.of(...),
> > >> Count.of(...), Delta.of(....).
> > >>
> > >> Additionally this also allows some very flexible way of defining
> window
> > >> joins. With this we could also define grouped windowing inside if a
> > join.
> > >> An example of this would be: Join all elements of Stream1 with the
> last
> > 5
> > >> elements by a given windowkey of Stream2 on some join key.
> > >>
> > >> This feature can be easily implemented over the current operators, so
> I
> > >> already have a working prototype for the simple non-grouped case. My
> > only
> > >> concern is the API, the best thing I could come up with is something
> > like
> > >> this:
> > >>
> > >> stream_A.join(stream_B).onWindow(windowDefA,
> windowDefB).by(windowKey1,
> > >> windowKey2).where(...).equalTo(...).with(...)
> > >>
> > >> (the user can omit the "by" and "with" calls)
> > >>
> > >> I think this new approach would be worthy of our "flexible windowing"
> in
> > >> contrast with the current approach.
> > >>
> > >> Regards,
> > >> Gyula
> > >>
> >
>

Reply via email to