Hello, I may be really wrong with this, but from what I get in the source file, you are using a semi-column to separate the value. This probably means that you should set the csv.field-delimiter to `;` to make your example work properly.
Have you tried with that configuration in your create table csv connector option? Regards, Gil On Tue, 26 Jul 2022 at 15:40, Weihua Hu <huweihua....@gmail.com> wrote: > Hi, > > Can you see any exception logs? > Where is this code running? is it a standalone cluster with one > TaskManager? > > > Best, > Weihua > > > On Tue, Jul 26, 2022 at 4:18 AM <pod...@gmx.com> wrote: > >> If I get it correctly this is the way how I can save to CSV: >> >> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example >> >> So my code is (read from file, save to file): >> >> >> *package flinkCSV;* >> >> *import org.apache.flink.table.api.EnvironmentSettings; import >> org.apache.flink.table.api.TableEnvironment;* >> *public class flinkCSV {* >> * public static void main(String[] args) throws Exception {* >> >> >> >> >> >> >> * //register and create table EnvironmentSettings >> settings = EnvironmentSettings .newInstance() >> //.inStreamingMode() .inBatchMode() >> .build();* >> * final TableEnvironment tEnv = TableEnvironment.create(settings);* >> >> >> >> >> >> >> * tEnv.executeSql("CREATE TABLE Table1 (column_name1 >> STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', >> 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')"); >> tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM >> Table1") .execute() .print(); * >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> * tEnv.executeSql("CREATE TABLE fs_table (" + " >> column_nameA STRING, " + " column_nameB DOUBLE " >> + " ) WITH ( \n" + " >> 'connector'='filesystem', " + " >> 'path'='file:///C:/temp/test5.txt', " + " >> 'format'='csv', " + " 'sink.partition-commit.delay'='1 >> s', " + " >> 'sink.partition-commit.policy.kind'='success-file'" + " >> )"); tEnv.executeSql("INSERT INTO fs_table SELECT >> column_name1, column_name2 from Table1"); >> tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table") >> .execute() .print(); } }* >> >> Source file (test4.txt) is: >> >> aa; 23 >> bb; 657.9 >> cc; 55 >> >> test5.txt is not created, select from fs_table gives null >> >> >