Thanks Reuven. Do you know how a bundle size is determined (e.g. spark runner)? If we are not specifying shard number, the number of files will be total_size/bundle_size?
From: Reuven Lax <re...@google.com> Reply-To: "user@beam.apache.org" <user@beam.apache.org> Date: Friday, May 21, 2021 at 4:46 PM To: user <user@beam.apache.org> Cc: Lian Jiang <li...@zillowgroup.com> Subject: Re: Why is GroupBy involved in the file save operation? On Fri, May 21, 2021 at 4:35 PM Tao Li <t...@zillow.com<mailto:t...@zillow.com>> wrote: Reuven thanks for your response. GroupBy is not involved if we are not specifying fixed number of files, correct? Correct. And what’s the implication of not specifying the shard number? Is the parallelism determined by the number of spark executors that hold data to save? This is assuming we are using spark runner. Each bundle becomes a file. I'm not entirely sure how the spark runner determines what the bundles should be. What would be the best practice? Specifying the fixed shard number or asking beam to figure it out for us? From: Reuven Lax <re...@google.com<mailto:re...@google.com>> Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" <user@beam.apache.org<mailto:user@beam.apache.org>> Date: Friday, May 21, 2021 at 4:27 PM To: user <user@beam.apache.org<mailto:user@beam.apache.org>> Cc: Lian Jiang <li...@zillowgroup.com<mailto:li...@zillowgroup.com>> Subject: Re: Why is GroupBy involved in the file save operation? What you describe is what happens (at least in the Dataflow runner) if auto sharding is specified in batch. This mechanism tries to split the PColllection to fully utilize every worker, so is not appropriate when a fixed number of shards is desired. A GroupByKey is also necessary in streaming in order to split an unbounded PColllection using windows/triggers, as windows and triggers are applied during GroupByKey. On Fri, May 21, 2021 at 4:16 PM Tao Li <t...@zillow.com<mailto:t...@zillow.com>> wrote: Hi Beam community, I wonder why a GroupBy operation is involved in WriteFiles: https://beam.apache.org/releases/javadoc/2.29.0/org/apache/beam/sdk/io/WriteFiles.html<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.29.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FWriteFiles.html&data=04%7C01%7Ctaol%40zillow.com%7C14cae9f29bdb426c581708d91cb29a90%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637572375685455902%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=iwNT3K0n7Mn5dxv%2BQIYEzYqdWdkyzkqQaYKC24oz6N4%3D&reserved=0> This doc mentioned “ The exact parallelism of the write stage can be controlled using withNumShards(int)<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.29.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FWriteFiles.html%23withNumShards-int-&data=04%7C01%7Ctaol%40zillow.com%7C14cae9f29bdb426c581708d91cb29a90%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637572375685465861%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=sHJYRVHJRbfpcfzw4S0stpuOdpQBKC0Bm3Oo%2F3AHqfM%3D&reserved=0>, typically used to control how many files are produced or to globally limit the number of workers connecting to an external service. However, this option can often hurt performance: it adds an additional GroupByKey<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.29.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Ftransforms%2FGroupByKey.html&data=04%7C01%7Ctaol%40zillow.com%7C14cae9f29bdb426c581708d91cb29a90%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637572375685465861%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=Ns%2FM4qhIkZyZPT%2F9rLXCU1LZZBdJKkJf6jg3IxforLY%3D&reserved=0> to the pipeline.” When we are saving the PCollection into multiple files, why can’t we simply split the PCollection without a key and save each split as a file? Thanks!