OK, this is very reassuring! I'm glad I was mistaken here. I still think there are likely significant optimization opportunities here, but the situation is certainly much better than I feared.
The example in question here does indeed include a large object as part of the pipeline definition, a 47 MB NumPy array assigned as an attribution to a PTransform, which I'm running locally via DirectRunner. Fortunately, the objects we pass in as transform arguments are typically much smaller in realistic use cases, because they point to arrays lazily loaded from disk rather than in-memory NumPy arrays (which are nice mostly for pedagogical examples). For this example, it takes about 10s to round-trip this object through Beam's pickling machinery, including the bz2 compression. So it appears that Beam is pickling/unpickling this argument about 5-6 times in my 60s pipeline construction. I did a test replacing calls like pickle.loads(pickle.dumps(...)) with a fast-path that avoids compression, and it cut down the runtime with DirectRunner to about 25s. So it looks like there are about 2x pickle round-trips just involved in setting up DirectRunner. Ideally, we could remove these, too, because although pickle can be flaky, compression is not something that we need to test. One very simple improvement would be to switch the default compression from bz2 to gzip, which is ~5x faster for my example. Zstd would also be a natural choice to consider, because it will be part of the Python standard library in 3.14 (to be released later this month). On Thu, Oct 2, 2025 at 9:16 AM Robert Bradshaw <[email protected]> wrote: > +1 to Claudius's comments that this compressed cloudpickle is only > used for pickling elements of the pipeline graph itself, not data. > This is probably where your overheads come from. (Our direct runner > still does all the serialization and deserialization that would be > required for remote, distributed execution for more fidelity to that > case). For these cases, we have found that compression can be quite > useful (e.g. often the pickled blobs contain long, common module paths > and such). > > That being said, 60s is certainly excessive (and far more than my > typical experience, e.g. our unit tests are significantly faster than > that). Are you perchance trying to pickle a (large?) main session? Or > do your DoFns hold onto large state? (How large are these pickles that > you're generating?) > > I would not be opposed to trying other compression libraries like LZ4 > if it's available (though I don't know if the savings are significant > enough to merit a new dependency). The base64 step can almost > certainly be elided now that we've moved to the unified worker and > would be a good savings regardless: > https://github.com/apache/beam/pull/36359 > > On Thu, Oct 2, 2025 at 8:49 AM Claudius van der Merwe > <[email protected]> wrote: > > > > Hi Stephan, > > > > Thanks for raising this issue. Can you help me understand why > cloudpickle tis pickling such large amounts of data in this case? > > > > The only cases I would expect cloudpickle to be used for pickling is > > 1. Serializing DoFns, coders, etc.. for the pipeline proto > > 2. Encoding the type (not the data contained by the type) for GBK > operations where the type is considered "special" e.g. NamedTuple, Protos, > dataclasses type (see code) > > > > > We use domain-specific data structures (xarray.Dataset, wrapping NumPy > arrays) inside our Beam pipelines, which means data in our pipelines is > serialized with pickle. > > In this case the coder should fall back to regular PickleCoder which > uses the regular fast python pickler. > > > > Can you give instructions to reproduce the slowness? Or do you know what > is causing so much data to be pickled by cloudpickle? > > > > Best, > > Claude > > > > On Thu, Oct 2, 2025 at 12:37 AM Stephan Hoyer via dev < > [email protected]> wrote: > >> > >> I maintain a project, Xarray-Beam, which we use for analytics on > large-scale scientific datasets, including multi-PB climate datasets. > >> > >> We use domain-specific data structures (xarray.Dataset, wrapping NumPy > arrays) inside our Beam pipelines, which means data in our pipelines is > serialized with pickle. My naive expectation was that this would still > yield reasonable performance, because NumPy arrays can be pickled and > unpickled very quickly in modern versions of pickle. > >> > >> Unfortunately, I noticed that one of the demos in our documentation > (writing a mere 47 MB to disk), is painfully slow when written in Beam, > taking 60s to run using Beam's DirectRunner for an operation that Dask > completes in 127 ms. The main culprit is Beam's pickling utilities, which > apply Bzip2 compression and base64 encoding. If I remove these compression > steps, Beam runs in 1.3s -- still slow, but at least at interactive speeds. > In my microbenchmark (see below), pickling and unpickling a mere 8 MB of > binary data takes about 2 seconds with Beam, which is far slower than > network speeds on most machines. I also ran a more realistic end-to-end > workload with Google's internal Beam runner, which resulted in a 2-fold > savings in CPU usage. > >> > >> Would it be possible to improve this situation for all Beam users? > >> 1. Using the slow Bzip2 compression feels very inappropriate. There are > far faster modern compression libraries (e.g., LZ4), but given pickle is a > binary format already, I would not expect large savings from compressions > in most cases. (Dask uses the fall-back approach of trying LZ4, and only > using it if it results in significant saving.) > >> 2. Base 64 encoding also feels quite wasteful. Comments in the code > suggest that this exists to allow for serialization of pickled values to > JSON, but surely data written in JSON is a small minority of data > transferred in Beam pipelines, and protective coding into base64 could be > done only when actually necessary. > >> > >> Would love to hear from experienced Beam maintainers what you think > about this! Hopefully these are not hard changes to land. > >> > >> Best, > >> Stephan > >> > >> ------------------ > >> > >> from apache_beam.internal import cloudpickle_pickler > >> import pickle > >> import numpy as np > >> import bz2 > >> import base64 > >> > >> x = np.random.randn(1_000_000).astype(dtype=np.float64) # 8 MB > >> > >> %timeit pickle.loads(pickle.dumps(x)) > >> # 10.8 ms ± 3.85 ms per loop (mean ± std. dev. of 7 runs, 100 loops > each) > >> > >> %timeit cloudpickle_pickler.loads(cloudpickle_pickler.dumps(x)) > >> # 2.27 s ± 387 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) > >> > >> pickled = pickle.dumps(x) > >> compressed = bz2.compress(pickled, compresslevel=9) > >> b64ed = base64.b64encode(compressed) > >> > >> %timeit bz2.compress(pickled, compresslevel=9) > >> # 1.2 s ± 8.47 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) > >> > >> %timeit base64.b64encode(compressed) > >> # 58.9 ms ± 19.3 ms per loop (mean ± std. dev. of 7 runs, 10 loops each) > >> > >> %timeit base64.b64decode(b64ed) > >> # 35.3 ms ± 998 µs per loop (mean ± std. dev. of 7 runs, 10 loops each) > >> > >> %timeit bz2.decompress(compressed) > >> # 708 ms ± 53.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) >
