docete commented on a change in pull request #14977: URL: https://github.com/apache/flink/pull/14977#discussion_r584449925
########## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/TableSinkITCase.scala ########## @@ -207,4 +212,336 @@ class TableSinkITCase extends BatchTestBase { val expected = List("book,1,12", "book,4,11", "fruit,3,44") assertEquals(expected.sorted, result.sorted) } + + @Test + def testSinkWithPartitionAndComputedColumn(): Unit = { + tEnv.executeSql( + s""" + |CREATE TABLE testSink ( + | `a` INT, + | `b` AS `a` + 1, + | `c` STRING, + | `d` INT, + | `e` DOUBLE + |) + |PARTITIONED BY (`c`, `d`) + |WITH ( + | 'connector' = 'values', + | 'sink-insert-only' = 'true' + |) + |""".stripMargin) + + registerCollection("MyTable", simpleData2, simpleType2, "x, y", nullableOfSimpleData2) + + tEnv.executeSql( + s""" + |INSERT INTO testSink PARTITION(`c`='2021', `d`=1) + |SELECT x, sum(y) FROM MyTable GROUP BY x + |""".stripMargin).await() + val expected = List( + "1,2021,1,0.1", + "2,2021,1,0.4", + "3,2021,1,1.0", + "4,2021,1,2.2", + "5,2021,1,3.9") + val result = TestValuesTableFactory.getResults("testSink") + assertEquals(expected.sorted, result.sorted) + } + + @Test + def testPartialInsert(): Unit = { + tEnv.executeSql( + s""" + |CREATE TABLE testSink ( + | `a` INT, + | `b` DOUBLE + |) + |WITH ( + | 'connector' = 'values', + | 'sink-insert-only' = 'true' + |) + |""".stripMargin) + + registerCollection("MyTable", simpleData2, simpleType2, "x, y", nullableOfSimpleData2) + + tEnv.executeSql( + s""" + |INSERT INTO testSink (b) + |SELECT sum(y) FROM MyTable GROUP BY x + |""".stripMargin).await() + val expected = List( + "null,0.1", + "null,0.4", + "null,1.0", + "null,2.2", + "null,3.9") + val result = TestValuesTableFactory.getResults("testSink") + assertEquals(expected.sorted, result.sorted) + } + + @Test + def testPartialInsertWithNotNullColumn(): Unit = { + tEnv.executeSql( + s""" + |CREATE TABLE testSink ( + | `a` INT NOT NULL, + | `b` DOUBLE + |) + |WITH ( + | 'connector' = 'values', + | 'sink-insert-only' = 'true' + |) + |""".stripMargin) + + registerCollection("MyTable", simpleData2, simpleType2, "x, y", nullableOfSimpleData2) + + expectedEx.expect(classOf[ValidationException]) + expectedEx.expectMessage("Column 'a' has no default value and does not allow NULLs") + + tEnv.executeSql( + s""" + |INSERT INTO testSink (b) + |SELECT sum(y) FROM MyTable GROUP BY x + |""".stripMargin).await() + } + + @Test + def testPartialInsertWithPartitionAndComputedColumn(): Unit = { + tEnv.executeSql( + s""" + |CREATE TABLE testSink ( + | `a` INT, + | `b` AS `a` + 1, + | `c` STRING, + | `d` INT, + | `e` DOUBLE + |) + |PARTITIONED BY (`c`, `d`) + |WITH ( + | 'connector' = 'values', + | 'sink-insert-only' = 'true' + |) + |""".stripMargin) + + registerCollection("MyTable", simpleData2, simpleType2, "x, y", nullableOfSimpleData2) + + tEnv.executeSql( + s""" + |INSERT INTO testSink PARTITION(`c`='2021', `d`=1) (e) + |SELECT sum(y) FROM MyTable GROUP BY x + |""".stripMargin).await() + val expected = List( + "null,2021,1,0.1", + "null,2021,1,0.4", + "null,2021,1,1.0", + "null,2021,1,2.2", + "null,2021,1,3.9") + val result = TestValuesTableFactory.getResults("testSink") + assertEquals(expected.sorted, result.sorted) + } + + @Test + def testFullInsertWithPartitionAndComputedColumn(): Unit = { + tEnv.executeSql( + s""" + |CREATE TABLE testSink ( + | `a` INT, + | `b` AS `a` + 1, + | `c` STRING, + | `d` INT, + | `e` DOUBLE + |) + |PARTITIONED BY (`c`, `d`) + |WITH ( + | 'connector' = 'values', + | 'sink-insert-only' = 'true' + |) + |""".stripMargin) + + registerCollection("MyTable", simpleData2, simpleType2, "x, y", nullableOfSimpleData2) + + tEnv.executeSql( + s""" + |INSERT INTO testSink PARTITION(`c`='2021', `d`=1) (a, e) + |SELECT x, sum(y) FROM MyTable GROUP BY x + |""".stripMargin).await() + val expected = List( + "1,2021,1,0.1", + "2,2021,1,0.4", + "3,2021,1,1.0", + "4,2021,1,2.2", + "5,2021,1,3.9") + val result = TestValuesTableFactory.getResults("testSink") + assertEquals(expected.sorted, result.sorted) + } + + @Test + def testPartialInsertWithDynamicPartitionAndComputedColumn1(): Unit = { + tEnv.executeSql( + s""" + |CREATE TABLE testSink ( + | `a` INT, + | `b` AS `a` + 1, + | `c` STRING, + | `d` INT, + | `e` DOUBLE + |) + |PARTITIONED BY (`c`, `d`) + |WITH ( + | 'connector' = 'values', + | 'sink-insert-only' = 'true' + |) + |""".stripMargin) + + registerCollection("MyTable", simpleData2, simpleType2, "x, y", nullableOfSimpleData2) + + tEnv.executeSql( + s""" + |INSERT INTO testSink (e) + |SELECT sum(y) FROM MyTable GROUP BY x + |""".stripMargin).await() + val expected = List( + "null,null,null,0.1", + "null,null,null,0.4", + "null,null,null,1.0", + "null,null,null,2.2", + "null,null,null,3.9") + val result = TestValuesTableFactory.getResults("testSink") + assertEquals(expected.sorted, result.sorted) + } + + @Test + def testPartialInsertWithDynamicPartitionAndComputedColumn2(): Unit = { + tEnv.executeSql( + s""" + |CREATE TABLE testSink ( + | `a` INT, + | `b` AS `a` + 1, + | `c` STRING, + | `d` INT, + | `e` DOUBLE + |) + |PARTITIONED BY (`c`, `d`) + |WITH ( + | 'connector' = 'values', + | 'sink-insert-only' = 'true' + |) + |""".stripMargin) + + registerCollection("MyTable", simpleData2, simpleType2, "x, y", nullableOfSimpleData2) + + tEnv.executeSql( + s""" + |INSERT INTO testSink (c, d, e) Review comment: fixed. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org