Hi, I have a PCollection of objects that I want to write to disk. I want to shard the objects; each shard in the output should contain roughly 80 MB of data.
I have written the code to write the objects to disk, but I cannot find an efficient way to control shard size. I have tried some options: 1. Call a ParDo directly on the PCollection and 'batch' by using start_bundle() and finish_bundle(). This does not work well because you cannot control the size of each bundle. 2. I tried using the BatchElements <https://beam.apache.org/documentation/patterns/batch-elements> primitive, but it seems that it only supports 'batch across bundles' in the streaming setting, using `max_batch_duration_secs`. This also transforms the PCollection into a windowed PCollection, which does not work well with downstream operations. 3. Using GroupIntoBatches or something similar is also not doable, because you can only specify the number of elements, not the batch size in megabytes. And since this required an int input, not a PValue, I cannot precompute and pass an 'average batch size'. 4. WriteToFiles and FileBasedSink are not the right abstractions because they don't expose any way to control shard size. Also it might not work well with my actual data format [0] So what is the right way to do this? I could do something like this: 1. Compute the average bytes per element and determine the total number of shards 2. Assign key random.randint(0, num_shards) to each element 3. Group by key 4. Create a shard from the list[elements] in input However this involves writing a lot of custom logic, a GroupByKey (which effectively doubles the data we pass around and isn't particularly efficient), etc. Is there a better way to achieve this that I am missing? I would appreciate any help. Thank you! [0] My data format is a MosaicML shard. You write like this: with MDSWrite(...) as writer: for record in records: writer.write(record) The output contains 1 or more `.mds` binary files and one `index.json` file which gives you information about the size and location of each shard. I don't think this plays well with the `FileBasedSink`, does it? Best, Antonio
