Hi Flavio,

We would recommend you to use new table source & sink interfaces, which
have different
property keys compared to the old ones, e.g. 'connector' v.s.
'connector.type'.

You can follow the 1.12 doc [1] to define your csv table, everything should
work just fine.

*Flink SQL> set table.dml-sync=true;*

*[INFO] Session property has been set.*


*Flink SQL> select * from csv;*

*+----------------------+----------------------+*

*|                   id |                 name |*

*+----------------------+----------------------+*

*|                    3 |                    c |*

*+----------------------+----------------------+*

*Received a total of 1 row*


*Flink SQL> insert overwrite csv values(4, 'd');*

*[INFO] Submitting SQL update statement to the cluster...*

*[INFO] Execute statement in sync mode. Please wait for the execution
finish...*

*[INFO] Complete execution of the SQL update statement.*


*Flink SQL> select * from csv;*

*+----------------------+----------------------+*

*|                   id |                 name |*

*+----------------------+----------------------+*

*|                    4 |                    d |*

*+----------------------+----------------------+*

*Received a total of 1 row*

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html

Best,
Kurt


On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> Hi Till,
> since I was using the same WITH-clause both for reading and writing I
> discovered that overwrite is actually supported in the Sinks, while in the
> Sources an exception is thrown (I was thinking that those properties were
> simply ignored).
> However the quote-character is not supported in the sinks: is this a bug
> or is it the intended behaviour?.
> Here is a minimal example that reproduce the problem (put in the
> /tmp/test.csv something like '1,hello' or '2,hi').
>
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.TableEnvironment;
>
> public class FlinkCsvTest {
>   public static void main(String[] args) throws Exception {
>     final EnvironmentSettings envSettings =
>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>     final TableEnvironment tableEnv = TableEnvironment.create(envSettings);
>     // ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>     // BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
>     final String tableInName = "testTableIn";
>     final String createInTableDdl = getSourceDdl(tableInName,
> "/tmp/test.csv"); //
>
>     final String tableOutName = "testTableOut";
>     final String createOutTableDdl = getSinkDdl(tableOutName,
> "/tmp/test-out.csv"); //
>     tableEnv.executeSql(createInTableDdl);
>     tableEnv.executeSql(createOutTableDdl);
>
>     Table tableIn = tableEnv.from(tableInName);
>     Table tableOut = tableEnv.from(tableOutName);
>     tableIn.insertInto(tableOutName);
>     // tableEnv.toDataSet(table, Row.class).print();
>     tableEnv.execute("TEST read/write");
>
>   }
>
>   private static String getSourceDdl(String tableName, String filePath) {
>     return "CREATE TABLE " + tableName + " (\n" + //
>         " `id` BIGINT,\n" + //
>         " `name` STRING) WITH (\n" + //
>         " 'connector.type' = 'filesystem',\n" + //
>         " 'connector.property-version' = '1',\n" + //
>         " 'connector.path' = '" + filePath + "',\n" + //
>         " 'format.type' = 'csv',\n" + //
>         " 'format.field-delimiter' = ',',\n" + //
>  //       " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED
>         " 'format.property-version' = '1',\n" + //
>         " 'format.quote-character' = '\"',\n" + //
>         " 'format.ignore-first-line' = 'false'" + //
>         ")";
>   }
>
>   private static String getSinkDdl(String tableName, String filePath) {
>     return "CREATE TABLE " + tableName + " (\n" + //
>         " `id` BIGINT,\n" + //
>         " `name` STRING) WITH (\n" + //
>         " 'connector.type' = 'filesystem',\n" + //
>         " 'connector.property-version' = '1',\n" + //
>         " 'connector.path' = '" + filePath + "',\n" + //
>         " 'format.type' = 'csv',\n" + //
>         " 'format.field-delimiter' = ',',\n" + //
>         " 'format.num-files' = '1',\n" + //
>         " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED (sinks only)
>         " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED
>         " 'format.property-version' = '1'\n" + //
>         ")";
>   }
> }
>
> Thanks for the support,
> Flavio
>
>
> On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann <trohrm...@apache.org> wrote:
>
>> Hi Flavio,
>>
>> I tried to execute the code snippet you have provided and I could not
>> reproduce the problem.
>>
>> Concretely I am running this code:
>>
>> final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
>>     .useBlinkPlanner()
>>     .inStreamingMode()
>>     .build();
>> final TableEnvironment tableEnv = TableEnvironment.create(envSettings);
>>
>> tableEnv.fromValues("foobar").execute().await();
>>
>> Am I missing something? Maybe you can share a minimal but fully working
>> example where the problem occurs. Thanks a lot.
>>
>> Cheers,
>> Till
>>
>> On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier <pomperma...@okkam.it>
>> wrote:
>>
>>> Any help here? Moreover if I use the DataStream APIs there's no
>>> left/right outer join yet..are those meant to be added in Flink 1.13 or
>>> 1.14?
>>>
>>> On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier <pomperma...@okkam.it>
>>> wrote:
>>>
>>>> Hi to all,
>>>> I'm testing writing to a CSV using Flink 1.13 and I get the following
>>>> error:
>>>>
>>>> The matching candidates:
>>>> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
>>>> Unsupported property keys:
>>>> format.quote-character
>>>>
>>>> I create the table env using this:
>>>>
>>>> final EnvironmentSettings envSettings =
>>>> EnvironmentSettings.newInstance()//
>>>>         .useBlinkPlanner()//
>>>>         // .inBatchMode()//
>>>>         .inStreamingMode()//
>>>>         .build();
>>>>     final TableEnvironment tableEnv =
>>>> TableEnvironment.create(envSettings);
>>>>
>>>> The error is the same both with inBatchMode and inStreamingMode.
>>>> Is this really not supported or am I using the wrong API?
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>

Reply via email to