On Tue, Oct 7, 2025 at 12:12 PM Joey Tran <[email protected]> wrote:
> FWIW these serialization costs are also incurred a couple times at > execution time as well. When the workers/bundle processors create the > ParDos > <https://github.com/apache/beam/blob/9944acf243e4c0b53a0adbaec1b1579b8eccba0b/sdks/python/apache_beam/runners/worker/bundle_processor.py#L1872> > operations[1], > they deserialize the dofn, run DoFn.setup(), and then serialize the setup > dofn. Then when the worker actually executes the dofn, the dofn gets > deserialized for the final time I think. > > Actually, although we do roundtrip the dofn a few times but looking at the code again, we don't set it up first before serializing after all. I got a little confused reading the code :-) > I'm not sure why exactly we do this serialization roundtrip to set up the > dofn as opposed to just waiting to set up the dofn until we actually > execute it and keeping that dofn in memory. > >> >> We won't deprecate 3.13 (and below) for quite some time though... >> > Is there any reason why we couldn't add zstd as a dependency for python > versions <3.14? > > [1] > https://github.com/apache/beam/blob/9944acf243e4c0b53a0adbaec1b1579b8eccba0b/sdks/python/apache_beam/runners/worker/bundle_processor.py#L1872 > > > >> >> > On Thu, Oct 2, 2025 at 9:16 AM Robert Bradshaw <[email protected]> >> wrote: >> >> >> >> +1 to Claudius's comments that this compressed cloudpickle is only >> >> used for pickling elements of the pipeline graph itself, not data. >> >> This is probably where your overheads come from. (Our direct runner >> >> still does all the serialization and deserialization that would be >> >> required for remote, distributed execution for more fidelity to that >> >> case). For these cases, we have found that compression can be quite >> >> useful (e.g. often the pickled blobs contain long, common module paths >> >> and such). >> >> >> >> That being said, 60s is certainly excessive (and far more than my >> >> typical experience, e.g. our unit tests are significantly faster than >> >> that). Are you perchance trying to pickle a (large?) main session? Or >> >> do your DoFns hold onto large state? (How large are these pickles that >> >> you're generating?) >> >> >> >> I would not be opposed to trying other compression libraries like LZ4 >> >> if it's available (though I don't know if the savings are significant >> >> enough to merit a new dependency). The base64 step can almost >> >> certainly be elided now that we've moved to the unified worker and >> >> would be a good savings regardless: >> >> https://github.com/apache/beam/pull/36359 >> >> >> >> On Thu, Oct 2, 2025 at 8:49 AM Claudius van der Merwe >> >> <[email protected]> wrote: >> >> > >> >> > Hi Stephan, >> >> > >> >> > Thanks for raising this issue. Can you help me understand why >> cloudpickle tis pickling such large amounts of data in this case? >> >> > >> >> > The only cases I would expect cloudpickle to be used for pickling is >> >> > 1. Serializing DoFns, coders, etc.. for the pipeline proto >> >> > 2. Encoding the type (not the data contained by the type) for GBK >> operations where the type is considered "special" e.g. NamedTuple, Protos, >> dataclasses type (see code) >> >> > >> >> > > We use domain-specific data structures (xarray.Dataset, wrapping >> NumPy arrays) inside our Beam pipelines, which means data in our pipelines >> is serialized with pickle. >> >> > In this case the coder should fall back to regular PickleCoder which >> uses the regular fast python pickler. >> >> > >> >> > Can you give instructions to reproduce the slowness? Or do you know >> what is causing so much data to be pickled by cloudpickle? >> >> > >> >> > Best, >> >> > Claude >> >> > >> >> > On Thu, Oct 2, 2025 at 12:37 AM Stephan Hoyer via dev < >> [email protected]> wrote: >> >> >> >> >> >> I maintain a project, Xarray-Beam, which we use for analytics on >> large-scale scientific datasets, including multi-PB climate datasets. >> >> >> >> >> >> We use domain-specific data structures (xarray.Dataset, wrapping >> NumPy arrays) inside our Beam pipelines, which means data in our pipelines >> is serialized with pickle. My naive expectation was that this would still >> yield reasonable performance, because NumPy arrays can be pickled and >> unpickled very quickly in modern versions of pickle. >> >> >> >> >> >> Unfortunately, I noticed that one of the demos in our documentation >> (writing a mere 47 MB to disk), is painfully slow when written in Beam, >> taking 60s to run using Beam's DirectRunner for an operation that Dask >> completes in 127 ms. The main culprit is Beam's pickling utilities, which >> apply Bzip2 compression and base64 encoding. If I remove these compression >> steps, Beam runs in 1.3s -- still slow, but at least at interactive speeds. >> In my microbenchmark (see below), pickling and unpickling a mere 8 MB of >> binary data takes about 2 seconds with Beam, which is far slower than >> network speeds on most machines. I also ran a more realistic end-to-end >> workload with Google's internal Beam runner, which resulted in a 2-fold >> savings in CPU usage. >> >> >> >> >> >> Would it be possible to improve this situation for all Beam users? >> >> >> 1. Using the slow Bzip2 compression feels very inappropriate. There >> are far faster modern compression libraries (e.g., LZ4), but given pickle >> is a binary format already, I would not expect large savings from >> compressions in most cases. (Dask uses the fall-back approach of trying >> LZ4, and only using it if it results in significant saving.) >> >> >> 2. Base 64 encoding also feels quite wasteful. Comments in the code >> suggest that this exists to allow for serialization of pickled values to >> JSON, but surely data written in JSON is a small minority of data >> transferred in Beam pipelines, and protective coding into base64 could be >> done only when actually necessary. >> >> >> >> >> >> Would love to hear from experienced Beam maintainers what you think >> about this! Hopefully these are not hard changes to land. >> >> >> >> >> >> Best, >> >> >> Stephan >> >> >> >> >> >> ------------------ >> >> >> >> >> >> from apache_beam.internal import cloudpickle_pickler >> >> >> import pickle >> >> >> import numpy as np >> >> >> import bz2 >> >> >> import base64 >> >> >> >> >> >> x = np.random.randn(1_000_000).astype(dtype=np.float64) # 8 MB >> >> >> >> >> >> %timeit pickle.loads(pickle.dumps(x)) >> >> >> # 10.8 ms ± 3.85 ms per loop (mean ± std. dev. of 7 runs, 100 loops >> each) >> >> >> >> >> >> %timeit cloudpickle_pickler.loads(cloudpickle_pickler.dumps(x)) >> >> >> # 2.27 s ± 387 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) >> >> >> >> >> >> pickled = pickle.dumps(x) >> >> >> compressed = bz2.compress(pickled, compresslevel=9) >> >> >> b64ed = base64.b64encode(compressed) >> >> >> >> >> >> %timeit bz2.compress(pickled, compresslevel=9) >> >> >> # 1.2 s ± 8.47 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) >> >> >> >> >> >> %timeit base64.b64encode(compressed) >> >> >> # 58.9 ms ± 19.3 ms per loop (mean ± std. dev. of 7 runs, 10 loops >> each) >> >> >> >> >> >> %timeit base64.b64decode(b64ed) >> >> >> # 35.3 ms ± 998 µs per loop (mean ± std. dev. of 7 runs, 10 loops >> each) >> >> >> >> >> >> %timeit bz2.decompress(compressed) >> >> >> # 708 ms ± 53.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop >> each) >> >
