And another thing: in my csv I added ',bye' (to test null Long values) but
I get a parse error..if I add  'csv.null-literal' = '' it seems to work..is
that the right way to solve this problem?

On Fri, Apr 9, 2021 at 10:13 AM Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> Thanks Kurt, now it works. However I can't find a way to skip the CSV
> header..before there was  "format.ignore-first-line" but now I can't find
> another way to skip it.
> I could set csv.ignore-parse-errors to true but then I can't detect other
> parsing errors, otherwise I need to manually transofrm the header into a
> comment adding the # character at the start of the line..
> How can I solve that?
>
> On Fri, Apr 9, 2021 at 4:07 AM Kurt Young <ykt...@gmail.com> wrote:
>
>> My DDL is:
>>
>> CREATE TABLE csv (
>>        id BIGINT,
>>        name STRING
>> ) WITH (
>>        'connector' = 'filesystem',
>>        'path' = '.....',
>>        'format' = 'csv'
>> );
>>
>> Best,
>> Kurt
>>
>>
>> On Fri, Apr 9, 2021 at 10:00 AM Kurt Young <ykt...@gmail.com> wrote:
>>
>>> Hi Flavio,
>>>
>>> We would recommend you to use new table source & sink interfaces, which
>>> have different
>>> property keys compared to the old ones, e.g. 'connector' v.s.
>>> 'connector.type'.
>>>
>>> You can follow the 1.12 doc [1] to define your csv table, everything
>>> should work just fine.
>>>
>>> *Flink SQL> set table.dml-sync=true;*
>>>
>>> *[INFO] Session property has been set.*
>>>
>>>
>>> *Flink SQL> select * from csv;*
>>>
>>> *+----------------------+----------------------+*
>>>
>>> *|                   id |                 name |*
>>>
>>> *+----------------------+----------------------+*
>>>
>>> *|                    3 |                    c |*
>>>
>>> *+----------------------+----------------------+*
>>>
>>> *Received a total of 1 row*
>>>
>>>
>>> *Flink SQL> insert overwrite csv values(4, 'd');*
>>>
>>> *[INFO] Submitting SQL update statement to the cluster...*
>>>
>>> *[INFO] Execute statement in sync mode. Please wait for the execution
>>> finish...*
>>>
>>> *[INFO] Complete execution of the SQL update statement.*
>>>
>>>
>>> *Flink SQL> select * from csv;*
>>>
>>> *+----------------------+----------------------+*
>>>
>>> *|                   id |                 name |*
>>>
>>> *+----------------------+----------------------+*
>>>
>>> *|                    4 |                    d |*
>>>
>>> *+----------------------+----------------------+*
>>>
>>> *Received a total of 1 row*
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier <pomperma...@okkam.it>
>>> wrote:
>>>
>>>> Hi Till,
>>>> since I was using the same WITH-clause both for reading and writing I
>>>> discovered that overwrite is actually supported in the Sinks, while in the
>>>> Sources an exception is thrown (I was thinking that those properties were
>>>> simply ignored).
>>>> However the quote-character is not supported in the sinks: is this a
>>>> bug or is it the intended behaviour?.
>>>> Here is a minimal example that reproduce the problem (put in the
>>>> /tmp/test.csv something like '1,hello' or '2,hi').
>>>>
>>>> import org.apache.flink.table.api.EnvironmentSettings;
>>>> import org.apache.flink.table.api.Table;
>>>> import org.apache.flink.table.api.TableEnvironment;
>>>>
>>>> public class FlinkCsvTest {
>>>>   public static void main(String[] args) throws Exception {
>>>>     final EnvironmentSettings envSettings =
>>>>
>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>>     final TableEnvironment tableEnv =
>>>> TableEnvironment.create(envSettings);
>>>>     // ExecutionEnvironment env =
>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>     // BatchTableEnvironment tableEnv =
>>>> BatchTableEnvironment.create(env);
>>>>     final String tableInName = "testTableIn";
>>>>     final String createInTableDdl = getSourceDdl(tableInName,
>>>> "/tmp/test.csv"); //
>>>>
>>>>     final String tableOutName = "testTableOut";
>>>>     final String createOutTableDdl = getSinkDdl(tableOutName,
>>>> "/tmp/test-out.csv"); //
>>>>     tableEnv.executeSql(createInTableDdl);
>>>>     tableEnv.executeSql(createOutTableDdl);
>>>>
>>>>     Table tableIn = tableEnv.from(tableInName);
>>>>     Table tableOut = tableEnv.from(tableOutName);
>>>>     tableIn.insertInto(tableOutName);
>>>>     // tableEnv.toDataSet(table, Row.class).print();
>>>>     tableEnv.execute("TEST read/write");
>>>>
>>>>   }
>>>>
>>>>   private static String getSourceDdl(String tableName, String filePath)
>>>> {
>>>>     return "CREATE TABLE " + tableName + " (\n" + //
>>>>         " `id` BIGINT,\n" + //
>>>>         " `name` STRING) WITH (\n" + //
>>>>         " 'connector.type' = 'filesystem',\n" + //
>>>>         " 'connector.property-version' = '1',\n" + //
>>>>         " 'connector.path' = '" + filePath + "',\n" + //
>>>>         " 'format.type' = 'csv',\n" + //
>>>>         " 'format.field-delimiter' = ',',\n" + //
>>>>  //       " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED
>>>>         " 'format.property-version' = '1',\n" + //
>>>>         " 'format.quote-character' = '\"',\n" + //
>>>>         " 'format.ignore-first-line' = 'false'" + //
>>>>         ")";
>>>>   }
>>>>
>>>>   private static String getSinkDdl(String tableName, String filePath) {
>>>>     return "CREATE TABLE " + tableName + " (\n" + //
>>>>         " `id` BIGINT,\n" + //
>>>>         " `name` STRING) WITH (\n" + //
>>>>         " 'connector.type' = 'filesystem',\n" + //
>>>>         " 'connector.property-version' = '1',\n" + //
>>>>         " 'connector.path' = '" + filePath + "',\n" + //
>>>>         " 'format.type' = 'csv',\n" + //
>>>>         " 'format.field-delimiter' = ',',\n" + //
>>>>         " 'format.num-files' = '1',\n" + //
>>>>         " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED (sinks
>>>> only)
>>>>         " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED
>>>>         " 'format.property-version' = '1'\n" + //
>>>>         ")";
>>>>   }
>>>> }
>>>>
>>>> Thanks for the support,
>>>> Flavio
>>>>
>>>>
>>>> On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann <trohrm...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Flavio,
>>>>>
>>>>> I tried to execute the code snippet you have provided and I could not
>>>>> reproduce the problem.
>>>>>
>>>>> Concretely I am running this code:
>>>>>
>>>>> final EnvironmentSettings envSettings =
>>>>> EnvironmentSettings.newInstance()
>>>>>     .useBlinkPlanner()
>>>>>     .inStreamingMode()
>>>>>     .build();
>>>>> final TableEnvironment tableEnv = TableEnvironment.create(envSettings);
>>>>>
>>>>> tableEnv.fromValues("foobar").execute().await();
>>>>>
>>>>> Am I missing something? Maybe you can share a minimal but fully
>>>>> working example where the problem occurs. Thanks a lot.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier <
>>>>> pomperma...@okkam.it> wrote:
>>>>>
>>>>>> Any help here? Moreover if I use the DataStream APIs there's no
>>>>>> left/right outer join yet..are those meant to be added in Flink 1.13 or
>>>>>> 1.14?
>>>>>>
>>>>>> On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier <
>>>>>> pomperma...@okkam.it> wrote:
>>>>>>
>>>>>>> Hi to all,
>>>>>>> I'm testing writing to a CSV using Flink 1.13 and I get the
>>>>>>> following error:
>>>>>>>
>>>>>>> The matching candidates:
>>>>>>> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
>>>>>>> Unsupported property keys:
>>>>>>> format.quote-character
>>>>>>>
>>>>>>> I create the table env using this:
>>>>>>>
>>>>>>> final EnvironmentSettings envSettings =
>>>>>>> EnvironmentSettings.newInstance()//
>>>>>>>         .useBlinkPlanner()//
>>>>>>>         // .inBatchMode()//
>>>>>>>         .inStreamingMode()//
>>>>>>>         .build();
>>>>>>>     final TableEnvironment tableEnv =
>>>>>>> TableEnvironment.create(envSettings);
>>>>>>>
>>>>>>> The error is the same both with inBatchMode and inStreamingMode.
>>>>>>> Is this really not supported or am I using the wrong API?
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>>>
>>>>>>

Reply via email to