In my real CSV I have LONG columns that can contain null values. In that
case I get a parse exception (and I would like to avoid to read it as a
string).
The ',bye' is just the way you can test that in my example (add that line
to the input csv).
If I use  'csv.null-literal' = '' it seems to work but, is it a workaround
or it is the right solution?

Another big problem I'm having with the new APIs is that if I use
    TableEnvironment tableEnv = TableEnvironment.create(envSettings);
then I can't convert a table to a datastream..I need to use
    StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(streamEnv, envSettings);
but in that case I can't use inBatchMode..

On Fri, Apr 9, 2021 at 11:44 AM Kurt Young <ykt...@gmail.com> wrote:

> `format.ignore-first-line` is unfortunately a regression compared to the
> old one.
> I've created a ticket [1] to track this but according to current design,
> it seems not easy to do.
>
> Regarding null values, I'm not sure if I understand the issue you had.
> What do you mean by
> using ',bye' to test null Long values?
>
> [1] https://issues.apache.org/jira/browse/FLINK-22178
>
> Best,
> Kurt
>
>
> On Fri, Apr 9, 2021 at 4:46 PM Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
>
>> And another thing: in my csv I added ',bye' (to test null Long values)
>> but I get a parse error..if I add  'csv.null-literal' = '' it seems to
>> work..is that the right way to solve this problem?
>>
>> On Fri, Apr 9, 2021 at 10:13 AM Flavio Pompermaier <pomperma...@okkam.it>
>> wrote:
>>
>>> Thanks Kurt, now it works. However I can't find a way to skip the CSV
>>> header..before there was  "format.ignore-first-line" but now I can't find
>>> another way to skip it.
>>> I could set csv.ignore-parse-errors to true but then I can't detect
>>> other parsing errors, otherwise I need to manually transofrm the header
>>> into a comment adding the # character at the start of the line..
>>> How can I solve that?
>>>
>>> On Fri, Apr 9, 2021 at 4:07 AM Kurt Young <ykt...@gmail.com> wrote:
>>>
>>>> My DDL is:
>>>>
>>>> CREATE TABLE csv (
>>>>        id BIGINT,
>>>>        name STRING
>>>> ) WITH (
>>>>        'connector' = 'filesystem',
>>>>        'path' = '.....',
>>>>        'format' = 'csv'
>>>> );
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>>
>>>> On Fri, Apr 9, 2021 at 10:00 AM Kurt Young <ykt...@gmail.com> wrote:
>>>>
>>>>> Hi Flavio,
>>>>>
>>>>> We would recommend you to use new table source & sink interfaces,
>>>>> which have different
>>>>> property keys compared to the old ones, e.g. 'connector' v.s.
>>>>> 'connector.type'.
>>>>>
>>>>> You can follow the 1.12 doc [1] to define your csv table, everything
>>>>> should work just fine.
>>>>>
>>>>> *Flink SQL> set table.dml-sync=true;*
>>>>>
>>>>> *[INFO] Session property has been set.*
>>>>>
>>>>>
>>>>> *Flink SQL> select * from csv;*
>>>>>
>>>>> *+----------------------+----------------------+*
>>>>>
>>>>> *|                   id |                 name |*
>>>>>
>>>>> *+----------------------+----------------------+*
>>>>>
>>>>> *|                    3 |                    c |*
>>>>>
>>>>> *+----------------------+----------------------+*
>>>>>
>>>>> *Received a total of 1 row*
>>>>>
>>>>>
>>>>> *Flink SQL> insert overwrite csv values(4, 'd');*
>>>>>
>>>>> *[INFO] Submitting SQL update statement to the cluster...*
>>>>>
>>>>> *[INFO] Execute statement in sync mode. Please wait for the execution
>>>>> finish...*
>>>>>
>>>>> *[INFO] Complete execution of the SQL update statement.*
>>>>>
>>>>>
>>>>> *Flink SQL> select * from csv;*
>>>>>
>>>>> *+----------------------+----------------------+*
>>>>>
>>>>> *|                   id |                 name |*
>>>>>
>>>>> *+----------------------+----------------------+*
>>>>>
>>>>> *|                    4 |                    d |*
>>>>>
>>>>> *+----------------------+----------------------+*
>>>>>
>>>>> *Received a total of 1 row*
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html
>>>>>
>>>>> Best,
>>>>> Kurt
>>>>>
>>>>>
>>>>> On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier <
>>>>> pomperma...@okkam.it> wrote:
>>>>>
>>>>>> Hi Till,
>>>>>> since I was using the same WITH-clause both for reading and writing I
>>>>>> discovered that overwrite is actually supported in the Sinks, while in 
>>>>>> the
>>>>>> Sources an exception is thrown (I was thinking that those properties were
>>>>>> simply ignored).
>>>>>> However the quote-character is not supported in the sinks: is this a
>>>>>> bug or is it the intended behaviour?.
>>>>>> Here is a minimal example that reproduce the problem (put in the
>>>>>> /tmp/test.csv something like '1,hello' or '2,hi').
>>>>>>
>>>>>> import org.apache.flink.table.api.EnvironmentSettings;
>>>>>> import org.apache.flink.table.api.Table;
>>>>>> import org.apache.flink.table.api.TableEnvironment;
>>>>>>
>>>>>> public class FlinkCsvTest {
>>>>>>   public static void main(String[] args) throws Exception {
>>>>>>     final EnvironmentSettings envSettings =
>>>>>>
>>>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>>>>     final TableEnvironment tableEnv =
>>>>>> TableEnvironment.create(envSettings);
>>>>>>     // ExecutionEnvironment env =
>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>     // BatchTableEnvironment tableEnv =
>>>>>> BatchTableEnvironment.create(env);
>>>>>>     final String tableInName = "testTableIn";
>>>>>>     final String createInTableDdl = getSourceDdl(tableInName,
>>>>>> "/tmp/test.csv"); //
>>>>>>
>>>>>>     final String tableOutName = "testTableOut";
>>>>>>     final String createOutTableDdl = getSinkDdl(tableOutName,
>>>>>> "/tmp/test-out.csv"); //
>>>>>>     tableEnv.executeSql(createInTableDdl);
>>>>>>     tableEnv.executeSql(createOutTableDdl);
>>>>>>
>>>>>>     Table tableIn = tableEnv.from(tableInName);
>>>>>>     Table tableOut = tableEnv.from(tableOutName);
>>>>>>     tableIn.insertInto(tableOutName);
>>>>>>     // tableEnv.toDataSet(table, Row.class).print();
>>>>>>     tableEnv.execute("TEST read/write");
>>>>>>
>>>>>>   }
>>>>>>
>>>>>>   private static String getSourceDdl(String tableName, String
>>>>>> filePath) {
>>>>>>     return "CREATE TABLE " + tableName + " (\n" + //
>>>>>>         " `id` BIGINT,\n" + //
>>>>>>         " `name` STRING) WITH (\n" + //
>>>>>>         " 'connector.type' = 'filesystem',\n" + //
>>>>>>         " 'connector.property-version' = '1',\n" + //
>>>>>>         " 'connector.path' = '" + filePath + "',\n" + //
>>>>>>         " 'format.type' = 'csv',\n" + //
>>>>>>         " 'format.field-delimiter' = ',',\n" + //
>>>>>>  //       " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED
>>>>>>         " 'format.property-version' = '1',\n" + //
>>>>>>         " 'format.quote-character' = '\"',\n" + //
>>>>>>         " 'format.ignore-first-line' = 'false'" + //
>>>>>>         ")";
>>>>>>   }
>>>>>>
>>>>>>   private static String getSinkDdl(String tableName, String filePath)
>>>>>> {
>>>>>>     return "CREATE TABLE " + tableName + " (\n" + //
>>>>>>         " `id` BIGINT,\n" + //
>>>>>>         " `name` STRING) WITH (\n" + //
>>>>>>         " 'connector.type' = 'filesystem',\n" + //
>>>>>>         " 'connector.property-version' = '1',\n" + //
>>>>>>         " 'connector.path' = '" + filePath + "',\n" + //
>>>>>>         " 'format.type' = 'csv',\n" + //
>>>>>>         " 'format.field-delimiter' = ',',\n" + //
>>>>>>         " 'format.num-files' = '1',\n" + //
>>>>>>         " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED (sinks
>>>>>> only)
>>>>>>         " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED
>>>>>>         " 'format.property-version' = '1'\n" + //
>>>>>>         ")";
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>> Thanks for the support,
>>>>>> Flavio
>>>>>>
>>>>>>
>>>>>> On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann <trohrm...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Flavio,
>>>>>>>
>>>>>>> I tried to execute the code snippet you have provided and I could
>>>>>>> not reproduce the problem.
>>>>>>>
>>>>>>> Concretely I am running this code:
>>>>>>>
>>>>>>> final EnvironmentSettings envSettings =
>>>>>>> EnvironmentSettings.newInstance()
>>>>>>>     .useBlinkPlanner()
>>>>>>>     .inStreamingMode()
>>>>>>>     .build();
>>>>>>> final TableEnvironment tableEnv =
>>>>>>> TableEnvironment.create(envSettings);
>>>>>>>
>>>>>>> tableEnv.fromValues("foobar").execute().await();
>>>>>>>
>>>>>>> Am I missing something? Maybe you can share a minimal but fully
>>>>>>> working example where the problem occurs. Thanks a lot.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier <
>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>
>>>>>>>> Any help here? Moreover if I use the DataStream APIs there's no
>>>>>>>> left/right outer join yet..are those meant to be added in Flink 1.13 or
>>>>>>>> 1.14?
>>>>>>>>
>>>>>>>> On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier <
>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>
>>>>>>>>> Hi to all,
>>>>>>>>> I'm testing writing to a CSV using Flink 1.13 and I get the
>>>>>>>>> following error:
>>>>>>>>>
>>>>>>>>> The matching candidates:
>>>>>>>>> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
>>>>>>>>> Unsupported property keys:
>>>>>>>>> format.quote-character
>>>>>>>>>
>>>>>>>>> I create the table env using this:
>>>>>>>>>
>>>>>>>>> final EnvironmentSettings envSettings =
>>>>>>>>> EnvironmentSettings.newInstance()//
>>>>>>>>>         .useBlinkPlanner()//
>>>>>>>>>         // .inBatchMode()//
>>>>>>>>>         .inStreamingMode()//
>>>>>>>>>         .build();
>>>>>>>>>     final TableEnvironment tableEnv =
>>>>>>>>> TableEnvironment.create(envSettings);
>>>>>>>>>
>>>>>>>>> The error is the same both with inBatchMode and inStreamingMode.
>>>>>>>>> Is this really not supported or am I using the wrong API?
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Flavio
>>>>>>>>>
>>>>>>>>

Reply via email to