XComp commented on code in PR #21971: URL: https://github.com/apache/flink/pull/21971#discussion_r1118867104
########## flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java: ########## @@ -264,7 +267,68 @@ void testSequenceCheckpointRestore() throws Exception { } @Test - void testLackStartForSequence() { + void testDefaultValueForSequence() { + DescriptorProperties descriptor = new DescriptorProperties(); + descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen"); + descriptor.putString( + DataGenConnectorOptionsUtil.FIELDS + ".f0." + DataGenConnectorOptionsUtil.KIND, + DataGenConnectorOptionsUtil.SEQUENCE); + + DataGenTableSource source = + (DataGenTableSource) + createTableSource( + ResolvedSchema.of(Column.physical("f0", DataTypes.BIGINT())), + descriptor.asMap()); + DataGenerator<?>[] fieldGenerators = source.getFieldGenerators(); + SequenceGenerator<?> fieldGenerator = (SequenceGenerator<?>) fieldGenerators[0]; + long start = fieldGenerator.getStart(); + long end = fieldGenerator.getEnd(); + + Assertions.assertThat(0) + .describedAs("The default start value should be 0") + .isEqualTo(start); + Assertions.assertThat(Integer.MAX_VALUE) + .describedAs("The default start value should be Integer.MAX_VALUE") + .isEqualTo(end); + } + + @Test + void testStartEndForSequence() { + DescriptorProperties descriptor = new DescriptorProperties(); + descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen"); + descriptor.putString( + DataGenConnectorOptionsUtil.FIELDS + ".f0." + DataGenConnectorOptionsUtil.KIND, + DataGenConnectorOptionsUtil.SEQUENCE); + final int setupStart = 10; + descriptor.putLong( + DataGenConnectorOptionsUtil.FIELDS + ".f0." + DataGenConnectorOptionsUtil.START, + setupStart); + final int setupEnd = 100; + descriptor.putLong( + DataGenConnectorOptionsUtil.FIELDS + ".f0." + DataGenConnectorOptionsUtil.END, + setupEnd); + + DataGenTableSource source = + (DataGenTableSource) + createTableSource( + ResolvedSchema.of(Column.physical("f0", DataTypes.BIGINT())), + descriptor.asMap()); + DataGenerator<?>[] fieldGenerators = source.getFieldGenerators(); + SequenceGenerator<?> fieldGenerator = (SequenceGenerator<?>) fieldGenerators[0]; + long start = fieldGenerator.getStart(); + long end = fieldGenerator.getEnd(); + + Assertions.assertThat(setupStart) + .describedAs("The default start value should be " + setupStart) + .isEqualTo(start); + Assertions.assertThat(setupEnd) + .describedAs("The default start value should be " + setupEnd) + .isEqualTo(end); + } + + @Test + void testCornerCaseForSequence() { + // An example of testing that the end value is greater than the start value assertThatThrownBy( Review Comment: `assertThatThrownBy` should be used as specifically as possible: Theoretically, the `IllegalArgumentException` could have been thrown by any other code line in the passed block even though you're only interested in the `createTableSource` here. ########## flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java: ########## @@ -264,7 +267,68 @@ void testSequenceCheckpointRestore() throws Exception { } @Test - void testLackStartForSequence() { + void testDefaultValueForSequence() { + DescriptorProperties descriptor = new DescriptorProperties(); + descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen"); + descriptor.putString( + DataGenConnectorOptionsUtil.FIELDS + ".f0." + DataGenConnectorOptionsUtil.KIND, + DataGenConnectorOptionsUtil.SEQUENCE); + + DataGenTableSource source = + (DataGenTableSource) + createTableSource( + ResolvedSchema.of(Column.physical("f0", DataTypes.BIGINT())), + descriptor.asMap()); + DataGenerator<?>[] fieldGenerators = source.getFieldGenerators(); + SequenceGenerator<?> fieldGenerator = (SequenceGenerator<?>) fieldGenerators[0]; + long start = fieldGenerator.getStart(); + long end = fieldGenerator.getEnd(); + + Assertions.assertThat(0) + .describedAs("The default start value should be 0") + .isEqualTo(start); + Assertions.assertThat(Integer.MAX_VALUE) + .describedAs("The default start value should be Integer.MAX_VALUE") + .isEqualTo(end); + } + + @Test + void testStartEndForSequence() { + DescriptorProperties descriptor = new DescriptorProperties(); + descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen"); + descriptor.putString( + DataGenConnectorOptionsUtil.FIELDS + ".f0." + DataGenConnectorOptionsUtil.KIND, + DataGenConnectorOptionsUtil.SEQUENCE); + final int setupStart = 10; + descriptor.putLong( + DataGenConnectorOptionsUtil.FIELDS + ".f0." + DataGenConnectorOptionsUtil.START, + setupStart); + final int setupEnd = 100; + descriptor.putLong( + DataGenConnectorOptionsUtil.FIELDS + ".f0." + DataGenConnectorOptionsUtil.END, + setupEnd); + + DataGenTableSource source = + (DataGenTableSource) + createTableSource( + ResolvedSchema.of(Column.physical("f0", DataTypes.BIGINT())), + descriptor.asMap()); + DataGenerator<?>[] fieldGenerators = source.getFieldGenerators(); + SequenceGenerator<?> fieldGenerator = (SequenceGenerator<?>) fieldGenerators[0]; + long start = fieldGenerator.getStart(); + long end = fieldGenerator.getEnd(); + + Assertions.assertThat(setupStart) + .describedAs("The default start value should be " + setupStart) + .isEqualTo(start); + Assertions.assertThat(setupEnd) + .describedAs("The default start value should be " + setupEnd) + .isEqualTo(end); + } + + @Test + void testCornerCaseForSequence() { + // An example of testing that the end value is greater than the start value Review Comment: We could make two test methods out of this one. A Junit test should usually only test a single issue. On another not, usually, comments are not necessary but should be replaced by assert messages instead. That way, the comment also serves as a description for the assert in case the assert fails. ########## flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java: ########## @@ -264,7 +267,68 @@ void testSequenceCheckpointRestore() throws Exception { } @Test - void testLackStartForSequence() { + void testDefaultValueForSequence() { + DescriptorProperties descriptor = new DescriptorProperties(); + descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen"); + descriptor.putString( + DataGenConnectorOptionsUtil.FIELDS + ".f0." + DataGenConnectorOptionsUtil.KIND, + DataGenConnectorOptionsUtil.SEQUENCE); + + DataGenTableSource source = + (DataGenTableSource) + createTableSource( + ResolvedSchema.of(Column.physical("f0", DataTypes.BIGINT())), + descriptor.asMap()); + DataGenerator<?>[] fieldGenerators = source.getFieldGenerators(); + SequenceGenerator<?> fieldGenerator = (SequenceGenerator<?>) fieldGenerators[0]; + long start = fieldGenerator.getStart(); + long end = fieldGenerator.getEnd(); + + Assertions.assertThat(0) + .describedAs("The default start value should be 0") + .isEqualTo(start); + Assertions.assertThat(Integer.MAX_VALUE) + .describedAs("The default start value should be Integer.MAX_VALUE") + .isEqualTo(end); + } + + @Test + void testStartEndForSequence() { + DescriptorProperties descriptor = new DescriptorProperties(); + descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen"); + descriptor.putString( + DataGenConnectorOptionsUtil.FIELDS + ".f0." + DataGenConnectorOptionsUtil.KIND, + DataGenConnectorOptionsUtil.SEQUENCE); + final int setupStart = 10; + descriptor.putLong( + DataGenConnectorOptionsUtil.FIELDS + ".f0." + DataGenConnectorOptionsUtil.START, + setupStart); + final int setupEnd = 100; + descriptor.putLong( + DataGenConnectorOptionsUtil.FIELDS + ".f0." + DataGenConnectorOptionsUtil.END, + setupEnd); + + DataGenTableSource source = + (DataGenTableSource) + createTableSource( + ResolvedSchema.of(Column.physical("f0", DataTypes.BIGINT())), + descriptor.asMap()); + DataGenerator<?>[] fieldGenerators = source.getFieldGenerators(); + SequenceGenerator<?> fieldGenerator = (SequenceGenerator<?>) fieldGenerators[0]; + long start = fieldGenerator.getStart(); + long end = fieldGenerator.getEnd(); + + Assertions.assertThat(setupStart) + .describedAs("The default start value should be " + setupStart) + .isEqualTo(start); + Assertions.assertThat(setupEnd) + .describedAs("The default start value should be " + setupEnd) + .isEqualTo(end); + } + + @Test + void testCornerCaseForSequence() { Review Comment: ```suggestion void testCornerCaseForSequence() { ``` that's bad naming. The test method should be more descriptive. Your test method could apply to any corner case scenario. ########## flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java: ########## @@ -264,7 +267,68 @@ void testSequenceCheckpointRestore() throws Exception { } @Test - void testLackStartForSequence() { + void testDefaultValueForSequence() { + DescriptorProperties descriptor = new DescriptorProperties(); + descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen"); + descriptor.putString( + DataGenConnectorOptionsUtil.FIELDS + ".f0." + DataGenConnectorOptionsUtil.KIND, + DataGenConnectorOptionsUtil.SEQUENCE); + + DataGenTableSource source = + (DataGenTableSource) + createTableSource( + ResolvedSchema.of(Column.physical("f0", DataTypes.BIGINT())), + descriptor.asMap()); + DataGenerator<?>[] fieldGenerators = source.getFieldGenerators(); + SequenceGenerator<?> fieldGenerator = (SequenceGenerator<?>) fieldGenerators[0]; + long start = fieldGenerator.getStart(); + long end = fieldGenerator.getEnd(); + + Assertions.assertThat(0) + .describedAs("The default start value should be 0") + .isEqualTo(start); + Assertions.assertThat(Integer.MAX_VALUE) + .describedAs("The default start value should be Integer.MAX_VALUE") + .isEqualTo(end); + } + + @Test + void testStartEndForSequence() { + DescriptorProperties descriptor = new DescriptorProperties(); + descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen"); + descriptor.putString( + DataGenConnectorOptionsUtil.FIELDS + ".f0." + DataGenConnectorOptionsUtil.KIND, + DataGenConnectorOptionsUtil.SEQUENCE); + final int setupStart = 10; + descriptor.putLong( + DataGenConnectorOptionsUtil.FIELDS + ".f0." + DataGenConnectorOptionsUtil.START, + setupStart); + final int setupEnd = 100; + descriptor.putLong( + DataGenConnectorOptionsUtil.FIELDS + ".f0." + DataGenConnectorOptionsUtil.END, + setupEnd); + + DataGenTableSource source = + (DataGenTableSource) + createTableSource( + ResolvedSchema.of(Column.physical("f0", DataTypes.BIGINT())), + descriptor.asMap()); + DataGenerator<?>[] fieldGenerators = source.getFieldGenerators(); + SequenceGenerator<?> fieldGenerator = (SequenceGenerator<?>) fieldGenerators[0]; + long start = fieldGenerator.getStart(); + long end = fieldGenerator.getEnd(); + + Assertions.assertThat(setupStart) + .describedAs("The default start value should be " + setupStart) + .isEqualTo(start); + Assertions.assertThat(setupEnd) + .describedAs("The default start value should be " + setupEnd) + .isEqualTo(end); + } + + @Test + void testCornerCaseForSequence() { Review Comment: You don't want to have code being covered that's not necessary for the unit test. You're trying to test functionality within `SequenceGneerator`. Wouldn't it be better to introduce a `SequenceGeneratorTest` that tests the implementation details of `SequenceGenerator`? WDYT? ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java: ########## @@ -53,6 +54,8 @@ * @param end End of the range of numbers to emit. */ public SequenceGenerator(long start, long end) { + Preconditions.checkArgument( Review Comment: The requirements of this class should be reflected in the JavaDoc as well. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java: ########## @@ -83,9 +86,11 @@ public void open( final int taskIdx = runtimeContext.getIndexOfThisSubtask(); final long congruence = start + taskIdx; + Preconditions.checkArgument( Review Comment: The validation check should happen as early as possible (in the location where these parameters are set, i.e. the constructor). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org