Airblader commented on a change in pull request #17811: URL: https://github.com/apache/flink/pull/17811#discussion_r756662813
########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java ########## @@ -381,11 +394,44 @@ // ------------------------------------------------------------------------------------------ /** The enforcer to guarantee NOT NULL column constraint when writing data into sink. */ - public enum NotNullEnforcer { - /** Throws runtime exception when writing null values into NOT NULL column. */ - ERROR, - /** Drop records when writing null values into NOT NULL column. */ - DROP + public enum NotNullEnforcer implements DescribedEnum { + ERROR(text("Throws runtime exception when writing null values into NOT NULL column.")), + DROP(text("Drops records when writing null values into NOT NULL column.")); Review comment: ```suggestion DROP(text("Drop records silently if a null value would have to be inserted into a NOT NULL column.")); ``` ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java ########## @@ -114,12 +116,23 @@ .enumType(NotNullEnforcer.class) .defaultValue(NotNullEnforcer.ERROR) .withDescription( - "The NOT NULL column constraint on a table enforces that " - + "null values can't be inserted into the table. Flink supports " - + "'error' (default) and 'drop' enforcement behavior. By default, " - + "Flink will check values and throw runtime exception when null values writing " - + "into NOT NULL columns. Users can change the behavior to 'drop' to " - + "silently drop such records without throwing exception."); + "Determines whether the NOT NULL column constraint on a table enforces that " + + "null values can't be inserted into the table. By default, " + + "Flink will check values and throw runtime exception when attempting " + + "to write null values writing into NOT NULL columns. Users can change " + + "the behavior to silently drop such records without throwing exception."); Review comment: This entire paragraph just re-states the description of the enum. I think we can simplify this to something like ```suggestion "Determines how Flink enforces NOT NULL column constraints when inserting null values."); ``` ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java ########## @@ -381,11 +394,44 @@ // ------------------------------------------------------------------------------------------ /** The enforcer to guarantee NOT NULL column constraint when writing data into sink. */ - public enum NotNullEnforcer { - /** Throws runtime exception when writing null values into NOT NULL column. */ - ERROR, - /** Drop records when writing null values into NOT NULL column. */ - DROP + public enum NotNullEnforcer implements DescribedEnum { + ERROR(text("Throws runtime exception when writing null values into NOT NULL column.")), + DROP(text("Drops records when writing null values into NOT NULL column.")); + + private final InlineElement description; + + NotNullEnforcer(InlineElement description) { + this.description = description; + } + + @Override + public InlineElement getDescription() { + return description; + } + } + + /** + * The enforcer to guarantee that precision of CHAR/VARCHAR columns is respected when writing + * data into sink. + */ + public enum CharPrecisionEnforcer implements DescribedEnum { + IGNORE( + text( + "Doesn't apply any trimming, simply ignores the CHAR/VARCHAR precision directive.")), Review comment: Just for consistent wording ```suggestion "Don't apply any trimming, and instead ignore the CHAR/VARCHAR precision directive.")), ``` ########## File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java ########## @@ -187,6 +198,148 @@ public void testStreamRecordTimestampInserterNotApplied() { assertPlan(tableEnv, "INSERT INTO T1 SELECT * FROM T1", false); } + @Test + public void testCharPrecisionEnforcer() throws ExecutionException, InterruptedException { + final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + final List<Row> rows = + Arrays.asList( + Row.of(1, "Apache Flink", "SQL RuleZ", 11, 111, "SQL"), + Row.of(2, "Apache", "SQL", 22, 222, "Flink"), + Row.of(3, "Apache", "Flink SQL", 33, 333, "Apache Flink SQL"), + Row.of(4, "Flink Project", "SQL or SeQueL?", 44, 444, "Apache Flink SQL")); + + final TableDescriptor sourceDescriptor = + TableFactoryHarness.newBuilder() + .schema(schemaForCharPrecisionEnforcer()) + .source(new TestSource(rows)) + .build(); + tableEnv.createTable("T1", sourceDescriptor); + + // Default config - ignore (no trim) + TableResult result = tableEnv.executeSql("SELECT * FROM T1"); + result.await(); + + final List<String> expected = rows.stream().map(Row::toString).collect(Collectors.toList()); + final List<String> resultsAsString = new ArrayList<>(); + result.collect().forEachRemaining(r -> resultsAsString.add(r.toString())); + assertEquals(expected, resultsAsString); + + try { Review comment: You had a comment above to specify that it's the default config test case, but no comment here. ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java ########## @@ -161,28 +167,47 @@ public DynamicTableSinkSpec getTableSinkSpec() { /** * Apply an operator to filter or report error to process not-null values for not-null fields. */ - private Transformation<RowData> applyNotNullEnforcer( + private Transformation<RowData> applyConstraintValidations( Transformation<RowData> inputTransform, TableConfig config, RowType physicalRowType) { - final ExecutionConfigOptions.NotNullEnforcer notNullEnforcer = - config.getConfiguration() - .get(ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER); - final int[] notNullFieldIndices = getNotNullFieldIndices(physicalRowType); + final ConstraintEnforcer.Builder validatorBuilder = ConstraintEnforcer.newBuilder(); final String[] fieldNames = physicalRowType.getFieldNames().toArray(new String[0]); + // Build NOT NULL enforcer + final int[] notNullFieldIndices = getNotNullFieldIndices(physicalRowType); if (notNullFieldIndices.length > 0) { - final SinkNotNullEnforcer enforcer = - new SinkNotNullEnforcer(notNullEnforcer, notNullFieldIndices, fieldNames); + final ExecutionConfigOptions.NotNullEnforcer notNullEnforcer = + config.getConfiguration() + .get(ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER); final List<String> notNullFieldNames = Arrays.stream(notNullFieldIndices) .mapToObj(idx -> fieldNames[idx]) .collect(Collectors.toList()); - final String operatorName = - String.format( - "NotNullEnforcer(fields=[%s])", String.join(", ", notNullFieldNames)); + + validatorBuilder.addNotNullConstraint( + notNullEnforcer, notNullFieldIndices, notNullFieldNames, fieldNames); + } + + // Build CHAR/VARCHAR precision enforcer + final List<Tuple2<Integer, Integer>> charFields = getCharFieldIndices(physicalRowType); + if (charFields.size() > 0) { Review comment: nit ```suggestion if (!charFields.isEmpty()) { ``` ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java ########## @@ -114,12 +116,23 @@ .enumType(NotNullEnforcer.class) .defaultValue(NotNullEnforcer.ERROR) .withDescription( - "The NOT NULL column constraint on a table enforces that " - + "null values can't be inserted into the table. Flink supports " - + "'error' (default) and 'drop' enforcement behavior. By default, " - + "Flink will check values and throw runtime exception when null values writing " - + "into NOT NULL columns. Users can change the behavior to 'drop' to " - + "silently drop such records without throwing exception."); + "Determines whether the NOT NULL column constraint on a table enforces that " + + "null values can't be inserted into the table. By default, " + + "Flink will check values and throw runtime exception when attempting " + + "to write null values writing into NOT NULL columns. Users can change " + + "the behavior to silently drop such records without throwing exception."); + + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) + public static final ConfigOption<CharPrecisionEnforcer> + TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER = + key("table.exec.sink.char-precision-enforcer") + .enumType(CharPrecisionEnforcer.class) + .defaultValue(CharPrecisionEnforcer.IGNORE) + .withDescription( + "Determines whether string values for columns with CHAR(<precision>)/VARCHAR(<precision>) " + + "types will be trimmed, so that their length will match the one defined by the " + + "precision of their respective CHAR/VARCHAR column type. By default, no trimming " + + "is applied."); Review comment: The default value is already documented. ```suggestion + "precision of their respective CHAR/VARCHAR column type."); ``` ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java ########## @@ -197,6 +222,23 @@ public DynamicTableSinkSpec getTableSinkSpec() { .toArray(); } + /** + * Returns a long[], each long element holds 2 integers, the char field idx and its precision. + */ + private List<Tuple2<Integer, Integer>> getCharFieldIndices(RowType physicalType) { + ArrayList<Tuple2<Integer, Integer>> charFieldsAndLengths = new ArrayList<>(); Review comment: Use interface for the type ```suggestion final List<Tuple2<Integer, Integer>> charFieldsAndLengths = new ArrayList<>(); ``` ########## File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java ########## @@ -187,6 +198,148 @@ public void testStreamRecordTimestampInserterNotApplied() { assertPlan(tableEnv, "INSERT INTO T1 SELECT * FROM T1", false); } + @Test + public void testCharPrecisionEnforcer() throws ExecutionException, InterruptedException { + final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + final List<Row> rows = + Arrays.asList( + Row.of(1, "Apache Flink", "SQL RuleZ", 11, 111, "SQL"), + Row.of(2, "Apache", "SQL", 22, 222, "Flink"), + Row.of(3, "Apache", "Flink SQL", 33, 333, "Apache Flink SQL"), + Row.of(4, "Flink Project", "SQL or SeQueL?", 44, 444, "Apache Flink SQL")); + + final TableDescriptor sourceDescriptor = + TableFactoryHarness.newBuilder() + .schema(schemaForCharPrecisionEnforcer()) + .source(new TestSource(rows)) + .build(); + tableEnv.createTable("T1", sourceDescriptor); + + // Default config - ignore (no trim) + TableResult result = tableEnv.executeSql("SELECT * FROM T1"); + result.await(); + + final List<String> expected = rows.stream().map(Row::toString).collect(Collectors.toList()); + final List<String> resultsAsString = new ArrayList<>(); + result.collect().forEachRemaining(r -> resultsAsString.add(r.toString())); + assertEquals(expected, resultsAsString); + + try { + tableEnv.getConfig() + .getConfiguration() + .setString( + TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER.key(), + ExecutionConfigOptions.CharPrecisionEnforcer.TRIM.name()); + + result = tableEnv.executeSql("SELECT * FROM T1"); + result.await(); + + final List<String> expectedTrimmed = + Arrays.asList( + "+I[1, Apache F, SQL R, 11, 111, SQL]", + "+I[2, Apache, SQL, 22, 222, Flink]", + "+I[3, Apache, Flink, 33, 333, Apache]", + "+I[4, Flink Pr, SQL o, 44, 444, Apache]"); + final List<String> resultsAsStringStrimmed = new ArrayList<>(); + result.collect().forEachRemaining(r -> resultsAsStringStrimmed.add(r.toString())); + assertEquals(expectedTrimmed, resultsAsStringStrimmed); + + } finally { + tableEnv.getConfig() + .getConfiguration() + .setString( + TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER.key(), + ExecutionConfigOptions.CharPrecisionEnforcer.IGNORE.name()); + } + } + + @Test + public void testNullEnforcer() throws ExecutionException, InterruptedException { + final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + final List<Row> rows = + Arrays.asList( + Row.of(1, "Apache", 11), + Row.of(2, null, 22), + Row.of(null, "Flink", 33), + Row.of(null, null, 44)); + + final SharedReference<List<RowData>> results = sharedObjects.add(new ArrayList<>()); + tableEnv.createTable( + "T1", + TableFactoryHarness.newBuilder() + .schema(schemaForNotNullEnforcer()) + .source(new TestSource(rows)) + .sink( + new TableFactoryHarness.SinkBase() { + @Override + public DynamicTableSink.SinkRuntimeProvider + getSinkRuntimeProvider( + DynamicTableSink.Context context) { + return SinkProvider.of( + TestSink.newBuilder() + .setWriter(new TestNotNullWriter(results)) + .setCommittableSerializer( + TestSink.StringCommittableSerializer + .INSTANCE) + .build()); + } + }) + .build()); + + // Default config - ignore (no trim) + ExecutionException ee = Review comment: (This would / will be nicer with AssertJā¦ :upside_down_face: ) ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java ########## @@ -381,11 +394,44 @@ // ------------------------------------------------------------------------------------------ /** The enforcer to guarantee NOT NULL column constraint when writing data into sink. */ - public enum NotNullEnforcer { - /** Throws runtime exception when writing null values into NOT NULL column. */ - ERROR, - /** Drop records when writing null values into NOT NULL column. */ - DROP + public enum NotNullEnforcer implements DescribedEnum { + ERROR(text("Throws runtime exception when writing null values into NOT NULL column.")), + DROP(text("Drops records when writing null values into NOT NULL column.")); + + private final InlineElement description; + + NotNullEnforcer(InlineElement description) { + this.description = description; + } + + @Override + public InlineElement getDescription() { + return description; + } + } + + /** + * The enforcer to guarantee that precision of CHAR/VARCHAR columns is respected when writing + * data into sink. + */ + public enum CharPrecisionEnforcer implements DescribedEnum { + IGNORE( + text( + "Doesn't apply any trimming, simply ignores the CHAR/VARCHAR precision directive.")), + TRIM( + text( + "Trims string values to match the length defined as the CHAR/VARCHAR precision.")); Review comment: Just for consistent wording ```suggestion "Trim string values to match the length defined by the CHAR/VARCHAR precision.")); ``` ########## File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/UpdatableRowData.java ########## @@ -17,23 +17,33 @@ package org.apache.flink.table.data; +import org.apache.flink.annotation.Internal; import org.apache.flink.table.data.binary.TypedSetters; import org.apache.flink.types.RowKind; +import java.util.BitSet; + /** * An implementation of {@link RowData} which is backed by a {@link RowData} and an updated Java * object array. */ +@Internal public final class UpdatableRowData implements RowData, TypedSetters { private RowData row; private final Object[] fields; - private final boolean[] updated; + private final BitSet updated; Review comment: I wonder about the performance or memory impact of this. At least from a quick search it seems to be OK performance-wise and even beneficial memory-wise (https://www.baeldung.com/java-boolean-array-bitset-performance) ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java ########## @@ -381,11 +394,44 @@ // ------------------------------------------------------------------------------------------ /** The enforcer to guarantee NOT NULL column constraint when writing data into sink. */ - public enum NotNullEnforcer { - /** Throws runtime exception when writing null values into NOT NULL column. */ - ERROR, - /** Drop records when writing null values into NOT NULL column. */ - DROP + public enum NotNullEnforcer implements DescribedEnum { + ERROR(text("Throws runtime exception when writing null values into NOT NULL column.")), Review comment: ```suggestion ERROR(text("Throw a runtime exception when writing null values into a NOT NULL column.")), ``` ########## File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java ########## @@ -187,6 +198,148 @@ public void testStreamRecordTimestampInserterNotApplied() { assertPlan(tableEnv, "INSERT INTO T1 SELECT * FROM T1", false); } + @Test + public void testCharPrecisionEnforcer() throws ExecutionException, InterruptedException { + final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + final List<Row> rows = + Arrays.asList( + Row.of(1, "Apache Flink", "SQL RuleZ", 11, 111, "SQL"), + Row.of(2, "Apache", "SQL", 22, 222, "Flink"), + Row.of(3, "Apache", "Flink SQL", 33, 333, "Apache Flink SQL"), + Row.of(4, "Flink Project", "SQL or SeQueL?", 44, 444, "Apache Flink SQL")); + + final TableDescriptor sourceDescriptor = + TableFactoryHarness.newBuilder() + .schema(schemaForCharPrecisionEnforcer()) + .source(new TestSource(rows)) + .build(); + tableEnv.createTable("T1", sourceDescriptor); + + // Default config - ignore (no trim) + TableResult result = tableEnv.executeSql("SELECT * FROM T1"); + result.await(); + + final List<String> expected = rows.stream().map(Row::toString).collect(Collectors.toList()); + final List<String> resultsAsString = new ArrayList<>(); + result.collect().forEachRemaining(r -> resultsAsString.add(r.toString())); + assertEquals(expected, resultsAsString); + + try { + tableEnv.getConfig() + .getConfiguration() + .setString( + TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER.key(), + ExecutionConfigOptions.CharPrecisionEnforcer.TRIM.name()); + + result = tableEnv.executeSql("SELECT * FROM T1"); + result.await(); + + final List<String> expectedTrimmed = + Arrays.asList( + "+I[1, Apache F, SQL R, 11, 111, SQL]", + "+I[2, Apache, SQL, 22, 222, Flink]", + "+I[3, Apache, Flink, 33, 333, Apache]", + "+I[4, Flink Pr, SQL o, 44, 444, Apache]"); + final List<String> resultsAsStringStrimmed = new ArrayList<>(); + result.collect().forEachRemaining(r -> resultsAsStringStrimmed.add(r.toString())); + assertEquals(expectedTrimmed, resultsAsStringStrimmed); + + } finally { + tableEnv.getConfig() + .getConfiguration() + .setString( + TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER.key(), + ExecutionConfigOptions.CharPrecisionEnforcer.IGNORE.name()); + } Review comment: This feels pretty unstable. :-( If we need to modify the table environment in a test case, it shouldn't be re-used across test cases, IMO, otherwise this can introduce hard to find test instabilities. ########## File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java ########## @@ -187,6 +198,148 @@ public void testStreamRecordTimestampInserterNotApplied() { assertPlan(tableEnv, "INSERT INTO T1 SELECT * FROM T1", false); } + @Test + public void testCharPrecisionEnforcer() throws ExecutionException, InterruptedException { + final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + final List<Row> rows = + Arrays.asList( + Row.of(1, "Apache Flink", "SQL RuleZ", 11, 111, "SQL"), + Row.of(2, "Apache", "SQL", 22, 222, "Flink"), + Row.of(3, "Apache", "Flink SQL", 33, 333, "Apache Flink SQL"), + Row.of(4, "Flink Project", "SQL or SeQueL?", 44, 444, "Apache Flink SQL")); + + final TableDescriptor sourceDescriptor = + TableFactoryHarness.newBuilder() + .schema(schemaForCharPrecisionEnforcer()) + .source(new TestSource(rows)) + .build(); + tableEnv.createTable("T1", sourceDescriptor); + + // Default config - ignore (no trim) + TableResult result = tableEnv.executeSql("SELECT * FROM T1"); + result.await(); + + final List<String> expected = rows.stream().map(Row::toString).collect(Collectors.toList()); + final List<String> resultsAsString = new ArrayList<>(); + result.collect().forEachRemaining(r -> resultsAsString.add(r.toString())); + assertEquals(expected, resultsAsString); + + try { + tableEnv.getConfig() + .getConfiguration() + .setString( + TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER.key(), + ExecutionConfigOptions.CharPrecisionEnforcer.TRIM.name()); + + result = tableEnv.executeSql("SELECT * FROM T1"); + result.await(); + + final List<String> expectedTrimmed = + Arrays.asList( + "+I[1, Apache F, SQL R, 11, 111, SQL]", + "+I[2, Apache, SQL, 22, 222, Flink]", + "+I[3, Apache, Flink, 33, 333, Apache]", + "+I[4, Flink Pr, SQL o, 44, 444, Apache]"); + final List<String> resultsAsStringStrimmed = new ArrayList<>(); + result.collect().forEachRemaining(r -> resultsAsStringStrimmed.add(r.toString())); + assertEquals(expectedTrimmed, resultsAsStringStrimmed); + + } finally { + tableEnv.getConfig() + .getConfiguration() + .setString( + TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER.key(), + ExecutionConfigOptions.CharPrecisionEnforcer.IGNORE.name()); + } + } + + @Test + public void testNullEnforcer() throws ExecutionException, InterruptedException { + final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + final List<Row> rows = + Arrays.asList( + Row.of(1, "Apache", 11), + Row.of(2, null, 22), + Row.of(null, "Flink", 33), + Row.of(null, null, 44)); + + final SharedReference<List<RowData>> results = sharedObjects.add(new ArrayList<>()); + tableEnv.createTable( + "T1", + TableFactoryHarness.newBuilder() + .schema(schemaForNotNullEnforcer()) + .source(new TestSource(rows)) + .sink( + new TableFactoryHarness.SinkBase() { + @Override + public DynamicTableSink.SinkRuntimeProvider + getSinkRuntimeProvider( + DynamicTableSink.Context context) { + return SinkProvider.of( + TestSink.newBuilder() + .setWriter(new TestNotNullWriter(results)) + .setCommittableSerializer( + TestSink.StringCommittableSerializer + .INSTANCE) + .build()); + } + }) + .build()); Review comment: It'd be nice to extract some parts here to avoid the pyramid of doom. ########## File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java ########## @@ -187,6 +198,148 @@ public void testStreamRecordTimestampInserterNotApplied() { assertPlan(tableEnv, "INSERT INTO T1 SELECT * FROM T1", false); } + @Test + public void testCharPrecisionEnforcer() throws ExecutionException, InterruptedException { + final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + final List<Row> rows = + Arrays.asList( + Row.of(1, "Apache Flink", "SQL RuleZ", 11, 111, "SQL"), + Row.of(2, "Apache", "SQL", 22, 222, "Flink"), + Row.of(3, "Apache", "Flink SQL", 33, 333, "Apache Flink SQL"), + Row.of(4, "Flink Project", "SQL or SeQueL?", 44, 444, "Apache Flink SQL")); + + final TableDescriptor sourceDescriptor = + TableFactoryHarness.newBuilder() + .schema(schemaForCharPrecisionEnforcer()) + .source(new TestSource(rows)) + .build(); + tableEnv.createTable("T1", sourceDescriptor); + + // Default config - ignore (no trim) + TableResult result = tableEnv.executeSql("SELECT * FROM T1"); + result.await(); + + final List<String> expected = rows.stream().map(Row::toString).collect(Collectors.toList()); + final List<String> resultsAsString = new ArrayList<>(); + result.collect().forEachRemaining(r -> resultsAsString.add(r.toString())); + assertEquals(expected, resultsAsString); Review comment: We should compare this regardless of order for stability. Also, I wonder if we can't extract this entire block into a reusable piece? ########## File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/ConstraintEnforcer.java ########## @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.config.ExecutionConfigOptions.NotNullEnforcer; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.UpdatableRowData; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.runtime.operators.TableStreamOperator; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.table.api.config.ExecutionConfigOptions.CharPrecisionEnforcer; +import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Processes {@link RowData} to enforce the following constraints: + * + * <ul> + * <li>{@code NOT NULL} column constraint of a sink table + * <li>{@code CHAR(precision)}/@{code VARCHAR(precision)}: trim string values to comply with the + * {@code precision} defined in their corresponding types. + * </ul> + */ +@Internal +public class ConstraintEnforcer extends TableStreamOperator<RowData> + implements OneInputStreamOperator<RowData, RowData> { + + private static final long serialVersionUID = 1L; + + private final NotNullEnforcer notNullEnforcer; + private final int[] notNullFieldIndices; + private final String[] allFieldNames; + + private final CharPrecisionEnforcer charPrecisionEnforcer; + private final int[] charFieldIndices; + private final int[] charFieldPrecisions; + + private final String operatorName; + + private transient UpdatableRowData reusableRowData; + private transient StreamRecord<RowData> reusableStreamRecord; + + private ConstraintEnforcer( + NotNullEnforcer notNullEnforcer, + int[] notNullFieldIndices, + CharPrecisionEnforcer charPrecisionEnforcer, + int[] charFieldIndices, + int[] charFieldPrecisions, + String[] allFieldNames, + String operatorName) { + this.notNullEnforcer = notNullEnforcer; + this.notNullFieldIndices = notNullFieldIndices; + this.charPrecisionEnforcer = charPrecisionEnforcer; + this.charFieldIndices = charFieldIndices; + this.charFieldPrecisions = charFieldPrecisions; + this.allFieldNames = allFieldNames; + this.operatorName = operatorName; + } + + @Override + public String getOperatorName() { + return operatorName; + } + + @Override + public void open() throws Exception { + super.open(); + reusableRowData = new UpdatableRowData(null, allFieldNames.length); + reusableStreamRecord = new StreamRecord<>(null); + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Helper builder, so that the {@link ConstraintEnforcer} can be instantiated with only the NOT + * NULL constraint validation, only the CHAR/VARCHAR precision validation, or both. + */ + public static class Builder { + + private NotNullEnforcer notNullEnforcer; + private int[] notNullFieldIndices; + + private CharPrecisionEnforcer charPrecisionEnforcer; + private List<Tuple2<Integer, Integer>> charFields; + private String[] allFieldNames; + + private final List<String> operatorNames = new ArrayList<>(); + + private boolean isConfigured = false; + + public void addNotNullConstraint( + NotNullEnforcer notNullEnforcer, + int[] notNullFieldIndices, + List<String> notNullFieldNames, + String[] allFieldNames) { + checkArgument( + notNullFieldIndices.length > 0, + "ConstraintValidator requires that there are not-null fields."); + this.notNullFieldIndices = notNullFieldIndices; + this.notNullEnforcer = notNullEnforcer; + this.allFieldNames = allFieldNames; + if (notNullEnforcer != null) { + operatorNames.add( + String.format( + "NotNullEnforcer(fields=[%s])", + String.join(", ", notNullFieldNames))); + this.isConfigured = true; + } + } + + public void addCharPrecisionConstraint( + CharPrecisionEnforcer charPrecisionEnforcer, + List<Tuple2<Integer, Integer>> charFields, + List<String> charFieldNames, + String[] allFieldNames) { + this.charPrecisionEnforcer = charPrecisionEnforcer; + if (this.charPrecisionEnforcer == CharPrecisionEnforcer.TRIM) { + checkArgument( + charFields.size() > 0, + "ConstraintValidator requires that there are CHAR/VARCHAR fields."); + this.charFields = charFields; + this.allFieldNames = allFieldNames; + + operatorNames.add( + String.format( + "CharPrecisionEnforcer(fields=[%s])", + String.join(", ", charFieldNames))); + this.isConfigured = true; + } + } + + /** + * If neither of NOT NULL or CHAR/VARCHAR precision enforcers are configured, null is + * returned. + */ + public ConstraintEnforcer build() { + if (isConfigured) { + String operatorName = + "ConstraintEnforcer[" + String.join(", ", operatorNames) + "]"; + return new ConstraintEnforcer( + notNullEnforcer, + notNullFieldIndices, + charPrecisionEnforcer, + charFields != null + ? charFields.stream().mapToInt(t -> t.f0).toArray() + : null, + charFields != null + ? charFields.stream().mapToInt(t -> t.f1).toArray() + : null, + allFieldNames, + operatorName); + } + return null; + } + } + + @Override + public void processElement(StreamRecord<RowData> element) throws Exception { + StreamRecord<RowData> processedElement = processNotNullConstraint(element); + if (processedElement != null) { + processedElement = processCharConstraint(processedElement); + output.collect(processedElement); + } + } + + private StreamRecord<RowData> processNotNullConstraint(StreamRecord<RowData> element) { Review comment: nit: ```suggestion private @Nullable StreamRecord<RowData> processNotNullConstraint(StreamRecord<RowData> element) { ``` ########## File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/ConstraintEnforcer.java ########## @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.config.ExecutionConfigOptions.NotNullEnforcer; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.UpdatableRowData; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.runtime.operators.TableStreamOperator; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.table.api.config.ExecutionConfigOptions.CharPrecisionEnforcer; +import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Processes {@link RowData} to enforce the following constraints: + * + * <ul> + * <li>{@code NOT NULL} column constraint of a sink table + * <li>{@code CHAR(precision)}/@{code VARCHAR(precision)}: trim string values to comply with the + * {@code precision} defined in their corresponding types. + * </ul> + */ +@Internal +public class ConstraintEnforcer extends TableStreamOperator<RowData> + implements OneInputStreamOperator<RowData, RowData> { + + private static final long serialVersionUID = 1L; + + private final NotNullEnforcer notNullEnforcer; + private final int[] notNullFieldIndices; + private final String[] allFieldNames; + + private final CharPrecisionEnforcer charPrecisionEnforcer; + private final int[] charFieldIndices; + private final int[] charFieldPrecisions; + + private final String operatorName; + + private transient UpdatableRowData reusableRowData; + private transient StreamRecord<RowData> reusableStreamRecord; + + private ConstraintEnforcer( + NotNullEnforcer notNullEnforcer, + int[] notNullFieldIndices, + CharPrecisionEnforcer charPrecisionEnforcer, + int[] charFieldIndices, + int[] charFieldPrecisions, + String[] allFieldNames, + String operatorName) { + this.notNullEnforcer = notNullEnforcer; + this.notNullFieldIndices = notNullFieldIndices; + this.charPrecisionEnforcer = charPrecisionEnforcer; + this.charFieldIndices = charFieldIndices; + this.charFieldPrecisions = charFieldPrecisions; + this.allFieldNames = allFieldNames; + this.operatorName = operatorName; + } + + @Override + public String getOperatorName() { + return operatorName; + } + + @Override + public void open() throws Exception { + super.open(); + reusableRowData = new UpdatableRowData(null, allFieldNames.length); + reusableStreamRecord = new StreamRecord<>(null); + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Helper builder, so that the {@link ConstraintEnforcer} can be instantiated with only the NOT + * NULL constraint validation, only the CHAR/VARCHAR precision validation, or both. + */ + public static class Builder { + + private NotNullEnforcer notNullEnforcer; + private int[] notNullFieldIndices; + + private CharPrecisionEnforcer charPrecisionEnforcer; + private List<Tuple2<Integer, Integer>> charFields; + private String[] allFieldNames; + + private final List<String> operatorNames = new ArrayList<>(); + + private boolean isConfigured = false; + + public void addNotNullConstraint( + NotNullEnforcer notNullEnforcer, + int[] notNullFieldIndices, + List<String> notNullFieldNames, + String[] allFieldNames) { + checkArgument( + notNullFieldIndices.length > 0, + "ConstraintValidator requires that there are not-null fields."); + this.notNullFieldIndices = notNullFieldIndices; + this.notNullEnforcer = notNullEnforcer; + this.allFieldNames = allFieldNames; + if (notNullEnforcer != null) { + operatorNames.add( + String.format( + "NotNullEnforcer(fields=[%s])", + String.join(", ", notNullFieldNames))); + this.isConfigured = true; + } + } + + public void addCharPrecisionConstraint( + CharPrecisionEnforcer charPrecisionEnforcer, + List<Tuple2<Integer, Integer>> charFields, + List<String> charFieldNames, + String[] allFieldNames) { + this.charPrecisionEnforcer = charPrecisionEnforcer; + if (this.charPrecisionEnforcer == CharPrecisionEnforcer.TRIM) { + checkArgument( + charFields.size() > 0, + "ConstraintValidator requires that there are CHAR/VARCHAR fields."); + this.charFields = charFields; + this.allFieldNames = allFieldNames; + + operatorNames.add( + String.format( + "CharPrecisionEnforcer(fields=[%s])", + String.join(", ", charFieldNames))); + this.isConfigured = true; + } + } + + /** + * If neither of NOT NULL or CHAR/VARCHAR precision enforcers are configured, null is + * returned. + */ + public ConstraintEnforcer build() { + if (isConfigured) { + String operatorName = + "ConstraintEnforcer[" + String.join(", ", operatorNames) + "]"; + return new ConstraintEnforcer( + notNullEnforcer, + notNullFieldIndices, + charPrecisionEnforcer, + charFields != null + ? charFields.stream().mapToInt(t -> t.f0).toArray() + : null, + charFields != null + ? charFields.stream().mapToInt(t -> t.f1).toArray() + : null, + allFieldNames, + operatorName); + } + return null; + } + } + + @Override + public void processElement(StreamRecord<RowData> element) throws Exception { + StreamRecord<RowData> processedElement = processNotNullConstraint(element); + if (processedElement != null) { + processedElement = processCharConstraint(processedElement); + output.collect(processedElement); + } + } + + private StreamRecord<RowData> processNotNullConstraint(StreamRecord<RowData> element) { + if (notNullEnforcer == null) { + return element; + } + + final RowData rowData = element.getValue(); + + for (int index : notNullFieldIndices) { + if (rowData.isNullAt(index)) { + switch (notNullEnforcer) { + case ERROR: + throw new TableException( + String.format( + "Column '%s' is NOT NULL, however, a null value is being written into it. " + + "You can set job configuration '%s'='drop' " Review comment: nit: also format the value to set into here using the enum -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org