wuchong commented on a change in pull request #11490: [FLINK-15579][table-planner-blink] UpsertStreamTableSink should work on batch mode URL: https://github.com/apache/flink/pull/11490#discussion_r396492527
########## File path: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSinkITCase.java ########## @@ -210,4 +228,76 @@ public void testAppend() throws Exception { Row.of(20, 6, Timestamp.valueOf("1970-01-01 00:00:00.02")) }, DB_URL, OUTPUT_TABLE2, new String[]{"id", "num", "ts"}); } + + @Test + public void testBatchUpsert() throws Exception { + StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); + StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); + RowTypeInfo rt = (RowTypeInfo) Types.ROW_NAMED(new String[]{"NAME", "SCORE"}, Types.STRING, Types.LONG); + Table source = bsTableEnv.fromTableSource(new CollectionTableSource(generateRecords(2), rt)); + bsTableEnv.registerTable("sourceTable", source); + bsTableEnv.sqlUpdate( + "CREATE TABLE USER_RESULT(" + + "NAME VARCHAR," + + "SCORE BIGINT" + + ") WITH ( " + + "'connector.type' = 'jdbc'," + + "'connector.url'='" + DB_URL + "'," + + "'connector.table' = '" + OUTPUT_TABLE3 + "'" + + ")"); + + bsTableEnv.sqlUpdate("insert into USER_RESULT SELECT s.NAME, s.SCORE " + + "FROM sourceTable as s "); + bsTableEnv.execute("test"); + + check(new Row[] { + Row.of("a0", 0L), + Row.of("a1", 1L) + }, DB_URL, OUTPUT_TABLE3, new String[]{"NAME", "SCORE"}); + } + + private List<Row> generateRecords(int numRecords) { + int arity = 2; + List<Row> res = new ArrayList<>(numRecords); + for (long i = 0; i < numRecords; i++) { + Row row = new Row(arity); + row.setField(0, "a" + i); + row.setField(1, i); + res.add(row); + } + return res; + } + + private static class CollectionTableSource extends InputFormatTableSource<Row> { Review comment: We should avoid creating too many testing TableSource. You can use VALUES instead. ```sql INSERT INTO USER_RESULT SELECT user_name, score FROM (VALUES (1, 'Bob'), (22, 'Tom'), (42, 'Kim'), (42, 'Kim'), (42, 'Kim'), (1, 'Bob')) AS UserCountTable(score, user_name) ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services