You need to add 'csv.field-delimiter'=';'
to the definition of Table1 so that the input from test4.txt can be correctly parsed: tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv', 'csv.field-delimiter'=';')"); Cheers, David On Fri, Jul 29, 2022 at 4:15 PM <pod...@gmx.com> wrote: > Hi, > > you mean adding: > > " 'csv.field-delimiter'=';', " > > like: > > tEnv.executeSql("CREATE TABLE fs_table (" > + " column_nameA STRING, " > + " column_nameB DOUBLE " > + " ) WITH ( " > + " 'connector'='filesystem', " > + " 'path'='file:///C:/temp/test5.txt', " > + " 'format'='csv', " > + " 'csv.field-delimiter'=';', " > + " 'sink.partition-commit.delay'='1 s', " > + " 'sink.partition-commit.policy.kind'='success-file'" > + " )"); > > tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, > column_name2 from Table1"); > > I did. Nothing new - still does not work. > > > > *Sent:* Tuesday, July 26, 2022 at 4:00 PM > *From:* "Gil De Grove" <gil.degr...@euranova.eu> > *To:* "Weihua Hu" <huweihua....@gmail.com> > *Cc:* pod...@gmx.com, "user" <user@flink.apache.org> > *Subject:* Re: Why this example does not save anything to file? > Hello, > > I may be really wrong with this, but from what I get in the source file, > you are using a semi-column to separate the value. > This probably means that you should set the csv.field-delimiter to `;` to > make your example work properly. > > Have you tried with that configuration in your create table csv connector > option? > > Regards, > Gil > > On Tue, 26 Jul 2022 at 15:40, Weihua Hu <huweihua....@gmail.com> wrote: > >> Hi, >> >> Can you see any exception logs? >> Where is this code running? is it a standalone cluster with one >> TaskManager? >> >> >> Best, >> Weihua >> >> On Tue, Jul 26, 2022 at 4:18 AM <pod...@gmx.com> wrote: >> >>> If I get it correctly this is the way how I can save to CSV: >>> >>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example >>> >>> So my code is (read from file, save to file): >>> >>> >>> *package flinkCSV;* >>> >>> *import org.apache.flink.table.api.EnvironmentSettings; import >>> org.apache.flink.table.api.TableEnvironment;* >>> *public class flinkCSV {* >>> * public static void main(String[] args) throws Exception {* >>> >>> >>> >>> >>> >>> >>> * //register and create table >>> EnvironmentSettings settings = EnvironmentSettings >>> .newInstance() //.inStreamingMode() >>> .inBatchMode() .build();* >>> * final TableEnvironment tEnv = >>> TableEnvironment.create(settings);* >>> >>> >>> >>> >>> >>> >>> * tEnv.executeSql("CREATE TABLE Table1 (column_name1 >>> STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', >>> 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')"); >>> tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM >>> Table1") .execute() .print(); * >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> * tEnv.executeSql("CREATE TABLE fs_table (" + " >>> column_nameA STRING, " + " column_nameB DOUBLE " >>> + " ) WITH ( \n" + " >>> 'connector'='filesystem', " + " >>> 'path'='file:///C:/temp/test5.txt', " + " >>> 'format'='csv', " + " 'sink.partition-commit.delay'='1 >>> s', " + " >>> 'sink.partition-commit.policy.kind'='success-file'" + " >>> )"); tEnv.executeSql("INSERT INTO fs_table SELECT >>> column_name1, column_name2 from Table1"); >>> tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table") >>> .execute() .print(); } }* >>> >>> Source file (test4.txt) is: >>> >>> aa; 23 >>> bb; 657.9 >>> cc; 55 >>> >>> test5.txt is not created, select from fs_table gives null >>> >>> >>