That would be really neat, the problem I see there, that we do not
distinguish between dataStream.window() and dataStream.window().every()
currently, they both return WindowedDataStreams and TriggerPolicies of the
every call do not make much sense in this setting (in fact practically the
trigger is always set to count of one).

But of course we could make it in a way, that we check that the eviction
should be either null or count of 1, in every other case we throw an
exception while building the JobGraph.

On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Or you could define it like this:
>
> stream_A = a.window(...)
> stream_B = b.window(...)
>
> stream_A.join(stream_B).where().equals().with()
>
> So a join would just be a join of two WindowedDataStreamS. This would
> neatly move the windowing stuff into one place.
>
> On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <balassi.mar...@gmail.com>
> wrote:
> > Big +1 for the proposal for Peter and Gyula. I'm really for bringing the
> > windowing and window join API in sync.
> >
> > On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra <gyf...@apache.org> wrote:
> >
> >> Hey guys,
> >>
> >> As Aljoscha has highlighted earlier the current window join semantics in
> >> the streaming api doesn't follow the changes in the windowing api. More
> >> precisely, we currently only support joins over time windows of equal
> size
> >> on both streams. The reason for this is that we now take a window of
> each
> >> of the two streams and do joins over these pairs. This would be a
> blocking
> >> operation if the windows are not closed at exactly the same time (and
> since
> >> we dont want this we only allow time windows)
> >>
> >> I talked with Peter who came up with the initial idea of an alternative
> >> approach for stream joins which works as follows:
> >>
> >> Instead of pairing windows for joins, we do element against window
> joins.
> >> What this means is that whenever we receive an element from one of the
> >> streams, we join this element with the current window(this window is
> >> constantly updated) of the other stream. This is non-blocking on any
> window
> >> definitions as we dont have to wait for windows to be completed and we
> can
> >> use this with any of our predefined policies like Time.of(...),
> >> Count.of(...), Delta.of(....).
> >>
> >> Additionally this also allows some very flexible way of defining window
> >> joins. With this we could also define grouped windowing inside if a
> join.
> >> An example of this would be: Join all elements of Stream1 with the last
> 5
> >> elements by a given windowkey of Stream2 on some join key.
> >>
> >> This feature can be easily implemented over the current operators, so I
> >> already have a working prototype for the simple non-grouped case. My
> only
> >> concern is the API, the best thing I could come up with is something
> like
> >> this:
> >>
> >> stream_A.join(stream_B).onWindow(windowDefA, windowDefB).by(windowKey1,
> >> windowKey2).where(...).equalTo(...).with(...)
> >>
> >> (the user can omit the "by" and "with" calls)
> >>
> >> I think this new approach would be worthy of our "flexible windowing" in
> >> contrast with the current approach.
> >>
> >> Regards,
> >> Gyula
> >>
>

Reply via email to