Doesn't it depends on 'sink.parallelism'?
If I set 'sink.parallelism' = '2' I get two files, 'sink.parallelism' = '1' 
just one file...
 
But I think doing like that I reduce the number of tasks so it will have 
negative impact on performance :-(
 
 

Sent: Tuesday, August 30, 2022 at 3:22 PM
From: "Martijn Visser" <mart...@immerok.com>
To: pod...@gmx.com, user@flink.apache.org
Subject: Re: Why this example does not save anything to file?

Hi Mike,
 
I think that's caused by you not having enabled checkpointing. If you enable 
that, it should be resolved I think.
 
Best regards,
 
Martijn 

On Wed, Aug 3, 2022 at 9:01 PM <pod...@gmx.com[mailto:pod...@gmx.com]> wrote:
Thank you very much Martijn you dedicated your productive time to help me!
I'm new noob in this subject - I took that example somewhere from Internet. I 
see problem for guys like me is that Flink syntax changes from version to 
version quite significantly. So here not 'connector.type' but 'connector' etc.
 
Additional problem was that there was no error that something is wrong and in 
addition 'select from' in next lines display result from table...
 
Anyway, I was expecting single file 'test5.txt' as a result but got file for 
each row.
 
part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-0-file-0
part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-1-file-0
...
 

Can it be just one file?
Best,
 
Mike
 

Sent: Wednesday, August 03, 2022 at 4:03 PM
From: "Martijn Visser" 
<martijnvis...@apache.org[mailto:martijnvis...@apache.org]>
To: pod...@gmx.com[mailto:pod...@gmx.com]
Cc: user@flink.apache.org[mailto:user@flink.apache.org]
Subject: Re: Why this example does not save anything to file?

I've verified your code locally and it doesn't work indeed, at least not with 
the latest Flink version (I've tested it with Flink 1.15). There are a couple 
of reasons for that:
 
1. You've mentioned in this thread that there's no problem with the 
'csv.field-delimiter'. There is actually, because the default is a , and not a 
; as documented at 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options]
2. When adding this option, Flink wouldn't compile because the SQL statement 
uses options that are different then documented at 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/][https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5Bhttps://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5D].
 You have connector.type, connector.path and format.type listed. It should be 
connector, path and format. 
 
In the end, I used the following code and the expect result was properly 
written:

        tEnv.executeSql(
                "CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) 
WITH ('connector' = 'filesystem', 'path' = 'file:///C:/temp/test4.txt', 
'format' = 'csv', 'csv.field-delimiter' = ';')");
 
Best regards,
 
Martijn 

Op di 2 aug. 2022 om 00:42 schreef 
<pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]>:

No, I do not have it
 
 

Sent: Monday, August 01, 2022 at 4:43 PM
From: "Martijn Visser" 
<martijnvis...@apache.org[mailto:martijnvis...@apache.org][mailto:martijnvis...@apache.org[mailto:martijnvis...@apache.org]]>
To: 
pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]
Cc: 
user@flink.apache.org[mailto:user@flink.apache.org][mailto:user@flink.apache.org[mailto:user@flink.apache.org]]
Subject: Re: Why this example does not save anything to file?

That's Flink fault-tolerance mechanism, see 
https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/[https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/][https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/%5Bhttps://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/%5D]
 

Op ma 1 aug. 2022 om 16:37 schreef 
<pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]>:

What's that?
 
 

Sent: Monday, August 01, 2022 at 2:49 PM
From: "Martijn Visser" 
<martijnvis...@apache.org[mailto:martijnvis...@apache.org][mailto:martijnvis...@apache.org[mailto:martijnvis...@apache.org]]>
To: 
pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]
Cc: 
user@flink.apache.org[mailto:user@flink.apache.org][mailto:user@flink.apache.org[mailto:user@flink.apache.org]]
Subject: Re: Why this example does not save anything to file?

Do you have checkpointing enabled? 
 

Op za 30 jul. 2022 om 17:31 schreef 
<pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]>:

Thanks David but there's no problem with that (probably ";" is default 
separator).
I can read the file and insert into "Table1" (I said that in my mail).
Problem is to save to CSV.
 
 

