    @@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector<T2, KEY> 
keySelector)  {
                        public <W extends Window> WithWindow<T1, T2, KEY, W> 
window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
                                return new WithWindow<>(input1, input2, 
keySelector1, keySelector2, keyType, assigner, null, null);
    +                   /**
    +                    * Specifies the time boundaries over which the join 
operation works, so that
    +                    * <pre>leftElement.timestamp + lowerBound <= 
rightElement.timestamp <= leftElement.timestamp + upperBound</pre>
    +                    * By default both the lower and the upper bound are 
inclusive. This can be configured
    +                    * with {@link 
TimeBounded#lowerBoundExclusive(boolean)} and
    +                    * {@link TimeBounded#upperBoundExclusive(boolean)}
    +                    *
    +                    * @param lowerBound The lower bound. Needs to be 
smaller than or equal to the upperBound
    +                    * @param upperBound The upper bound. Needs to be 
bigger than or equal to the lowerBound
    +                    */
    +                   public TimeBounded<T1, T2, KEY> between(Time 
lowerBound, Time upperBound) {
    +                           TimeCharacteristic timeCharacteristic =
    +                           if (timeCharacteristic != 
TimeCharacteristic.EventTime) {
    +                                   throw new 
RuntimeException("Time-bounded stream joins are only supported in event time");
    +                           }
    +                           checkNotNull(lowerBound, "A lower bound needs 
to be provided for a time-bounded join");
    +                           checkNotNull(upperBound, "An upper bound needs 
to be provided for a time-bounded join");
    +                           return new TimeBounded<>(
    +                                   input1,
    +                                   input2,
    +                                   lowerBound.toMilliseconds(),
    +                                   upperBound.toMilliseconds(),
    +                                   true,
    +                                   true,
    +                                   keySelector1,
    +                                   keySelector2
    +                           );
    +                   }
    +           }
    +   }
    +   /**
    +    * Joined streams that have keys for both sides as well as the time 
boundaries over which
    +    * elements should be joined defined.
    +    *
    +    * @param <IN1> Input type of elements from the first stream
    +    * @param <IN2> Input type of elements from the second stream
    +    * @param <KEY> The type of the key
    +    */
    +   public static class TimeBounded<IN1, IN2, KEY> {
    +           private static final String TIMEBOUNDED_JOIN_FUNC_NAME = 
    --- End diff --
    hmm... this might be not very relevant, but I'd prefer a single config 
class that holds all function's names, rather than having them scattered all 
over the code base. 


