Do you have checkpointing enabled? Op za 30 jul. 2022 om 17:31 schreef <pod...@gmx.com>:
> Thanks David but there's no problem with that (probably ";" is default > separator). > I can read the file and insert into "Table1" (I said that in my mail). > Problem is to save to CSV. > > > *Sent:* Saturday, July 30, 2022 at 3:33 PM > *From:* "David Anderson" <dander...@apache.org> > *To:* pod...@gmx.com > *Cc:* "user" <user@flink.apache.org> > *Subject:* Re: Why this example does not save anything to file? > You need to add > > 'csv.field-delimiter'=';' > > to the definition of Table1 so that the input from test4.txt can be > correctly parsed: > > tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, > column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', > 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv', > 'csv.field-delimiter'=';')"); > > Cheers, > David > > On Fri, Jul 29, 2022 at 4:15 PM <pod...@gmx.com> wrote: > >> Hi, >> >> you mean adding: >> >> " 'csv.field-delimiter'=';', " >> >> like: >> >> tEnv.executeSql("CREATE TABLE fs_table (" >> + " column_nameA STRING, " >> + " column_nameB DOUBLE " >> + " ) WITH ( " >> + " 'connector'='filesystem', " >> + " 'path'='file:///C:/temp/test5.txt', " >> + " 'format'='csv', " >> + " 'csv.field-delimiter'=';', " >> + " 'sink.partition-commit.delay'='1 s', " >> + " 'sink.partition-commit.policy.kind'='success-file'" >> + " )"); >> >> tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, >> column_name2 from Table1"); >> >> I did. Nothing new - still does not work. >> >> >> >> *Sent:* Tuesday, July 26, 2022 at 4:00 PM >> *From:* "Gil De Grove" <gil.degr...@euranova.eu> >> *To:* "Weihua Hu" <huweihua....@gmail.com> >> *Cc:* pod...@gmx.com, "user" <user@flink.apache.org> >> *Subject:* Re: Why this example does not save anything to file? >> Hello, >> >> I may be really wrong with this, but from what I get in the source file, >> you are using a semi-column to separate the value. >> This probably means that you should set the csv.field-delimiter to `;` to >> make your example work properly. >> >> Have you tried with that configuration in your create table csv connector >> option? >> >> Regards, >> Gil >> >> On Tue, 26 Jul 2022 at 15:40, Weihua Hu <huweihua....@gmail.com> wrote: >> >>> Hi, >>> >>> Can you see any exception logs? >>> Where is this code running? is it a standalone cluster with one >>> TaskManager? >>> >>> >>> Best, >>> Weihua >>> >>> On Tue, Jul 26, 2022 at 4:18 AM <pod...@gmx.com> wrote: >>> >>>> If I get it correctly this is the way how I can save to CSV: >>>> >>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example >>>> >>>> So my code is (read from file, save to file): >>>> >>>> >>>> *package flinkCSV;* >>>> >>>> *import org.apache.flink.table.api.EnvironmentSettings; import >>>> org.apache.flink.table.api.TableEnvironment;* >>>> *public class flinkCSV {* >>>> * public static void main(String[] args) throws Exception {* >>>> >>>> >>>> >>>> >>>> >>>> >>>> * //register and create table >>>> EnvironmentSettings settings = EnvironmentSettings >>>> .newInstance() //.inStreamingMode() >>>> .inBatchMode() .build();* >>>> * final TableEnvironment tEnv = >>>> TableEnvironment.create(settings);* >>>> >>>> >>>> >>>> >>>> >>>> >>>> * tEnv.executeSql("CREATE TABLE Table1 (column_name1 >>>> STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', >>>> 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')"); >>>> tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM >>>> Table1") .execute() .print(); * >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> * tEnv.executeSql("CREATE TABLE fs_table (" + >>>> " column_nameA STRING, " + " column_nameB DOUBLE " >>>> + " ) WITH ( \n" + " >>>> 'connector'='filesystem', " + " >>>> 'path'='file:///C:/temp/test5.txt', " + " >>>> 'format'='csv', " + " 'sink.partition-commit.delay'='1 >>>> s', " + " >>>> 'sink.partition-commit.policy.kind'='success-file'" + " >>>> )"); tEnv.executeSql("INSERT INTO fs_table SELECT >>>> column_name1, column_name2 from Table1"); >>>> tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table") >>>> .execute() .print(); } }* >>>> >>>> Source file (test4.txt) is: >>>> >>>> aa; 23 >>>> bb; 657.9 >>>> cc; 55 >>>> >>>> test5.txt is not created, select from fs_table gives null >>>> >>>> >>>