Do you have checkpointing enabled?

Op za 30 jul. 2022 om 17:31 schreef <pod...@gmx.com>:

> Thanks David but there's no problem with that (probably ";" is default
> separator).
> I can read the file and insert into "Table1" (I said that in my mail).
> Problem is to save to CSV.
>
>
> *Sent:* Saturday, July 30, 2022 at 3:33 PM
> *From:* "David Anderson" <dander...@apache.org>
> *To:* pod...@gmx.com
> *Cc:* "user" <user@flink.apache.org>
> *Subject:* Re: Why this example does not save anything to file?
> You need to add
>
> 'csv.field-delimiter'=';'
>
> to the definition of Table1 so that the input from test4.txt can be
> correctly parsed:
>
>         tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING,
> column_name2 DOUBLE) WITH ('connector.type' = 'filesystem',
> 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv',
> 'csv.field-delimiter'=';')");
>
> Cheers,
> David
>
> On Fri, Jul 29, 2022 at 4:15 PM <pod...@gmx.com> wrote:
>
>> Hi,
>>
>> you mean adding:
>>
>> " 'csv.field-delimiter'=';', "
>>
>> like:
>>
>>         tEnv.executeSql("CREATE TABLE fs_table ("
>>                 + "    column_nameA STRING, "
>>                 + "    column_nameB DOUBLE "
>>                 + "    ) WITH ( "
>>                 + "    'connector'='filesystem', "
>>                 + "    'path'='file:///C:/temp/test5.txt', "
>>                 + "    'format'='csv', "
>>                 + " 'csv.field-delimiter'=';', "
>>                 + " 'sink.partition-commit.delay'='1 s', "
>>                 + " 'sink.partition-commit.policy.kind'='success-file'"
>>                 + "    )");
>>
>>         tEnv.executeSql("INSERT INTO fs_table SELECT column_name1,
>> column_name2 from Table1");
>>
>> I did. Nothing new - still does not work.
>>
>>
>>
>> *Sent:* Tuesday, July 26, 2022 at 4:00 PM
>> *From:* "Gil De Grove" <gil.degr...@euranova.eu>
>> *To:* "Weihua Hu" <huweihua....@gmail.com>
>> *Cc:* pod...@gmx.com, "user" <user@flink.apache.org>
>> *Subject:* Re: Why this example does not save anything to file?
>> Hello,
>>
>> I may be really wrong with this, but from what I get in the source file,
>> you are using a semi-column to separate the value.
>> This probably means that you should set the csv.field-delimiter to `;` to
>> make your example work properly.
>>
>> Have you tried with that configuration in your create table csv connector
>> option?
>>
>> Regards,
>> Gil
>>
>> On Tue, 26 Jul 2022 at 15:40, Weihua Hu <huweihua....@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Can you see any exception logs?
>>> Where is this code running? is it a standalone cluster with one
>>> TaskManager?
>>>
>>>
>>> Best,
>>> Weihua
>>>
>>> On Tue, Jul 26, 2022 at 4:18 AM <pod...@gmx.com> wrote:
>>>
>>>> If I get it correctly this is the way how I can save to CSV:
>>>>
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example
>>>>
>>>> So my code is (read from file, save to file):
>>>>
>>>>
>>>> *package flinkCSV;*
>>>>
>>>> *import org.apache.flink.table.api.EnvironmentSettings; import
>>>> org.apache.flink.table.api.TableEnvironment;*
>>>> *public class flinkCSV {*
>>>> *    public static void main(String[] args) throws Exception {*
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *                 //register and create table
>>>>  EnvironmentSettings settings = EnvironmentSettings
>>>> .newInstance()                 //.inStreamingMode()
>>>> .inBatchMode()                 .build();*
>>>> *        final TableEnvironment tEnv =
>>>> TableEnvironment.create(settings);*
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *                 tEnv.executeSql("CREATE TABLE Table1 (column_name1
>>>> STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem',
>>>> 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')");
>>>>                  tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM
>>>> Table1")         .execute()         .print();         *
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *        tEnv.executeSql("CREATE TABLE fs_table ("                 +
>>>> "    column_nameA STRING, "                 + "    column_nameB DOUBLE "
>>>>              + "    ) WITH ( \n"                 + "
>>>>  'connector'='filesystem', "                 + "
>>>>  'path'='file:///C:/temp/test5.txt', "                 + "
>>>>  'format'='csv', "                 + "  'sink.partition-commit.delay'='1
>>>> s', "                 + "
>>>> 'sink.partition-commit.policy.kind'='success-file'"                 + "
>>>>  )");                  tEnv.executeSql("INSERT INTO fs_table SELECT
>>>> column_name1, column_name2 from Table1");
>>>> tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table")
>>>> .execute()         .print();               } }*
>>>>
>>>> Source file (test4.txt) is:
>>>>
>>>> aa; 23
>>>> bb; 657.9
>>>> cc; 55
>>>>
>>>> test5.txt is not created, select from fs_table gives null
>>>>
>>>>
>>>

Reply via email to