wuchong commented on a change in pull request #11490: 
[FLINK-15579][table-planner-blink] UpsertStreamTableSink should work on batch 
mode
URL: https://github.com/apache/flink/pull/11490#discussion_r396492527
 
 

 ##########
 File path: 
flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSinkITCase.java
 ##########
 @@ -210,4 +228,76 @@ public void testAppend() throws Exception {
                                Row.of(20, 6, Timestamp.valueOf("1970-01-01 
00:00:00.02"))
                }, DB_URL, OUTPUT_TABLE2, new String[]{"id", "num", "ts"});
        }
+
+       @Test
+       public void testBatchUpsert() throws Exception {
+               StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
+               StreamTableEnvironment bsTableEnv = 
StreamTableEnvironment.create(bsEnv, bsSettings);
+               RowTypeInfo rt = (RowTypeInfo) Types.ROW_NAMED(new 
String[]{"NAME", "SCORE"}, Types.STRING, Types.LONG);
+               Table source = bsTableEnv.fromTableSource(new 
CollectionTableSource(generateRecords(2), rt));
+               bsTableEnv.registerTable("sourceTable", source);
+               bsTableEnv.sqlUpdate(
+                       "CREATE TABLE USER_RESULT(" +
+                               "NAME VARCHAR," +
+                               "SCORE BIGINT" +
+                               ") WITH ( " +
+                               "'connector.type' = 'jdbc'," +
+                               "'connector.url'='" + DB_URL + "'," +
+                               "'connector.table' = '" + OUTPUT_TABLE3 + "'" +
+                               ")");
+
+               bsTableEnv.sqlUpdate("insert into USER_RESULT SELECT s.NAME, 
s.SCORE " +
+                       "FROM sourceTable as s ");
+               bsTableEnv.execute("test");
+
+               check(new Row[] {
+                       Row.of("a0", 0L),
+                       Row.of("a1", 1L)
+               }, DB_URL, OUTPUT_TABLE3, new String[]{"NAME", "SCORE"});
+       }
+
+       private List<Row> generateRecords(int numRecords) {
+               int arity = 2;
+               List<Row> res = new ArrayList<>(numRecords);
+               for (long i = 0; i < numRecords; i++) {
+                       Row row = new Row(arity);
+                       row.setField(0, "a" + i);
+                       row.setField(1, i);
+                       res.add(row);
+               }
+               return res;
+       }
+
+       private static class CollectionTableSource extends 
InputFormatTableSource<Row> {
 
 Review comment:
   We should avoid creating too many testing TableSource. You can use VALUES 
instead. 
   
   ```sql
   INSERT INTO USER_RESULT
     SELECT user_name, score
     FROM (VALUES (1, 'Bob'), (22, 'Tom'), (42, 'Kim'), (42, 'Kim'), (42, 
'Kim'), (1, 'Bob'))
       AS UserCountTable(score, user_name)
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to