XComp commented on code in PR #21971:
URL: https://github.com/apache/flink/pull/21971#discussion_r1118867104


##########
flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java:
##########
@@ -264,7 +267,68 @@ void testSequenceCheckpointRestore() throws Exception {
     }
 
     @Test
-    void testLackStartForSequence() {
+    void testDefaultValueForSequence() {
+        DescriptorProperties descriptor = new DescriptorProperties();
+        descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
+        descriptor.putString(
+                DataGenConnectorOptionsUtil.FIELDS + ".f0." + 
DataGenConnectorOptionsUtil.KIND,
+                DataGenConnectorOptionsUtil.SEQUENCE);
+
+        DataGenTableSource source =
+                (DataGenTableSource)
+                        createTableSource(
+                                ResolvedSchema.of(Column.physical("f0", 
DataTypes.BIGINT())),
+                                descriptor.asMap());
+        DataGenerator<?>[] fieldGenerators = source.getFieldGenerators();
+        SequenceGenerator<?> fieldGenerator = (SequenceGenerator<?>) 
fieldGenerators[0];
+        long start = fieldGenerator.getStart();
+        long end = fieldGenerator.getEnd();
+
+        Assertions.assertThat(0)
+                .describedAs("The default start value should be 0")
+                .isEqualTo(start);
+        Assertions.assertThat(Integer.MAX_VALUE)
+                .describedAs("The default start value should be 
Integer.MAX_VALUE")
+                .isEqualTo(end);
+    }
+
+    @Test
+    void testStartEndForSequence() {
+        DescriptorProperties descriptor = new DescriptorProperties();
+        descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
+        descriptor.putString(
+                DataGenConnectorOptionsUtil.FIELDS + ".f0." + 
DataGenConnectorOptionsUtil.KIND,
+                DataGenConnectorOptionsUtil.SEQUENCE);
+        final int setupStart = 10;
+        descriptor.putLong(
+                DataGenConnectorOptionsUtil.FIELDS + ".f0." + 
DataGenConnectorOptionsUtil.START,
+                setupStart);
+        final int setupEnd = 100;
+        descriptor.putLong(
+                DataGenConnectorOptionsUtil.FIELDS + ".f0." + 
DataGenConnectorOptionsUtil.END,
+                setupEnd);
+
+        DataGenTableSource source =
+                (DataGenTableSource)
+                        createTableSource(
+                                ResolvedSchema.of(Column.physical("f0", 
DataTypes.BIGINT())),
+                                descriptor.asMap());
+        DataGenerator<?>[] fieldGenerators = source.getFieldGenerators();
+        SequenceGenerator<?> fieldGenerator = (SequenceGenerator<?>) 
fieldGenerators[0];
+        long start = fieldGenerator.getStart();
+        long end = fieldGenerator.getEnd();
+
+        Assertions.assertThat(setupStart)
+                .describedAs("The default start value should be " + setupStart)
+                .isEqualTo(start);
+        Assertions.assertThat(setupEnd)
+                .describedAs("The default start value should be " + setupEnd)
+                .isEqualTo(end);
+    }
+
+    @Test
+    void testCornerCaseForSequence() {
+        // An example of testing that the end value is greater than the start 
value
         assertThatThrownBy(

Review Comment:
   `assertThatThrownBy` should be used as specifically as possible: 
Theoretically, the `IllegalArgumentException` could have been thrown by any 
other code line in the passed block even though you're only interested in the 
`createTableSource` here.



##########
flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java:
##########
@@ -264,7 +267,68 @@ void testSequenceCheckpointRestore() throws Exception {
     }
 
     @Test
-    void testLackStartForSequence() {
+    void testDefaultValueForSequence() {
+        DescriptorProperties descriptor = new DescriptorProperties();
+        descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
+        descriptor.putString(
+                DataGenConnectorOptionsUtil.FIELDS + ".f0." + 
DataGenConnectorOptionsUtil.KIND,
+                DataGenConnectorOptionsUtil.SEQUENCE);
+
+        DataGenTableSource source =
+                (DataGenTableSource)
+                        createTableSource(
+                                ResolvedSchema.of(Column.physical("f0", 
DataTypes.BIGINT())),
+                                descriptor.asMap());
+        DataGenerator<?>[] fieldGenerators = source.getFieldGenerators();
+        SequenceGenerator<?> fieldGenerator = (SequenceGenerator<?>) 
fieldGenerators[0];
+        long start = fieldGenerator.getStart();
+        long end = fieldGenerator.getEnd();
+
+        Assertions.assertThat(0)
+                .describedAs("The default start value should be 0")
+                .isEqualTo(start);
+        Assertions.assertThat(Integer.MAX_VALUE)
+                .describedAs("The default start value should be 
Integer.MAX_VALUE")
+                .isEqualTo(end);
+    }
+
+    @Test
+    void testStartEndForSequence() {
+        DescriptorProperties descriptor = new DescriptorProperties();
+        descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
+        descriptor.putString(
+                DataGenConnectorOptionsUtil.FIELDS + ".f0." + 
DataGenConnectorOptionsUtil.KIND,
+                DataGenConnectorOptionsUtil.SEQUENCE);
+        final int setupStart = 10;
+        descriptor.putLong(
+                DataGenConnectorOptionsUtil.FIELDS + ".f0." + 
DataGenConnectorOptionsUtil.START,
+                setupStart);
+        final int setupEnd = 100;
+        descriptor.putLong(
+                DataGenConnectorOptionsUtil.FIELDS + ".f0." + 
DataGenConnectorOptionsUtil.END,
+                setupEnd);
+
+        DataGenTableSource source =
+                (DataGenTableSource)
+                        createTableSource(
+                                ResolvedSchema.of(Column.physical("f0", 
DataTypes.BIGINT())),
+                                descriptor.asMap());
+        DataGenerator<?>[] fieldGenerators = source.getFieldGenerators();
+        SequenceGenerator<?> fieldGenerator = (SequenceGenerator<?>) 
fieldGenerators[0];
+        long start = fieldGenerator.getStart();
+        long end = fieldGenerator.getEnd();
+
+        Assertions.assertThat(setupStart)
+                .describedAs("The default start value should be " + setupStart)
+                .isEqualTo(start);
+        Assertions.assertThat(setupEnd)
+                .describedAs("The default start value should be " + setupEnd)
+                .isEqualTo(end);
+    }
+
+    @Test
+    void testCornerCaseForSequence() {
+        // An example of testing that the end value is greater than the start 
value

Review Comment:
   We could make two test methods out of this one. A Junit test should usually 
only test a single issue. On another not, usually, comments are not necessary 
but should be replaced by assert messages instead. That way, the comment also 
serves as a description for the assert in case the assert fails.



##########
flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java:
##########
@@ -264,7 +267,68 @@ void testSequenceCheckpointRestore() throws Exception {
     }
 
     @Test
-    void testLackStartForSequence() {
+    void testDefaultValueForSequence() {
+        DescriptorProperties descriptor = new DescriptorProperties();
+        descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
+        descriptor.putString(
+                DataGenConnectorOptionsUtil.FIELDS + ".f0." + 
DataGenConnectorOptionsUtil.KIND,
+                DataGenConnectorOptionsUtil.SEQUENCE);
+
+        DataGenTableSource source =
+                (DataGenTableSource)
+                        createTableSource(
+                                ResolvedSchema.of(Column.physical("f0", 
DataTypes.BIGINT())),
+                                descriptor.asMap());
+        DataGenerator<?>[] fieldGenerators = source.getFieldGenerators();
+        SequenceGenerator<?> fieldGenerator = (SequenceGenerator<?>) 
fieldGenerators[0];
+        long start = fieldGenerator.getStart();
+        long end = fieldGenerator.getEnd();
+
+        Assertions.assertThat(0)
+                .describedAs("The default start value should be 0")
+                .isEqualTo(start);
+        Assertions.assertThat(Integer.MAX_VALUE)
+                .describedAs("The default start value should be 
Integer.MAX_VALUE")
+                .isEqualTo(end);
+    }
+
+    @Test
+    void testStartEndForSequence() {
+        DescriptorProperties descriptor = new DescriptorProperties();
+        descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
+        descriptor.putString(
+                DataGenConnectorOptionsUtil.FIELDS + ".f0." + 
DataGenConnectorOptionsUtil.KIND,
+                DataGenConnectorOptionsUtil.SEQUENCE);
+        final int setupStart = 10;
+        descriptor.putLong(
+                DataGenConnectorOptionsUtil.FIELDS + ".f0." + 
DataGenConnectorOptionsUtil.START,
+                setupStart);
+        final int setupEnd = 100;
+        descriptor.putLong(
+                DataGenConnectorOptionsUtil.FIELDS + ".f0." + 
DataGenConnectorOptionsUtil.END,
+                setupEnd);
+
+        DataGenTableSource source =
+                (DataGenTableSource)
+                        createTableSource(
+                                ResolvedSchema.of(Column.physical("f0", 
DataTypes.BIGINT())),
+                                descriptor.asMap());
+        DataGenerator<?>[] fieldGenerators = source.getFieldGenerators();
+        SequenceGenerator<?> fieldGenerator = (SequenceGenerator<?>) 
fieldGenerators[0];
+        long start = fieldGenerator.getStart();
+        long end = fieldGenerator.getEnd();
+
+        Assertions.assertThat(setupStart)
+                .describedAs("The default start value should be " + setupStart)
+                .isEqualTo(start);
+        Assertions.assertThat(setupEnd)
+                .describedAs("The default start value should be " + setupEnd)
+                .isEqualTo(end);
+    }
+
+    @Test
+    void testCornerCaseForSequence() {

Review Comment:
   ```suggestion
       void testCornerCaseForSequence() {
   ```
   that's bad naming. The test method should be more descriptive. Your test 
method could apply to any corner case scenario.



##########
flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java:
##########
@@ -264,7 +267,68 @@ void testSequenceCheckpointRestore() throws Exception {
     }
 
     @Test
-    void testLackStartForSequence() {
+    void testDefaultValueForSequence() {
+        DescriptorProperties descriptor = new DescriptorProperties();
+        descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
+        descriptor.putString(
+                DataGenConnectorOptionsUtil.FIELDS + ".f0." + 
DataGenConnectorOptionsUtil.KIND,
+                DataGenConnectorOptionsUtil.SEQUENCE);
+
+        DataGenTableSource source =
+                (DataGenTableSource)
+                        createTableSource(
+                                ResolvedSchema.of(Column.physical("f0", 
DataTypes.BIGINT())),
+                                descriptor.asMap());
+        DataGenerator<?>[] fieldGenerators = source.getFieldGenerators();
+        SequenceGenerator<?> fieldGenerator = (SequenceGenerator<?>) 
fieldGenerators[0];
+        long start = fieldGenerator.getStart();
+        long end = fieldGenerator.getEnd();
+
+        Assertions.assertThat(0)
+                .describedAs("The default start value should be 0")
+                .isEqualTo(start);
+        Assertions.assertThat(Integer.MAX_VALUE)
+                .describedAs("The default start value should be 
Integer.MAX_VALUE")
+                .isEqualTo(end);
+    }
+
+    @Test
+    void testStartEndForSequence() {
+        DescriptorProperties descriptor = new DescriptorProperties();
+        descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
+        descriptor.putString(
+                DataGenConnectorOptionsUtil.FIELDS + ".f0." + 
DataGenConnectorOptionsUtil.KIND,
+                DataGenConnectorOptionsUtil.SEQUENCE);
+        final int setupStart = 10;
+        descriptor.putLong(
+                DataGenConnectorOptionsUtil.FIELDS + ".f0." + 
DataGenConnectorOptionsUtil.START,
+                setupStart);
+        final int setupEnd = 100;
+        descriptor.putLong(
+                DataGenConnectorOptionsUtil.FIELDS + ".f0." + 
DataGenConnectorOptionsUtil.END,
+                setupEnd);
+
+        DataGenTableSource source =
+                (DataGenTableSource)
+                        createTableSource(
+                                ResolvedSchema.of(Column.physical("f0", 
DataTypes.BIGINT())),
+                                descriptor.asMap());
+        DataGenerator<?>[] fieldGenerators = source.getFieldGenerators();
+        SequenceGenerator<?> fieldGenerator = (SequenceGenerator<?>) 
fieldGenerators[0];
+        long start = fieldGenerator.getStart();
+        long end = fieldGenerator.getEnd();
+
+        Assertions.assertThat(setupStart)
+                .describedAs("The default start value should be " + setupStart)
+                .isEqualTo(start);
+        Assertions.assertThat(setupEnd)
+                .describedAs("The default start value should be " + setupEnd)
+                .isEqualTo(end);
+    }
+
+    @Test
+    void testCornerCaseForSequence() {

Review Comment:
   You don't want to have code being covered that's not necessary for the unit 
test. You're trying to test functionality within `SequenceGneerator`. Wouldn't 
it be better to introduce a `SequenceGeneratorTest` that tests the 
implementation details of `SequenceGenerator`? WDYT?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -53,6 +54,8 @@
      * @param end End of the range of numbers to emit.
      */
     public SequenceGenerator(long start, long end) {
+        Preconditions.checkArgument(

Review Comment:
   The requirements of this class should be reflected in the JavaDoc as well.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -83,9 +86,11 @@ public void open(
             final int taskIdx = runtimeContext.getIndexOfThisSubtask();
             final long congruence = start + taskIdx;
 
+            Preconditions.checkArgument(

Review Comment:
   The validation check should happen as early as possible (in the location 
where these parameters are set, i.e. the constructor).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to