Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5482#discussion_r183745456 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java --- @@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector<T2, KEY> keySelector) { public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null); } + + /** + * Specifies the time boundaries over which the join operation works, so that + * <pre>leftElement.timestamp + lowerBound <= rightElement.timestamp <= leftElement.timestamp + upperBound</pre> + * By default both the lower and the upper bound are inclusive. This can be configured + * with {@link TimeBounded#lowerBoundExclusive(boolean)} and + * {@link TimeBounded#upperBoundExclusive(boolean)} + * + * @param lowerBound The lower bound. Needs to be smaller than or equal to the upperBound + * @param upperBound The upper bound. Needs to be bigger than or equal to the lowerBound + */ + public TimeBounded<T1, T2, KEY> between(Time lowerBound, Time upperBound) { + + TimeCharacteristic timeCharacteristic = + input1.getExecutionEnvironment().getStreamTimeCharacteristic(); + + if (timeCharacteristic != TimeCharacteristic.EventTime) { + throw new RuntimeException("Time-bounded stream joins are only supported in event time"); + } + + checkNotNull(lowerBound, "A lower bound needs to be provided for a time-bounded join"); + checkNotNull(upperBound, "An upper bound needs to be provided for a time-bounded join"); + return new TimeBounded<>( + input1, + input2, + lowerBound.toMilliseconds(), + upperBound.toMilliseconds(), + true, + true, + keySelector1, + keySelector2 + ); + } + } + } + + /** + * Joined streams that have keys for both sides as well as the time boundaries over which + * elements should be joined defined. + * + * @param <IN1> Input type of elements from the first stream + * @param <IN2> Input type of elements from the second stream + * @param <KEY> The type of the key + */ + public static class TimeBounded<IN1, IN2, KEY> { + + private static final String TIMEBOUNDED_JOIN_FUNC_NAME = "TimeBoundedJoin"; + + private final DataStream<IN1> left; + private final DataStream<IN2> right; + + private final long lowerBound; + private final long upperBound; + + private final KeySelector<IN1, KEY> keySelector1; + private final KeySelector<IN2, KEY> keySelector2; + + private boolean lowerBoundInclusive; + private boolean upperBoundInclusive; + + public TimeBounded( + DataStream<IN1> left, + DataStream<IN2> right, + long lowerBound, + long upperBound, + boolean lowerBoundInclusive, + boolean upperBoundInclusive, + KeySelector<IN1, KEY> keySelector1, + KeySelector<IN2, KEY> keySelector2) { + + this.left = Preconditions.checkNotNull(left); + this.right = Preconditions.checkNotNull(right); + + this.lowerBound = lowerBound; + this.upperBound = upperBound; + + this.lowerBoundInclusive = lowerBoundInclusive; + this.upperBoundInclusive = upperBoundInclusive; + + this.keySelector1 = Preconditions.checkNotNull(keySelector1); + this.keySelector2 = Preconditions.checkNotNull(keySelector2); + } + + /** + * Configure whether the upper bound should be considered exclusive or inclusive. + */ + public TimeBounded<IN1, IN2, KEY> upperBoundExclusive(boolean exclusive) { + this.upperBoundInclusive = !exclusive; + return this; + } + + /** + * Configure whether the lower bound should be considered exclusive or inclusive. + */ + public TimeBounded<IN1, IN2, KEY> lowerBoundExclusive(boolean exclusive) { + this.lowerBoundInclusive = !exclusive; + return this; + } + + /** + * Completes the join operation with the user function that is executed for each joined pair + * of elements. + * @param udf The user-defined function + * @param <OUT> The output type + * @return Returns a DataStream + */ + public <OUT> DataStream<OUT> process(TimeBoundedJoinFunction<IN1, IN2, OUT> udf) { + + ConnectedStreams<IN1, IN2> connected = left.connect(right); + + udf = left.getExecutionEnvironment().clean(udf); + + TypeInformation<OUT> resultType = TypeExtractor.getBinaryOperatorReturnType( + udf, + TimeBoundedJoinFunction.class, // TimeBoundedJoinFunction<IN1, IN2, OUT> + 0, // 0 1 2 + 1, + 2, + new int[]{0}, // lambda input 1 type arg indices + new int[]{1}, // lambda input 1 type arg indices + TypeExtractor.NO_INDEX, // output arg indices + left.getType(), // input 1 type information + right.getType(), // input 1 type information + TIMEBOUNDED_JOIN_FUNC_NAME, + false + ); + + long bucketGranularity = Time.seconds(1).toMilliseconds(); --- End diff -- I would be up for not exposing this parameter to the users. This is an internal thing that we use in the current implementation. In the future, we may not even use buckets for the joins. In addition, exposing it means exposing internal parameters to the user, which normally is not a good idea. We can always add this parameter if we see that it makes a difference in practice. Removing it if we change our mind in the future is always more tricky.
---