matriv commented on a change in pull request #17811:
URL: https://github.com/apache/flink/pull/17811#discussion_r756727589



##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
##########
@@ -187,6 +198,148 @@ public void testStreamRecordTimestampInserterNotApplied() 
{
         assertPlan(tableEnv, "INSERT INTO T1 SELECT * FROM T1", false);
     }
 
+    @Test
+    public void testCharPrecisionEnforcer() throws ExecutionException, 
InterruptedException {
+        final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
+        final List<Row> rows =
+                Arrays.asList(
+                        Row.of(1, "Apache Flink", "SQL RuleZ", 11, 111, "SQL"),
+                        Row.of(2, "Apache", "SQL", 22, 222, "Flink"),
+                        Row.of(3, "Apache", "Flink SQL", 33, 333, "Apache 
Flink SQL"),
+                        Row.of(4, "Flink Project", "SQL or SeQueL?", 44, 444, 
"Apache Flink SQL"));
+
+        final TableDescriptor sourceDescriptor =
+                TableFactoryHarness.newBuilder()
+                        .schema(schemaForCharPrecisionEnforcer())
+                        .source(new TestSource(rows))
+                        .build();
+        tableEnv.createTable("T1", sourceDescriptor);
+
+        // Default config - ignore (no trim)
+        TableResult result = tableEnv.executeSql("SELECT * FROM T1");
+        result.await();
+
+        final List<String> expected = 
rows.stream().map(Row::toString).collect(Collectors.toList());
+        final List<String> resultsAsString = new ArrayList<>();
+        result.collect().forEachRemaining(r -> 
resultsAsString.add(r.toString()));
+        assertEquals(expected, resultsAsString);
+
+        try {
+            tableEnv.getConfig()
+                    .getConfiguration()
+                    .setString(
+                            TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER.key(),
+                            
ExecutionConfigOptions.CharPrecisionEnforcer.TRIM.name());
+
+            result = tableEnv.executeSql("SELECT * FROM T1");
+            result.await();
+
+            final List<String> expectedTrimmed =
+                    Arrays.asList(
+                            "+I[1, Apache F, SQL R, 11, 111, SQL]",
+                            "+I[2, Apache, SQL, 22, 222, Flink]",
+                            "+I[3, Apache, Flink, 33, 333, Apache]",
+                            "+I[4, Flink Pr, SQL o, 44, 444, Apache]");
+            final List<String> resultsAsStringStrimmed = new ArrayList<>();
+            result.collect().forEachRemaining(r -> 
resultsAsStringStrimmed.add(r.toString()));
+            assertEquals(expectedTrimmed, resultsAsStringStrimmed);
+
+        } finally {
+            tableEnv.getConfig()
+                    .getConfiguration()
+                    .setString(
+                            TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER.key(),
+                            
ExecutionConfigOptions.CharPrecisionEnforcer.IGNORE.name());
+        }
+    }
+
+    @Test
+    public void testNullEnforcer() throws ExecutionException, 
InterruptedException {
+        final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
+        final List<Row> rows =
+                Arrays.asList(
+                        Row.of(1, "Apache", 11),
+                        Row.of(2, null, 22),
+                        Row.of(null, "Flink", 33),
+                        Row.of(null, null, 44));
+
+        final SharedReference<List<RowData>> results = sharedObjects.add(new 
ArrayList<>());
+        tableEnv.createTable(
+                "T1",
+                TableFactoryHarness.newBuilder()
+                        .schema(schemaForNotNullEnforcer())
+                        .source(new TestSource(rows))
+                        .sink(
+                                new TableFactoryHarness.SinkBase() {
+                                    @Override
+                                    public DynamicTableSink.SinkRuntimeProvider
+                                            getSinkRuntimeProvider(
+                                                    DynamicTableSink.Context 
context) {
+                                        return SinkProvider.of(
+                                                TestSink.newBuilder()
+                                                        .setWriter(new 
TestNotNullWriter(results))
+                                                        
.setCommittableSerializer(
+                                                                
TestSink.StringCommittableSerializer
+                                                                        
.INSTANCE)
+                                                        .build());
+                                    }
+                                })
+                        .build());
+
+        // Default config - ignore (no trim)
+        ExecutionException ee =

Review comment:
       indeed!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to