Hey guys, As Aljoscha has highlighted earlier the current window join semantics in the streaming api doesn't follow the changes in the windowing api. More precisely, we currently only support joins over time windows of equal size on both streams. The reason for this is that we now take a window of each of the two streams and do joins over these pairs. This would be a blocking operation if the windows are not closed at exactly the same time (and since we dont want this we only allow time windows)
I talked with Peter who came up with the initial idea of an alternative approach for stream joins which works as follows: Instead of pairing windows for joins, we do element against window joins. What this means is that whenever we receive an element from one of the streams, we join this element with the current window(this window is constantly updated) of the other stream. This is non-blocking on any window definitions as we dont have to wait for windows to be completed and we can use this with any of our predefined policies like Time.of(...), Count.of(...), Delta.of(....). Additionally this also allows some very flexible way of defining window joins. With this we could also define grouped windowing inside if a join. An example of this would be: Join all elements of Stream1 with the last 5 elements by a given windowkey of Stream2 on some join key. This feature can be easily implemented over the current operators, so I already have a working prototype for the simple non-grouped case. My only concern is the API, the best thing I could come up with is something like this: stream_A.join(stream_B).onWindow(windowDefA, windowDefB).by(windowKey1, windowKey2).where(...).equalTo(...).with(...) (the user can omit the "by" and "with" calls) I think this new approach would be worthy of our "flexible windowing" in contrast with the current approach. Regards, Gyula