Converting from table to DataStream in batch mode is indeed a problem now.
But I think this will
be improved soon.

Best,
Kurt


On Fri, Apr 9, 2021 at 6:14 PM Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> In my real CSV I have LONG columns that can contain null values. In that
> case I get a parse exception (and I would like to avoid to read it as a
> string).
> The ',bye' is just the way you can test that in my example (add that line
> to the input csv).
> If I use  'csv.null-literal' = '' it seems to work but, is it a workaround
> or it is the right solution?
>
> Another big problem I'm having with the new APIs is that if I use
>     TableEnvironment tableEnv = TableEnvironment.create(envSettings);
> then I can't convert a table to a datastream..I need to use
>     StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(streamEnv, envSettings);
> but in that case I can't use inBatchMode..
>
> On Fri, Apr 9, 2021 at 11:44 AM Kurt Young <ykt...@gmail.com> wrote:
>
>> `format.ignore-first-line` is unfortunately a regression compared to the
>> old one.
>> I've created a ticket [1] to track this but according to current design,
>> it seems not easy to do.
>>
>> Regarding null values, I'm not sure if I understand the issue you had.
>> What do you mean by
>> using ',bye' to test null Long values?
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-22178
>>
>> Best,
>> Kurt
>>
>>
>> On Fri, Apr 9, 2021 at 4:46 PM Flavio Pompermaier <pomperma...@okkam.it>
>> wrote:
>>
>>> And another thing: in my csv I added ',bye' (to test null Long values)
>>> but I get a parse error..if I add  'csv.null-literal' = '' it seems to
>>> work..is that the right way to solve this problem?
>>>
>>> On Fri, Apr 9, 2021 at 10:13 AM Flavio Pompermaier <pomperma...@okkam.it>
>>> wrote:
>>>
>>>> Thanks Kurt, now it works. However I can't find a way to skip the CSV
>>>> header..before there was  "format.ignore-first-line" but now I can't find
>>>> another way to skip it.
>>>> I could set csv.ignore-parse-errors to true but then I can't detect
>>>> other parsing errors, otherwise I need to manually transofrm the header
>>>> into a comment adding the # character at the start of the line..
>>>> How can I solve that?
>>>>
>>>> On Fri, Apr 9, 2021 at 4:07 AM Kurt Young <ykt...@gmail.com> wrote:
>>>>
>>>>> My DDL is:
>>>>>
>>>>> CREATE TABLE csv (
>>>>>        id BIGINT,
>>>>>        name STRING
>>>>> ) WITH (
>>>>>        'connector' = 'filesystem',
>>>>>        'path' = '.....',
>>>>>        'format' = 'csv'
>>>>> );
>>>>>
>>>>> Best,
>>>>> Kurt
>>>>>
>>>>>
>>>>> On Fri, Apr 9, 2021 at 10:00 AM Kurt Young <ykt...@gmail.com> wrote:
>>>>>
>>>>>> Hi Flavio,
>>>>>>
>>>>>> We would recommend you to use new table source & sink interfaces,
>>>>>> which have different
>>>>>> property keys compared to the old ones, e.g. 'connector' v.s.
>>>>>> 'connector.type'.
>>>>>>
>>>>>> You can follow the 1.12 doc [1] to define your csv table, everything
>>>>>> should work just fine.
>>>>>>
>>>>>> *Flink SQL> set table.dml-sync=true;*
>>>>>>
>>>>>> *[INFO] Session property has been set.*
>>>>>>
>>>>>>
>>>>>> *Flink SQL> select * from csv;*
>>>>>>
>>>>>> *+----------------------+----------------------+*
>>>>>>
>>>>>> *|                   id |                 name |*
>>>>>>
>>>>>> *+----------------------+----------------------+*
>>>>>>
>>>>>> *|                    3 |                    c |*
>>>>>>
>>>>>> *+----------------------+----------------------+*
>>>>>>
>>>>>> *Received a total of 1 row*
>>>>>>
>>>>>>
>>>>>> *Flink SQL> insert overwrite csv values(4, 'd');*
>>>>>>
>>>>>> *[INFO] Submitting SQL update statement to the cluster...*
>>>>>>
>>>>>> *[INFO] Execute statement in sync mode. Please wait for the execution
>>>>>> finish...*
>>>>>>
>>>>>> *[INFO] Complete execution of the SQL update statement.*
>>>>>>
>>>>>>
>>>>>> *Flink SQL> select * from csv;*
>>>>>>
>>>>>> *+----------------------+----------------------+*
>>>>>>
>>>>>> *|                   id |                 name |*
>>>>>>
>>>>>> *+----------------------+----------------------+*
>>>>>>
>>>>>> *|                    4 |                    d |*
>>>>>>
>>>>>> *+----------------------+----------------------+*
>>>>>>
>>>>>> *Received a total of 1 row*
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html
>>>>>>
>>>>>> Best,
>>>>>> Kurt
>>>>>>
>>>>>>
>>>>>> On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier <
>>>>>> pomperma...@okkam.it> wrote:
>>>>>>
>>>>>>> Hi Till,
>>>>>>> since I was using the same WITH-clause both for reading and writing
>>>>>>> I discovered that overwrite is actually supported in the Sinks, while in
>>>>>>> the Sources an exception is thrown (I was thinking that those properties
>>>>>>> were simply ignored).
>>>>>>> However the quote-character is not supported in the sinks: is this a
>>>>>>> bug or is it the intended behaviour?.
>>>>>>> Here is a minimal example that reproduce the problem (put in the
>>>>>>> /tmp/test.csv something like '1,hello' or '2,hi').
>>>>>>>
>>>>>>> import org.apache.flink.table.api.EnvironmentSettings;
>>>>>>> import org.apache.flink.table.api.Table;
>>>>>>> import org.apache.flink.table.api.TableEnvironment;
>>>>>>>
>>>>>>> public class FlinkCsvTest {
>>>>>>>   public static void main(String[] args) throws Exception {
>>>>>>>     final EnvironmentSettings envSettings =
>>>>>>>
>>>>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>>>>>     final TableEnvironment tableEnv =
>>>>>>> TableEnvironment.create(envSettings);
>>>>>>>     // ExecutionEnvironment env =
>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>     // BatchTableEnvironment tableEnv =
>>>>>>> BatchTableEnvironment.create(env);
>>>>>>>     final String tableInName = "testTableIn";
>>>>>>>     final String createInTableDdl = getSourceDdl(tableInName,
>>>>>>> "/tmp/test.csv"); //
>>>>>>>
>>>>>>>     final String tableOutName = "testTableOut";
>>>>>>>     final String createOutTableDdl = getSinkDdl(tableOutName,
>>>>>>> "/tmp/test-out.csv"); //
>>>>>>>     tableEnv.executeSql(createInTableDdl);
>>>>>>>     tableEnv.executeSql(createOutTableDdl);
>>>>>>>
>>>>>>>     Table tableIn = tableEnv.from(tableInName);
>>>>>>>     Table tableOut = tableEnv.from(tableOutName);
>>>>>>>     tableIn.insertInto(tableOutName);
>>>>>>>     // tableEnv.toDataSet(table, Row.class).print();
>>>>>>>     tableEnv.execute("TEST read/write");
>>>>>>>
>>>>>>>   }
>>>>>>>
>>>>>>>   private static String getSourceDdl(String tableName, String
>>>>>>> filePath) {
>>>>>>>     return "CREATE TABLE " + tableName + " (\n" + //
>>>>>>>         " `id` BIGINT,\n" + //
>>>>>>>         " `name` STRING) WITH (\n" + //
>>>>>>>         " 'connector.type' = 'filesystem',\n" + //
>>>>>>>         " 'connector.property-version' = '1',\n" + //
>>>>>>>         " 'connector.path' = '" + filePath + "',\n" + //
>>>>>>>         " 'format.type' = 'csv',\n" + //
>>>>>>>         " 'format.field-delimiter' = ',',\n" + //
>>>>>>>  //       " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED
>>>>>>>         " 'format.property-version' = '1',\n" + //
>>>>>>>         " 'format.quote-character' = '\"',\n" + //
>>>>>>>         " 'format.ignore-first-line' = 'false'" + //
>>>>>>>         ")";
>>>>>>>   }
>>>>>>>
>>>>>>>   private static String getSinkDdl(String tableName, String
>>>>>>> filePath) {
>>>>>>>     return "CREATE TABLE " + tableName + " (\n" + //
>>>>>>>         " `id` BIGINT,\n" + //
>>>>>>>         " `name` STRING) WITH (\n" + //
>>>>>>>         " 'connector.type' = 'filesystem',\n" + //
>>>>>>>         " 'connector.property-version' = '1',\n" + //
>>>>>>>         " 'connector.path' = '" + filePath + "',\n" + //
>>>>>>>         " 'format.type' = 'csv',\n" + //
>>>>>>>         " 'format.field-delimiter' = ',',\n" + //
>>>>>>>         " 'format.num-files' = '1',\n" + //
>>>>>>>         " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED
>>>>>>> (sinks only)
>>>>>>>         " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED
>>>>>>>         " 'format.property-version' = '1'\n" + //
>>>>>>>         ")";
>>>>>>>   }
>>>>>>> }
>>>>>>>
>>>>>>> Thanks for the support,
>>>>>>> Flavio
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann <trohrm...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Flavio,
>>>>>>>>
>>>>>>>> I tried to execute the code snippet you have provided and I could
>>>>>>>> not reproduce the problem.
>>>>>>>>
>>>>>>>> Concretely I am running this code:
>>>>>>>>
>>>>>>>> final EnvironmentSettings envSettings =
>>>>>>>> EnvironmentSettings.newInstance()
>>>>>>>>     .useBlinkPlanner()
>>>>>>>>     .inStreamingMode()
>>>>>>>>     .build();
>>>>>>>> final TableEnvironment tableEnv =
>>>>>>>> TableEnvironment.create(envSettings);
>>>>>>>>
>>>>>>>> tableEnv.fromValues("foobar").execute().await();
>>>>>>>>
>>>>>>>> Am I missing something? Maybe you can share a minimal but fully
>>>>>>>> working example where the problem occurs. Thanks a lot.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Till
>>>>>>>>
>>>>>>>> On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier <
>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>
>>>>>>>>> Any help here? Moreover if I use the DataStream APIs there's no
>>>>>>>>> left/right outer join yet..are those meant to be added in Flink 1.13 
>>>>>>>>> or
>>>>>>>>> 1.14?
>>>>>>>>>
>>>>>>>>> On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier <
>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>
>>>>>>>>>> Hi to all,
>>>>>>>>>> I'm testing writing to a CSV using Flink 1.13 and I get the
>>>>>>>>>> following error:
>>>>>>>>>>
>>>>>>>>>> The matching candidates:
>>>>>>>>>> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
>>>>>>>>>> Unsupported property keys:
>>>>>>>>>> format.quote-character
>>>>>>>>>>
>>>>>>>>>> I create the table env using this:
>>>>>>>>>>
>>>>>>>>>> final EnvironmentSettings envSettings =
>>>>>>>>>> EnvironmentSettings.newInstance()//
>>>>>>>>>>         .useBlinkPlanner()//
>>>>>>>>>>         // .inBatchMode()//
>>>>>>>>>>         .inStreamingMode()//
>>>>>>>>>>         .build();
>>>>>>>>>>     final TableEnvironment tableEnv =
>>>>>>>>>> TableEnvironment.create(envSettings);
>>>>>>>>>>
>>>>>>>>>> The error is the same both with inBatchMode and inStreamingMode.
>>>>>>>>>> Is this really not supported or am I using the wrong API?
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Flavio
>>>>>>>>>>
>>>>>>>>>

Reply via email to