Converting from table to DataStream in batch mode is indeed a problem now. But I think this will be improved soon.
Best, Kurt On Fri, Apr 9, 2021 at 6:14 PM Flavio Pompermaier <pomperma...@okkam.it> wrote: > In my real CSV I have LONG columns that can contain null values. In that > case I get a parse exception (and I would like to avoid to read it as a > string). > The ',bye' is just the way you can test that in my example (add that line > to the input csv). > If I use 'csv.null-literal' = '' it seems to work but, is it a workaround > or it is the right solution? > > Another big problem I'm having with the new APIs is that if I use > TableEnvironment tableEnv = TableEnvironment.create(envSettings); > then I can't convert a table to a datastream..I need to use > StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(streamEnv, envSettings); > but in that case I can't use inBatchMode.. > > On Fri, Apr 9, 2021 at 11:44 AM Kurt Young <ykt...@gmail.com> wrote: > >> `format.ignore-first-line` is unfortunately a regression compared to the >> old one. >> I've created a ticket [1] to track this but according to current design, >> it seems not easy to do. >> >> Regarding null values, I'm not sure if I understand the issue you had. >> What do you mean by >> using ',bye' to test null Long values? >> >> [1] https://issues.apache.org/jira/browse/FLINK-22178 >> >> Best, >> Kurt >> >> >> On Fri, Apr 9, 2021 at 4:46 PM Flavio Pompermaier <pomperma...@okkam.it> >> wrote: >> >>> And another thing: in my csv I added ',bye' (to test null Long values) >>> but I get a parse error..if I add 'csv.null-literal' = '' it seems to >>> work..is that the right way to solve this problem? >>> >>> On Fri, Apr 9, 2021 at 10:13 AM Flavio Pompermaier <pomperma...@okkam.it> >>> wrote: >>> >>>> Thanks Kurt, now it works. However I can't find a way to skip the CSV >>>> header..before there was "format.ignore-first-line" but now I can't find >>>> another way to skip it. >>>> I could set csv.ignore-parse-errors to true but then I can't detect >>>> other parsing errors, otherwise I need to manually transofrm the header >>>> into a comment adding the # character at the start of the line.. >>>> How can I solve that? >>>> >>>> On Fri, Apr 9, 2021 at 4:07 AM Kurt Young <ykt...@gmail.com> wrote: >>>> >>>>> My DDL is: >>>>> >>>>> CREATE TABLE csv ( >>>>> id BIGINT, >>>>> name STRING >>>>> ) WITH ( >>>>> 'connector' = 'filesystem', >>>>> 'path' = '.....', >>>>> 'format' = 'csv' >>>>> ); >>>>> >>>>> Best, >>>>> Kurt >>>>> >>>>> >>>>> On Fri, Apr 9, 2021 at 10:00 AM Kurt Young <ykt...@gmail.com> wrote: >>>>> >>>>>> Hi Flavio, >>>>>> >>>>>> We would recommend you to use new table source & sink interfaces, >>>>>> which have different >>>>>> property keys compared to the old ones, e.g. 'connector' v.s. >>>>>> 'connector.type'. >>>>>> >>>>>> You can follow the 1.12 doc [1] to define your csv table, everything >>>>>> should work just fine. >>>>>> >>>>>> *Flink SQL> set table.dml-sync=true;* >>>>>> >>>>>> *[INFO] Session property has been set.* >>>>>> >>>>>> >>>>>> *Flink SQL> select * from csv;* >>>>>> >>>>>> *+----------------------+----------------------+* >>>>>> >>>>>> *| id | name |* >>>>>> >>>>>> *+----------------------+----------------------+* >>>>>> >>>>>> *| 3 | c |* >>>>>> >>>>>> *+----------------------+----------------------+* >>>>>> >>>>>> *Received a total of 1 row* >>>>>> >>>>>> >>>>>> *Flink SQL> insert overwrite csv values(4, 'd');* >>>>>> >>>>>> *[INFO] Submitting SQL update statement to the cluster...* >>>>>> >>>>>> *[INFO] Execute statement in sync mode. Please wait for the execution >>>>>> finish...* >>>>>> >>>>>> *[INFO] Complete execution of the SQL update statement.* >>>>>> >>>>>> >>>>>> *Flink SQL> select * from csv;* >>>>>> >>>>>> *+----------------------+----------------------+* >>>>>> >>>>>> *| id | name |* >>>>>> >>>>>> *+----------------------+----------------------+* >>>>>> >>>>>> *| 4 | d |* >>>>>> >>>>>> *+----------------------+----------------------+* >>>>>> >>>>>> *Received a total of 1 row* >>>>>> >>>>>> [1] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html >>>>>> >>>>>> Best, >>>>>> Kurt >>>>>> >>>>>> >>>>>> On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier < >>>>>> pomperma...@okkam.it> wrote: >>>>>> >>>>>>> Hi Till, >>>>>>> since I was using the same WITH-clause both for reading and writing >>>>>>> I discovered that overwrite is actually supported in the Sinks, while in >>>>>>> the Sources an exception is thrown (I was thinking that those properties >>>>>>> were simply ignored). >>>>>>> However the quote-character is not supported in the sinks: is this a >>>>>>> bug or is it the intended behaviour?. >>>>>>> Here is a minimal example that reproduce the problem (put in the >>>>>>> /tmp/test.csv something like '1,hello' or '2,hi'). >>>>>>> >>>>>>> import org.apache.flink.table.api.EnvironmentSettings; >>>>>>> import org.apache.flink.table.api.Table; >>>>>>> import org.apache.flink.table.api.TableEnvironment; >>>>>>> >>>>>>> public class FlinkCsvTest { >>>>>>> public static void main(String[] args) throws Exception { >>>>>>> final EnvironmentSettings envSettings = >>>>>>> >>>>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >>>>>>> final TableEnvironment tableEnv = >>>>>>> TableEnvironment.create(envSettings); >>>>>>> // ExecutionEnvironment env = >>>>>>> ExecutionEnvironment.getExecutionEnvironment(); >>>>>>> // BatchTableEnvironment tableEnv = >>>>>>> BatchTableEnvironment.create(env); >>>>>>> final String tableInName = "testTableIn"; >>>>>>> final String createInTableDdl = getSourceDdl(tableInName, >>>>>>> "/tmp/test.csv"); // >>>>>>> >>>>>>> final String tableOutName = "testTableOut"; >>>>>>> final String createOutTableDdl = getSinkDdl(tableOutName, >>>>>>> "/tmp/test-out.csv"); // >>>>>>> tableEnv.executeSql(createInTableDdl); >>>>>>> tableEnv.executeSql(createOutTableDdl); >>>>>>> >>>>>>> Table tableIn = tableEnv.from(tableInName); >>>>>>> Table tableOut = tableEnv.from(tableOutName); >>>>>>> tableIn.insertInto(tableOutName); >>>>>>> // tableEnv.toDataSet(table, Row.class).print(); >>>>>>> tableEnv.execute("TEST read/write"); >>>>>>> >>>>>>> } >>>>>>> >>>>>>> private static String getSourceDdl(String tableName, String >>>>>>> filePath) { >>>>>>> return "CREATE TABLE " + tableName + " (\n" + // >>>>>>> " `id` BIGINT,\n" + // >>>>>>> " `name` STRING) WITH (\n" + // >>>>>>> " 'connector.type' = 'filesystem',\n" + // >>>>>>> " 'connector.property-version' = '1',\n" + // >>>>>>> " 'connector.path' = '" + filePath + "',\n" + // >>>>>>> " 'format.type' = 'csv',\n" + // >>>>>>> " 'format.field-delimiter' = ',',\n" + // >>>>>>> // " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED >>>>>>> " 'format.property-version' = '1',\n" + // >>>>>>> " 'format.quote-character' = '\"',\n" + // >>>>>>> " 'format.ignore-first-line' = 'false'" + // >>>>>>> ")"; >>>>>>> } >>>>>>> >>>>>>> private static String getSinkDdl(String tableName, String >>>>>>> filePath) { >>>>>>> return "CREATE TABLE " + tableName + " (\n" + // >>>>>>> " `id` BIGINT,\n" + // >>>>>>> " `name` STRING) WITH (\n" + // >>>>>>> " 'connector.type' = 'filesystem',\n" + // >>>>>>> " 'connector.property-version' = '1',\n" + // >>>>>>> " 'connector.path' = '" + filePath + "',\n" + // >>>>>>> " 'format.type' = 'csv',\n" + // >>>>>>> " 'format.field-delimiter' = ',',\n" + // >>>>>>> " 'format.num-files' = '1',\n" + // >>>>>>> " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED >>>>>>> (sinks only) >>>>>>> " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED >>>>>>> " 'format.property-version' = '1'\n" + // >>>>>>> ")"; >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> Thanks for the support, >>>>>>> Flavio >>>>>>> >>>>>>> >>>>>>> On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann <trohrm...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Flavio, >>>>>>>> >>>>>>>> I tried to execute the code snippet you have provided and I could >>>>>>>> not reproduce the problem. >>>>>>>> >>>>>>>> Concretely I am running this code: >>>>>>>> >>>>>>>> final EnvironmentSettings envSettings = >>>>>>>> EnvironmentSettings.newInstance() >>>>>>>> .useBlinkPlanner() >>>>>>>> .inStreamingMode() >>>>>>>> .build(); >>>>>>>> final TableEnvironment tableEnv = >>>>>>>> TableEnvironment.create(envSettings); >>>>>>>> >>>>>>>> tableEnv.fromValues("foobar").execute().await(); >>>>>>>> >>>>>>>> Am I missing something? Maybe you can share a minimal but fully >>>>>>>> working example where the problem occurs. Thanks a lot. >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Till >>>>>>>> >>>>>>>> On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier < >>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>> >>>>>>>>> Any help here? Moreover if I use the DataStream APIs there's no >>>>>>>>> left/right outer join yet..are those meant to be added in Flink 1.13 >>>>>>>>> or >>>>>>>>> 1.14? >>>>>>>>> >>>>>>>>> On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier < >>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>> >>>>>>>>>> Hi to all, >>>>>>>>>> I'm testing writing to a CSV using Flink 1.13 and I get the >>>>>>>>>> following error: >>>>>>>>>> >>>>>>>>>> The matching candidates: >>>>>>>>>> org.apache.flink.table.sinks.CsvBatchTableSinkFactory >>>>>>>>>> Unsupported property keys: >>>>>>>>>> format.quote-character >>>>>>>>>> >>>>>>>>>> I create the table env using this: >>>>>>>>>> >>>>>>>>>> final EnvironmentSettings envSettings = >>>>>>>>>> EnvironmentSettings.newInstance()// >>>>>>>>>> .useBlinkPlanner()// >>>>>>>>>> // .inBatchMode()// >>>>>>>>>> .inStreamingMode()// >>>>>>>>>> .build(); >>>>>>>>>> final TableEnvironment tableEnv = >>>>>>>>>> TableEnvironment.create(envSettings); >>>>>>>>>> >>>>>>>>>> The error is the same both with inBatchMode and inStreamingMode. >>>>>>>>>> Is this really not supported or am I using the wrong API? >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Flavio >>>>>>>>>> >>>>>>>>>