Doesn't it depends on 'sink.parallelism'? If I set 'sink.parallelism' = '2' I get two files, 'sink.parallelism' = '1' just one file... But I think doing like that I reduce the number of tasks so it will have negative impact on performance :-(
Sent: Tuesday, August 30, 2022 at 3:22 PM From: "Martijn Visser" <mart...@immerok.com> To: pod...@gmx.com, user@flink.apache.org Subject: Re: Why this example does not save anything to file? Hi Mike, I think that's caused by you not having enabled checkpointing. If you enable that, it should be resolved I think. Best regards, Martijn On Wed, Aug 3, 2022 at 9:01 PM <pod...@gmx.com[mailto:pod...@gmx.com]> wrote: Thank you very much Martijn you dedicated your productive time to help me! I'm new noob in this subject - I took that example somewhere from Internet. I see problem for guys like me is that Flink syntax changes from version to version quite significantly. So here not 'connector.type' but 'connector' etc. Additional problem was that there was no error that something is wrong and in addition 'select from' in next lines display result from table... Anyway, I was expecting single file 'test5.txt' as a result but got file for each row. part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-0-file-0 part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-1-file-0 ... Can it be just one file? Best, Mike Sent: Wednesday, August 03, 2022 at 4:03 PM From: "Martijn Visser" <martijnvis...@apache.org[mailto:martijnvis...@apache.org]> To: pod...@gmx.com[mailto:pod...@gmx.com] Cc: user@flink.apache.org[mailto:user@flink.apache.org] Subject: Re: Why this example does not save anything to file? I've verified your code locally and it doesn't work indeed, at least not with the latest Flink version (I've tested it with Flink 1.15). There are a couple of reasons for that: 1. You've mentioned in this thread that there's no problem with the 'csv.field-delimiter'. There is actually, because the default is a , and not a ; as documented at https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options] 2. When adding this option, Flink wouldn't compile because the SQL statement uses options that are different then documented at https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/][https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5Bhttps://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5D]. You have connector.type, connector.path and format.type listed. It should be connector, path and format. In the end, I used the following code and the expect result was properly written: tEnv.executeSql( "CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector' = 'filesystem', 'path' = 'file:///C:/temp/test4.txt', 'format' = 'csv', 'csv.field-delimiter' = ';')"); Best regards, Martijn Op di 2 aug. 2022 om 00:42 schreef <pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]>: No, I do not have it Sent: Monday, August 01, 2022 at 4:43 PM From: "Martijn Visser" <martijnvis...@apache.org[mailto:martijnvis...@apache.org][mailto:martijnvis...@apache.org[mailto:martijnvis...@apache.org]]> To: pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]] Cc: user@flink.apache.org[mailto:user@flink.apache.org][mailto:user@flink.apache.org[mailto:user@flink.apache.org]] Subject: Re: Why this example does not save anything to file? That's Flink fault-tolerance mechanism, see https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/[https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/][https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/%5Bhttps://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/%5D] Op ma 1 aug. 2022 om 16:37 schreef <pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]>: What's that? Sent: Monday, August 01, 2022 at 2:49 PM From: "Martijn Visser" <martijnvis...@apache.org[mailto:martijnvis...@apache.org][mailto:martijnvis...@apache.org[mailto:martijnvis...@apache.org]]> To: pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]] Cc: user@flink.apache.org[mailto:user@flink.apache.org][mailto:user@flink.apache.org[mailto:user@flink.apache.org]] Subject: Re: Why this example does not save anything to file? Do you have checkpointing enabled? Op za 30 jul. 2022 om 17:31 schreef <pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]>: Thanks David but there's no problem with that (probably ";" is default separator). I can read the file and insert into "Table1" (I said that in my mail). Problem is to save to CSV. Sent: Saturday, July 30, 2022 at 3:33 PM From: "David Anderson" <dander...@apache.org[mailto:dander...@apache.org][mailto:dander...@apache.org[mailto:dander...@apache.org]]> To: pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]] Cc: "user" <user@flink.apache.org[mailto:user@flink.apache.org][mailto:user@flink.apache.org[mailto:user@flink.apache.org]]> Subject: Re: Why this example does not save anything to file? You need to add 'csv.field-delimiter'=';' to the definition of Table1 so that the input from test4.txt can be correctly parsed: tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv', 'csv.field-delimiter'=';')"); Cheers, David On Fri, Jul 29, 2022 at 4:15 PM <pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]> wrote: Hi, you mean adding: " 'csv.field-delimiter'=';', " like: tEnv.executeSql("CREATE TABLE fs_table (" + " column_nameA STRING, " + " column_nameB DOUBLE " + " ) WITH ( " + " 'connector'='filesystem', " + " 'path'='file:///C:/temp/test5.txt', " + " 'format'='csv', " + " 'csv.field-delimiter'=';', " + " 'sink.partition-commit.delay'='1 s', " + " 'sink.partition-commit.policy.kind'='success-file'" + " )"); tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 from Table1"); I did. Nothing new - still does not work. Sent: Tuesday, July 26, 2022 at 4:00 PM From: "Gil De Grove" <gil.degr...@euranova.eu[mailto:gil.degr...@euranova.eu][mailto:gil.degr...@euranova.eu[mailto:gil.degr...@euranova.eu]]> To: "Weihua Hu" <huweihua....@gmail.com[mailto:huweihua....@gmail.com][mailto:huweihua....@gmail.com[mailto:huweihua....@gmail.com]]> Cc: pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]], "user" <user@flink.apache.org[mailto:user@flink.apache.org][mailto:user@flink.apache.org[mailto:user@flink.apache.org]]> Subject: Re: Why this example does not save anything to file? Hello, I may be really wrong with this, but from what I get in the source file, you are using a semi-column to separate the value. This probably means that you should set the csv.field-delimiter to `;` to make your example work properly. Have you tried with that configuration in your create table csv connector option? Regards, Gil On Tue, 26 Jul 2022 at 15:40, Weihua Hu <huweihua....@gmail.com[mailto:huweihua....@gmail.com][mailto:huweihua....@gmail.com[mailto:huweihua....@gmail.com]]> wrote: Hi, Can you see any exception logs? Where is this code running? is it a standalone cluster with one TaskManager? Best, Weihua On Tue, Jul 26, 2022 at 4:18 AM <pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]> wrote: If I get it correctly this is the way how I can save to CSV: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example][https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%23full-example]] So my code is (read from file, save to file): package flinkCSV; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; public class flinkCSV { public static void main(String[] args) throws Exception { //register and create table EnvironmentSettings settings = EnvironmentSettings .newInstance() //.inStreamingMode() .inBatchMode() .build(); final TableEnvironment tEnv = TableEnvironment.create(settings); tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')"); tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM Table1") .execute() .print(); tEnv.executeSql("CREATE TABLE fs_table (" + " column_nameA STRING, " + " column_nameB DOUBLE " + " ) WITH ( \n" + " 'connector'='filesystem', " + " 'path'='file:///C:/temp/test5.txt', " + " 'format'='csv', " + " 'sink.partition-commit.delay'='1 s', " + " 'sink.partition-commit.policy.kind'='success-file'" + " )"); tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 from Table1"); tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table") .execute() .print(); } } Source file (test4.txt) is: aa; 23 bb; 657.9 cc; 55 test5.txt is not created, select from fs_table gives null