I maintain a project, Xarray-Beam <https://github.com/google/xarray-beam>,
which we use for analytics on large-scale scientific datasets,
including multi-PB
climate datasets <https://github.com/google-research/arco-era5>.

We use domain-specific data structures (xarray.Dataset, wrapping NumPy
arrays) inside our Beam pipelines, which means data in our pipelines is
serialized with pickle. My naive expectation was that this would still
yield reasonable performance, because NumPy arrays can be pickled and
unpickled very quickly in modern versions of pickle.

Unfortunately, I noticed that one of the demos in our documentation
<https://xarray-beam.readthedocs.io/en/latest/high-level.html#data-model>
(writing
a mere 47 MB to disk), is painfully slow when written in Beam, taking 60s
to run using Beam's DirectRunner for an operation that Dask completes in
127 ms. The main culprit is Beam's pickling utilities, which apply Bzip2
compression and base64 encoding
<https://github.com/apache/beam/blob/a0831e0d4b1a36f8dd1d9c16ef388c02c6620e1a/sdks/python/apache_beam/internal/cloudpickle_pickler.py#L150-L162>.
If I remove these compression steps, Beam runs in 1.3s -- still slow, but
at least at interactive speeds. In my microbenchmark (see below), pickling
and unpickling a mere 8 MB of binary data takes about 2 seconds with Beam,
which is far slower than network speeds on most machines. I also ran a more
realistic end-to-end workload with Google's internal Beam runner, which
resulted in a 2-fold savings in CPU usage.

Would it be possible to improve this situation for all Beam users?
1. Using the slow Bzip2 compression feels very inappropriate. There are far
faster modern compression libraries (e.g., LZ4), but given pickle is a
binary format already, I would not expect large savings from compressions
in most cases. (Dask uses the fall-back approach of trying LZ4
<https://distributed.dask.org/en/latest/protocol.html>, and only using it
if it results in significant saving.)
2. Base 64 encoding also feels quite wasteful. Comments in the code suggest
that this exists to allow for serialization of pickled values to JSON, but
surely data written in JSON is a small minority of data transferred in Beam
pipelines, and protective coding into base64 could be done only when
actually necessary.

Would love to hear from experienced Beam maintainers what you think about
this! Hopefully these are not hard changes to land.

Best,
Stephan

------------------

from apache_beam.internal import cloudpickle_pickler
import pickle
import numpy as np
import bz2
import base64

x = np.random.randn(1_000_000).astype(dtype=np.float64)  # 8 MB

%timeit pickle.loads(pickle.dumps(x))
# 10.8 ms ± 3.85 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)

%timeit cloudpickle_pickler.loads(cloudpickle_pickler.dumps(x))
# 2.27 s ± 387 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

pickled = pickle.dumps(x)
compressed = bz2.compress(pickled, compresslevel=9)
b64ed = base64.b64encode(compressed)

%timeit bz2.compress(pickled, compresslevel=9)
# 1.2 s ± 8.47 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit base64.b64encode(compressed)
# 58.9 ms ± 19.3 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

%timeit base64.b64decode(b64ed)
# 35.3 ms ± 998 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

%timeit bz2.decompress(compressed)
# 708 ms ± 53.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Reply via email to