Sent: Saturday, July 30, 2022 at 3:33 PM
From: "David Anderson" 
<dander...@apache.org[mailto:dander...@apache.org][mailto:dander...@apache.org[mailto:dander...@apache.org]]>
To: 
pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]
Cc: "user" 
<user@flink.apache.org[mailto:user@flink.apache.org][mailto:user@flink.apache.org[mailto:user@flink.apache.org]]>
Subject: Re: Why this example does not save anything to file?

You need to add
 
'csv.field-delimiter'=';'
 
to the definition of Table1 so that the input from test4.txt can be correctly 
parsed:
         tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, 
column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 
'file:///C:/temp/test4.txt', 'format.type' = 'csv', 
'csv.field-delimiter'=';')");
 
Cheers,
David 

On Fri, Jul 29, 2022 at 4:15 PM 
<pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]>
 wrote:

Hi,
 
you mean adding:
 
" 'csv.field-delimiter'=';', "
 
like:
 
        tEnv.executeSql("CREATE TABLE fs_table ("
                + "    column_nameA STRING, "
                + "    column_nameB DOUBLE "
                + "    ) WITH ( "
                + "    'connector'='filesystem', "
                + "    'path'='file:///C:/temp/test5.txt', "
                + "    'format'='csv', "
                + " 'csv.field-delimiter'=';', "
                + " 'sink.partition-commit.delay'='1 s', "
                + " 'sink.partition-commit.policy.kind'='success-file'"
                + "    )");
        
        tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 
from Table1");
 
I did. Nothing new - still does not work.
 
 
 

Sent: Tuesday, July 26, 2022 at 4:00 PM
From: "Gil De Grove" 
<gil.degr...@euranova.eu[mailto:gil.degr...@euranova.eu][mailto:gil.degr...@euranova.eu[mailto:gil.degr...@euranova.eu]]>
To: "Weihua Hu" 
<huweihua....@gmail.com[mailto:huweihua....@gmail.com][mailto:huweihua....@gmail.com[mailto:huweihua....@gmail.com]]>
Cc: 
pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]],
 "user" 
<user@flink.apache.org[mailto:user@flink.apache.org][mailto:user@flink.apache.org[mailto:user@flink.apache.org]]>
Subject: Re: Why this example does not save anything to file?

Hello,
 
I may be really wrong with this, but from what I get in the source file, you 
are using a semi-column to separate the value.
This probably means that you should set the csv.field-delimiter to `;` to make 
your example work properly.
 
Have you tried with that configuration in your create table csv connector 
option?
 
Regards,
Gil 

On Tue, 26 Jul 2022 at 15:40, Weihua Hu 
<huweihua....@gmail.com[mailto:huweihua....@gmail.com][mailto:huweihua....@gmail.com[mailto:huweihua....@gmail.com]]>
 wrote:
Hi,

 
Can you see any exception logs? 
Where is this code running? is it a standalone cluster with one TaskManager?
 
 

Best,
Weihua 

On Tue, Jul 26, 2022 at 4:18 AM 
<pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]>
 wrote:

If I get it correctly this is the way how I can save to CSV:
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example][https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%23full-example]]
 
So my code is (read from file, save to file):
 
 

package flinkCSV;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
public class flinkCSV {
    public static void main(String[] args) throws Exception {
        
        //register and create table
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                //.inStreamingMode()
                .inBatchMode()
                .build();
        final TableEnvironment tEnv = TableEnvironment.create(settings);
        
        tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, column_name2 
DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 
'file:///C:/temp/test4.txt', 'format.type' = 'csv')");
        
        tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM Table1")
        .execute()
        .print();
        
        tEnv.executeSql("CREATE TABLE fs_table ("
                + "    column_nameA STRING, "
                + "    column_nameB DOUBLE "
                + "    ) WITH ( \n"
                + "    'connector'='filesystem', "
                + "    'path'='file:///C:/temp/test5.txt', "
                + "    'format'='csv', "
                + "  'sink.partition-commit.delay'='1 s', "
                + "  'sink.partition-commit.policy.kind'='success-file'"
                + "    )");
        
        tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 
from Table1");
        
        tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table")
        .execute()
        .print();
        
     }
}
 
Source file (test4.txt) is:
 
aa; 23
bb; 657.9
cc; 55
 
test5.txt is not created, select from fs_table gives null
 

Reply via email to