I think it should be possible to make this compatible with the .window().every() calls. Maybe if there is some trigger set in "every" we would not join that stream 1 by 1 but every so many elements. The problem here is that the window and every in this case are very-very different than the normal windowing semantics. The window would define the join window for each element of the other stream while every would define how often I join This stream with the other one.
We need to think to make this intuitive. On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi <balassi.mar...@gmail.com> wrote: > That would be really neat, the problem I see there, that we do not > distinguish between dataStream.window() and dataStream.window().every() > currently, they both return WindowedDataStreams and TriggerPolicies of the > every call do not make much sense in this setting (in fact practically the > trigger is always set to count of one). > > But of course we could make it in a way, that we check that the eviction > should be either null or count of 1, in every other case we throw an > exception while building the JobGraph. > > On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > > > Or you could define it like this: > > > > stream_A = a.window(...) > > stream_B = b.window(...) > > > > stream_A.join(stream_B).where().equals().with() > > > > So a join would just be a join of two WindowedDataStreamS. This would > > neatly move the windowing stuff into one place. > > > > On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <balassi.mar...@gmail.com > > > > wrote: > > > Big +1 for the proposal for Peter and Gyula. I'm really for bringing > the > > > windowing and window join API in sync. > > > > > > On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra <gyf...@apache.org> wrote: > > > > > >> Hey guys, > > >> > > >> As Aljoscha has highlighted earlier the current window join semantics > in > > >> the streaming api doesn't follow the changes in the windowing api. > More > > >> precisely, we currently only support joins over time windows of equal > > size > > >> on both streams. The reason for this is that we now take a window of > > each > > >> of the two streams and do joins over these pairs. This would be a > > blocking > > >> operation if the windows are not closed at exactly the same time (and > > since > > >> we dont want this we only allow time windows) > > >> > > >> I talked with Peter who came up with the initial idea of an > alternative > > >> approach for stream joins which works as follows: > > >> > > >> Instead of pairing windows for joins, we do element against window > > joins. > > >> What this means is that whenever we receive an element from one of the > > >> streams, we join this element with the current window(this window is > > >> constantly updated) of the other stream. This is non-blocking on any > > window > > >> definitions as we dont have to wait for windows to be completed and we > > can > > >> use this with any of our predefined policies like Time.of(...), > > >> Count.of(...), Delta.of(....). > > >> > > >> Additionally this also allows some very flexible way of defining > window > > >> joins. With this we could also define grouped windowing inside if a > > join. > > >> An example of this would be: Join all elements of Stream1 with the > last > > 5 > > >> elements by a given windowkey of Stream2 on some join key. > > >> > > >> This feature can be easily implemented over the current operators, so > I > > >> already have a working prototype for the simple non-grouped case. My > > only > > >> concern is the API, the best thing I could come up with is something > > like > > >> this: > > >> > > >> stream_A.join(stream_B).onWindow(windowDefA, > windowDefB).by(windowKey1, > > >> windowKey2).where(...).equalTo(...).with(...) > > >> > > >> (the user can omit the "by" and "with" calls) > > >> > > >> I think this new approach would be worthy of our "flexible windowing" > in > > >> contrast with the current approach. > > >> > > >> Regards, > > >> Gyula > > >> > > >