Hi Flavio, We would recommend you to use new table source & sink interfaces, which have different property keys compared to the old ones, e.g. 'connector' v.s. 'connector.type'.
You can follow the 1.12 doc [1] to define your csv table, everything should work just fine. *Flink SQL> set table.dml-sync=true;* *[INFO] Session property has been set.* *Flink SQL> select * from csv;* *+----------------------+----------------------+* *| id | name |* *+----------------------+----------------------+* *| 3 | c |* *+----------------------+----------------------+* *Received a total of 1 row* *Flink SQL> insert overwrite csv values(4, 'd');* *[INFO] Submitting SQL update statement to the cluster...* *[INFO] Execute statement in sync mode. Please wait for the execution finish...* *[INFO] Complete execution of the SQL update statement.* *Flink SQL> select * from csv;* *+----------------------+----------------------+* *| id | name |* *+----------------------+----------------------+* *| 4 | d |* *+----------------------+----------------------+* *Received a total of 1 row* [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html Best, Kurt On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier <pomperma...@okkam.it> wrote: > Hi Till, > since I was using the same WITH-clause both for reading and writing I > discovered that overwrite is actually supported in the Sinks, while in the > Sources an exception is thrown (I was thinking that those properties were > simply ignored). > However the quote-character is not supported in the sinks: is this a bug > or is it the intended behaviour?. > Here is a minimal example that reproduce the problem (put in the > /tmp/test.csv something like '1,hello' or '2,hi'). > > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.TableEnvironment; > > public class FlinkCsvTest { > public static void main(String[] args) throws Exception { > final EnvironmentSettings envSettings = > > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > final TableEnvironment tableEnv = TableEnvironment.create(envSettings); > // ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > // BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env); > final String tableInName = "testTableIn"; > final String createInTableDdl = getSourceDdl(tableInName, > "/tmp/test.csv"); // > > final String tableOutName = "testTableOut"; > final String createOutTableDdl = getSinkDdl(tableOutName, > "/tmp/test-out.csv"); // > tableEnv.executeSql(createInTableDdl); > tableEnv.executeSql(createOutTableDdl); > > Table tableIn = tableEnv.from(tableInName); > Table tableOut = tableEnv.from(tableOutName); > tableIn.insertInto(tableOutName); > // tableEnv.toDataSet(table, Row.class).print(); > tableEnv.execute("TEST read/write"); > > } > > private static String getSourceDdl(String tableName, String filePath) { > return "CREATE TABLE " + tableName + " (\n" + // > " `id` BIGINT,\n" + // > " `name` STRING) WITH (\n" + // > " 'connector.type' = 'filesystem',\n" + // > " 'connector.property-version' = '1',\n" + // > " 'connector.path' = '" + filePath + "',\n" + // > " 'format.type' = 'csv',\n" + // > " 'format.field-delimiter' = ',',\n" + // > // " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED > " 'format.property-version' = '1',\n" + // > " 'format.quote-character' = '\"',\n" + // > " 'format.ignore-first-line' = 'false'" + // > ")"; > } > > private static String getSinkDdl(String tableName, String filePath) { > return "CREATE TABLE " + tableName + " (\n" + // > " `id` BIGINT,\n" + // > " `name` STRING) WITH (\n" + // > " 'connector.type' = 'filesystem',\n" + // > " 'connector.property-version' = '1',\n" + // > " 'connector.path' = '" + filePath + "',\n" + // > " 'format.type' = 'csv',\n" + // > " 'format.field-delimiter' = ',',\n" + // > " 'format.num-files' = '1',\n" + // > " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED (sinks only) > " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED > " 'format.property-version' = '1'\n" + // > ")"; > } > } > > Thanks for the support, > Flavio > > > On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann <trohrm...@apache.org> wrote: > >> Hi Flavio, >> >> I tried to execute the code snippet you have provided and I could not >> reproduce the problem. >> >> Concretely I am running this code: >> >> final EnvironmentSettings envSettings = EnvironmentSettings.newInstance() >> .useBlinkPlanner() >> .inStreamingMode() >> .build(); >> final TableEnvironment tableEnv = TableEnvironment.create(envSettings); >> >> tableEnv.fromValues("foobar").execute().await(); >> >> Am I missing something? Maybe you can share a minimal but fully working >> example where the problem occurs. Thanks a lot. >> >> Cheers, >> Till >> >> On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier <pomperma...@okkam.it> >> wrote: >> >>> Any help here? Moreover if I use the DataStream APIs there's no >>> left/right outer join yet..are those meant to be added in Flink 1.13 or >>> 1.14? >>> >>> On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier <pomperma...@okkam.it> >>> wrote: >>> >>>> Hi to all, >>>> I'm testing writing to a CSV using Flink 1.13 and I get the following >>>> error: >>>> >>>> The matching candidates: >>>> org.apache.flink.table.sinks.CsvBatchTableSinkFactory >>>> Unsupported property keys: >>>> format.quote-character >>>> >>>> I create the table env using this: >>>> >>>> final EnvironmentSettings envSettings = >>>> EnvironmentSettings.newInstance()// >>>> .useBlinkPlanner()// >>>> // .inBatchMode()// >>>> .inStreamingMode()// >>>> .build(); >>>> final TableEnvironment tableEnv = >>>> TableEnvironment.create(envSettings); >>>> >>>> The error is the same both with inBatchMode and inStreamingMode. >>>> Is this really not supported or am I using the wrong API? >>>> >>>> Best, >>>> Flavio >>>> >>>