Now that we have the benchmark, it seems like it would be a good idea to try to devise possible solutions to this issue. I recognize that the particular interface of ExecBatchIterator may not be something we want to preserve, but we could instead focus on the general batch-splitting problem for purposes of parallelization — how do we enable our functions to execute on a smaller "window" of an ExecBatch in a lightweight way (i.e. without calling ArrayData::Slice, which is too expensive)? One approach is refactoring the ArrayKernelExec API to be based on some variant of the Span/ArraySpan that Antoine suggested earlier.
On Thu, Jul 29, 2021 at 1:06 AM Eduardo Ponce <edponc...@gmail.com> wrote: > > My mistake, I confused the input type to kernels as Datums, when they are > in fact Scalar and ArrayData. > I agree that SIMD details should not be exposed in the kernel API. > > ~Eduardo > > On Wed, Jul 28, 2021 at 6:38 PM Wes McKinney <wesmck...@gmail.com> wrote: > > > On Wed, Jul 28, 2021 at 5:23 PM Eduardo Ponce <edponc...@gmail.com> wrote: > > > > > > Hi all, > > > > > > I agree with supporting finer-grained parallelism in the compute > > operators. > > > I think that incorporating a Datum-like span, would allow expressing > > > parallelism not only > > > on a per-thread basis but can also be used to represent SIMD spans, where > > > span length > > > is directed by vector ISA, "L2" cache line or NUMA-aware, and the > > > materialized data type being processed. > > > For compute functions, data parallelism between threads is equivalent to > > > data parallelism in > > > SIMD (thread == vector lane). My intention is not to derail this > > > conversation to discuss SIMD > > > but rather to acknowledge the suggested approach as a possible solution > > for > > > it. I can definitely > > > put together a PR for how a span interface looks for SIMDified compute > > > functions. > > > > My principal concern is making sure we have an input/output splitting > > solution (building the arguments that have to be passed to the > > ArrayKernelExec function) that is not as heavyweight as the one we > > have now, without trying to pack in additional functionality. > > > > That said, I'm not sure how SIMD spans are related to Datum, because > > this seems to be something that's done in the kernel implementation > > itself? If a Scalar is passed into a SIMD-enabled kernel, then the > > Scalar value would be replicated however many times as necessary into > > a simd_batch<T> and used for the kernel evaluation. I'm not sure why > > we would externalize SIMD details in the kernel API? > > > > > ~Eduardo > > > > > > On Wed, Jul 28, 2021 at 5:36 PM Wes McKinney <wesmck...@gmail.com> > > wrote: > > > > > > > On Wed, Jul 28, 2021 at 5:39 AM Antoine Pitrou <anto...@python.org> > > wrote: > > > > > > > > > > > > > > > Le 28/07/2021 à 03:33, Wes McKinney a écrit : > > > > > > > > > > > > I don't have the solution worked out for this, but the basic gist > > is: > > > > > > > > > > > > * To be 10-100x more efficient ExecBatch slicing cannot call > > > > > > ArrayData::Slice for every field like it does now > > > > > > * Atomics associated with interacting with shared_ptr<ArrayData> / > > > > > > shared_ptr<Buffer> do add meaningful overhead > > > > > > * The way that array kernels are currently implemented would need > > to > > > > > > shift to accommodate the changes needed to make ExecBatch lighter > > > > > > weight. > > > > > > > > > > > > One initial option is to move the "batch offset" to the top level > > of > > > > > > ExecBatch (to remove the need to copy ArrayData), but then quite a > > bit > > > > > > of code would need to be adapted to combine that offset with the > > > > > > ArrayData's offsets to compute memory addresses. If this memory > > > > > > address arithmetic has leaked into kernel implementations, this > > might > > > > > > be a good opportunity to unleak it. That wouldn't fix the > > > > > > shared_ptr/atomics overhead, so I'm open to ideas about how that > > could > > > > > > be addressed also. > > > > > > > > > > We could have a non-owning ArraySpan: > > > > > > > > > > struct ArraySpan { > > > > > ArrayData* data; > > > > > const int64_t offset, length; > > > > > > > > > > int64_t absolute_offset() const { > > > > > return offset + data->offset; > > > > > } > > > > > }; > > > > > > > > > > And/or a more general (Datum-like) Span: > > > > > > > > > > class Span { > > > > > util::variant<ArraySpan*, Scalar*> datum_; > > > > > > > > > > public: > > > > > // Datum-like accessors > > > > > }; > > > > > > > > > > or > > > > > > > > > > class Span { > > > > > util::variant<ArrayData*, Scalar*> datum_; > > > > > const int64_t offset_, length_; > > > > > > > > > > public: > > > > > // Datum-like accessors > > > > > }; > > > > > > > > > > > > > > > Then ExecBatch could be a glorified std::vector<Span>. > > > > > > > > Yes, something like this might work. To make this complete, we would > > > > also want to change the out-argument of ArrayKernelExec to be > > > > something lighter-weight than Datum* > > > > > > > > std::function<Status(KernelContext*, const ExecBatch&, Datum*)> > > > > > > > > One thing to navigate would be kernels that do zero-copy [1] — the > > > > output passed to a kernel would need to be a mutable Span that can > > > > communicate to zero-copy implementations whether buffers can be > > > > replaced outright or whether the span is a slice of some other > > > > ArrayData a memcopy is required. > > > > > > > > A prototype along with some benchmarks would help assess whether a > > > > proposed design addresses the slicing cost to satisfaction. From a > > > > glance through some of the kernel implementations, the porting would > > > > be a labor-intensive but probably fairly mechanical project once the > > > > details are worked out. > > > > > > > > [1]: > > > > > > https://github.com/apache/arrow/blob/apache-arrow-5.0.0/cpp/src/arrow/compute/kernels/scalar_cast_internal.cc#L226 > > > > > > > > > (also, Arrow would probably still benefit from a small vector > > > > > implementation... at least for internals, because we can't easily > > expose > > > > > it in public-facing APIs in place of regular std::vector) > > > > > > > > > > Regards > > > > > > > > > > Antoine. > > > > > >