With the parallelism set to 2, you will get 2 files. More than 2, actually
-- e.g., with hourly buckets, you'll get 2 files per hour.

If the i/o bandwidth of a single instance is sufficient to handle your
expected throughput, then you can set the parallelism of the sink (or the
entire pipeline) to one. If it's not, then you'll be glad to have multiple
instances each handling a slice of the job and writing to independent
filesystems.

David

On Wed, Aug 31, 2022 at 8:44 AM <pod...@gmx.com> wrote:

>
> Doesn't it depends on 'sink.parallelism'?
> If I set 'sink.parallelism' = '2' I get two files, 'sink.parallelism' =
> '1' just one file...
>
> But I think doing like that I reduce the number of tasks so it will have
> negative impact on performance :-(
>
>
>
> Sent: Tuesday, August 30, 2022 at 3:22 PM
> From: "Martijn Visser" <mart...@immerok.com>
> To: pod...@gmx.com, user@flink.apache.org
> Subject: Re: Why this example does not save anything to file?
>
> Hi Mike,
>
> I think that's caused by you not having enabled checkpointing. If you
> enable that, it should be resolved I think.
>
> Best regards,
>
> Martijn
>
> On Wed, Aug 3, 2022 at 9:01 PM <pod...@gmx.com[mailto:pod...@gmx.com]>
> wrote:
> Thank you very much Martijn you dedicated your productive time to help me!
> I'm new noob in this subject - I took that example somewhere from
> Internet. I see problem for guys like me is that Flink syntax changes from
> version to version quite significantly. So here not 'connector.type' but
> 'connector' etc.
>
> Additional problem was that there was no error that something is wrong and
> in addition 'select from' in next lines display result from table...
>
> Anyway, I was expecting single file 'test5.txt' as a result but got file
> for each row.
>
> part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-0-file-0
> part-6624d8af-6638-444e-b53a-bca9d5aa175e-task-1-file-0
> ...
>
>
> Can it be just one file?
> Best,
>
> Mike
>
>
> Sent: Wednesday, August 03, 2022 at 4:03 PM
> From: "Martijn Visser" <martijnvis...@apache.org[mailto:
> martijnvis...@apache.org]>
> To: pod...@gmx.com[mailto:pod...@gmx.com]
> Cc: user@flink.apache.org[mailto:user@flink.apache.org]
> Subject: Re: Why this example does not save anything to file?
>
> I've verified your code locally and it doesn't work indeed, at least not
> with the latest Flink version (I've tested it with Flink 1.15). There are a
> couple of reasons for that:
>
> 1. You've mentioned in this thread that there's no problem with the
> 'csv.field-delimiter'. There is actually, because the default is a , and
> not a ; as documented at
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/#format-options]
> 2. When adding this option, Flink wouldn't compile because the SQL
> statement uses options that are different then documented at
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/][https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5Bhttps://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5D]
> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5Bhttps://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5D%5Bhttps://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5Bhttps://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%5D%5D>.
> You have connector.type, connector.path and format.type listed. It should
> be connector, path and format.
>
> In the end, I used the following code and the expect result was properly
> written:
>
>         tEnv.executeSql(
>                 "CREATE TABLE Table1 (column_name1 STRING, column_name2
> DOUBLE) WITH ('connector' = 'filesystem', 'path' =
> 'file:///C:/temp/test4.txt', 'format' = 'csv', 'csv.field-delimiter' =
> ';')");
>
> Best regards,
>
> Martijn
>
> Op di 2 aug. 2022 om 00:42 schreef <pod...@gmx.com[mailto:pod...@gmx.com
> ][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]>:
>
> No, I do not have it
>
>
>
> Sent: Monday, August 01, 2022 at 4:43 PM
> From: "Martijn Visser" <martijnvis...@apache.org[mailto:
> martijnvis...@apache.org][mailto:martijnvis...@apache.org[mailto:
> martijnvis...@apache.org]]>
> To: pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:
> pod...@gmx.com]]
> Cc: user@flink.apache.org[mailto:user@flink.apache.org][mailto:
> user@flink.apache.org[mailto:user@flink.apache.org]]
> Subject: Re: Why this example does not save anything to file?
>
> That's Flink fault-tolerance mechanism, see
> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/[https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/][https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/%5Bhttps://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/%5D]
> <https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/%5Bhttps://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/%5D%5Bhttps://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/%5Bhttps://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/%5D%5D>
>
>
> Op ma 1 aug. 2022 om 16:37 schreef <pod...@gmx.com[mailto:pod...@gmx.com
> ][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]>:
>
> What's that?
>
>
>
> Sent: Monday, August 01, 2022 at 2:49 PM
> From: "Martijn Visser" <martijnvis...@apache.org[mailto:
> martijnvis...@apache.org][mailto:martijnvis...@apache.org[mailto:
> martijnvis...@apache.org]]>
> To: pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:
> pod...@gmx.com]]
> Cc: user@flink.apache.org[mailto:user@flink.apache.org][mailto:
> user@flink.apache.org[mailto:user@flink.apache.org]]
> Subject: Re: Why this example does not save anything to file?
>
> Do you have checkpointing enabled?
>
>
> Op za 30 jul. 2022 om 17:31 schreef <pod...@gmx.com[mailto:pod...@gmx.com
> ][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]>:
>
> Thanks David but there's no problem with that (probably ";" is default
> separator).
> I can read the file and insert into "Table1" (I said that in my mail).
> Problem is to save to CSV.
>
>
>
> Sent: Saturday, July 30, 2022 at 3:33 PM
> From: "David Anderson" <dander...@apache.org[mailto:dander...@apache.org
> ][mailto:dander...@apache.org[mailto:dander...@apache.org]]>
> To: pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:
> pod...@gmx.com]]
> Cc: "user" <user@flink.apache.org[mailto:user@flink.apache.org][mailto:
> user@flink.apache.org[mailto:user@flink.apache.org]]>
> Subject: Re: Why this example does not save anything to file?
>
> You need to add
>
> 'csv.field-delimiter'=';'
>
> to the definition of Table1 so that the input from test4.txt can be
> correctly parsed:
>          tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING,
> column_name2 DOUBLE) WITH ('connector.type' = 'filesystem',
> 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' =
> 'csv', 'csv.field-delimiter'=';')");
>
> Cheers,
> David
>
> On Fri, Jul 29, 2022 at 4:15 PM <pod...@gmx.com[mailto:pod...@gmx.com
> ][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]> wrote:
>
> Hi,
>
> you mean adding:
>
> " 'csv.field-delimiter'=';', "
>
> like:
>
>         tEnv.executeSql("CREATE TABLE fs_table ("
>                 + "    column_nameA STRING, "
>                 + "    column_nameB DOUBLE "
>                 + "    ) WITH ( "
>                 + "    'connector'='filesystem', "
>                 + "    'path'='file:///C:/temp/test5.txt', "
>                 + "    'format'='csv', "
>                 + " 'csv.field-delimiter'=';', "
>                 + " 'sink.partition-commit.delay'='1 s', "
>                 + " 'sink.partition-commit.policy.kind'='success-file'"
>                 + "    )");
>
>         tEnv.executeSql("INSERT INTO fs_table SELECT column_name1,
> column_name2 from Table1");
>
> I did. Nothing new - still does not work.
>
>
>
>
> Sent: Tuesday, July 26, 2022 at 4:00 PM
> From: "Gil De Grove" <gil.degr...@euranova.eu[mailto:
> gil.degr...@euranova.eu][mailto:gil.degr...@euranova.eu[mailto:
> gil.degr...@euranova.eu]]>
> To: "Weihua Hu" <huweihua....@gmail.com[mailto:huweihua....@gmail.com
> ][mailto:huweihua....@gmail.com[mailto:huweihua....@gmail.com]]>
> Cc: pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:
> pod...@gmx.com]], "user" <user@flink.apache.org[mailto:
> user@flink.apache.org][mailto:user@flink.apache.org[mailto:
> user@flink.apache.org]]>
> Subject: Re: Why this example does not save anything to file?
>
> Hello,
>
> I may be really wrong with this, but from what I get in the source file,
> you are using a semi-column to separate the value.
> This probably means that you should set the csv.field-delimiter to `;` to
> make your example work properly.
>
> Have you tried with that configuration in your create table csv connector
> option?
>
> Regards,
> Gil
>
> On Tue, 26 Jul 2022 at 15:40, Weihua Hu <huweihua....@gmail.com[mailto:
> huweihua....@gmail.com][mailto:huweihua....@gmail.com[mailto:
> huweihua....@gmail.com]]> wrote:
> Hi,
>
>
> Can you see any exception logs?
> Where is this code running? is it a standalone cluster with one
> TaskManager?
>
>
>
> Best,
> Weihua
>
> On Tue, Jul 26, 2022 at 4:18 AM <pod...@gmx.com[mailto:pod...@gmx.com
> ][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]> wrote:
>
> If I get it correctly this is the way how I can save to CSV:
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example][https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%23full-example]]
> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%23full-example][https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%23full-example[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/%23full-example]]>
>
> So my code is (read from file, save to file):
>
>
>
> package flinkCSV;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.TableEnvironment;
> public class flinkCSV {
>     public static void main(String[] args) throws Exception {
>
>         //register and create table
>         EnvironmentSettings settings = EnvironmentSettings
>                 .newInstance()
>                 //.inStreamingMode()
>                 .inBatchMode()
>                 .build();
>         final TableEnvironment tEnv = TableEnvironment.create(settings);
>
>         tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING,
> column_name2 DOUBLE) WITH ('connector.type' = 'filesystem',
> 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')");
>
>         tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM Table1")
>         .execute()
>         .print();
>
>         tEnv.executeSql("CREATE TABLE fs_table ("
>                 + "    column_nameA STRING, "
>                 + "    column_nameB DOUBLE "
>                 + "    ) WITH ( \n"
>                 + "    'connector'='filesystem', "
>                 + "    'path'='file:///C:/temp/test5.txt', "
>                 + "    'format'='csv', "
>                 + "  'sink.partition-commit.delay'='1 s', "
>                 + "  'sink.partition-commit.policy.kind'='success-file'"
>                 + "    )");
>
>         tEnv.executeSql("INSERT INTO fs_table SELECT column_name1,
> column_name2 from Table1");
>
>         tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table")
>         .execute()
>         .print();
>
>      }
> }
>
> Source file (test4.txt) is:
>
> aa; 23
> bb; 657.9
> cc; 55
>
> test5.txt is not created, select from fs_table gives null
>
>

Reply via email to