Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5342#discussion_r163506497
  
    --- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperatorTest.java
 ---
    @@ -0,0 +1,590 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import org.apache.flink.api.common.state.MapState;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
    +import 
org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
    +
    +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
    +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.junit.runners.Parameterized;
    +import org.junit.runners.Parameterized.Parameters;
    +
    +import java.util.List;
    +import java.util.Queue;
    +import java.util.concurrent.ConcurrentLinkedQueue;
    +import java.util.stream.Collectors;
    +
    +// TODO: Parameterize to use different state backends --> This would 
require circular dependency on flink rocksdb
    +@RunWith(Parameterized.class)
    +public class TimeBoundedStreamJoinOperatorTest {
    +
    +   private final boolean lhsFasterThanRhs;
    +
    +   @Parameters(name = "lhs faster than rhs stream: {0}")
    +   public static Boolean[] data() {
    +           return new Boolean[]{true, false};
    +   }
    +
    +   public TimeBoundedStreamJoinOperatorTest(boolean lhsFasterThanRhs) {
    +           this.lhsFasterThanRhs = lhsFasterThanRhs;
    +   }
    +
    +   @Test // lhs - 2 <= rhs <= rhs + 2
    +   public void testNegativeInclusiveAndNegativeInclusive() throws 
Exception {
    +
    +           long lowerBound = -2;
    +           boolean lowerBoundInclusive = true;
    +
    +           long upperBound = -1;
    +           boolean upperBoundInclusive = true;
    +
    +           try (KeyedTwoInputStreamOperatorTestHarness<String, TestElem, 
TestElem, Tuple2<TestElem, TestElem>> testHarness
    +                            = createTestHarness(lowerBound, 
lowerBoundInclusive, upperBound, upperBoundInclusive)) {
    +
    +
    +                   testHarness.setup();
    +                   testHarness.open();
    +
    +                   prepareTestHarness(testHarness);
    +
    +                   List<StreamRecord<Tuple2<TestElem, TestElem>>> 
expectedOutput = Lists.newArrayList(
    +                           streamRecordOf(2, 1),
    +                           streamRecordOf(3, 1),
    +                           streamRecordOf(3, 2),
    +                           streamRecordOf(4, 2),
    +                           streamRecordOf(4, 3)
    +                   );
    +
    +                   assertOutput(expectedOutput, testHarness.getOutput());
    +                   ensureNoLateData(testHarness.getOutput());
    +           }
    +   }
    +
    +   @Test // lhs - 1 <= rhs <= rhs + 1
    +   public void testNegativeInclusiveAndPositiveInclusive() throws 
Exception {
    +
    +           long lowerBound = -1;
    +           boolean lowerBoundInclusive = true;
    +
    +           long upperBound = 1;
    +           boolean upperBoundInclusive = true;
    +
    +           try (KeyedTwoInputStreamOperatorTestHarness<String, TestElem, 
TestElem, Tuple2<TestElem, TestElem>> testHarness
    +                            = createTestHarness(lowerBound, 
lowerBoundInclusive, upperBound, upperBoundInclusive)) {
    +
    +                   testHarness.setup();
    +                   testHarness.open();
    +
    +                   prepareTestHarness(testHarness);
    +
    +                   List<StreamRecord<Tuple2<TestElem, TestElem>>> 
expectedOutput = Lists.newArrayList(
    +                           streamRecordOf(1, 1),
    +                           streamRecordOf(1, 2),
    +                           streamRecordOf(2, 1),
    +                           streamRecordOf(2, 2),
    +                           streamRecordOf(2, 3),
    +                           streamRecordOf(3, 2),
    +                           streamRecordOf(3, 3),
    +                           streamRecordOf(3, 4),
    +                           streamRecordOf(4, 3),
    +                           streamRecordOf(4, 4)
    +                   );
    +
    +                   ConcurrentLinkedQueue<Object> output = 
testHarness.getOutput();
    +
    +                   assertOutput(expectedOutput, testHarness.getOutput());
    +                   ensureNoLateData(output);
    +
    +           }
    +   }
    +
    +   @Test // lhs + 1 <= rhs <= lhs + 2
    +   public void testPositiveInclusiveAndPositiveInclusive() throws 
Exception {
    +           long lowerBound = 1;
    +           long upperBound = 2;
    +
    +           boolean lowerBoundInclusive = true;
    +           boolean upperBoundInclusive = true;
    +
    +           try (KeyedTwoInputStreamOperatorTestHarness<String, TestElem, 
TestElem, Tuple2<TestElem, TestElem>> testHarness
    +                            = createTestHarness(lowerBound, 
lowerBoundInclusive, upperBound, upperBoundInclusive)) {
    +
    +                   testHarness.setup();
    +                   testHarness.open();
    +
    +                   prepareTestHarness(testHarness);
    +
    +                   List<StreamRecord<Tuple2<TestElem, TestElem>>> expected 
= Lists.newArrayList(
    +                           streamRecordOf(1, 2),
    +                           streamRecordOf(1, 3),
    +                           streamRecordOf(2, 3),
    +                           streamRecordOf(2, 4),
    +                           streamRecordOf(3, 4)
    +                   );
    +
    +                   assertOutput(expected, testHarness.getOutput());
    +                   ensureNoLateData(testHarness.getOutput());
    +           }
    +   }
    +
    +   @Test
    +   public void testNegativeExclusiveAndNegativeExlusive() throws Exception 
{
    +           long lowerBound = -3;
    +           boolean lowerBoundInclusive = false;
    +
    +           long upperBound = -1;
    +           boolean upperBoundInclusive = false;
    +
    +           try (KeyedTwoInputStreamOperatorTestHarness<String, TestElem, 
TestElem, Tuple2<TestElem, TestElem>> testHarness
    +                            = createTestHarness(lowerBound, 
lowerBoundInclusive, upperBound, upperBoundInclusive)) {
    +
    +                   testHarness.setup();
    +                   testHarness.open();
    +                   prepareTestHarness(testHarness);
    +
    +                   List<StreamRecord<Tuple2<TestElem, TestElem>>> 
expectedOutput = Lists.newArrayList(
    +                           streamRecordOf(3, 1),
    +                           streamRecordOf(4, 2)
    +                   );
    +
    +                   ConcurrentLinkedQueue<Object> output = 
testHarness.getOutput();
    +
    +                   assertOutput(expectedOutput, testHarness.getOutput());
    +                   ensureNoLateData(output);
    +           }
    +   }
    +
    +   @Test
    +   public void testNegativeExclusiveAndPositiveExlusive() throws Exception 
{
    +           long lowerBound = -1;
    +           boolean lowerBoundInclusive = false;
    +
    +           long upperBound = 1;
    +           boolean upperBoundInclusive = false;
    +
    +           try (KeyedTwoInputStreamOperatorTestHarness<String, TestElem, 
TestElem, Tuple2<TestElem, TestElem>> testHarness
    +                            = createTestHarness(lowerBound, 
lowerBoundInclusive, upperBound, upperBoundInclusive)) {
    +
    +                   testHarness.setup();
    +                   testHarness.open();
    +                   prepareTestHarness(testHarness);
    +
    +                   List<StreamRecord<Tuple2<TestElem, TestElem>>> 
expectedOutput = Lists.newArrayList(
    +                           streamRecordOf(1, 1),
    +                           streamRecordOf(2, 2),
    +                           streamRecordOf(3, 3),
    +                           streamRecordOf(4, 4)
    +                   );
    +
    +                   ConcurrentLinkedQueue<Object> output = 
testHarness.getOutput();
    +
    +                   assertOutput(expectedOutput, testHarness.getOutput());
    +                   ensureNoLateData(output);
    +           }
    +   }
    +
    +   @Test
    +   public void testPositiveExclusiveAndPositiveExlusive() throws Exception 
{
    +           long lowerBound = 1;
    +           boolean lowerBoundInclusive = false;
    +
    +           long upperBound = 3;
    +           boolean upperBoundInclusive = false;
    +
    +           try (KeyedTwoInputStreamOperatorTestHarness<String, TestElem, 
TestElem, Tuple2<TestElem, TestElem>> testHarness
    +                            = createTestHarness(lowerBound, 
lowerBoundInclusive, upperBound, upperBoundInclusive)) {
    +
    +                   testHarness.setup();
    +                   testHarness.open();
    +                   prepareTestHarness(testHarness);
    +
    +                   List<StreamRecord<Tuple2<TestElem, TestElem>>> 
expectedOutput = Lists.newArrayList(
    +                           streamRecordOf(1, 3),
    +                           streamRecordOf(2, 4)
    +                   );
    +
    +                   ConcurrentLinkedQueue<Object> output = 
testHarness.getOutput();
    +
    +                   assertOutput(expectedOutput, testHarness.getOutput());
    +                   ensureNoLateData(output);
    +
    +           }
    +   }
    +
    +   @Test
    +   public void stateGetsCleanedWhenNotNeeded() throws Exception {
    +
    +           long lowerBound = 1;
    +           boolean lowerBoundInclusive = true;
    +
    +           long upperBound = 2;
    +           boolean upperBoundInclusive = true;
    +
    +           TimeBoundedStreamJoinOperator<TestElem, TestElem> operator = 
new TimeBoundedStreamJoinOperator<>(
    +                   lowerBound,
    --- End diff --
    
    Here, and wherever we instantiate an operator, we should now add the 
serializers for the input elements. This can be done with the 
`TypeInformation.of(new TypeHint<TestElem>() {}).createSerializer(new 
ExecutionConfig())`.


---

Reply via email to