+1 to Claudius's comments that this compressed cloudpickle is only
used for pickling elements of the pipeline graph itself, not data.
This is probably where your overheads come from. (Our direct runner
still does all the serialization and deserialization that would be
required for remote, distributed execution for more fidelity to that
case). For these cases, we have found that compression can be quite
useful (e.g. often the pickled blobs contain long, common module paths
and such).

That being said, 60s is certainly excessive (and far more than my
typical experience, e.g. our unit tests are significantly faster than
that). Are you perchance trying to pickle a (large?) main session? Or
do your DoFns hold onto large state? (How large are these pickles that
you're generating?)

I would not be opposed to trying other compression libraries like LZ4
if it's available (though I don't know if the savings are significant
enough to merit a new dependency). The base64 step can almost
certainly be elided now that we've moved to the unified worker and
would be a good savings regardless:
https://github.com/apache/beam/pull/36359

On Thu, Oct 2, 2025 at 8:49 AM Claudius van der Merwe
<[email protected]> wrote:
>
> Hi Stephan,
>
> Thanks for raising this issue. Can you help me understand why cloudpickle tis 
> pickling such large amounts of data in this case?
>
> The only cases I would expect cloudpickle to be used for pickling is
> 1. Serializing DoFns, coders, etc.. for the pipeline proto
> 2. Encoding the type (not the data contained by the type) for GBK operations 
> where the type is considered "special" e.g. NamedTuple, Protos, dataclasses 
> type (see code)
>
> > We use domain-specific data structures (xarray.Dataset, wrapping NumPy 
> > arrays) inside our Beam pipelines, which means data in our pipelines is 
> > serialized with pickle.
> In this case the coder should fall back to regular PickleCoder which uses the 
> regular fast python pickler.
>
> Can you give instructions to reproduce the slowness? Or do you know what is 
> causing so much data to be pickled by cloudpickle?
>
> Best,
> Claude
>
> On Thu, Oct 2, 2025 at 12:37 AM Stephan Hoyer via dev <[email protected]> 
> wrote:
>>
>> I maintain a project, Xarray-Beam, which we use for analytics on large-scale 
>> scientific datasets, including multi-PB climate datasets.
>>
>> We use domain-specific data structures (xarray.Dataset, wrapping NumPy 
>> arrays) inside our Beam pipelines, which means data in our pipelines is 
>> serialized with pickle. My naive expectation was that this would still yield 
>> reasonable performance, because NumPy arrays can be pickled and unpickled 
>> very quickly in modern versions of pickle.
>>
>> Unfortunately, I noticed that one of the demos in our documentation (writing 
>> a mere 47 MB to disk), is painfully slow when written in Beam, taking 60s to 
>> run using Beam's DirectRunner for an operation that Dask completes in 127 
>> ms. The main culprit is Beam's pickling utilities, which apply Bzip2 
>> compression and base64 encoding. If I remove these compression steps, Beam 
>> runs in 1.3s -- still slow, but at least at interactive speeds. In my 
>> microbenchmark (see below), pickling and unpickling a mere 8 MB of binary 
>> data takes about 2 seconds with Beam, which is far slower than network 
>> speeds on most machines. I also ran a more realistic end-to-end workload 
>> with Google's internal Beam runner, which resulted in a 2-fold savings in 
>> CPU usage.
>>
>> Would it be possible to improve this situation for all Beam users?
>> 1. Using the slow Bzip2 compression feels very inappropriate. There are far 
>> faster modern compression libraries (e.g., LZ4), but given pickle is a 
>> binary format already, I would not expect large savings from compressions in 
>> most cases. (Dask uses the fall-back approach of trying LZ4, and only using 
>> it if it results in significant saving.)
>> 2. Base 64 encoding also feels quite wasteful. Comments in the code suggest 
>> that this exists to allow for serialization of pickled values to JSON, but 
>> surely data written in JSON is a small minority of data transferred in Beam 
>> pipelines, and protective coding into base64 could be done only when 
>> actually necessary.
>>
>> Would love to hear from experienced Beam maintainers what you think about 
>> this! Hopefully these are not hard changes to land.
>>
>> Best,
>> Stephan
>>
>> ------------------
>>
>> from apache_beam.internal import cloudpickle_pickler
>> import pickle
>> import numpy as np
>> import bz2
>> import base64
>>
>> x = np.random.randn(1_000_000).astype(dtype=np.float64)  # 8 MB
>>
>> %timeit pickle.loads(pickle.dumps(x))
>> # 10.8 ms ± 3.85 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)
>>
>> %timeit cloudpickle_pickler.loads(cloudpickle_pickler.dumps(x))
>> # 2.27 s ± 387 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
>>
>> pickled = pickle.dumps(x)
>> compressed = bz2.compress(pickled, compresslevel=9)
>> b64ed = base64.b64encode(compressed)
>>
>> %timeit bz2.compress(pickled, compresslevel=9)
>> # 1.2 s ± 8.47 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
>>
>> %timeit base64.b64encode(compressed)
>> # 58.9 ms ± 19.3 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
>>
>> %timeit base64.b64decode(b64ed)
>> # 35.3 ms ± 998 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
>>
>> %timeit bz2.decompress(compressed)
>> # 708 ms ± 53.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Reply via email to