We have some use-cases where we are combining over very large sets (e.g.,
computing the average of 1e5 to 1e6 elements, corresponding to hourly
weather observations over the past 50 years).

"with_hot_key_fanout" seems to be rather essential for performing these
calculations, but as far as I can tell it only performs a single level of
fanout, i.e., instead of summing up 1e6 elements on a single node, you sum
1e3 element 1e3 times, and then sum those 1e3 results together.

My guess is that such combiners could be much more efficient if this was
performed in a hierarchical/multi-stage fashion proportional to
log(element_count), e.g., summing 100 elements with 3 stages, or maybe
summing 10 elements with 6 stages. Dask uses such a "tree reduction"
strategy as controlled by the "split_every" parameter:
https://github.com/dask/dask/blob/453bd7031828f72e4602498c1a1f776280794bea/dask/array/reductions.py#L109

I understand that the number of fanout stages could not be computed
automatically in the Beam data model, but it would still be nice to be able
to specify this manually. Has there been any thought to introducing this
feature?

Thanks,
Stephan

Reply via email to