Github user florianschmidt1994 commented on a diff in the pull request: https://github.com/apache/flink/pull/5482#discussion_r183727245 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java --- @@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector<T2, KEY> keySelector) { public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null); } + + /** + * Specifies the time boundaries over which the join operation works, so that + * <pre>leftElement.timestamp + lowerBound <= rightElement.timestamp <= leftElement.timestamp + upperBound</pre> + * By default both the lower and the upper bound are inclusive. This can be configured + * with {@link TimeBounded#lowerBoundExclusive(boolean)} and + * {@link TimeBounded#upperBoundExclusive(boolean)} + * + * @param lowerBound The lower bound. Needs to be smaller than or equal to the upperBound + * @param upperBound The upper bound. Needs to be bigger than or equal to the lowerBound + */ + public TimeBounded<T1, T2, KEY> between(Time lowerBound, Time upperBound) { + + TimeCharacteristic timeCharacteristic = + input1.getExecutionEnvironment().getStreamTimeCharacteristic(); + + if (timeCharacteristic != TimeCharacteristic.EventTime) { + throw new RuntimeException("Time-bounded stream joins are only supported in event time"); + } + + checkNotNull(lowerBound, "A lower bound needs to be provided for a time-bounded join"); + checkNotNull(upperBound, "An upper bound needs to be provided for a time-bounded join"); + return new TimeBounded<>( + input1, + input2, + lowerBound.toMilliseconds(), + upperBound.toMilliseconds(), + true, + true, + keySelector1, + keySelector2 + ); + } + } + } + + /** + * Joined streams that have keys for both sides as well as the time boundaries over which + * elements should be joined defined. + * + * @param <IN1> Input type of elements from the first stream + * @param <IN2> Input type of elements from the second stream + * @param <KEY> The type of the key + */ + public static class TimeBounded<IN1, IN2, KEY> { + + private static final String TIMEBOUNDED_JOIN_FUNC_NAME = "TimeBoundedJoin"; + + private final DataStream<IN1> left; + private final DataStream<IN2> right; + + private final long lowerBound; + private final long upperBound; + + private final KeySelector<IN1, KEY> keySelector1; + private final KeySelector<IN2, KEY> keySelector2; + + private boolean lowerBoundInclusive; + private boolean upperBoundInclusive; + + public TimeBounded( + DataStream<IN1> left, + DataStream<IN2> right, + long lowerBound, + long upperBound, + boolean lowerBoundInclusive, + boolean upperBoundInclusive, + KeySelector<IN1, KEY> keySelector1, + KeySelector<IN2, KEY> keySelector2) { + + this.left = Preconditions.checkNotNull(left); + this.right = Preconditions.checkNotNull(right); + + this.lowerBound = lowerBound; + this.upperBound = upperBound; + + this.lowerBoundInclusive = lowerBoundInclusive; + this.upperBoundInclusive = upperBoundInclusive; + + this.keySelector1 = Preconditions.checkNotNull(keySelector1); + this.keySelector2 = Preconditions.checkNotNull(keySelector2); + } + + /** + * Configure whether the upper bound should be considered exclusive or inclusive. + */ + public TimeBounded<IN1, IN2, KEY> upperBoundExclusive(boolean exclusive) { + this.upperBoundInclusive = !exclusive; + return this; + } + + /** + * Configure whether the lower bound should be considered exclusive or inclusive. + */ + public TimeBounded<IN1, IN2, KEY> lowerBoundExclusive(boolean exclusive) { + this.lowerBoundInclusive = !exclusive; + return this; + } + + /** + * Completes the join operation with the user function that is executed for each joined pair + * of elements. + * @param udf The user-defined function + * @param <OUT> The output type + * @return Returns a DataStream + */ + public <OUT> DataStream<OUT> process(TimeBoundedJoinFunction<IN1, IN2, OUT> udf) { + + ConnectedStreams<IN1, IN2> connected = left.connect(right); + + udf = left.getExecutionEnvironment().clean(udf); + + TypeInformation<OUT> resultType = TypeExtractor.getBinaryOperatorReturnType( + udf, + TimeBoundedJoinFunction.class, // TimeBoundedJoinFunction<IN1, IN2, OUT> + 0, // 0 1 2 + 1, + 2, + new int[]{0}, // lambda input 1 type arg indices + new int[]{1}, // lambda input 1 type arg indices + TypeExtractor.NO_INDEX, // output arg indices + left.getType(), // input 1 type information + right.getType(), // input 1 type information + TIMEBOUNDED_JOIN_FUNC_NAME, + false + ); + + long bucketGranularity = Time.seconds(1).toMilliseconds(); --- End diff -- So I built a POC which supports setting it like this ```java streamOne .join(streamTwo) .where(new Tuple2KeyExtractor()) .equalTo(new Tuple2KeyExtractor()) .between(Time.milliseconds(0), Time.milliseconds(2)) .withBucketGranularity(Time.hours(1)) // this one here .process(new CombineToStringJoinFunction()) .addSink(new ResultSink()); ``` but I'm really not sure about this. To me it feels like the wrong level of abstraction at that point. Maybe @kl0u or @aljoscha have some ideas on where this would fit best? Also putting it in the `flink-config.yaml` feels a little bit too low level
---