No, I do not have it
Sent: Monday, August 01, 2022 at 4:43 PM
From: "Martijn Visser" <martijnvis...@apache.org>
To: pod...@gmx.com
Cc: user@flink.apache.org
Subject: Re: Why this example does not save anything to file?
From: "Martijn Visser" <martijnvis...@apache.org>
To: pod...@gmx.com
Cc: user@flink.apache.org
Subject: Re: Why this example does not save anything to file?
That's Flink fault-tolerance mechanism, see https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/
Op ma 1 aug. 2022 om 16:37 schreef <pod...@gmx.com>:
What's that?Sent: Monday, August 01, 2022 at 2:49 PM
From: "Martijn Visser" <martijnvis...@apache.org>
To: pod...@gmx.com
Cc: user@flink.apache.org
Subject: Re: Why this example does not save anything to file?Do you have checkpointing enabled?Op za 30 jul. 2022 om 17:31 schreef <pod...@gmx.com>:Thanks David but there's no problem with that (probably ";" is default separator).I can read the file and insert into "Table1" (I said that in my mail).Problem is to save to CSV.Sent: Saturday, July 30, 2022 at 3:33 PM
From: "David Anderson" <dander...@apache.org>
To: pod...@gmx.com
Cc: "user" <user@flink.apache.org>
Subject: Re: Why this example does not save anything to file?You need to add'csv.field-delimiter'=';'to the definition of Table1 so that the input from test4.txt can be correctly parsed:tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv', 'csv.field-delimiter'=';')");Cheers,DavidOn Fri, Jul 29, 2022 at 4:15 PM <pod...@gmx.com> wrote:Hi,you mean adding:" 'csv.field-delimiter'=';', "like:tEnv.executeSql("CREATE TABLE fs_table ("
+ " column_nameA STRING, "
+ " column_nameB DOUBLE "
+ " ) WITH ( "
+ " 'connector'='filesystem', "
+ " 'path'='file:///C:/temp/test5.txt', "
+ " 'format'='csv', "
+ " 'csv.field-delimiter'=';', "
+ " 'sink.partition-commit.delay'='1 s', "
+ " 'sink.partition-commit.policy.kind'='success-file'"
+ " )");
tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 from Table1");I did. Nothing new - still does not work.Sent: Tuesday, July 26, 2022 at 4:00 PM
From: "Gil De Grove" <gil.degr...@euranova.eu>
To: "Weihua Hu" <huweihua....@gmail.com>
Cc: pod...@gmx.com, "user" <user@flink.apache.org>
Subject: Re: Why this example does not save anything to file?Hello,
I may be really wrong with this, but from what I get in the source file, you are using a semi-column to separate the value.This probably means that you should set the csv.field-delimiter to `;` to make your example work properly.
Have you tried with that configuration in your create table csv connector option?Regards,GilOn Tue, 26 Jul 2022 at 15:40, Weihua Hu <huweihua....@gmail.com> wrote:Hi,Can you see any exception logs?Where is this code running? is it a standalone cluster with one TaskManager?Best,
WeihuaOn Tue, Jul 26, 2022 at 4:18 AM <pod...@gmx.com> wrote:If I get it correctly this is the way how I can save to CSV:So my code is (read from file, save to file):package flinkCSV;import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;public class flinkCSV {public static void main(String[] args) throws Exception {
//register and create table
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
//.inStreamingMode()
.inBatchMode()
.build();final TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')");
tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM Table1")
.execute()
.print();
tEnv.executeSql("CREATE TABLE fs_table ("
+ " column_nameA STRING, "
+ " column_nameB DOUBLE "
+ " ) WITH ( \n"
+ " 'connector'='filesystem', "
+ " 'path'='file:///C:/temp/test5.txt', "
+ " 'format'='csv', "
+ " 'sink.partition-commit.delay'='1 s', "
+ " 'sink.partition-commit.policy.kind'='success-file'"
+ " )");
tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 from Table1");
tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table")
.execute()
.print();
}
}Source file (test4.txt) is:aa; 23
bb; 657.9
cc; 55test5.txt is not created, select from fs_table gives null