Hi Stephan,

Thanks for raising this issue. Can you help me understand why cloudpickle
tis pickling such large amounts of data in this case?

The only cases I would expect cloudpickle to be used for pickling is
1. Serializing DoFns, coders, etc.. for the pipeline proto
2. Encoding the type (*not *the data contained by the type) for GBK
operations where the type is considered "special" e.g. NamedTuple, Protos,
dataclasses type (see code
<https://github.com/apache/beam/blob/d78d004d0a5846547381c9476d6732846365497c/sdks/python/apache_beam/coders/coder_impl.py#L492>
)

> We use domain-specific data structures (xarray.Dataset, wrapping NumPy
arrays) inside our Beam pipelines, which means data in our pipelines is
serialized with pickle.
In this case the coder should fall back
<https://github.com/apache/beam/blob/d78d004d0a5846547381c9476d6732846365497c/sdks/python/apache_beam/coders/coders.py#L1029>
to regular PickleCoder
<https://github.com/apache/beam/blob/d78d004d0a5846547381c9476d6732846365497c/sdks/python/apache_beam/coders/coders.py#L896>
which
uses the regular fast python pickler.

Can you give instructions to reproduce the slowness? Or do you know what is
causing so much data to be pickled by cloudpickle?

Best,
Claude

On Thu, Oct 2, 2025 at 12:37 AM Stephan Hoyer via dev <[email protected]>
wrote:

> I maintain a project, Xarray-Beam <https://github.com/google/xarray-beam>,
> which we use for analytics on large-scale scientific datasets, including 
> multi-PB
> climate datasets <https://github.com/google-research/arco-era5>.
>
> We use domain-specific data structures (xarray.Dataset, wrapping NumPy
> arrays) inside our Beam pipelines, which means data in our pipelines is
> serialized with pickle. My naive expectation was that this would still
> yield reasonable performance, because NumPy arrays can be pickled and
> unpickled very quickly in modern versions of pickle.
>
> Unfortunately, I noticed that one of the demos in our documentation
> <https://xarray-beam.readthedocs.io/en/latest/high-level.html#data-model> 
> (writing
> a mere 47 MB to disk), is painfully slow when written in Beam, taking 60s
> to run using Beam's DirectRunner for an operation that Dask completes in
> 127 ms. The main culprit is Beam's pickling utilities, which apply Bzip2
> compression and base64 encoding
> <https://github.com/apache/beam/blob/a0831e0d4b1a36f8dd1d9c16ef388c02c6620e1a/sdks/python/apache_beam/internal/cloudpickle_pickler.py#L150-L162>.
> If I remove these compression steps, Beam runs in 1.3s -- still slow, but
> at least at interactive speeds. In my microbenchmark (see below), pickling
> and unpickling a mere 8 MB of binary data takes about 2 seconds with Beam,
> which is far slower than network speeds on most machines. I also ran a more
> realistic end-to-end workload with Google's internal Beam runner, which
> resulted in a 2-fold savings in CPU usage.
>
> Would it be possible to improve this situation for all Beam users?
> 1. Using the slow Bzip2 compression feels very inappropriate. There are
> far faster modern compression libraries (e.g., LZ4), but given pickle is a
> binary format already, I would not expect large savings from compressions
> in most cases. (Dask uses the fall-back approach of trying LZ4
> <https://distributed.dask.org/en/latest/protocol.html>, and only using it
> if it results in significant saving.)
> 2. Base 64 encoding also feels quite wasteful. Comments in the code
> suggest that this exists to allow for serialization of pickled values to
> JSON, but surely data written in JSON is a small minority of data
> transferred in Beam pipelines, and protective coding into base64 could be
> done only when actually necessary.
>
> Would love to hear from experienced Beam maintainers what you think about
> this! Hopefully these are not hard changes to land.
>
> Best,
> Stephan
>
> ------------------
>
> from apache_beam.internal import cloudpickle_pickler
> import pickle
> import numpy as np
> import bz2
> import base64
>
> x = np.random.randn(1_000_000).astype(dtype=np.float64)  # 8 MB
>
> %timeit pickle.loads(pickle.dumps(x))
> # 10.8 ms ± 3.85 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)
>
> %timeit cloudpickle_pickler.loads(cloudpickle_pickler.dumps(x))
> # 2.27 s ± 387 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
>
> pickled = pickle.dumps(x)
> compressed = bz2.compress(pickled, compresslevel=9)
> b64ed = base64.b64encode(compressed)
>
> %timeit bz2.compress(pickled, compresslevel=9)
> # 1.2 s ± 8.47 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
>
> %timeit base64.b64encode(compressed)
> # 58.9 ms ± 19.3 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
>
> %timeit base64.b64decode(b64ed)
> # 35.3 ms ± 998 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
>
> %timeit bz2.decompress(compressed)
> # 708 ms ± 53.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
>

Reply via email to