On Tue, Oct 7, 2025 at 12:12 PM Joey Tran <[email protected]> wrote:

> FWIW these serialization costs are also incurred a couple times at
> execution time as well. When the workers/bundle processors create the
> ParDos
> <https://github.com/apache/beam/blob/9944acf243e4c0b53a0adbaec1b1579b8eccba0b/sdks/python/apache_beam/runners/worker/bundle_processor.py#L1872>
>  operations[1],
> they deserialize the dofn, run DoFn.setup(), and then serialize the setup
> dofn. Then when the worker actually executes the dofn, the dofn gets
> deserialized for the final time I think.
>
> Actually, although we do roundtrip the dofn a few times but looking at the
code again, we don't set it up first before serializing after all. I got a
little confused reading the code :-)




> I'm not sure why exactly we do this serialization roundtrip to set up the
> dofn as opposed to just waiting to set up the dofn until we actually
> execute it and keeping that dofn in memory.
>
>>
>> We won't deprecate 3.13 (and below) for quite some time though...
>>
> Is there any reason why we couldn't add zstd as a dependency for python
> versions <3.14?
>
> [1]
> https://github.com/apache/beam/blob/9944acf243e4c0b53a0adbaec1b1579b8eccba0b/sdks/python/apache_beam/runners/worker/bundle_processor.py#L1872
>
>
>
>>
>> > On Thu, Oct 2, 2025 at 9:16 AM Robert Bradshaw <[email protected]>
>> wrote:
>> >>
>> >> +1 to Claudius's comments that this compressed cloudpickle is only
>> >> used for pickling elements of the pipeline graph itself, not data.
>> >> This is probably where your overheads come from. (Our direct runner
>> >> still does all the serialization and deserialization that would be
>> >> required for remote, distributed execution for more fidelity to that
>> >> case). For these cases, we have found that compression can be quite
>> >> useful (e.g. often the pickled blobs contain long, common module paths
>> >> and such).
>> >>
>> >> That being said, 60s is certainly excessive (and far more than my
>> >> typical experience, e.g. our unit tests are significantly faster than
>> >> that). Are you perchance trying to pickle a (large?) main session? Or
>> >> do your DoFns hold onto large state? (How large are these pickles that
>> >> you're generating?)
>> >>
>> >> I would not be opposed to trying other compression libraries like LZ4
>> >> if it's available (though I don't know if the savings are significant
>> >> enough to merit a new dependency). The base64 step can almost
>> >> certainly be elided now that we've moved to the unified worker and
>> >> would be a good savings regardless:
>> >> https://github.com/apache/beam/pull/36359
>> >>
>> >> On Thu, Oct 2, 2025 at 8:49 AM Claudius van der Merwe
>> >> <[email protected]> wrote:
>> >> >
>> >> > Hi Stephan,
>> >> >
>> >> > Thanks for raising this issue. Can you help me understand why
>> cloudpickle tis pickling such large amounts of data in this case?
>> >> >
>> >> > The only cases I would expect cloudpickle to be used for pickling is
>> >> > 1. Serializing DoFns, coders, etc.. for the pipeline proto
>> >> > 2. Encoding the type (not the data contained by the type) for GBK
>> operations where the type is considered "special" e.g. NamedTuple, Protos,
>> dataclasses type (see code)
>> >> >
>> >> > > We use domain-specific data structures (xarray.Dataset, wrapping
>> NumPy arrays) inside our Beam pipelines, which means data in our pipelines
>> is serialized with pickle.
>> >> > In this case the coder should fall back to regular PickleCoder which
>> uses the regular fast python pickler.
>> >> >
>> >> > Can you give instructions to reproduce the slowness? Or do you know
>> what is causing so much data to be pickled by cloudpickle?
>> >> >
>> >> > Best,
>> >> > Claude
>> >> >
>> >> > On Thu, Oct 2, 2025 at 12:37 AM Stephan Hoyer via dev <
>> [email protected]> wrote:
>> >> >>
>> >> >> I maintain a project, Xarray-Beam, which we use for analytics on
>> large-scale scientific datasets, including multi-PB climate datasets.
>> >> >>
>> >> >> We use domain-specific data structures (xarray.Dataset, wrapping
>> NumPy arrays) inside our Beam pipelines, which means data in our pipelines
>> is serialized with pickle. My naive expectation was that this would still
>> yield reasonable performance, because NumPy arrays can be pickled and
>> unpickled very quickly in modern versions of pickle.
>> >> >>
>> >> >> Unfortunately, I noticed that one of the demos in our documentation
>> (writing a mere 47 MB to disk), is painfully slow when written in Beam,
>> taking 60s to run using Beam's DirectRunner for an operation that Dask
>> completes in 127 ms. The main culprit is Beam's pickling utilities, which
>> apply Bzip2 compression and base64 encoding. If I remove these compression
>> steps, Beam runs in 1.3s -- still slow, but at least at interactive speeds.
>> In my microbenchmark (see below), pickling and unpickling a mere 8 MB of
>> binary data takes about 2 seconds with Beam, which is far slower than
>> network speeds on most machines. I also ran a more realistic end-to-end
>> workload with Google's internal Beam runner, which resulted in a 2-fold
>> savings in CPU usage.
>> >> >>
>> >> >> Would it be possible to improve this situation for all Beam users?
>> >> >> 1. Using the slow Bzip2 compression feels very inappropriate. There
>> are far faster modern compression libraries (e.g., LZ4), but given pickle
>> is a binary format already, I would not expect large savings from
>> compressions in most cases. (Dask uses the fall-back approach of trying
>> LZ4, and only using it if it results in significant saving.)
>> >> >> 2. Base 64 encoding also feels quite wasteful. Comments in the code
>> suggest that this exists to allow for serialization of pickled values to
>> JSON, but surely data written in JSON is a small minority of data
>> transferred in Beam pipelines, and protective coding into base64 could be
>> done only when actually necessary.
>> >> >>
>> >> >> Would love to hear from experienced Beam maintainers what you think
>> about this! Hopefully these are not hard changes to land.
>> >> >>
>> >> >> Best,
>> >> >> Stephan
>> >> >>
>> >> >> ------------------
>> >> >>
>> >> >> from apache_beam.internal import cloudpickle_pickler
>> >> >> import pickle
>> >> >> import numpy as np
>> >> >> import bz2
>> >> >> import base64
>> >> >>
>> >> >> x = np.random.randn(1_000_000).astype(dtype=np.float64)  # 8 MB
>> >> >>
>> >> >> %timeit pickle.loads(pickle.dumps(x))
>> >> >> # 10.8 ms ± 3.85 ms per loop (mean ± std. dev. of 7 runs, 100 loops
>> each)
>> >> >>
>> >> >> %timeit cloudpickle_pickler.loads(cloudpickle_pickler.dumps(x))
>> >> >> # 2.27 s ± 387 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
>> >> >>
>> >> >> pickled = pickle.dumps(x)
>> >> >> compressed = bz2.compress(pickled, compresslevel=9)
>> >> >> b64ed = base64.b64encode(compressed)
>> >> >>
>> >> >> %timeit bz2.compress(pickled, compresslevel=9)
>> >> >> # 1.2 s ± 8.47 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
>> >> >>
>> >> >> %timeit base64.b64encode(compressed)
>> >> >> # 58.9 ms ± 19.3 ms per loop (mean ± std. dev. of 7 runs, 10 loops
>> each)
>> >> >>
>> >> >> %timeit base64.b64decode(b64ed)
>> >> >> # 35.3 ms ± 998 µs per loop (mean ± std. dev. of 7 runs, 10 loops
>> each)
>> >> >>
>> >> >> %timeit bz2.decompress(compressed)
>> >> >> # 708 ms ± 53.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop
>> each)
>>
>

Reply via email to