Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5482#discussion_r183745456
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 ---
    @@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector<T2, KEY> 
keySelector)  {
                        public <W extends Window> WithWindow<T1, T2, KEY, W> 
window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
                                return new WithWindow<>(input1, input2, 
keySelector1, keySelector2, keyType, assigner, null, null);
                        }
    +
    +                   /**
    +                    * Specifies the time boundaries over which the join 
operation works, so that
    +                    * <pre>leftElement.timestamp + lowerBound <= 
rightElement.timestamp <= leftElement.timestamp + upperBound</pre>
    +                    * By default both the lower and the upper bound are 
inclusive. This can be configured
    +                    * with {@link 
TimeBounded#lowerBoundExclusive(boolean)} and
    +                    * {@link TimeBounded#upperBoundExclusive(boolean)}
    +                    *
    +                    * @param lowerBound The lower bound. Needs to be 
smaller than or equal to the upperBound
    +                    * @param upperBound The upper bound. Needs to be 
bigger than or equal to the lowerBound
    +                    */
    +                   public TimeBounded<T1, T2, KEY> between(Time 
lowerBound, Time upperBound) {
    +
    +                           TimeCharacteristic timeCharacteristic =
    +                                   
input1.getExecutionEnvironment().getStreamTimeCharacteristic();
    +
    +                           if (timeCharacteristic != 
TimeCharacteristic.EventTime) {
    +                                   throw new 
RuntimeException("Time-bounded stream joins are only supported in event time");
    +                           }
    +
    +                           checkNotNull(lowerBound, "A lower bound needs 
to be provided for a time-bounded join");
    +                           checkNotNull(upperBound, "An upper bound needs 
to be provided for a time-bounded join");
    +                           return new TimeBounded<>(
    +                                   input1,
    +                                   input2,
    +                                   lowerBound.toMilliseconds(),
    +                                   upperBound.toMilliseconds(),
    +                                   true,
    +                                   true,
    +                                   keySelector1,
    +                                   keySelector2
    +                           );
    +                   }
    +           }
    +   }
    +
    +   /**
    +    * Joined streams that have keys for both sides as well as the time 
boundaries over which
    +    * elements should be joined defined.
    +    *
    +    * @param <IN1> Input type of elements from the first stream
    +    * @param <IN2> Input type of elements from the second stream
    +    * @param <KEY> The type of the key
    +    */
    +   public static class TimeBounded<IN1, IN2, KEY> {
    +
    +           private static final String TIMEBOUNDED_JOIN_FUNC_NAME = 
"TimeBoundedJoin";
    +
    +           private final DataStream<IN1> left;
    +           private final DataStream<IN2> right;
    +
    +           private final long lowerBound;
    +           private final long upperBound;
    +
    +           private final KeySelector<IN1, KEY> keySelector1;
    +           private final KeySelector<IN2, KEY> keySelector2;
    +
    +           private boolean lowerBoundInclusive;
    +           private boolean upperBoundInclusive;
    +
    +           public TimeBounded(
    +                   DataStream<IN1> left,
    +                   DataStream<IN2> right,
    +                   long lowerBound,
    +                   long upperBound,
    +                   boolean lowerBoundInclusive,
    +                   boolean upperBoundInclusive,
    +                   KeySelector<IN1, KEY> keySelector1,
    +                   KeySelector<IN2, KEY> keySelector2) {
    +
    +                   this.left = Preconditions.checkNotNull(left);
    +                   this.right = Preconditions.checkNotNull(right);
    +
    +                   this.lowerBound = lowerBound;
    +                   this.upperBound = upperBound;
    +
    +                   this.lowerBoundInclusive = lowerBoundInclusive;
    +                   this.upperBoundInclusive = upperBoundInclusive;
    +
    +                   this.keySelector1 = 
Preconditions.checkNotNull(keySelector1);
    +                   this.keySelector2 = 
Preconditions.checkNotNull(keySelector2);
    +           }
    +
    +           /**
    +            * Configure whether the upper bound should be considered 
exclusive or inclusive.
    +            */
    +           public TimeBounded<IN1, IN2, KEY> upperBoundExclusive(boolean 
exclusive) {
    +                   this.upperBoundInclusive = !exclusive;
    +                   return this;
    +           }
    +
    +           /**
    +            * Configure whether the lower bound should be considered 
exclusive or inclusive.
    +            */
    +           public TimeBounded<IN1, IN2, KEY> lowerBoundExclusive(boolean 
exclusive) {
    +                   this.lowerBoundInclusive = !exclusive;
    +                   return this;
    +           }
    +
    +           /**
    +            * Completes the join operation with the user function that is 
executed for each joined pair
    +            * of elements.
    +            * @param udf The user-defined function
    +            * @param <OUT> The output type
    +            * @return Returns a DataStream
    +            */
    +           public <OUT> DataStream<OUT> 
process(TimeBoundedJoinFunction<IN1, IN2, OUT> udf) {
    +
    +                   ConnectedStreams<IN1, IN2> connected = 
left.connect(right);
    +
    +                   udf = left.getExecutionEnvironment().clean(udf);
    +
    +                   TypeInformation<OUT> resultType = 
TypeExtractor.getBinaryOperatorReturnType(
    +                           udf,
    +                           TimeBoundedJoinFunction.class,    // 
TimeBoundedJoinFunction<IN1, IN2, OUT>
    +                           0,                                //            
                                  0    1    2
    +                           1,
    +                           2,
    +                           new int[]{0},                   // lambda input 
1 type arg indices
    +                           new int[]{1},                   // lambda input 
1 type arg indices
    +                           TypeExtractor.NO_INDEX,         // output arg 
indices
    +                           left.getType(),                 // input 1 type 
information
    +                           right.getType(),                // input 1 type 
information
    +                           TIMEBOUNDED_JOIN_FUNC_NAME,
    +                           false
    +                   );
    +
    +                   long bucketGranularity = 
Time.seconds(1).toMilliseconds();
    --- End diff --
    
    I would be up for not exposing this parameter to the users. This is an 
internal thing that we use in the current implementation. In the future, we may 
not even use buckets for the joins.
    
    In addition, exposing it means exposing internal parameters to the user, 
which normally is not a good idea. We can always add this parameter if we see 
that it makes a difference in practice. Removing it if we change our mind in 
the future is always more tricky.


---

Reply via email to