That's absolutely useful. IMHO also join should work without windows/triggers and left/right outer joins should be easier in order to really migrate legacy code. Also reduceGroup would help but less urgent. I hope that my feedback as Flink user could be useful.
Best, Flavio On Fri, Apr 9, 2021 at 12:38 PM Kurt Young <ykt...@gmail.com> wrote: > Converting from table to DataStream in batch mode is indeed a problem now. > But I think this will > be improved soon. > > Best, > Kurt > > > On Fri, Apr 9, 2021 at 6:14 PM Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> In my real CSV I have LONG columns that can contain null values. In that >> case I get a parse exception (and I would like to avoid to read it as a >> string). >> The ',bye' is just the way you can test that in my example (add that line >> to the input csv). >> If I use 'csv.null-literal' = '' it seems to work but, is it a >> workaround or it is the right solution? >> >> Another big problem I'm having with the new APIs is that if I use >> TableEnvironment tableEnv = TableEnvironment.create(envSettings); >> then I can't convert a table to a datastream..I need to use >> StreamTableEnvironment tableEnv = >> StreamTableEnvironment.create(streamEnv, envSettings); >> but in that case I can't use inBatchMode.. >> >> On Fri, Apr 9, 2021 at 11:44 AM Kurt Young <ykt...@gmail.com> wrote: >> >>> `format.ignore-first-line` is unfortunately a regression compared to the >>> old one. >>> I've created a ticket [1] to track this but according to current design, >>> it seems not easy to do. >>> >>> Regarding null values, I'm not sure if I understand the issue you had. >>> What do you mean by >>> using ',bye' to test null Long values? >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-22178 >>> >>> Best, >>> Kurt >>> >>> >>> On Fri, Apr 9, 2021 at 4:46 PM Flavio Pompermaier <pomperma...@okkam.it> >>> wrote: >>> >>>> And another thing: in my csv I added ',bye' (to test null Long values) >>>> but I get a parse error..if I add 'csv.null-literal' = '' it seems to >>>> work..is that the right way to solve this problem? >>>> >>>> On Fri, Apr 9, 2021 at 10:13 AM Flavio Pompermaier < >>>> pomperma...@okkam.it> wrote: >>>> >>>>> Thanks Kurt, now it works. However I can't find a way to skip the CSV >>>>> header..before there was "format.ignore-first-line" but now I can't find >>>>> another way to skip it. >>>>> I could set csv.ignore-parse-errors to true but then I can't detect >>>>> other parsing errors, otherwise I need to manually transofrm the header >>>>> into a comment adding the # character at the start of the line.. >>>>> How can I solve that? >>>>> >>>>> On Fri, Apr 9, 2021 at 4:07 AM Kurt Young <ykt...@gmail.com> wrote: >>>>> >>>>>> My DDL is: >>>>>> >>>>>> CREATE TABLE csv ( >>>>>> id BIGINT, >>>>>> name STRING >>>>>> ) WITH ( >>>>>> 'connector' = 'filesystem', >>>>>> 'path' = '.....', >>>>>> 'format' = 'csv' >>>>>> ); >>>>>> >>>>>> Best, >>>>>> Kurt >>>>>> >>>>>> >>>>>> On Fri, Apr 9, 2021 at 10:00 AM Kurt Young <ykt...@gmail.com> wrote: >>>>>> >>>>>>> Hi Flavio, >>>>>>> >>>>>>> We would recommend you to use new table source & sink interfaces, >>>>>>> which have different >>>>>>> property keys compared to the old ones, e.g. 'connector' v.s. >>>>>>> 'connector.type'. >>>>>>> >>>>>>> You can follow the 1.12 doc [1] to define your csv table, everything >>>>>>> should work just fine. >>>>>>> >>>>>>> *Flink SQL> set table.dml-sync=true;* >>>>>>> >>>>>>> *[INFO] Session property has been set.* >>>>>>> >>>>>>> >>>>>>> *Flink SQL> select * from csv;* >>>>>>> >>>>>>> *+----------------------+----------------------+* >>>>>>> >>>>>>> *| id | name |* >>>>>>> >>>>>>> *+----------------------+----------------------+* >>>>>>> >>>>>>> *| 3 | c |* >>>>>>> >>>>>>> *+----------------------+----------------------+* >>>>>>> >>>>>>> *Received a total of 1 row* >>>>>>> >>>>>>> >>>>>>> *Flink SQL> insert overwrite csv values(4, 'd');* >>>>>>> >>>>>>> *[INFO] Submitting SQL update statement to the cluster...* >>>>>>> >>>>>>> *[INFO] Execute statement in sync mode. Please wait for the >>>>>>> execution finish...* >>>>>>> >>>>>>> *[INFO] Complete execution of the SQL update statement.* >>>>>>> >>>>>>> >>>>>>> *Flink SQL> select * from csv;* >>>>>>> >>>>>>> *+----------------------+----------------------+* >>>>>>> >>>>>>> *| id | name |* >>>>>>> >>>>>>> *+----------------------+----------------------+* >>>>>>> >>>>>>> *| 4 | d |* >>>>>>> >>>>>>> *+----------------------+----------------------+* >>>>>>> >>>>>>> *Received a total of 1 row* >>>>>>> >>>>>>> [1] >>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html >>>>>>> >>>>>>> Best, >>>>>>> Kurt >>>>>>> >>>>>>> >>>>>>> On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier < >>>>>>> pomperma...@okkam.it> wrote: >>>>>>> >>>>>>>> Hi Till, >>>>>>>> since I was using the same WITH-clause both for reading and writing >>>>>>>> I discovered that overwrite is actually supported in the Sinks, while >>>>>>>> in >>>>>>>> the Sources an exception is thrown (I was thinking that those >>>>>>>> properties >>>>>>>> were simply ignored). >>>>>>>> However the quote-character is not supported in the sinks: is this >>>>>>>> a bug or is it the intended behaviour?. >>>>>>>> Here is a minimal example that reproduce the problem (put in the >>>>>>>> /tmp/test.csv something like '1,hello' or '2,hi'). >>>>>>>> >>>>>>>> import org.apache.flink.table.api.EnvironmentSettings; >>>>>>>> import org.apache.flink.table.api.Table; >>>>>>>> import org.apache.flink.table.api.TableEnvironment; >>>>>>>> >>>>>>>> public class FlinkCsvTest { >>>>>>>> public static void main(String[] args) throws Exception { >>>>>>>> final EnvironmentSettings envSettings = >>>>>>>> >>>>>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >>>>>>>> final TableEnvironment tableEnv = >>>>>>>> TableEnvironment.create(envSettings); >>>>>>>> // ExecutionEnvironment env = >>>>>>>> ExecutionEnvironment.getExecutionEnvironment(); >>>>>>>> // BatchTableEnvironment tableEnv = >>>>>>>> BatchTableEnvironment.create(env); >>>>>>>> final String tableInName = "testTableIn"; >>>>>>>> final String createInTableDdl = getSourceDdl(tableInName, >>>>>>>> "/tmp/test.csv"); // >>>>>>>> >>>>>>>> final String tableOutName = "testTableOut"; >>>>>>>> final String createOutTableDdl = getSinkDdl(tableOutName, >>>>>>>> "/tmp/test-out.csv"); // >>>>>>>> tableEnv.executeSql(createInTableDdl); >>>>>>>> tableEnv.executeSql(createOutTableDdl); >>>>>>>> >>>>>>>> Table tableIn = tableEnv.from(tableInName); >>>>>>>> Table tableOut = tableEnv.from(tableOutName); >>>>>>>> tableIn.insertInto(tableOutName); >>>>>>>> // tableEnv.toDataSet(table, Row.class).print(); >>>>>>>> tableEnv.execute("TEST read/write"); >>>>>>>> >>>>>>>> } >>>>>>>> >>>>>>>> private static String getSourceDdl(String tableName, String >>>>>>>> filePath) { >>>>>>>> return "CREATE TABLE " + tableName + " (\n" + // >>>>>>>> " `id` BIGINT,\n" + // >>>>>>>> " `name` STRING) WITH (\n" + // >>>>>>>> " 'connector.type' = 'filesystem',\n" + // >>>>>>>> " 'connector.property-version' = '1',\n" + // >>>>>>>> " 'connector.path' = '" + filePath + "',\n" + // >>>>>>>> " 'format.type' = 'csv',\n" + // >>>>>>>> " 'format.field-delimiter' = ',',\n" + // >>>>>>>> // " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED >>>>>>>> " 'format.property-version' = '1',\n" + // >>>>>>>> " 'format.quote-character' = '\"',\n" + // >>>>>>>> " 'format.ignore-first-line' = 'false'" + // >>>>>>>> ")"; >>>>>>>> } >>>>>>>> >>>>>>>> private static String getSinkDdl(String tableName, String >>>>>>>> filePath) { >>>>>>>> return "CREATE TABLE " + tableName + " (\n" + // >>>>>>>> " `id` BIGINT,\n" + // >>>>>>>> " `name` STRING) WITH (\n" + // >>>>>>>> " 'connector.type' = 'filesystem',\n" + // >>>>>>>> " 'connector.property-version' = '1',\n" + // >>>>>>>> " 'connector.path' = '" + filePath + "',\n" + // >>>>>>>> " 'format.type' = 'csv',\n" + // >>>>>>>> " 'format.field-delimiter' = ',',\n" + // >>>>>>>> " 'format.num-files' = '1',\n" + // >>>>>>>> " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED >>>>>>>> (sinks only) >>>>>>>> " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED >>>>>>>> " 'format.property-version' = '1'\n" + // >>>>>>>> ")"; >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> Thanks for the support, >>>>>>>> Flavio >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann <trohrm...@apache.org> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Flavio, >>>>>>>>> >>>>>>>>> I tried to execute the code snippet you have provided and I could >>>>>>>>> not reproduce the problem. >>>>>>>>> >>>>>>>>> Concretely I am running this code: >>>>>>>>> >>>>>>>>> final EnvironmentSettings envSettings = >>>>>>>>> EnvironmentSettings.newInstance() >>>>>>>>> .useBlinkPlanner() >>>>>>>>> .inStreamingMode() >>>>>>>>> .build(); >>>>>>>>> final TableEnvironment tableEnv = >>>>>>>>> TableEnvironment.create(envSettings); >>>>>>>>> >>>>>>>>> tableEnv.fromValues("foobar").execute().await(); >>>>>>>>> >>>>>>>>> Am I missing something? Maybe you can share a minimal but fully >>>>>>>>> working example where the problem occurs. Thanks a lot. >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Till >>>>>>>>> >>>>>>>>> On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier < >>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>> >>>>>>>>>> Any help here? Moreover if I use the DataStream APIs there's no >>>>>>>>>> left/right outer join yet..are those meant to be added in Flink 1.13 >>>>>>>>>> or >>>>>>>>>> 1.14? >>>>>>>>>> >>>>>>>>>> On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier < >>>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>>> >>>>>>>>>>> Hi to all, >>>>>>>>>>> I'm testing writing to a CSV using Flink 1.13 and I get the >>>>>>>>>>> following error: >>>>>>>>>>> >>>>>>>>>>> The matching candidates: >>>>>>>>>>> org.apache.flink.table.sinks.CsvBatchTableSinkFactory >>>>>>>>>>> Unsupported property keys: >>>>>>>>>>> format.quote-character >>>>>>>>>>> >>>>>>>>>>> I create the table env using this: >>>>>>>>>>> >>>>>>>>>>> final EnvironmentSettings envSettings = >>>>>>>>>>> EnvironmentSettings.newInstance()// >>>>>>>>>>> .useBlinkPlanner()// >>>>>>>>>>> // .inBatchMode()// >>>>>>>>>>> .inStreamingMode()// >>>>>>>>>>> .build(); >>>>>>>>>>> final TableEnvironment tableEnv = >>>>>>>>>>> TableEnvironment.create(envSettings); >>>>>>>>>>> >>>>>>>>>>> The error is the same both with inBatchMode and inStreamingMode. >>>>>>>>>>> Is this really not supported or am I using the wrong API? >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Flavio >>>>>>>>>>> >>>>>>>>>>