Yes, we should write these semantics down. I volunteer to help. I don't think that this is very ad-hoc. The semantics are basically the following. Assuming an arriving element from the left side: (1) We find the right-side matches (2) We insert the left-side arrival into the left window (3) We recompute the left window We need to see whether right window re-computation needs to be triggered as well. I think that this way of joining streams is also what the symmetric hash join algorithms were meant to support.
Kostas On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen <se...@apache.org> wrote: > Is the approach of joining an element at a time from one input against a > window on the other input not a bit arbitrary? > > This just joins whatever currently happens to be the window by the time the > single element arrives - that is a bit non-predictable, right? > > As a more general point: The whole semantics of windowing and when they are > triggered are a bit ad-hoc now. It would be really good to start > formalizing that a bit and > put it down somewhere. Users need to be able to clearly understand and how > to predict the output. > > > > On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra <gyula.f...@gmail.com> wrote: > > > I think it should be possible to make this compatible with the > > .window().every() calls. Maybe if there is some trigger set in "every" we > > would not join that stream 1 by 1 but every so many elements. The problem > > here is that the window and every in this case are very-very different > than > > the normal windowing semantics. The window would define the join window > for > > each element of the other stream while every would define how often I > join > > This stream with the other one. > > > > We need to think to make this intuitive. > > > > On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi < > balassi.mar...@gmail.com> > > wrote: > > > > > That would be really neat, the problem I see there, that we do not > > > distinguish between dataStream.window() and dataStream.window().every() > > > currently, they both return WindowedDataStreams and TriggerPolicies of > > the > > > every call do not make much sense in this setting (in fact practically > > the > > > trigger is always set to count of one). > > > > > > But of course we could make it in a way, that we check that the > eviction > > > should be either null or count of 1, in every other case we throw an > > > exception while building the JobGraph. > > > > > > On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek <aljos...@apache.org> > > > wrote: > > > > > > > Or you could define it like this: > > > > > > > > stream_A = a.window(...) > > > > stream_B = b.window(...) > > > > > > > > stream_A.join(stream_B).where().equals().with() > > > > > > > > So a join would just be a join of two WindowedDataStreamS. This would > > > > neatly move the windowing stuff into one place. > > > > > > > > On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi < > > balassi.mar...@gmail.com > > > > > > > > wrote: > > > > > Big +1 for the proposal for Peter and Gyula. I'm really for > bringing > > > the > > > > > windowing and window join API in sync. > > > > > > > > > > On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra <gyf...@apache.org> > > wrote: > > > > > > > > > >> Hey guys, > > > > >> > > > > >> As Aljoscha has highlighted earlier the current window join > > semantics > > > in > > > > >> the streaming api doesn't follow the changes in the windowing api. > > > More > > > > >> precisely, we currently only support joins over time windows of > > equal > > > > size > > > > >> on both streams. The reason for this is that we now take a window > of > > > > each > > > > >> of the two streams and do joins over these pairs. This would be a > > > > blocking > > > > >> operation if the windows are not closed at exactly the same time > > (and > > > > since > > > > >> we dont want this we only allow time windows) > > > > >> > > > > >> I talked with Peter who came up with the initial idea of an > > > alternative > > > > >> approach for stream joins which works as follows: > > > > >> > > > > >> Instead of pairing windows for joins, we do element against window > > > > joins. > > > > >> What this means is that whenever we receive an element from one of > > the > > > > >> streams, we join this element with the current window(this window > is > > > > >> constantly updated) of the other stream. This is non-blocking on > any > > > > window > > > > >> definitions as we dont have to wait for windows to be completed > and > > we > > > > can > > > > >> use this with any of our predefined policies like Time.of(...), > > > > >> Count.of(...), Delta.of(....). > > > > >> > > > > >> Additionally this also allows some very flexible way of defining > > > window > > > > >> joins. With this we could also define grouped windowing inside if > a > > > > join. > > > > >> An example of this would be: Join all elements of Stream1 with the > > > last > > > > 5 > > > > >> elements by a given windowkey of Stream2 on some join key. > > > > >> > > > > >> This feature can be easily implemented over the current operators, > > so > > > I > > > > >> already have a working prototype for the simple non-grouped case. > My > > > > only > > > > >> concern is the API, the best thing I could come up with is > something > > > > like > > > > >> this: > > > > >> > > > > >> stream_A.join(stream_B).onWindow(windowDefA, > > > windowDefB).by(windowKey1, > > > > >> windowKey2).where(...).equalTo(...).with(...) > > > > >> > > > > >> (the user can omit the "by" and "with" calls) > > > > >> > > > > >> I think this new approach would be worthy of our "flexible > > windowing" > > > in > > > > >> contrast with the current approach. > > > > >> > > > > >> Regards, > > > > >> Gyula > > > > >> > > > > > > > > > >