    +package org.apache.flink.streaming.api.functions;
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.state.MapState;
    +import org.apache.flink.api.common.state.MapStateDescriptor;
    +import org.apache.flink.api.common.state.ValueState;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeinfo.TypeHint;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.TimestampedCollector;
    +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import java.util.ArrayList;
    +import java.util.LinkedList;
    +import java.util.List;
    +import static 
    +// TODO: Make bucket granularity adaptable
    + * A TwoInputStreamOperator to execute time-bounded stream inner joins.
    + * <p>
    + * By using a configurable lower and upper bound this operator will emit 
exactly those pairs
    + * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
    + * upper bound can be configured to be either inclusive or exclusive.
    + *
    + * @param <T1> The type of the elements in the left stream
    + * @param <T2> The type of the elements in the right stream
    + */
    +public class TimeBoundedStreamJoinOperator<T1, T2>
    +   extends AbstractStreamOperator<Tuple2<T1, T2>>
    +   implements TwoInputStreamOperator<T1, T2, Tuple2<T1, T2>> {
    +   private final long lowerBound;
    +   private final long upperBound;
    +   private final long inverseLowerBound;
    +   private final long inverseUpperBound;
    +   private final boolean lowerBoundInclusive;
    +   private final boolean upperBoundInclusive;
    +   private final long bucketGranularity = 1;
    +   private static final String LEFT_BUFFER = "LEFT_BUFFER";
    +   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
    +   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
    +   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
    +   private transient ValueState<Long> lastCleanupRightBuffer;
    +   private transient ValueState<Long> lastCleanupLeftBuffer;
    +   private transient MapState<Long, List<Tuple3<T1, Long, Boolean>>> 
    +   private transient MapState<Long, List<Tuple3<T2, Long, Boolean>>> 
    +   private transient TimestampedCollector<Tuple2<T1, T2>> collector;
    +   /**
    +    * Creates a new TimeBoundedStreamJoinOperator
    +    *
    +    * @param lowerBound          The lower bound for evaluating if 
elements should be joined
    +    * @param upperBound          The upper bound for evaluating if 
elements should be joined
    +    * @param lowerBoundInclusive Whether or not to include elements where 
the timestamp matches
    +    *                            the lower bound
    +    * @param upperBoundInclusive Whether or not to include elements where 
the timestamp matches
    +    *                            the upper bound
    +    */
    +   public TimeBoundedStreamJoinOperator(long lowerBound,
 long upperBound,
 boolean lowerBoundInclusive,
 boolean upperBoundInclusive) {
    +           this.lowerBound = lowerBound;
    +           this.upperBound = upperBound;
    +           this.inverseLowerBound = -1 * upperBound;
    +           this.inverseUpperBound = -1 * lowerBound;
    +           this.lowerBoundInclusive = lowerBoundInclusive;
    +           this.upperBoundInclusive = upperBoundInclusive;
    +   }
    +   @Override
    +   public void open() throws Exception {
    + ;
    +           collector = new TimestampedCollector<>(output);
    +           this.leftBuffer = getRuntimeContext().getMapState(new 
    +                   LEFT_BUFFER,
    +                   LONG_TYPE_INFO,
    +                   TypeInformation.of(new TypeHint<List<Tuple3<T1, Long, 
Boolean>>>() {
    +                   })
    +           ));
    +           this.rightBuffer = getRuntimeContext().getMapState(new 
    +                   RIGHT_BUFFER,
    +                   LONG_TYPE_INFO,
    +                   TypeInformation.of(new TypeHint<List<Tuple3<T2, Long, 
Boolean>>>() {
    +                   })
    +           ));
    +           this.lastCleanupRightBuffer = getRuntimeContext().getState(new 
    +                   LAST_CLEANUP_RIGHT,
    +                   LONG_TYPE_INFO
    +           ));
    +           this.lastCleanupLeftBuffer = getRuntimeContext().getState(new 
    +                   LAST_CLEANUP_LEFT,
    +                   LONG_TYPE_INFO
    +           ));
    +   }
    +   /**
    +    * Process a {@link StreamRecord<T1>} from the left stream. Whenever an 
{@link StreamRecord<T1>}
    +    * arrives at the left stream, it will get added to the left buffer. 
Possible join candidates
    +    * for that element will be looked up from the right buffer and if the 
pair lies within the
    +    * user defined boundaries, it gets collected.
    +    *
    +    * @param record An incoming record to be joined
    +    * @throws Exception Can throw an Exception during state access
    +    */
    +   @Override
    +   public void processElement1(StreamRecord<T1> record) throws Exception {
    +           long leftTs = record.getTimestamp();
    +           T1 leftValue = record.getValue();
    +           addToLeftBuffer(leftValue, leftTs);
    +           getJoinCandidatesForLeftElement(leftTs)
    This code creates unnecessary copies, as it first copies everything from 
the state to a list and then iterates over the list checking which elements are 
able to be joined. Why not keeping the for-loop and writing sth like:
    for (long i = lowerBound; i <= upperBound; i++) {
                        List<Tuple3<T2, Long, Boolean>> fromBucket = 
rightBuffer.get(leftTs + i);
                        if (fromBucket != null) {
                                for (Tuple3<T2, Long, Boolean> candidate : 
fromBucket) {
                                        if (shouldBeJoined(leftTs, 
candidate.f1)) {
candidate.f0, leftTs);


