Hi Mike, I think that's caused by you not having enabled checkpointing. If you enable that, it should be resolved I think.
Best regards, Martijn On Wed, Aug 3, 2022 at 9:01 PM <pod...@gmx.com> wrote: > > Thank you very much Martijn you dedicated your productive time to help me! > I'm new noob in this subject - I took that example somewhere from > Internet. I see problem for guys like me is that Flink syntax changes from > version to version quite significantly. So here not 'connector.type' but > 'connector' etc. > > Additional problem was that there was no error that something is wrong and > in addition 'select from' in next lines display result from table... > > Anyway, I was expecting single file 'test5.txt' as a result but got file > for each row. > > part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-0-file-0 > part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-1-file-0 > ... > > > Can it be just one file? > Best, > > Mike > > > Sent: Wednesday, August 03, 2022 at 4:03 PM > From: "Martijn Visser" <martijnvis...@apache.org> > To: pod...@gmx.com > Cc: user@flink.apache.org > Subject: Re: Why this example does not save anything to file? > > I've verified your code locally and it doesn't work indeed, at least not > with the latest Flink version (I've tested it with Flink 1.15). There are a > couple of reasons for that: > > 1. You've mentioned in this thread that there's no problem with the > 'csv.field-delimiter'. There is actually, because the default is a , and > not a ; as documented at > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options > 2. When adding this option, Flink wouldn't compile because the SQL > statement uses options that are different then documented at > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/]. > You have connector.type, connector.path and format.type listed. It should > be connector, path and format. > > In the end, I used the following code and the expect result was properly > written: > > tEnv.executeSql( > "CREATE TABLE Table1 (column_name1 STRING, column_name2 > DOUBLE) WITH ('connector' = 'filesystem', 'path' = > 'file:///C:/temp/test4.txt', 'format' = 'csv', 'csv.field-delimiter' = > ';')"); > > Best regards, > > Martijn > > Op di 2 aug. 2022 om 00:42 schreef <pod...@gmx.com[mailto:pod...@gmx.com > ]>: > > No, I do not have it > > > > Sent: Monday, August 01, 2022 at 4:43 PM > From: "Martijn Visser" <martijnvis...@apache.org[mailto: > martijnvis...@apache.org]> > To: pod...@gmx.com[mailto:pod...@gmx.com] > Cc: user@flink.apache.org[mailto:user@flink.apache.org] > Subject: Re: Why this example does not save anything to file? > > That's Flink fault-tolerance mechanism, see > https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/[https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/] > > > Op ma 1 aug. 2022 om 16:37 schreef <pod...@gmx.com[mailto:pod...@gmx.com > ]>: > > What's that? > > > > Sent: Monday, August 01, 2022 at 2:49 PM > From: "Martijn Visser" <martijnvis...@apache.org[mailto: > martijnvis...@apache.org]> > To: pod...@gmx.com[mailto:pod...@gmx.com] > Cc: user@flink.apache.org[mailto:user@flink.apache.org] > Subject: Re: Why this example does not save anything to file? > > Do you have checkpointing enabled? > > > Op za 30 jul. 2022 om 17:31 schreef <pod...@gmx.com[mailto:pod...@gmx.com > ]>: > > Thanks David but there's no problem with that (probably ";" is default > separator). > I can read the file and insert into "Table1" (I said that in my mail). > Problem is to save to CSV. > > > > Sent: Saturday, July 30, 2022 at 3:33 PM > From: "David Anderson" <dander...@apache.org[mailto:dander...@apache.org]> > To: pod...@gmx.com[mailto:pod...@gmx.com] > Cc: "user" <user@flink.apache.org[mailto:user@flink.apache.org]> > Subject: Re: Why this example does not save anything to file? > > You need to add > > 'csv.field-delimiter'=';' > > to the definition of Table1 so that the input from test4.txt can be > correctly parsed: > tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, > column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', > 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = > 'csv', 'csv.field-delimiter'=';')"); > > Cheers, > David > > On Fri, Jul 29, 2022 at 4:15 PM <pod...@gmx.com[mailto:pod...@gmx.com]> > wrote: > > Hi, > > you mean adding: > > " 'csv.field-delimiter'=';', " > > like: > > tEnv.executeSql("CREATE TABLE fs_table (" > + " column_nameA STRING, " > + " column_nameB DOUBLE " > + " ) WITH ( " > + " 'connector'='filesystem', " > + " 'path'='file:///C:/temp/test5.txt', " > + " 'format'='csv', " > + " 'csv.field-delimiter'=';', " > + " 'sink.partition-commit.delay'='1 s', " > + " 'sink.partition-commit.policy.kind'='success-file'" > + " )"); > > tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, > column_name2 from Table1"); > > I did. Nothing new - still does not work. > > > > > Sent: Tuesday, July 26, 2022 at 4:00 PM > From: "Gil De Grove" <gil.degr...@euranova.eu[mailto: > gil.degr...@euranova.eu]> > To: "Weihua Hu" <huweihua....@gmail.com[mailto:huweihua....@gmail.com]> > Cc: pod...@gmx.com[mailto:pod...@gmx.com], "user" <user@flink.apache.org > [mailto:user@flink.apache.org]> > Subject: Re: Why this example does not save anything to file? > > Hello, > > I may be really wrong with this, but from what I get in the source file, > you are using a semi-column to separate the value. > This probably means that you should set the csv.field-delimiter to `;` to > make your example work properly. > > Have you tried with that configuration in your create table csv connector > option? > > Regards, > Gil > > On Tue, 26 Jul 2022 at 15:40, Weihua Hu <huweihua....@gmail.com[mailto: > huweihua....@gmail.com]> wrote: > Hi, > > > Can you see any exception logs? > Where is this code running? is it a standalone cluster with one > TaskManager? > > > > Best, > Weihua > > On Tue, Jul 26, 2022 at 4:18 AM <pod...@gmx.com[mailto:pod...@gmx.com]> > wrote: > > If I get it correctly this is the way how I can save to CSV: > > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example] > > So my code is (read from file, save to file): > > > > package flinkCSV; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.TableEnvironment; > public class flinkCSV { > public static void main(String[] args) throws Exception { > > //register and create table > EnvironmentSettings settings = EnvironmentSettings > .newInstance() > //.inStreamingMode() > .inBatchMode() > .build(); > final TableEnvironment tEnv = TableEnvironment.create(settings); > > tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING, > column_name2 DOUBLE) WITH ('connector.type' = 'filesystem', > 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')"); > > tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM Table1") > .execute() > .print(); > > tEnv.executeSql("CREATE TABLE fs_table (" > + " column_nameA STRING, " > + " column_nameB DOUBLE " > + " ) WITH ( \n" > + " 'connector'='filesystem', " > + " 'path'='file:///C:/temp/test5.txt', " > + " 'format'='csv', " > + " 'sink.partition-commit.delay'='1 s', " > + " 'sink.partition-commit.policy.kind'='success-file'" > + " )"); > > tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, > column_name2 from Table1"); > > tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table") > .execute() > .print(); > > } > } > > Source file (test4.txt) is: > > aa; 23 > bb; 657.9 > cc; 55 > > test5.txt is not created, select from fs_table gives null > >