In my real CSV I have LONG columns that can contain null values. In that case I get a parse exception (and I would like to avoid to read it as a string). The ',bye' is just the way you can test that in my example (add that line to the input csv). If I use 'csv.null-literal' = '' it seems to work but, is it a workaround or it is the right solution?
Another big problem I'm having with the new APIs is that if I use TableEnvironment tableEnv = TableEnvironment.create(envSettings); then I can't convert a table to a datastream..I need to use StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, envSettings); but in that case I can't use inBatchMode.. On Fri, Apr 9, 2021 at 11:44 AM Kurt Young <ykt...@gmail.com> wrote: > `format.ignore-first-line` is unfortunately a regression compared to the > old one. > I've created a ticket [1] to track this but according to current design, > it seems not easy to do. > > Regarding null values, I'm not sure if I understand the issue you had. > What do you mean by > using ',bye' to test null Long values? > > [1] https://issues.apache.org/jira/browse/FLINK-22178 > > Best, > Kurt > > > On Fri, Apr 9, 2021 at 4:46 PM Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> And another thing: in my csv I added ',bye' (to test null Long values) >> but I get a parse error..if I add 'csv.null-literal' = '' it seems to >> work..is that the right way to solve this problem? >> >> On Fri, Apr 9, 2021 at 10:13 AM Flavio Pompermaier <pomperma...@okkam.it> >> wrote: >> >>> Thanks Kurt, now it works. However I can't find a way to skip the CSV >>> header..before there was "format.ignore-first-line" but now I can't find >>> another way to skip it. >>> I could set csv.ignore-parse-errors to true but then I can't detect >>> other parsing errors, otherwise I need to manually transofrm the header >>> into a comment adding the # character at the start of the line.. >>> How can I solve that? >>> >>> On Fri, Apr 9, 2021 at 4:07 AM Kurt Young <ykt...@gmail.com> wrote: >>> >>>> My DDL is: >>>> >>>> CREATE TABLE csv ( >>>> id BIGINT, >>>> name STRING >>>> ) WITH ( >>>> 'connector' = 'filesystem', >>>> 'path' = '.....', >>>> 'format' = 'csv' >>>> ); >>>> >>>> Best, >>>> Kurt >>>> >>>> >>>> On Fri, Apr 9, 2021 at 10:00 AM Kurt Young <ykt...@gmail.com> wrote: >>>> >>>>> Hi Flavio, >>>>> >>>>> We would recommend you to use new table source & sink interfaces, >>>>> which have different >>>>> property keys compared to the old ones, e.g. 'connector' v.s. >>>>> 'connector.type'. >>>>> >>>>> You can follow the 1.12 doc [1] to define your csv table, everything >>>>> should work just fine. >>>>> >>>>> *Flink SQL> set table.dml-sync=true;* >>>>> >>>>> *[INFO] Session property has been set.* >>>>> >>>>> >>>>> *Flink SQL> select * from csv;* >>>>> >>>>> *+----------------------+----------------------+* >>>>> >>>>> *| id | name |* >>>>> >>>>> *+----------------------+----------------------+* >>>>> >>>>> *| 3 | c |* >>>>> >>>>> *+----------------------+----------------------+* >>>>> >>>>> *Received a total of 1 row* >>>>> >>>>> >>>>> *Flink SQL> insert overwrite csv values(4, 'd');* >>>>> >>>>> *[INFO] Submitting SQL update statement to the cluster...* >>>>> >>>>> *[INFO] Execute statement in sync mode. Please wait for the execution >>>>> finish...* >>>>> >>>>> *[INFO] Complete execution of the SQL update statement.* >>>>> >>>>> >>>>> *Flink SQL> select * from csv;* >>>>> >>>>> *+----------------------+----------------------+* >>>>> >>>>> *| id | name |* >>>>> >>>>> *+----------------------+----------------------+* >>>>> >>>>> *| 4 | d |* >>>>> >>>>> *+----------------------+----------------------+* >>>>> >>>>> *Received a total of 1 row* >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html >>>>> >>>>> Best, >>>>> Kurt >>>>> >>>>> >>>>> On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier < >>>>> pomperma...@okkam.it> wrote: >>>>> >>>>>> Hi Till, >>>>>> since I was using the same WITH-clause both for reading and writing I >>>>>> discovered that overwrite is actually supported in the Sinks, while in >>>>>> the >>>>>> Sources an exception is thrown (I was thinking that those properties were >>>>>> simply ignored). >>>>>> However the quote-character is not supported in the sinks: is this a >>>>>> bug or is it the intended behaviour?. >>>>>> Here is a minimal example that reproduce the problem (put in the >>>>>> /tmp/test.csv something like '1,hello' or '2,hi'). >>>>>> >>>>>> import org.apache.flink.table.api.EnvironmentSettings; >>>>>> import org.apache.flink.table.api.Table; >>>>>> import org.apache.flink.table.api.TableEnvironment; >>>>>> >>>>>> public class FlinkCsvTest { >>>>>> public static void main(String[] args) throws Exception { >>>>>> final EnvironmentSettings envSettings = >>>>>> >>>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >>>>>> final TableEnvironment tableEnv = >>>>>> TableEnvironment.create(envSettings); >>>>>> // ExecutionEnvironment env = >>>>>> ExecutionEnvironment.getExecutionEnvironment(); >>>>>> // BatchTableEnvironment tableEnv = >>>>>> BatchTableEnvironment.create(env); >>>>>> final String tableInName = "testTableIn"; >>>>>> final String createInTableDdl = getSourceDdl(tableInName, >>>>>> "/tmp/test.csv"); // >>>>>> >>>>>> final String tableOutName = "testTableOut"; >>>>>> final String createOutTableDdl = getSinkDdl(tableOutName, >>>>>> "/tmp/test-out.csv"); // >>>>>> tableEnv.executeSql(createInTableDdl); >>>>>> tableEnv.executeSql(createOutTableDdl); >>>>>> >>>>>> Table tableIn = tableEnv.from(tableInName); >>>>>> Table tableOut = tableEnv.from(tableOutName); >>>>>> tableIn.insertInto(tableOutName); >>>>>> // tableEnv.toDataSet(table, Row.class).print(); >>>>>> tableEnv.execute("TEST read/write"); >>>>>> >>>>>> } >>>>>> >>>>>> private static String getSourceDdl(String tableName, String >>>>>> filePath) { >>>>>> return "CREATE TABLE " + tableName + " (\n" + // >>>>>> " `id` BIGINT,\n" + // >>>>>> " `name` STRING) WITH (\n" + // >>>>>> " 'connector.type' = 'filesystem',\n" + // >>>>>> " 'connector.property-version' = '1',\n" + // >>>>>> " 'connector.path' = '" + filePath + "',\n" + // >>>>>> " 'format.type' = 'csv',\n" + // >>>>>> " 'format.field-delimiter' = ',',\n" + // >>>>>> // " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED >>>>>> " 'format.property-version' = '1',\n" + // >>>>>> " 'format.quote-character' = '\"',\n" + // >>>>>> " 'format.ignore-first-line' = 'false'" + // >>>>>> ")"; >>>>>> } >>>>>> >>>>>> private static String getSinkDdl(String tableName, String filePath) >>>>>> { >>>>>> return "CREATE TABLE " + tableName + " (\n" + // >>>>>> " `id` BIGINT,\n" + // >>>>>> " `name` STRING) WITH (\n" + // >>>>>> " 'connector.type' = 'filesystem',\n" + // >>>>>> " 'connector.property-version' = '1',\n" + // >>>>>> " 'connector.path' = '" + filePath + "',\n" + // >>>>>> " 'format.type' = 'csv',\n" + // >>>>>> " 'format.field-delimiter' = ',',\n" + // >>>>>> " 'format.num-files' = '1',\n" + // >>>>>> " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED (sinks >>>>>> only) >>>>>> " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED >>>>>> " 'format.property-version' = '1'\n" + // >>>>>> ")"; >>>>>> } >>>>>> } >>>>>> >>>>>> Thanks for the support, >>>>>> Flavio >>>>>> >>>>>> >>>>>> On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann <trohrm...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Hi Flavio, >>>>>>> >>>>>>> I tried to execute the code snippet you have provided and I could >>>>>>> not reproduce the problem. >>>>>>> >>>>>>> Concretely I am running this code: >>>>>>> >>>>>>> final EnvironmentSettings envSettings = >>>>>>> EnvironmentSettings.newInstance() >>>>>>> .useBlinkPlanner() >>>>>>> .inStreamingMode() >>>>>>> .build(); >>>>>>> final TableEnvironment tableEnv = >>>>>>> TableEnvironment.create(envSettings); >>>>>>> >>>>>>> tableEnv.fromValues("foobar").execute().await(); >>>>>>> >>>>>>> Am I missing something? Maybe you can share a minimal but fully >>>>>>> working example where the problem occurs. Thanks a lot. >>>>>>> >>>>>>> Cheers, >>>>>>> Till >>>>>>> >>>>>>> On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier < >>>>>>> pomperma...@okkam.it> wrote: >>>>>>> >>>>>>>> Any help here? Moreover if I use the DataStream APIs there's no >>>>>>>> left/right outer join yet..are those meant to be added in Flink 1.13 or >>>>>>>> 1.14? >>>>>>>> >>>>>>>> On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier < >>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>> >>>>>>>>> Hi to all, >>>>>>>>> I'm testing writing to a CSV using Flink 1.13 and I get the >>>>>>>>> following error: >>>>>>>>> >>>>>>>>> The matching candidates: >>>>>>>>> org.apache.flink.table.sinks.CsvBatchTableSinkFactory >>>>>>>>> Unsupported property keys: >>>>>>>>> format.quote-character >>>>>>>>> >>>>>>>>> I create the table env using this: >>>>>>>>> >>>>>>>>> final EnvironmentSettings envSettings = >>>>>>>>> EnvironmentSettings.newInstance()// >>>>>>>>> .useBlinkPlanner()// >>>>>>>>> // .inBatchMode()// >>>>>>>>> .inStreamingMode()// >>>>>>>>> .build(); >>>>>>>>> final TableEnvironment tableEnv = >>>>>>>>> TableEnvironment.create(envSettings); >>>>>>>>> >>>>>>>>> The error is the same both with inBatchMode and inStreamingMode. >>>>>>>>> Is this really not supported or am I using the wrong API? >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Flavio >>>>>>>>> >>>>>>>>