matriv commented on a change in pull request #17811: URL: https://github.com/apache/flink/pull/17811#discussion_r756727589
########## File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java ########## @@ -187,6 +198,148 @@ public void testStreamRecordTimestampInserterNotApplied() { assertPlan(tableEnv, "INSERT INTO T1 SELECT * FROM T1", false); } + @Test + public void testCharPrecisionEnforcer() throws ExecutionException, InterruptedException { + final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + final List<Row> rows = + Arrays.asList( + Row.of(1, "Apache Flink", "SQL RuleZ", 11, 111, "SQL"), + Row.of(2, "Apache", "SQL", 22, 222, "Flink"), + Row.of(3, "Apache", "Flink SQL", 33, 333, "Apache Flink SQL"), + Row.of(4, "Flink Project", "SQL or SeQueL?", 44, 444, "Apache Flink SQL")); + + final TableDescriptor sourceDescriptor = + TableFactoryHarness.newBuilder() + .schema(schemaForCharPrecisionEnforcer()) + .source(new TestSource(rows)) + .build(); + tableEnv.createTable("T1", sourceDescriptor); + + // Default config - ignore (no trim) + TableResult result = tableEnv.executeSql("SELECT * FROM T1"); + result.await(); + + final List<String> expected = rows.stream().map(Row::toString).collect(Collectors.toList()); + final List<String> resultsAsString = new ArrayList<>(); + result.collect().forEachRemaining(r -> resultsAsString.add(r.toString())); + assertEquals(expected, resultsAsString); + + try { + tableEnv.getConfig() + .getConfiguration() + .setString( + TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER.key(), + ExecutionConfigOptions.CharPrecisionEnforcer.TRIM.name()); + + result = tableEnv.executeSql("SELECT * FROM T1"); + result.await(); + + final List<String> expectedTrimmed = + Arrays.asList( + "+I[1, Apache F, SQL R, 11, 111, SQL]", + "+I[2, Apache, SQL, 22, 222, Flink]", + "+I[3, Apache, Flink, 33, 333, Apache]", + "+I[4, Flink Pr, SQL o, 44, 444, Apache]"); + final List<String> resultsAsStringStrimmed = new ArrayList<>(); + result.collect().forEachRemaining(r -> resultsAsStringStrimmed.add(r.toString())); + assertEquals(expectedTrimmed, resultsAsStringStrimmed); + + } finally { + tableEnv.getConfig() + .getConfiguration() + .setString( + TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER.key(), + ExecutionConfigOptions.CharPrecisionEnforcer.IGNORE.name()); + } + } + + @Test + public void testNullEnforcer() throws ExecutionException, InterruptedException { + final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + final List<Row> rows = + Arrays.asList( + Row.of(1, "Apache", 11), + Row.of(2, null, 22), + Row.of(null, "Flink", 33), + Row.of(null, null, 44)); + + final SharedReference<List<RowData>> results = sharedObjects.add(new ArrayList<>()); + tableEnv.createTable( + "T1", + TableFactoryHarness.newBuilder() + .schema(schemaForNotNullEnforcer()) + .source(new TestSource(rows)) + .sink( + new TableFactoryHarness.SinkBase() { + @Override + public DynamicTableSink.SinkRuntimeProvider + getSinkRuntimeProvider( + DynamicTableSink.Context context) { + return SinkProvider.of( + TestSink.newBuilder() + .setWriter(new TestNotNullWriter(results)) + .setCommittableSerializer( + TestSink.StringCommittableSerializer + .INSTANCE) + .build()); + } + }) + .build()); + + // Default config - ignore (no trim) + ExecutionException ee = Review comment: indeed! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org