Airblader commented on a change in pull request #17811:
URL: https://github.com/apache/flink/pull/17811#discussion_r756662813



##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
##########
@@ -381,11 +394,44 @@
     // 
------------------------------------------------------------------------------------------
 
     /** The enforcer to guarantee NOT NULL column constraint when writing data 
into sink. */
-    public enum NotNullEnforcer {
-        /** Throws runtime exception when writing null values into NOT NULL 
column. */
-        ERROR,
-        /** Drop records when writing null values into NOT NULL column. */
-        DROP
+    public enum NotNullEnforcer implements DescribedEnum {
+        ERROR(text("Throws runtime exception when writing null values into NOT 
NULL column.")),
+        DROP(text("Drops records when writing null values into NOT NULL 
column."));

Review comment:
       ```suggestion
           DROP(text("Drop records silently if a null value would have to be 
inserted into a NOT NULL column."));
   ```

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
##########
@@ -114,12 +116,23 @@
                     .enumType(NotNullEnforcer.class)
                     .defaultValue(NotNullEnforcer.ERROR)
                     .withDescription(
-                            "The NOT NULL column constraint on a table 
enforces that "
-                                    + "null values can't be inserted into the 
table. Flink supports "
-                                    + "'error' (default) and 'drop' 
enforcement behavior. By default, "
-                                    + "Flink will check values and throw 
runtime exception when null values writing "
-                                    + "into NOT NULL columns. Users can change 
the behavior to 'drop' to "
-                                    + "silently drop such records without 
throwing exception.");
+                            "Determines whether the NOT NULL column constraint 
on a table enforces that "
+                                    + "null values can't be inserted into the 
table. By default, "
+                                    + "Flink will check values and throw 
runtime exception when attempting "
+                                    + "to write null values writing into NOT 
NULL columns. Users can change "
+                                    + "the behavior to silently drop such 
records without throwing exception.");

Review comment:
       This entire paragraph just re-states the description of the enum. I 
think we can simplify this to something like
   
   ```suggestion
                               "Determines how Flink enforces NOT NULL column 
constraints when inserting null values.");
   ```

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
##########
@@ -381,11 +394,44 @@
     // 
------------------------------------------------------------------------------------------
 
     /** The enforcer to guarantee NOT NULL column constraint when writing data 
into sink. */
-    public enum NotNullEnforcer {
-        /** Throws runtime exception when writing null values into NOT NULL 
column. */
-        ERROR,
-        /** Drop records when writing null values into NOT NULL column. */
-        DROP
+    public enum NotNullEnforcer implements DescribedEnum {
+        ERROR(text("Throws runtime exception when writing null values into NOT 
NULL column.")),
+        DROP(text("Drops records when writing null values into NOT NULL 
column."));
+
+        private final InlineElement description;
+
+        NotNullEnforcer(InlineElement description) {
+            this.description = description;
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return description;
+        }
+    }
+
+    /**
+     * The enforcer to guarantee that precision of CHAR/VARCHAR columns is 
respected when writing
+     * data into sink.
+     */
+    public enum CharPrecisionEnforcer implements DescribedEnum {
+        IGNORE(
+                text(
+                        "Doesn't apply any trimming, simply ignores the 
CHAR/VARCHAR precision directive.")),

Review comment:
       Just for consistent wording
   
   ```suggestion
                           "Don't apply any trimming, and instead ignore the 
CHAR/VARCHAR precision directive.")),
   ```

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
##########
@@ -187,6 +198,148 @@ public void testStreamRecordTimestampInserterNotApplied() 
{
         assertPlan(tableEnv, "INSERT INTO T1 SELECT * FROM T1", false);
     }
 
+    @Test
+    public void testCharPrecisionEnforcer() throws ExecutionException, 
InterruptedException {
+        final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
+        final List<Row> rows =
+                Arrays.asList(
+                        Row.of(1, "Apache Flink", "SQL RuleZ", 11, 111, "SQL"),
+                        Row.of(2, "Apache", "SQL", 22, 222, "Flink"),
+                        Row.of(3, "Apache", "Flink SQL", 33, 333, "Apache 
Flink SQL"),
+                        Row.of(4, "Flink Project", "SQL or SeQueL?", 44, 444, 
"Apache Flink SQL"));
+
+        final TableDescriptor sourceDescriptor =
+                TableFactoryHarness.newBuilder()
+                        .schema(schemaForCharPrecisionEnforcer())
+                        .source(new TestSource(rows))
+                        .build();
+        tableEnv.createTable("T1", sourceDescriptor);
+
+        // Default config - ignore (no trim)
+        TableResult result = tableEnv.executeSql("SELECT * FROM T1");
+        result.await();
+
+        final List<String> expected = 
rows.stream().map(Row::toString).collect(Collectors.toList());
+        final List<String> resultsAsString = new ArrayList<>();
+        result.collect().forEachRemaining(r -> 
resultsAsString.add(r.toString()));
+        assertEquals(expected, resultsAsString);
+
+        try {

Review comment:
       You had a comment above to specify that it's the default config test 
case, but no comment here. 

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
##########
@@ -161,28 +167,47 @@ public DynamicTableSinkSpec getTableSinkSpec() {
     /**
      * Apply an operator to filter or report error to process not-null values 
for not-null fields.
      */
-    private Transformation<RowData> applyNotNullEnforcer(
+    private Transformation<RowData> applyConstraintValidations(
             Transformation<RowData> inputTransform, TableConfig config, 
RowType physicalRowType) {
-        final ExecutionConfigOptions.NotNullEnforcer notNullEnforcer =
-                config.getConfiguration()
-                        
.get(ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER);
-        final int[] notNullFieldIndices = 
getNotNullFieldIndices(physicalRowType);
+        final ConstraintEnforcer.Builder validatorBuilder = 
ConstraintEnforcer.newBuilder();
         final String[] fieldNames = 
physicalRowType.getFieldNames().toArray(new String[0]);
 
+        // Build NOT NULL enforcer
+        final int[] notNullFieldIndices = 
getNotNullFieldIndices(physicalRowType);
         if (notNullFieldIndices.length > 0) {
-            final SinkNotNullEnforcer enforcer =
-                    new SinkNotNullEnforcer(notNullEnforcer, 
notNullFieldIndices, fieldNames);
+            final ExecutionConfigOptions.NotNullEnforcer notNullEnforcer =
+                    config.getConfiguration()
+                            
.get(ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER);
             final List<String> notNullFieldNames =
                     Arrays.stream(notNullFieldIndices)
                             .mapToObj(idx -> fieldNames[idx])
                             .collect(Collectors.toList());
-            final String operatorName =
-                    String.format(
-                            "NotNullEnforcer(fields=[%s])", String.join(", ", 
notNullFieldNames));
+
+            validatorBuilder.addNotNullConstraint(
+                    notNullEnforcer, notNullFieldIndices, notNullFieldNames, 
fieldNames);
+        }
+
+        // Build CHAR/VARCHAR precision enforcer
+        final List<Tuple2<Integer, Integer>> charFields = 
getCharFieldIndices(physicalRowType);
+        if (charFields.size() > 0) {

Review comment:
       nit
   
   ```suggestion
           if (!charFields.isEmpty()) {
   ```

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
##########
@@ -114,12 +116,23 @@
                     .enumType(NotNullEnforcer.class)
                     .defaultValue(NotNullEnforcer.ERROR)
                     .withDescription(
-                            "The NOT NULL column constraint on a table 
enforces that "
-                                    + "null values can't be inserted into the 
table. Flink supports "
-                                    + "'error' (default) and 'drop' 
enforcement behavior. By default, "
-                                    + "Flink will check values and throw 
runtime exception when null values writing "
-                                    + "into NOT NULL columns. Users can change 
the behavior to 'drop' to "
-                                    + "silently drop such records without 
throwing exception.");
+                            "Determines whether the NOT NULL column constraint 
on a table enforces that "
+                                    + "null values can't be inserted into the 
table. By default, "
+                                    + "Flink will check values and throw 
runtime exception when attempting "
+                                    + "to write null values writing into NOT 
NULL columns. Users can change "
+                                    + "the behavior to silently drop such 
records without throwing exception.");
+
+    @Documentation.TableOption(execMode = 
Documentation.ExecMode.BATCH_STREAMING)
+    public static final ConfigOption<CharPrecisionEnforcer>
+            TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER =
+                    key("table.exec.sink.char-precision-enforcer")
+                            .enumType(CharPrecisionEnforcer.class)
+                            .defaultValue(CharPrecisionEnforcer.IGNORE)
+                            .withDescription(
+                                    "Determines whether string values for 
columns with CHAR(<precision>)/VARCHAR(<precision>) "
+                                            + "types will be trimmed, so that 
their length will match the one defined by the "
+                                            + "precision of their respective 
CHAR/VARCHAR column type. By default, no trimming "
+                                            + "is applied.");

Review comment:
       The default value is already documented.
   
   ```suggestion
                                               + "precision of their respective 
CHAR/VARCHAR column type.");
   ```

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
##########
@@ -197,6 +222,23 @@ public DynamicTableSinkSpec getTableSinkSpec() {
                 .toArray();
     }
 
+    /**
+     * Returns a long[], each long element holds 2 integers, the char field 
idx and its precision.
+     */
+    private List<Tuple2<Integer, Integer>> getCharFieldIndices(RowType 
physicalType) {
+        ArrayList<Tuple2<Integer, Integer>> charFieldsAndLengths = new 
ArrayList<>();

Review comment:
       Use interface for the type
   
   ```suggestion
           final List<Tuple2<Integer, Integer>> charFieldsAndLengths = new 
ArrayList<>();
   ```

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
##########
@@ -187,6 +198,148 @@ public void testStreamRecordTimestampInserterNotApplied() 
{
         assertPlan(tableEnv, "INSERT INTO T1 SELECT * FROM T1", false);
     }
 
+    @Test
+    public void testCharPrecisionEnforcer() throws ExecutionException, 
InterruptedException {
+        final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
+        final List<Row> rows =
+                Arrays.asList(
+                        Row.of(1, "Apache Flink", "SQL RuleZ", 11, 111, "SQL"),
+                        Row.of(2, "Apache", "SQL", 22, 222, "Flink"),
+                        Row.of(3, "Apache", "Flink SQL", 33, 333, "Apache 
Flink SQL"),
+                        Row.of(4, "Flink Project", "SQL or SeQueL?", 44, 444, 
"Apache Flink SQL"));
+
+        final TableDescriptor sourceDescriptor =
+                TableFactoryHarness.newBuilder()
+                        .schema(schemaForCharPrecisionEnforcer())
+                        .source(new TestSource(rows))
+                        .build();
+        tableEnv.createTable("T1", sourceDescriptor);
+
+        // Default config - ignore (no trim)
+        TableResult result = tableEnv.executeSql("SELECT * FROM T1");
+        result.await();
+
+        final List<String> expected = 
rows.stream().map(Row::toString).collect(Collectors.toList());
+        final List<String> resultsAsString = new ArrayList<>();
+        result.collect().forEachRemaining(r -> 
resultsAsString.add(r.toString()));
+        assertEquals(expected, resultsAsString);
+
+        try {
+            tableEnv.getConfig()
+                    .getConfiguration()
+                    .setString(
+                            TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER.key(),
+                            
ExecutionConfigOptions.CharPrecisionEnforcer.TRIM.name());
+
+            result = tableEnv.executeSql("SELECT * FROM T1");
+            result.await();
+
+            final List<String> expectedTrimmed =
+                    Arrays.asList(
+                            "+I[1, Apache F, SQL R, 11, 111, SQL]",
+                            "+I[2, Apache, SQL, 22, 222, Flink]",
+                            "+I[3, Apache, Flink, 33, 333, Apache]",
+                            "+I[4, Flink Pr, SQL o, 44, 444, Apache]");
+            final List<String> resultsAsStringStrimmed = new ArrayList<>();
+            result.collect().forEachRemaining(r -> 
resultsAsStringStrimmed.add(r.toString()));
+            assertEquals(expectedTrimmed, resultsAsStringStrimmed);
+
+        } finally {
+            tableEnv.getConfig()
+                    .getConfiguration()
+                    .setString(
+                            TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER.key(),
+                            
ExecutionConfigOptions.CharPrecisionEnforcer.IGNORE.name());
+        }
+    }
+
+    @Test
+    public void testNullEnforcer() throws ExecutionException, 
InterruptedException {
+        final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
+        final List<Row> rows =
+                Arrays.asList(
+                        Row.of(1, "Apache", 11),
+                        Row.of(2, null, 22),
+                        Row.of(null, "Flink", 33),
+                        Row.of(null, null, 44));
+
+        final SharedReference<List<RowData>> results = sharedObjects.add(new 
ArrayList<>());
+        tableEnv.createTable(
+                "T1",
+                TableFactoryHarness.newBuilder()
+                        .schema(schemaForNotNullEnforcer())
+                        .source(new TestSource(rows))
+                        .sink(
+                                new TableFactoryHarness.SinkBase() {
+                                    @Override
+                                    public DynamicTableSink.SinkRuntimeProvider
+                                            getSinkRuntimeProvider(
+                                                    DynamicTableSink.Context 
context) {
+                                        return SinkProvider.of(
+                                                TestSink.newBuilder()
+                                                        .setWriter(new 
TestNotNullWriter(results))
+                                                        
.setCommittableSerializer(
+                                                                
TestSink.StringCommittableSerializer
+                                                                        
.INSTANCE)
+                                                        .build());
+                                    }
+                                })
+                        .build());
+
+        // Default config - ignore (no trim)
+        ExecutionException ee =

Review comment:
       (This would / will be nicer with AssertJā€¦ :upside_down_face: )

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
##########
@@ -381,11 +394,44 @@
     // 
------------------------------------------------------------------------------------------
 
     /** The enforcer to guarantee NOT NULL column constraint when writing data 
into sink. */
-    public enum NotNullEnforcer {
-        /** Throws runtime exception when writing null values into NOT NULL 
column. */
-        ERROR,
-        /** Drop records when writing null values into NOT NULL column. */
-        DROP
+    public enum NotNullEnforcer implements DescribedEnum {
+        ERROR(text("Throws runtime exception when writing null values into NOT 
NULL column.")),
+        DROP(text("Drops records when writing null values into NOT NULL 
column."));
+
+        private final InlineElement description;
+
+        NotNullEnforcer(InlineElement description) {
+            this.description = description;
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return description;
+        }
+    }
+
+    /**
+     * The enforcer to guarantee that precision of CHAR/VARCHAR columns is 
respected when writing
+     * data into sink.
+     */
+    public enum CharPrecisionEnforcer implements DescribedEnum {
+        IGNORE(
+                text(
+                        "Doesn't apply any trimming, simply ignores the 
CHAR/VARCHAR precision directive.")),
+        TRIM(
+                text(
+                        "Trims string values to match the length defined as 
the CHAR/VARCHAR precision."));

Review comment:
       Just for consistent wording
   
   ```suggestion
                           "Trim string values to match the length defined by 
the CHAR/VARCHAR precision."));
   ```

##########
File path: 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/UpdatableRowData.java
##########
@@ -17,23 +17,33 @@
 
 package org.apache.flink.table.data;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.data.binary.TypedSetters;
 import org.apache.flink.types.RowKind;
 
+import java.util.BitSet;
+
 /**
  * An implementation of {@link RowData} which is backed by a {@link RowData} 
and an updated Java
  * object array.
  */
+@Internal
 public final class UpdatableRowData implements RowData, TypedSetters {
 
     private RowData row;
     private final Object[] fields;
-    private final boolean[] updated;
+    private final BitSet updated;

Review comment:
       I wonder about the performance or memory impact of this. At least from a 
quick search it seems to be OK performance-wise and even beneficial memory-wise 
(https://www.baeldung.com/java-boolean-array-bitset-performance)

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
##########
@@ -381,11 +394,44 @@
     // 
------------------------------------------------------------------------------------------
 
     /** The enforcer to guarantee NOT NULL column constraint when writing data 
into sink. */
-    public enum NotNullEnforcer {
-        /** Throws runtime exception when writing null values into NOT NULL 
column. */
-        ERROR,
-        /** Drop records when writing null values into NOT NULL column. */
-        DROP
+    public enum NotNullEnforcer implements DescribedEnum {
+        ERROR(text("Throws runtime exception when writing null values into NOT 
NULL column.")),

Review comment:
       ```suggestion
           ERROR(text("Throw a runtime exception when writing null values into 
a NOT NULL column.")),
   ```

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
##########
@@ -187,6 +198,148 @@ public void testStreamRecordTimestampInserterNotApplied() 
{
         assertPlan(tableEnv, "INSERT INTO T1 SELECT * FROM T1", false);
     }
 
+    @Test
+    public void testCharPrecisionEnforcer() throws ExecutionException, 
InterruptedException {
+        final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
+        final List<Row> rows =
+                Arrays.asList(
+                        Row.of(1, "Apache Flink", "SQL RuleZ", 11, 111, "SQL"),
+                        Row.of(2, "Apache", "SQL", 22, 222, "Flink"),
+                        Row.of(3, "Apache", "Flink SQL", 33, 333, "Apache 
Flink SQL"),
+                        Row.of(4, "Flink Project", "SQL or SeQueL?", 44, 444, 
"Apache Flink SQL"));
+
+        final TableDescriptor sourceDescriptor =
+                TableFactoryHarness.newBuilder()
+                        .schema(schemaForCharPrecisionEnforcer())
+                        .source(new TestSource(rows))
+                        .build();
+        tableEnv.createTable("T1", sourceDescriptor);
+
+        // Default config - ignore (no trim)
+        TableResult result = tableEnv.executeSql("SELECT * FROM T1");
+        result.await();
+
+        final List<String> expected = 
rows.stream().map(Row::toString).collect(Collectors.toList());
+        final List<String> resultsAsString = new ArrayList<>();
+        result.collect().forEachRemaining(r -> 
resultsAsString.add(r.toString()));
+        assertEquals(expected, resultsAsString);
+
+        try {
+            tableEnv.getConfig()
+                    .getConfiguration()
+                    .setString(
+                            TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER.key(),
+                            
ExecutionConfigOptions.CharPrecisionEnforcer.TRIM.name());
+
+            result = tableEnv.executeSql("SELECT * FROM T1");
+            result.await();
+
+            final List<String> expectedTrimmed =
+                    Arrays.asList(
+                            "+I[1, Apache F, SQL R, 11, 111, SQL]",
+                            "+I[2, Apache, SQL, 22, 222, Flink]",
+                            "+I[3, Apache, Flink, 33, 333, Apache]",
+                            "+I[4, Flink Pr, SQL o, 44, 444, Apache]");
+            final List<String> resultsAsStringStrimmed = new ArrayList<>();
+            result.collect().forEachRemaining(r -> 
resultsAsStringStrimmed.add(r.toString()));
+            assertEquals(expectedTrimmed, resultsAsStringStrimmed);
+
+        } finally {
+            tableEnv.getConfig()
+                    .getConfiguration()
+                    .setString(
+                            TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER.key(),
+                            
ExecutionConfigOptions.CharPrecisionEnforcer.IGNORE.name());
+        }

Review comment:
       This feels pretty unstable. :-( If we need to modify the table 
environment in a test case, it shouldn't be re-used across test cases, IMO, 
otherwise this can introduce hard to find test instabilities.

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
##########
@@ -187,6 +198,148 @@ public void testStreamRecordTimestampInserterNotApplied() 
{
         assertPlan(tableEnv, "INSERT INTO T1 SELECT * FROM T1", false);
     }
 
+    @Test
+    public void testCharPrecisionEnforcer() throws ExecutionException, 
InterruptedException {
+        final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
+        final List<Row> rows =
+                Arrays.asList(
+                        Row.of(1, "Apache Flink", "SQL RuleZ", 11, 111, "SQL"),
+                        Row.of(2, "Apache", "SQL", 22, 222, "Flink"),
+                        Row.of(3, "Apache", "Flink SQL", 33, 333, "Apache 
Flink SQL"),
+                        Row.of(4, "Flink Project", "SQL or SeQueL?", 44, 444, 
"Apache Flink SQL"));
+
+        final TableDescriptor sourceDescriptor =
+                TableFactoryHarness.newBuilder()
+                        .schema(schemaForCharPrecisionEnforcer())
+                        .source(new TestSource(rows))
+                        .build();
+        tableEnv.createTable("T1", sourceDescriptor);
+
+        // Default config - ignore (no trim)
+        TableResult result = tableEnv.executeSql("SELECT * FROM T1");
+        result.await();
+
+        final List<String> expected = 
rows.stream().map(Row::toString).collect(Collectors.toList());
+        final List<String> resultsAsString = new ArrayList<>();
+        result.collect().forEachRemaining(r -> 
resultsAsString.add(r.toString()));
+        assertEquals(expected, resultsAsString);
+
+        try {
+            tableEnv.getConfig()
+                    .getConfiguration()
+                    .setString(
+                            TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER.key(),
+                            
ExecutionConfigOptions.CharPrecisionEnforcer.TRIM.name());
+
+            result = tableEnv.executeSql("SELECT * FROM T1");
+            result.await();
+
+            final List<String> expectedTrimmed =
+                    Arrays.asList(
+                            "+I[1, Apache F, SQL R, 11, 111, SQL]",
+                            "+I[2, Apache, SQL, 22, 222, Flink]",
+                            "+I[3, Apache, Flink, 33, 333, Apache]",
+                            "+I[4, Flink Pr, SQL o, 44, 444, Apache]");
+            final List<String> resultsAsStringStrimmed = new ArrayList<>();
+            result.collect().forEachRemaining(r -> 
resultsAsStringStrimmed.add(r.toString()));
+            assertEquals(expectedTrimmed, resultsAsStringStrimmed);
+
+        } finally {
+            tableEnv.getConfig()
+                    .getConfiguration()
+                    .setString(
+                            TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER.key(),
+                            
ExecutionConfigOptions.CharPrecisionEnforcer.IGNORE.name());
+        }
+    }
+
+    @Test
+    public void testNullEnforcer() throws ExecutionException, 
InterruptedException {
+        final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
+        final List<Row> rows =
+                Arrays.asList(
+                        Row.of(1, "Apache", 11),
+                        Row.of(2, null, 22),
+                        Row.of(null, "Flink", 33),
+                        Row.of(null, null, 44));
+
+        final SharedReference<List<RowData>> results = sharedObjects.add(new 
ArrayList<>());
+        tableEnv.createTable(
+                "T1",
+                TableFactoryHarness.newBuilder()
+                        .schema(schemaForNotNullEnforcer())
+                        .source(new TestSource(rows))
+                        .sink(
+                                new TableFactoryHarness.SinkBase() {
+                                    @Override
+                                    public DynamicTableSink.SinkRuntimeProvider
+                                            getSinkRuntimeProvider(
+                                                    DynamicTableSink.Context 
context) {
+                                        return SinkProvider.of(
+                                                TestSink.newBuilder()
+                                                        .setWriter(new 
TestNotNullWriter(results))
+                                                        
.setCommittableSerializer(
+                                                                
TestSink.StringCommittableSerializer
+                                                                        
.INSTANCE)
+                                                        .build());
+                                    }
+                                })
+                        .build());

Review comment:
       It'd be nice to extract some parts here to avoid the pyramid of doom.

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
##########
@@ -187,6 +198,148 @@ public void testStreamRecordTimestampInserterNotApplied() 
{
         assertPlan(tableEnv, "INSERT INTO T1 SELECT * FROM T1", false);
     }
 
+    @Test
+    public void testCharPrecisionEnforcer() throws ExecutionException, 
InterruptedException {
+        final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
+        final List<Row> rows =
+                Arrays.asList(
+                        Row.of(1, "Apache Flink", "SQL RuleZ", 11, 111, "SQL"),
+                        Row.of(2, "Apache", "SQL", 22, 222, "Flink"),
+                        Row.of(3, "Apache", "Flink SQL", 33, 333, "Apache 
Flink SQL"),
+                        Row.of(4, "Flink Project", "SQL or SeQueL?", 44, 444, 
"Apache Flink SQL"));
+
+        final TableDescriptor sourceDescriptor =
+                TableFactoryHarness.newBuilder()
+                        .schema(schemaForCharPrecisionEnforcer())
+                        .source(new TestSource(rows))
+                        .build();
+        tableEnv.createTable("T1", sourceDescriptor);
+
+        // Default config - ignore (no trim)
+        TableResult result = tableEnv.executeSql("SELECT * FROM T1");
+        result.await();
+
+        final List<String> expected = 
rows.stream().map(Row::toString).collect(Collectors.toList());
+        final List<String> resultsAsString = new ArrayList<>();
+        result.collect().forEachRemaining(r -> 
resultsAsString.add(r.toString()));
+        assertEquals(expected, resultsAsString);

Review comment:
       We should compare this regardless of order for stability. Also, I wonder 
if we can't extract this entire block into a reusable piece?

##########
File path: 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/ConstraintEnforcer.java
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.TableException;
+import 
org.apache.flink.table.api.config.ExecutionConfigOptions.NotNullEnforcer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.CharPrecisionEnforcer;
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Processes {@link RowData} to enforce the following constraints:
+ *
+ * <ul>
+ *   <li>{@code NOT NULL} column constraint of a sink table
+ *   <li>{@code CHAR(precision)}/@{code VARCHAR(precision)}: trim string 
values to comply with the
+ *       {@code precision} defined in their corresponding types.
+ * </ul>
+ */
+@Internal
+public class ConstraintEnforcer extends TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final NotNullEnforcer notNullEnforcer;
+    private final int[] notNullFieldIndices;
+    private final String[] allFieldNames;
+
+    private final CharPrecisionEnforcer charPrecisionEnforcer;
+    private final int[] charFieldIndices;
+    private final int[] charFieldPrecisions;
+
+    private final String operatorName;
+
+    private transient UpdatableRowData reusableRowData;
+    private transient StreamRecord<RowData> reusableStreamRecord;
+
+    private ConstraintEnforcer(
+            NotNullEnforcer notNullEnforcer,
+            int[] notNullFieldIndices,
+            CharPrecisionEnforcer charPrecisionEnforcer,
+            int[] charFieldIndices,
+            int[] charFieldPrecisions,
+            String[] allFieldNames,
+            String operatorName) {
+        this.notNullEnforcer = notNullEnforcer;
+        this.notNullFieldIndices = notNullFieldIndices;
+        this.charPrecisionEnforcer = charPrecisionEnforcer;
+        this.charFieldIndices = charFieldIndices;
+        this.charFieldPrecisions = charFieldPrecisions;
+        this.allFieldNames = allFieldNames;
+        this.operatorName = operatorName;
+    }
+
+    @Override
+    public String getOperatorName() {
+        return operatorName;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        reusableRowData = new UpdatableRowData(null, allFieldNames.length);
+        reusableStreamRecord = new StreamRecord<>(null);
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Helper builder, so that the {@link ConstraintEnforcer} can be 
instantiated with only the NOT
+     * NULL constraint validation, only the CHAR/VARCHAR precision validation, 
or both.
+     */
+    public static class Builder {
+
+        private NotNullEnforcer notNullEnforcer;
+        private int[] notNullFieldIndices;
+
+        private CharPrecisionEnforcer charPrecisionEnforcer;
+        private List<Tuple2<Integer, Integer>> charFields;
+        private String[] allFieldNames;
+
+        private final List<String> operatorNames = new ArrayList<>();
+
+        private boolean isConfigured = false;
+
+        public void addNotNullConstraint(
+                NotNullEnforcer notNullEnforcer,
+                int[] notNullFieldIndices,
+                List<String> notNullFieldNames,
+                String[] allFieldNames) {
+            checkArgument(
+                    notNullFieldIndices.length > 0,
+                    "ConstraintValidator requires that there are not-null 
fields.");
+            this.notNullFieldIndices = notNullFieldIndices;
+            this.notNullEnforcer = notNullEnforcer;
+            this.allFieldNames = allFieldNames;
+            if (notNullEnforcer != null) {
+                operatorNames.add(
+                        String.format(
+                                "NotNullEnforcer(fields=[%s])",
+                                String.join(", ", notNullFieldNames)));
+                this.isConfigured = true;
+            }
+        }
+
+        public void addCharPrecisionConstraint(
+                CharPrecisionEnforcer charPrecisionEnforcer,
+                List<Tuple2<Integer, Integer>> charFields,
+                List<String> charFieldNames,
+                String[] allFieldNames) {
+            this.charPrecisionEnforcer = charPrecisionEnforcer;
+            if (this.charPrecisionEnforcer == CharPrecisionEnforcer.TRIM) {
+                checkArgument(
+                        charFields.size() > 0,
+                        "ConstraintValidator requires that there are 
CHAR/VARCHAR fields.");
+                this.charFields = charFields;
+                this.allFieldNames = allFieldNames;
+
+                operatorNames.add(
+                        String.format(
+                                "CharPrecisionEnforcer(fields=[%s])",
+                                String.join(", ", charFieldNames)));
+                this.isConfigured = true;
+            }
+        }
+
+        /**
+         * If neither of NOT NULL or CHAR/VARCHAR precision enforcers are 
configured, null is
+         * returned.
+         */
+        public ConstraintEnforcer build() {
+            if (isConfigured) {
+                String operatorName =
+                        "ConstraintEnforcer[" + String.join(", ", 
operatorNames) + "]";
+                return new ConstraintEnforcer(
+                        notNullEnforcer,
+                        notNullFieldIndices,
+                        charPrecisionEnforcer,
+                        charFields != null
+                                ? charFields.stream().mapToInt(t -> 
t.f0).toArray()
+                                : null,
+                        charFields != null
+                                ? charFields.stream().mapToInt(t -> 
t.f1).toArray()
+                                : null,
+                        allFieldNames,
+                        operatorName);
+            }
+            return null;
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception 
{
+        StreamRecord<RowData> processedElement = 
processNotNullConstraint(element);
+        if (processedElement != null) {
+            processedElement = processCharConstraint(processedElement);
+            output.collect(processedElement);
+        }
+    }
+
+    private StreamRecord<RowData> 
processNotNullConstraint(StreamRecord<RowData> element) {

Review comment:
       nit:
   
   ```suggestion
       private @Nullable StreamRecord<RowData> 
processNotNullConstraint(StreamRecord<RowData> element) {
   ```

##########
File path: 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/ConstraintEnforcer.java
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.TableException;
+import 
org.apache.flink.table.api.config.ExecutionConfigOptions.NotNullEnforcer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.CharPrecisionEnforcer;
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Processes {@link RowData} to enforce the following constraints:
+ *
+ * <ul>
+ *   <li>{@code NOT NULL} column constraint of a sink table
+ *   <li>{@code CHAR(precision)}/@{code VARCHAR(precision)}: trim string 
values to comply with the
+ *       {@code precision} defined in their corresponding types.
+ * </ul>
+ */
+@Internal
+public class ConstraintEnforcer extends TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final NotNullEnforcer notNullEnforcer;
+    private final int[] notNullFieldIndices;
+    private final String[] allFieldNames;
+
+    private final CharPrecisionEnforcer charPrecisionEnforcer;
+    private final int[] charFieldIndices;
+    private final int[] charFieldPrecisions;
+
+    private final String operatorName;
+
+    private transient UpdatableRowData reusableRowData;
+    private transient StreamRecord<RowData> reusableStreamRecord;
+
+    private ConstraintEnforcer(
+            NotNullEnforcer notNullEnforcer,
+            int[] notNullFieldIndices,
+            CharPrecisionEnforcer charPrecisionEnforcer,
+            int[] charFieldIndices,
+            int[] charFieldPrecisions,
+            String[] allFieldNames,
+            String operatorName) {
+        this.notNullEnforcer = notNullEnforcer;
+        this.notNullFieldIndices = notNullFieldIndices;
+        this.charPrecisionEnforcer = charPrecisionEnforcer;
+        this.charFieldIndices = charFieldIndices;
+        this.charFieldPrecisions = charFieldPrecisions;
+        this.allFieldNames = allFieldNames;
+        this.operatorName = operatorName;
+    }
+
+    @Override
+    public String getOperatorName() {
+        return operatorName;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        reusableRowData = new UpdatableRowData(null, allFieldNames.length);
+        reusableStreamRecord = new StreamRecord<>(null);
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Helper builder, so that the {@link ConstraintEnforcer} can be 
instantiated with only the NOT
+     * NULL constraint validation, only the CHAR/VARCHAR precision validation, 
or both.
+     */
+    public static class Builder {
+
+        private NotNullEnforcer notNullEnforcer;
+        private int[] notNullFieldIndices;
+
+        private CharPrecisionEnforcer charPrecisionEnforcer;
+        private List<Tuple2<Integer, Integer>> charFields;
+        private String[] allFieldNames;
+
+        private final List<String> operatorNames = new ArrayList<>();
+
+        private boolean isConfigured = false;
+
+        public void addNotNullConstraint(
+                NotNullEnforcer notNullEnforcer,
+                int[] notNullFieldIndices,
+                List<String> notNullFieldNames,
+                String[] allFieldNames) {
+            checkArgument(
+                    notNullFieldIndices.length > 0,
+                    "ConstraintValidator requires that there are not-null 
fields.");
+            this.notNullFieldIndices = notNullFieldIndices;
+            this.notNullEnforcer = notNullEnforcer;
+            this.allFieldNames = allFieldNames;
+            if (notNullEnforcer != null) {
+                operatorNames.add(
+                        String.format(
+                                "NotNullEnforcer(fields=[%s])",
+                                String.join(", ", notNullFieldNames)));
+                this.isConfigured = true;
+            }
+        }
+
+        public void addCharPrecisionConstraint(
+                CharPrecisionEnforcer charPrecisionEnforcer,
+                List<Tuple2<Integer, Integer>> charFields,
+                List<String> charFieldNames,
+                String[] allFieldNames) {
+            this.charPrecisionEnforcer = charPrecisionEnforcer;
+            if (this.charPrecisionEnforcer == CharPrecisionEnforcer.TRIM) {
+                checkArgument(
+                        charFields.size() > 0,
+                        "ConstraintValidator requires that there are 
CHAR/VARCHAR fields.");
+                this.charFields = charFields;
+                this.allFieldNames = allFieldNames;
+
+                operatorNames.add(
+                        String.format(
+                                "CharPrecisionEnforcer(fields=[%s])",
+                                String.join(", ", charFieldNames)));
+                this.isConfigured = true;
+            }
+        }
+
+        /**
+         * If neither of NOT NULL or CHAR/VARCHAR precision enforcers are 
configured, null is
+         * returned.
+         */
+        public ConstraintEnforcer build() {
+            if (isConfigured) {
+                String operatorName =
+                        "ConstraintEnforcer[" + String.join(", ", 
operatorNames) + "]";
+                return new ConstraintEnforcer(
+                        notNullEnforcer,
+                        notNullFieldIndices,
+                        charPrecisionEnforcer,
+                        charFields != null
+                                ? charFields.stream().mapToInt(t -> 
t.f0).toArray()
+                                : null,
+                        charFields != null
+                                ? charFields.stream().mapToInt(t -> 
t.f1).toArray()
+                                : null,
+                        allFieldNames,
+                        operatorName);
+            }
+            return null;
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception 
{
+        StreamRecord<RowData> processedElement = 
processNotNullConstraint(element);
+        if (processedElement != null) {
+            processedElement = processCharConstraint(processedElement);
+            output.collect(processedElement);
+        }
+    }
+
+    private StreamRecord<RowData> 
processNotNullConstraint(StreamRecord<RowData> element) {
+        if (notNullEnforcer == null) {
+            return element;
+        }
+
+        final RowData rowData = element.getValue();
+
+        for (int index : notNullFieldIndices) {
+            if (rowData.isNullAt(index)) {
+                switch (notNullEnforcer) {
+                    case ERROR:
+                        throw new TableException(
+                                String.format(
+                                        "Column '%s' is NOT NULL, however, a 
null value is being written into it. "
+                                                + "You can set job 
configuration '%s'='drop' "

Review comment:
       nit: also format the value to set into here using the enum




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to