If I get it correctly this is the way how I can save to CSV:
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example
So my code is (read from file, save to file):
package flinkCSV;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableEnvironment;
public class flinkCSV {
public static void main(String[] args) throws Exception {
//register and create table
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
//.inStreamingMode()
.inBatchMode()
.build();
final TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')");
tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM Table1")
.execute()
.print();
tEnv.executeSql("CREATE TABLE fs_table ("
+ " column_nameA STRING, "
+ " column_nameB DOUBLE "
+ " ) WITH ( \n"
+ " 'connector'='filesystem', "
+ " 'path'='file:///C:/temp/test5.txt', "
+ " 'format'='csv', "
+ " 'sink.partition-commit.delay'='1 s', "
+ " 'sink.partition-commit.policy.kind'='success-file'"
+ " )");
tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 from Table1");
tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table")
.execute()
.print();
}
}
+ " column_nameA STRING, "
+ " column_nameB DOUBLE "
+ " ) WITH ( \n"
+ " 'connector'='filesystem', "
+ " 'path'='file:///C:/temp/test5.txt', "
+ " 'format'='csv', "
+ " 'sink.partition-commit.delay'='1 s', "
+ " 'sink.partition-commit.policy.kind'='success-file'"
+ " )");
tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 from Table1");
tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table")
.execute()
.print();
}
}
Source file (test4.txt) is:
aa; 23
bb; 657.9
cc; 55
bb; 657.9
cc; 55
test5.txt is not created, select from fs_table gives null