And another thing: in my csv I added ',bye' (to test null Long values) but I get a parse error..if I add 'csv.null-literal' = '' it seems to work..is that the right way to solve this problem?
On Fri, Apr 9, 2021 at 10:13 AM Flavio Pompermaier <pomperma...@okkam.it> wrote: > Thanks Kurt, now it works. However I can't find a way to skip the CSV > header..before there was "format.ignore-first-line" but now I can't find > another way to skip it. > I could set csv.ignore-parse-errors to true but then I can't detect other > parsing errors, otherwise I need to manually transofrm the header into a > comment adding the # character at the start of the line.. > How can I solve that? > > On Fri, Apr 9, 2021 at 4:07 AM Kurt Young <ykt...@gmail.com> wrote: > >> My DDL is: >> >> CREATE TABLE csv ( >> id BIGINT, >> name STRING >> ) WITH ( >> 'connector' = 'filesystem', >> 'path' = '.....', >> 'format' = 'csv' >> ); >> >> Best, >> Kurt >> >> >> On Fri, Apr 9, 2021 at 10:00 AM Kurt Young <ykt...@gmail.com> wrote: >> >>> Hi Flavio, >>> >>> We would recommend you to use new table source & sink interfaces, which >>> have different >>> property keys compared to the old ones, e.g. 'connector' v.s. >>> 'connector.type'. >>> >>> You can follow the 1.12 doc [1] to define your csv table, everything >>> should work just fine. >>> >>> *Flink SQL> set table.dml-sync=true;* >>> >>> *[INFO] Session property has been set.* >>> >>> >>> *Flink SQL> select * from csv;* >>> >>> *+----------------------+----------------------+* >>> >>> *| id | name |* >>> >>> *+----------------------+----------------------+* >>> >>> *| 3 | c |* >>> >>> *+----------------------+----------------------+* >>> >>> *Received a total of 1 row* >>> >>> >>> *Flink SQL> insert overwrite csv values(4, 'd');* >>> >>> *[INFO] Submitting SQL update statement to the cluster...* >>> >>> *[INFO] Execute statement in sync mode. Please wait for the execution >>> finish...* >>> >>> *[INFO] Complete execution of the SQL update statement.* >>> >>> >>> *Flink SQL> select * from csv;* >>> >>> *+----------------------+----------------------+* >>> >>> *| id | name |* >>> >>> *+----------------------+----------------------+* >>> >>> *| 4 | d |* >>> >>> *+----------------------+----------------------+* >>> >>> *Received a total of 1 row* >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html >>> >>> Best, >>> Kurt >>> >>> >>> On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier <pomperma...@okkam.it> >>> wrote: >>> >>>> Hi Till, >>>> since I was using the same WITH-clause both for reading and writing I >>>> discovered that overwrite is actually supported in the Sinks, while in the >>>> Sources an exception is thrown (I was thinking that those properties were >>>> simply ignored). >>>> However the quote-character is not supported in the sinks: is this a >>>> bug or is it the intended behaviour?. >>>> Here is a minimal example that reproduce the problem (put in the >>>> /tmp/test.csv something like '1,hello' or '2,hi'). >>>> >>>> import org.apache.flink.table.api.EnvironmentSettings; >>>> import org.apache.flink.table.api.Table; >>>> import org.apache.flink.table.api.TableEnvironment; >>>> >>>> public class FlinkCsvTest { >>>> public static void main(String[] args) throws Exception { >>>> final EnvironmentSettings envSettings = >>>> >>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >>>> final TableEnvironment tableEnv = >>>> TableEnvironment.create(envSettings); >>>> // ExecutionEnvironment env = >>>> ExecutionEnvironment.getExecutionEnvironment(); >>>> // BatchTableEnvironment tableEnv = >>>> BatchTableEnvironment.create(env); >>>> final String tableInName = "testTableIn"; >>>> final String createInTableDdl = getSourceDdl(tableInName, >>>> "/tmp/test.csv"); // >>>> >>>> final String tableOutName = "testTableOut"; >>>> final String createOutTableDdl = getSinkDdl(tableOutName, >>>> "/tmp/test-out.csv"); // >>>> tableEnv.executeSql(createInTableDdl); >>>> tableEnv.executeSql(createOutTableDdl); >>>> >>>> Table tableIn = tableEnv.from(tableInName); >>>> Table tableOut = tableEnv.from(tableOutName); >>>> tableIn.insertInto(tableOutName); >>>> // tableEnv.toDataSet(table, Row.class).print(); >>>> tableEnv.execute("TEST read/write"); >>>> >>>> } >>>> >>>> private static String getSourceDdl(String tableName, String filePath) >>>> { >>>> return "CREATE TABLE " + tableName + " (\n" + // >>>> " `id` BIGINT,\n" + // >>>> " `name` STRING) WITH (\n" + // >>>> " 'connector.type' = 'filesystem',\n" + // >>>> " 'connector.property-version' = '1',\n" + // >>>> " 'connector.path' = '" + filePath + "',\n" + // >>>> " 'format.type' = 'csv',\n" + // >>>> " 'format.field-delimiter' = ',',\n" + // >>>> // " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED >>>> " 'format.property-version' = '1',\n" + // >>>> " 'format.quote-character' = '\"',\n" + // >>>> " 'format.ignore-first-line' = 'false'" + // >>>> ")"; >>>> } >>>> >>>> private static String getSinkDdl(String tableName, String filePath) { >>>> return "CREATE TABLE " + tableName + " (\n" + // >>>> " `id` BIGINT,\n" + // >>>> " `name` STRING) WITH (\n" + // >>>> " 'connector.type' = 'filesystem',\n" + // >>>> " 'connector.property-version' = '1',\n" + // >>>> " 'connector.path' = '" + filePath + "',\n" + // >>>> " 'format.type' = 'csv',\n" + // >>>> " 'format.field-delimiter' = ',',\n" + // >>>> " 'format.num-files' = '1',\n" + // >>>> " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED (sinks >>>> only) >>>> " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED >>>> " 'format.property-version' = '1'\n" + // >>>> ")"; >>>> } >>>> } >>>> >>>> Thanks for the support, >>>> Flavio >>>> >>>> >>>> On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann <trohrm...@apache.org> >>>> wrote: >>>> >>>>> Hi Flavio, >>>>> >>>>> I tried to execute the code snippet you have provided and I could not >>>>> reproduce the problem. >>>>> >>>>> Concretely I am running this code: >>>>> >>>>> final EnvironmentSettings envSettings = >>>>> EnvironmentSettings.newInstance() >>>>> .useBlinkPlanner() >>>>> .inStreamingMode() >>>>> .build(); >>>>> final TableEnvironment tableEnv = TableEnvironment.create(envSettings); >>>>> >>>>> tableEnv.fromValues("foobar").execute().await(); >>>>> >>>>> Am I missing something? Maybe you can share a minimal but fully >>>>> working example where the problem occurs. Thanks a lot. >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier < >>>>> pomperma...@okkam.it> wrote: >>>>> >>>>>> Any help here? Moreover if I use the DataStream APIs there's no >>>>>> left/right outer join yet..are those meant to be added in Flink 1.13 or >>>>>> 1.14? >>>>>> >>>>>> On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier < >>>>>> pomperma...@okkam.it> wrote: >>>>>> >>>>>>> Hi to all, >>>>>>> I'm testing writing to a CSV using Flink 1.13 and I get the >>>>>>> following error: >>>>>>> >>>>>>> The matching candidates: >>>>>>> org.apache.flink.table.sinks.CsvBatchTableSinkFactory >>>>>>> Unsupported property keys: >>>>>>> format.quote-character >>>>>>> >>>>>>> I create the table env using this: >>>>>>> >>>>>>> final EnvironmentSettings envSettings = >>>>>>> EnvironmentSettings.newInstance()// >>>>>>> .useBlinkPlanner()// >>>>>>> // .inBatchMode()// >>>>>>> .inStreamingMode()// >>>>>>> .build(); >>>>>>> final TableEnvironment tableEnv = >>>>>>> TableEnvironment.create(envSettings); >>>>>>> >>>>>>> The error is the same both with inBatchMode and inStreamingMode. >>>>>>> Is this really not supported or am I using the wrong API? >>>>>>> >>>>>>> Best, >>>>>>> Flavio >>>>>>> >>>>>>