Hello Community,

I’ve recently been working on adding support for outer joins [1] and timestamp 
assignment [2] to the IntervalJoin in the DataStream API.
As this is a public API and it should be simple and understandable for users I 
wanted to gather some feedback on some variations that I drafted up:

1. Add outer joins

Approach A

keyedStreamA.intervalJoin(keyedStreamB)
                .leftOuter() // .rightOuter, .fullOuter()
                .between(<Time>, <Time>)
                .process(new ProcessJoinFunction() { /* … */ }

Approach B

keyedStreamA.intervalLeftJoin(keyedStreamB) // intervalRightJoin, 
intervalFullOuterJoin
                .between(<Time>, <Time>)
                .process(new ProcessJoinFunction() { /* … */ }

Approach C

keyedStreamA.intervalJoin(keyedStreamB)
                .joinType(JoinType.INNER) // Reuse existing (internally used) 
JoinType


Personally I feel like C is the cleanest approach, but it has the problem that 
checking for invalid timestamp strategy & join combinations can only be done 
during runtime, whereas A and B would allow us to express valid combinations 
through the type system.

2. Assign timestamps to the joined pairs

When two elements are joined together, this will add support for specifying 
which of the elements timestamps should be assigned as the results timestamp.
The for options are MIN, MAX, LEFT and RIGHT, where MIN selects the minimum of 
the two elements timestamps, MAX the maximum, LEFT the left elements timestamp 
and RIGHT the right elements timestamp.

Approach A

keyedStreamA.intervalJoin(streamB)
                .between(<Time>, <Time>)
                .assignLeftTimestamp() // assignRightTimestamp(), 
assignMinTimestamp(), assignMaxTimestamp()
                .process(new ProcessJoinFunction() { /* … */ }

Approach B

keyedStreamA.intervalJoin(keyedStreamB)
                .between(<Time>, <Time>)
                .assignTimestamp(TimestampStrategy.LEFT) // .RIGHT, .MIN, .MAX

Again I feel like B is the cleanest approach, but has the same caveat with 
runtime vs. type system checks as the approach above. This could be especially 
interesting when it comes to combinations of join types and timestamp 
assignments, where we will have a few combinations that are not possibly. 

Any feedback would be greatly appreciated. I also updated the design doc at [3] 
if anyone wants to hop in on further discussions!

Florian

[1] https://issues.apache.org/jira/browse/FLINK-8483 
<https://issues.apache.org/jira/browse/FLINK-8483>
[2] https://issues.apache.org/jira/browse/FLINK-8482 
<https://issues.apache.org/jira/browse/FLINK-8482>
[3] 
https://docs.google.com/document/d/16GMH5VM6JJiWj_N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.6pxr0kgtqp3c
 
<https://docs.google.com/document/d/16GMH5VM6JJiWj_N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.6pxr0kgtqp3c>

Reply via email to