Thanks Luke for the info regarding GCS compose. Do you know if the GCS FileIO implementation supports the "compose" api that you mentioned?
Thanks, Evan On Fri, Jul 16, 2021 at 12:17 PM Luke Cwik <lc...@google.com> wrote: > > > On Fri, Jul 16, 2021 at 6:53 AM Evan Galpin <evan.gal...@gmail.com> wrote: > >> Hi there, >> >> I imagine the answer to this question might depend on the underlying >> runner, but simply put: can I write files, temporarily, to disk? I'm >> currently using the DataflowRunner if that's a helpful detail. >> > > For all runners that use containers (e.g. Dataflow), your limited to the > amount of free space in the container typically (maybe a few GBs). Dataflow > does also mount I believe /tmp as a persistent directory from the VM (only > valid for the lifetime of the VM) and then you're limited to the amount of > disk space on the VM itself. > > >> Relatedly, how does Beam handle large files? Say that my pipeline reads >> files from a distributed file system like AWS S3 or GCP Cloud Storage. If a >> file is 10 GB and I read its contents, those contents will be held in >> memory, correct? >> > > Beam processes ranges/partitions/... of sources at a time. Many file > formats are actually split at byte offsets and each byte range is processed > independently and in parallel. As these records are processed they are > typically output to something like a sink or a GroupByKey so the only > memory necessary for processing is the working set size for all the > elements currently being processed. Note that depending on the runners > implementation of GroupByKey, some runners may do the grouping operation in > memory and hence you might be limited by the amount of memory some set of > workers have. Dataflow does its grouping operation in a separate service > which is backed by durable storage so the size of a GroupByKey isn't > limited by the VMs available memory. > > To be concrete, if your files each contain one record which is 10gbs in > size then yes you will need 10gbs+ of memory since that is the working set > size but if your files contain 10k records and each record is about 1kb > then your working set size is much smaller. > > As a somewhat contrived example, what would be the recommended approach if >> I wanted to read a set of large files, tar them, and upload them elsewhere? >> > > Create.of(listOfFileNames) -> ParDo(TarFiles) > > In your TarFiles DoFn you would have: > processElement(List<String> files) { > OutputStream tarOutput = // initialize output stream for tar file > for (String file : files) { > InputStream input = // open file > // copy input stream to output stream > } > } > This will consume memory relative to the buffer sizes used in the input > and output streams and will process in parallel based upon how many output > tar files you want to create. > > Conveniently tar is a format which allows you to concatenate together > pretty easily so if the underlying file system has a concatenate operation > you could design a better pipeline. For example GCS allows you to > compose[1] up to 32 files allowing you to write a pipeline where you split > up your list of files into 32 partitions and have each write one tar file > and then have a transform after all those are complete which composes the > 32 files together and then deletes the 32 pieces. > > >> >> Thanks! >> Evan >> > > 1: https://cloud.google.com/storage/docs/json_api/v1/objects/compose > >