I maintain a project, Xarray-Beam <https://github.com/google/xarray-beam>, which we use for analytics on large-scale scientific datasets, including multi-PB climate datasets <https://github.com/google-research/arco-era5>.
We use domain-specific data structures (xarray.Dataset, wrapping NumPy arrays) inside our Beam pipelines, which means data in our pipelines is serialized with pickle. My naive expectation was that this would still yield reasonable performance, because NumPy arrays can be pickled and unpickled very quickly in modern versions of pickle. Unfortunately, I noticed that one of the demos in our documentation <https://xarray-beam.readthedocs.io/en/latest/high-level.html#data-model> (writing a mere 47 MB to disk), is painfully slow when written in Beam, taking 60s to run using Beam's DirectRunner for an operation that Dask completes in 127 ms. The main culprit is Beam's pickling utilities, which apply Bzip2 compression and base64 encoding <https://github.com/apache/beam/blob/a0831e0d4b1a36f8dd1d9c16ef388c02c6620e1a/sdks/python/apache_beam/internal/cloudpickle_pickler.py#L150-L162>. If I remove these compression steps, Beam runs in 1.3s -- still slow, but at least at interactive speeds. In my microbenchmark (see below), pickling and unpickling a mere 8 MB of binary data takes about 2 seconds with Beam, which is far slower than network speeds on most machines. I also ran a more realistic end-to-end workload with Google's internal Beam runner, which resulted in a 2-fold savings in CPU usage. Would it be possible to improve this situation for all Beam users? 1. Using the slow Bzip2 compression feels very inappropriate. There are far faster modern compression libraries (e.g., LZ4), but given pickle is a binary format already, I would not expect large savings from compressions in most cases. (Dask uses the fall-back approach of trying LZ4 <https://distributed.dask.org/en/latest/protocol.html>, and only using it if it results in significant saving.) 2. Base 64 encoding also feels quite wasteful. Comments in the code suggest that this exists to allow for serialization of pickled values to JSON, but surely data written in JSON is a small minority of data transferred in Beam pipelines, and protective coding into base64 could be done only when actually necessary. Would love to hear from experienced Beam maintainers what you think about this! Hopefully these are not hard changes to land. Best, Stephan ------------------ from apache_beam.internal import cloudpickle_pickler import pickle import numpy as np import bz2 import base64 x = np.random.randn(1_000_000).astype(dtype=np.float64) # 8 MB %timeit pickle.loads(pickle.dumps(x)) # 10.8 ms ± 3.85 ms per loop (mean ± std. dev. of 7 runs, 100 loops each) %timeit cloudpickle_pickler.loads(cloudpickle_pickler.dumps(x)) # 2.27 s ± 387 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) pickled = pickle.dumps(x) compressed = bz2.compress(pickled, compresslevel=9) b64ed = base64.b64encode(compressed) %timeit bz2.compress(pickled, compresslevel=9) # 1.2 s ± 8.47 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) %timeit base64.b64encode(compressed) # 58.9 ms ± 19.3 ms per loop (mean ± std. dev. of 7 runs, 10 loops each) %timeit base64.b64decode(b64ed) # 35.3 ms ± 998 µs per loop (mean ± std. dev. of 7 runs, 10 loops each) %timeit bz2.decompress(compressed) # 708 ms ± 53.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
