danny0405 commented on a change in pull request #11711: 
[FLINK-17103][table-planner-blink] Supports dynamic options for Blink…
URL: https://github.com/apache/flink/pull/11711#discussion_r408587966
 
 

 ##########
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
 ##########
 @@ -250,6 +254,71 @@ class CatalogTableITCase(isStreamingMode: Boolean) 
extends AbstractTestBase {
     assertEquals(expected, FileUtils.readFileUtf8(new File(new 
URI(sinkFilePath))))
   }
 
+  @Test
+  def testReadWriteCsvWithDynamicTableOptions(): Unit = {
+    val csvRecords = Seq(
+      "2.02,Euro,2019-12-12 00:00:01.001001",
+      "1.11,US Dollar,2019-12-12 00:00:02.002001",
+      "50,Yen,2019-12-12 00:00:04.004001",
+      "3.1,Euro,2019-12-12 00:00:05.005001",
+      "5.33,US Dollar,2019-12-12 00:00:06.006001"
+    )
+    val tempFilePath = createTempFile(
+      "csv-order-test",
+      csvRecords.mkString("#"))
+    val sourceDDL =
+      s"""
+         |CREATE TABLE T1 (
+         |  price DECIMAL(10, 2),
+         |  currency STRING,
+         |  ts6 TIMESTAMP(6),
+         |  ts AS CAST(ts6 AS TIMESTAMP(3)),
+         |  WATERMARK FOR ts AS ts
+         |) WITH (
+         |  'connector.type' = 'filesystem',
+         |  'connector.path' = '$tempFilePath',
+         |  'format.type' = 'csv',
+         |  'format.field-delimiter' = ','
+         |)
+     """.stripMargin
+    tableEnv.sqlUpdate(sourceDDL)
+
+    val sinkFilePath = getTempFilePath("csv-order-sink")
+    val sinkDDL =
+      s"""
+         |CREATE TABLE T2 (
+         |  window_end TIMESTAMP(3),
+         |  max_ts TIMESTAMP(6),
+         |  counter BIGINT,
+         |  total_price DECIMAL(10, 2)
+         |) with (
+         |  'connector.type' = 'filesystem',
+         |  'connector.path' = '$sinkFilePath',
+         |  'format.type' = 'csv'
+         |)
+      """.stripMargin
+    tableEnv.sqlUpdate(sinkDDL)
+
+    val query =
+      """
+        |INSERT INTO T2 /*+ OPTIONS('format.field-delimiter' = ',') */
 
 Review comment:
   Agree, thanks.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to