This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 4d931ff1c0 ARROW-16852: [C++] Migrate remaining kernels to use
ExecSpan, remove ExecBatchIterator (#13630)
4d931ff1c0 is described below
commit 4d931ff1c0f5661a9b134c514555c8d769001759
Author: Wes McKinney <[email protected]>
AuthorDate: Tue Jul 19 16:26:46 2022 -0500
ARROW-16852: [C++] Migrate remaining kernels to use ExecSpan, remove
ExecBatchIterator (#13630)
This completes the porting to use ExecSpan everywhere. I also changed the
ExecBatchIterator benchmarks to use ExecSpan to show the performance
improvement in input splitting that we've talked about in the past:
Splitting inputs into small ExecSpan:
```
------------------------------------------------------------------------------------
Benchmark Time CPU Iterations
UserCounters...
------------------------------------------------------------------------------------
BM_ExecSpanIterator/1024 205671 ns 205667 ns 3395
items_per_second=4.86223k/s
BM_ExecSpanIterator/4096 54749 ns 54750 ns 13121
items_per_second=18.265k/s
BM_ExecSpanIterator/16384 15979 ns 15979 ns 42628
items_per_second=62.5824k/s
BM_ExecSpanIterator/65536 5597 ns 5597 ns 125099
items_per_second=178.668k/s
```
Splitting inputs into small ExecBatch:
```
-------------------------------------------------------------------------------------
Benchmark Time CPU Iterations
UserCounters...
-------------------------------------------------------------------------------------
BM_ExecBatchIterator/1024 17163432 ns 17163171 ns 41
items_per_second=58.2643/s
BM_ExecBatchIterator/4096 4243467 ns 4243316 ns 163
items_per_second=235.665/s
BM_ExecBatchIterator/16384 1093680 ns 1093638 ns 620
items_per_second=914.38/s
BM_ExecBatchIterator/65536 272451 ns 272435 ns 2584
items_per_second=3.6706k/s
```
Because the input in this benchmark has 1M elements, this shows that
splitting into 1024 chunks of size 1024 adds only 0.2ms of overhead with
ExecSpanIterator versus 17.16ms of overhead with ExecBatchIterator (> 80x
improvement).
This won't by itself do much to impact performance in Acero but things for
the community to explore in the future are the following (this work that I've
been doing has been a precondition to consider this):
* A leaner ExecuteScalarExpression implementation that reuses temporary
allocations (ARROW-16758)
* Parallel expression evaluation
* Better defining morsel (~1M elements) versus task (~1K elements)
granularity in execution
* Work stealing so that we don't "hog" the thread pools, and we keep the
work pinned to a particular CPU core if there are other things going on at the
same time
Authored-by: Wes McKinney <[email protected]>
Signed-off-by: Wes McKinney <[email protected]>
---
cpp/src/arrow/array/data.cc | 6 +-
cpp/src/arrow/array/data.h | 15 ++-
cpp/src/arrow/compute/exec.cc | 142 ++++-----------------
cpp/src/arrow/compute/exec.h | 34 +++--
cpp/src/arrow/compute/exec/aggregate.cc | 31 +++--
cpp/src/arrow/compute/exec/aggregate_node.cc | 25 ++--
cpp/src/arrow/compute/exec_internal.h | 40 +-----
cpp/src/arrow/compute/exec_test.cc | 131 -------------------
cpp/src/arrow/compute/function_benchmark.cc | 26 ++--
cpp/src/arrow/compute/function_test.cc | 8 +-
cpp/src/arrow/compute/kernel.h | 49 +++----
cpp/src/arrow/compute/kernels/aggregate_basic.cc | 60 ++++-----
.../compute/kernels/aggregate_basic_internal.h | 37 +++---
cpp/src/arrow/compute/kernels/aggregate_internal.h | 12 +-
cpp/src/arrow/compute/kernels/aggregate_mode.cc | 28 ----
.../arrow/compute/kernels/aggregate_quantile.cc | 42 ------
cpp/src/arrow/compute/kernels/aggregate_tdigest.cc | 10 +-
cpp/src/arrow/compute/kernels/aggregate_var_std.cc | 36 +++---
cpp/src/arrow/compute/kernels/hash_aggregate.cc | 140 ++++++++++----------
.../arrow/compute/kernels/hash_aggregate_test.cc | 31 +++--
.../arrow/compute/kernels/scalar_cast_numeric.cc | 8 +-
cpp/src/arrow/compute/kernels/scalar_nested.cc | 10 +-
cpp/src/arrow/compute/row/grouper.cc | 42 +++---
cpp/src/arrow/compute/row/grouper.h | 2 +-
cpp/src/arrow/dataset/partition.cc | 6 +-
25 files changed, 312 insertions(+), 659 deletions(-)
diff --git a/cpp/src/arrow/array/data.cc b/cpp/src/arrow/array/data.cc
index d3f28758d9..0cfa9fcd2e 100644
--- a/cpp/src/arrow/array/data.cc
+++ b/cpp/src/arrow/array/data.cc
@@ -138,7 +138,11 @@ int64_t ArrayData::GetNullCount() const {
void ArraySpan::SetMembers(const ArrayData& data) {
this->type = data.type.get();
this->length = data.length;
- this->null_count = data.null_count.load();
+ if (this->type->id() == Type::NA) {
+ this->null_count = this->length;
+ } else {
+ this->null_count = data.null_count.load();
+ }
this->offset = data.offset;
for (int i = 0; i < static_cast<int>(data.buffers.size()); ++i) {
diff --git a/cpp/src/arrow/array/data.h b/cpp/src/arrow/array/data.h
index 78643ae14a..dde66ac79c 100644
--- a/cpp/src/arrow/array/data.h
+++ b/cpp/src/arrow/array/data.h
@@ -25,6 +25,7 @@
#include "arrow/buffer.h"
#include "arrow/result.h"
+#include "arrow/type.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/macros.h"
#include "arrow/util/visibility.h"
@@ -351,14 +352,14 @@ struct ARROW_EXPORT ArraySpan {
}
}
- void AddOffset(int64_t offset) {
- this->offset += offset;
- this->null_count = kUnknownNullCount;
- }
-
- void SetOffset(int64_t offset) {
+ void SetSlice(int64_t offset, int64_t length) {
this->offset = offset;
- this->null_count = kUnknownNullCount;
+ this->length = length;
+ if (this->type->id() != Type::NA) {
+ this->null_count = kUnknownNullCount;
+ } else {
+ this->null_count = this->length;
+ }
}
/// \brief Return null count, or compute and set it if it's not known
diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc
index 4dc5cdc542..cf91bada6c 100644
--- a/cpp/src/arrow/compute/exec.cc
+++ b/cpp/src/arrow/compute/exec.cc
@@ -219,107 +219,8 @@ void ComputeDataPreallocate(const DataType& type,
namespace detail {
-ExecBatchIterator::ExecBatchIterator(std::vector<Datum> args, int64_t length,
- int64_t max_chunksize)
- : args_(std::move(args)),
- position_(0),
- length_(length),
- max_chunksize_(max_chunksize) {
- chunk_indexes_.resize(args_.size(), 0);
- chunk_positions_.resize(args_.size(), 0);
-}
-
-Result<std::unique_ptr<ExecBatchIterator>> ExecBatchIterator::Make(
- std::vector<Datum> args, int64_t max_chunksize) {
- for (const auto& arg : args) {
- if (!(arg.is_arraylike() || arg.is_scalar())) {
- return Status::Invalid(
- "ExecBatchIterator only works with Scalar, Array, and "
- "ChunkedArray arguments");
- }
- }
-
- int64_t length = -1;
- bool length_set = false;
- for (auto& arg : args) {
- if (arg.is_scalar()) {
- continue;
- }
- if (!length_set) {
- length = arg.length();
- length_set = true;
- } else {
- if (arg.length() != length) {
- return Status::Invalid("Array arguments must all be the same length");
- }
- }
- }
-
- if (!length_set) {
- // All scalar case, to be removed soon
- length = 1;
- }
-
- max_chunksize = std::min(length, max_chunksize);
-
- return std::unique_ptr<ExecBatchIterator>(
- new ExecBatchIterator(std::move(args), length, max_chunksize));
-}
-
-bool ExecBatchIterator::Next(ExecBatch* batch) {
- if (position_ == length_) {
- return false;
- }
-
- // Determine how large the common contiguous "slice" of all the arguments is
- int64_t iteration_size = std::min(length_ - position_, max_chunksize_);
-
- // If length_ is 0, then this loop will never execute
- for (size_t i = 0; i < args_.size() && iteration_size > 0; ++i) {
- // If the argument is not a chunked array, it's either a Scalar or Array,
- // in which case it doesn't influence the size of this batch. Note that if
- // the args are all scalars the batch length is 1
- if (args_[i].kind() != Datum::CHUNKED_ARRAY) {
- continue;
- }
- const ChunkedArray& arg = *args_[i].chunked_array();
- std::shared_ptr<Array> current_chunk;
- while (true) {
- current_chunk = arg.chunk(chunk_indexes_[i]);
- if (chunk_positions_[i] == current_chunk->length()) {
- // Chunk is zero-length, or was exhausted in the previous iteration
- chunk_positions_[i] = 0;
- ++chunk_indexes_[i];
- continue;
- }
- break;
- }
- iteration_size =
- std::min(current_chunk->length() - chunk_positions_[i],
iteration_size);
- }
-
- // Now, fill the batch
- batch->values.resize(args_.size());
- batch->length = iteration_size;
- for (size_t i = 0; i < args_.size(); ++i) {
- if (args_[i].is_scalar()) {
- batch->values[i] = args_[i].scalar();
- } else if (args_[i].is_array()) {
- batch->values[i] = args_[i].array()->Slice(position_, iteration_size);
- } else {
- const ChunkedArray& carr = *args_[i].chunked_array();
- const auto& chunk = carr.chunk(chunk_indexes_[i]);
- batch->values[i] = chunk->data()->Slice(chunk_positions_[i],
iteration_size);
- chunk_positions_[i] += iteration_size;
- }
- }
- position_ += iteration_size;
- DCHECK_LE(position_, length_);
- return true;
-}
-
// ----------------------------------------------------------------------
-// ExecSpanIterator; to eventually replace ExecBatchIterator
+// ExecSpanIterator
namespace {
@@ -348,7 +249,8 @@ bool CheckIfAllScalar(const ExecBatch& batch) {
} // namespace
-Status ExecSpanIterator::Init(const ExecBatch& batch, int64_t max_chunksize) {
+Status ExecSpanIterator::Init(const ExecBatch& batch, int64_t max_chunksize,
+ bool promote_if_all_scalars) {
if (batch.num_values() > 0) {
// Validate arguments
bool all_args_same_length = false;
@@ -363,6 +265,7 @@ Status ExecSpanIterator::Init(const ExecBatch& batch,
int64_t max_chunksize) {
args_ = &batch.values;
initialized_ = have_chunked_arrays_ = false;
have_all_scalars_ = CheckIfAllScalar(batch);
+ promote_if_all_scalars_ = promote_if_all_scalars;
position_ = 0;
length_ = batch.length;
chunk_indexes_.clear();
@@ -443,7 +346,7 @@ bool ExecSpanIterator::Next(ExecSpan* span) {
}
}
- if (have_all_scalars_) {
+ if (have_all_scalars_ && promote_if_all_scalars_) {
PromoteExecSpanScalars(span);
}
@@ -465,8 +368,7 @@ bool ExecSpanIterator::Next(ExecSpan* span) {
const Datum& arg = args_->at(i);
if (!arg.is_scalar()) {
ArraySpan* arr = &span->values[i].array;
- arr->length = iteration_size;
- arr->SetOffset(value_positions_[i] + value_offsets_[i]);
+ arr->SetSlice(value_positions_[i] + value_offsets_[i], iteration_size);
value_positions_[i] += iteration_size;
}
}
@@ -858,11 +760,12 @@ class ScalarExecutor : public
KernelExecutorImpl<ScalarKernel> {
// Populate and then reuse the ArraySpan inside
output_span->SetMembers(*preallocation);
output_span->offset = 0;
+ int64_t result_offset = 0;
while (span_iterator_.Next(&input)) {
// Set absolute output span position and length
- output_span->length = input.length;
+ output_span->SetSlice(result_offset, input.length);
RETURN_NOT_OK(ExecuteSingleSpan(input, &output));
- output_span->SetOffset(span_iterator_.position());
+ result_offset = span_iterator_.position();
}
// Kernel execution is complete; emit result
@@ -1138,19 +1041,15 @@ class ScalarAggExecutor : public
KernelExecutorImpl<ScalarAggregateKernel> {
return KernelExecutorImpl<ScalarAggregateKernel>::Init(ctx, args);
}
- Status Execute(const ExecBatch& args, ExecListener* listener) override {
- return ExecuteImpl(args.values, listener);
- }
-
- Status ExecuteImpl(const std::vector<Datum>& args, ExecListener* listener) {
- ARROW_ASSIGN_OR_RAISE(
- batch_iterator_, ExecBatchIterator::Make(args,
exec_context()->exec_chunksize()));
+ Status Execute(const ExecBatch& batch, ExecListener* listener) override {
+ RETURN_NOT_OK(span_iterator_.Init(batch, exec_context()->exec_chunksize(),
+ /*promote_if_all_scalars=*/false));
- ExecBatch batch;
- while (batch_iterator_->Next(&batch)) {
+ ExecSpan span;
+ while (span_iterator_.Next(&span)) {
// TODO: implement parallelism
- if (batch.length > 0) {
- RETURN_NOT_OK(Consume(batch));
+ if (span.length > 0) {
+ RETURN_NOT_OK(Consume(span));
}
}
@@ -1167,7 +1066,10 @@ class ScalarAggExecutor : public
KernelExecutorImpl<ScalarAggregateKernel> {
}
private:
- Status Consume(const ExecBatch& batch) {
+ Status Consume(const ExecSpan& span) {
+ // TODO(wesm): this is odd and should be examined soon -- only one state
+ // "should" be needed per thread of execution
+
// FIXME(ARROW-11840) don't merge *any* aggegates for every batch
ARROW_ASSIGN_OR_RAISE(auto batch_state,
kernel_->init(kernel_ctx_, {kernel_, *input_types_,
options_}));
@@ -1179,12 +1081,12 @@ class ScalarAggExecutor : public
KernelExecutorImpl<ScalarAggregateKernel> {
KernelContext batch_ctx(exec_context());
batch_ctx.SetState(batch_state.get());
- RETURN_NOT_OK(kernel_->consume(&batch_ctx, batch));
+ RETURN_NOT_OK(kernel_->consume(&batch_ctx, span));
RETURN_NOT_OK(kernel_->merge(kernel_ctx_, std::move(*batch_state),
state()));
return Status::OK();
}
- std::unique_ptr<ExecBatchIterator> batch_iterator_;
+ ExecSpanIterator span_iterator_;
const std::vector<TypeHolder>* input_types_;
const FunctionOptions* options_;
};
diff --git a/cpp/src/arrow/compute/exec.h b/cpp/src/arrow/compute/exec.h
index f0b951dccb..cdd3daf7f7 100644
--- a/cpp/src/arrow/compute/exec.h
+++ b/cpp/src/arrow/compute/exec.h
@@ -209,8 +209,7 @@ struct ARROW_EXPORT ExecBatch {
/// case, it would have scalar rows with length greater than 1.
///
/// If the array values are of length 0 then the length is 0 regardless of
- /// whether any values are Scalar. In general ExecBatch objects are produced
- /// by ExecBatchIterator which by design does not yield length-0 batches.
+ /// whether any values are Scalar.
int64_t length = 0;
/// \brief The sum of bytes in each buffer referenced by the batch
@@ -253,7 +252,7 @@ inline bool operator==(const ExecBatch& l, const ExecBatch&
r) { return l.Equals
inline bool operator!=(const ExecBatch& l, const ExecBatch& r) { return
!l.Equals(r); }
struct ExecValue {
- ArraySpan array;
+ ArraySpan array = {};
const Scalar* scalar = NULLPTR;
ExecValue(Scalar* scalar) // NOLINT implicit conversion
@@ -373,22 +372,6 @@ struct ARROW_EXPORT ExecSpan {
return values[i];
}
- void AddOffset(int64_t offset) {
- for (ExecValue& value : values) {
- if (value.is_array()) {
- value.array.AddOffset(offset);
- }
- }
- }
-
- void SetOffset(int64_t offset) {
- for (ExecValue& value : values) {
- if (value.is_array()) {
- value.array.SetOffset(offset);
- }
- }
- }
-
/// \brief A convenience for the number of values / arguments.
int num_values() const { return static_cast<int>(values.size()); }
@@ -400,6 +383,19 @@ struct ARROW_EXPORT ExecSpan {
return result;
}
+ ExecBatch ToExecBatch() const {
+ ExecBatch result;
+ result.length = this->length;
+ for (const ExecValue& value : this->values) {
+ if (value.is_array()) {
+ result.values.push_back(value.array.ToArrayData());
+ } else {
+ result.values.push_back(value.scalar->GetSharedPtr());
+ }
+ }
+ return result;
+ }
+
int64_t length = 0;
std::vector<ExecValue> values;
};
diff --git a/cpp/src/arrow/compute/exec/aggregate.cc
b/cpp/src/arrow/compute/exec/aggregate.cc
index 5cb9a9c563..cc2c464d42 100644
--- a/cpp/src/arrow/compute/exec/aggregate.cc
+++ b/cpp/src/arrow/compute/exec/aggregate.cc
@@ -110,11 +110,12 @@ Result<Datum> GroupBy(const std::vector<Datum>&
arguments, const std::vector<Dat
std::vector<std::vector<std::unique_ptr<KernelState>>> states;
FieldVector out_fields;
- using arrow::compute::detail::ExecBatchIterator;
- std::unique_ptr<ExecBatchIterator> argument_batch_iterator;
+ using arrow::compute::detail::ExecSpanIterator;
+ ExecSpanIterator argument_iterator;
+ ExecBatch args_batch;
if (!arguments.empty()) {
- ARROW_ASSIGN_OR_RAISE(ExecBatch args_batch, ExecBatch::Make(arguments));
+ ARROW_ASSIGN_OR_RAISE(args_batch, ExecBatch::Make(arguments));
// Construct and initialize HashAggregateKernels
auto argument_types = args_batch.GetTypes();
@@ -129,9 +130,7 @@ Result<Datum> GroupBy(const std::vector<Datum>& arguments,
const std::vector<Dat
ARROW_ASSIGN_OR_RAISE(
out_fields, ResolveKernels(aggregates, kernels, states[0], ctx,
argument_types));
- ARROW_ASSIGN_OR_RAISE(
- argument_batch_iterator,
- ExecBatchIterator::Make(args_batch.values, ctx->exec_chunksize()));
+ RETURN_NOT_OK(argument_iterator.Init(args_batch, ctx->exec_chunksize()));
}
// Construct Groupers
@@ -151,15 +150,13 @@ Result<Datum> GroupBy(const std::vector<Datum>&
arguments, const std::vector<Dat
out_fields.push_back(field("key_" + std::to_string(i++),
key_type.GetSharedPtr()));
}
- ARROW_ASSIGN_OR_RAISE(
- auto key_batch_iterator,
- ExecBatchIterator::Make(keys_batch.values, ctx->exec_chunksize()));
+ ExecSpanIterator key_iterator;
+ RETURN_NOT_OK(key_iterator.Init(keys_batch, ctx->exec_chunksize()));
// start "streaming" execution
- ExecBatch key_batch, argument_batch;
- while ((argument_batch_iterator == NULLPTR ||
- argument_batch_iterator->Next(&argument_batch)) &&
- key_batch_iterator->Next(&key_batch)) {
+ ExecSpan key_batch, argument_batch;
+ while ((arguments.empty() || argument_iterator.Next(&argument_batch)) &&
+ key_iterator.Next(&key_batch)) {
if (key_batch.length == 0) continue;
task_group->Append([&, key_batch, argument_batch] {
@@ -180,9 +177,10 @@ Result<Datum> GroupBy(const std::vector<Datum>& arguments,
const std::vector<Dat
for (size_t i = 0; i < kernels.size(); ++i) {
KernelContext batch_ctx{ctx};
batch_ctx.SetState(states[thread_index][i].get());
- ARROW_ASSIGN_OR_RAISE(auto batch, ExecBatch::Make({argument_batch[i],
id_batch}));
+ ExecSpan kernel_batch({argument_batch[i], *id_batch.array()},
+ argument_batch.length);
RETURN_NOT_OK(kernels[i]->resize(&batch_ctx, grouper->num_groups()));
- RETURN_NOT_OK(kernels[i]->consume(&batch_ctx, batch));
+ RETURN_NOT_OK(kernels[i]->consume(&batch_ctx, kernel_batch));
}
return Status::OK();
@@ -194,7 +192,8 @@ Result<Datum> GroupBy(const std::vector<Datum>& arguments,
const std::vector<Dat
// Merge if necessary
for (size_t thread_index = 1; thread_index < thread_ids.size();
++thread_index) {
ARROW_ASSIGN_OR_RAISE(ExecBatch other_keys,
groupers[thread_index]->GetUniques());
- ARROW_ASSIGN_OR_RAISE(Datum transposition,
groupers[0]->Consume(other_keys));
+ ARROW_ASSIGN_OR_RAISE(Datum transposition,
+ groupers[0]->Consume(ExecSpan(other_keys)));
groupers[thread_index].reset();
for (size_t idx = 0; idx < kernels.size(); ++idx) {
diff --git a/cpp/src/arrow/compute/exec/aggregate_node.cc
b/cpp/src/arrow/compute/exec/aggregate_node.cc
index 96aa56b80c..cca266ad69 100644
--- a/cpp/src/arrow/compute/exec/aggregate_node.cc
+++ b/cpp/src/arrow/compute/exec/aggregate_node.cc
@@ -137,7 +137,7 @@ class ScalarAggregateNode : public ExecNode {
const char* kind_name() const override { return "ScalarAggregateNode"; }
- Status DoConsume(const ExecBatch& batch, size_t thread_index) {
+ Status DoConsume(const ExecSpan& batch, size_t thread_index) {
util::tracing::Span span;
START_COMPUTE_SPAN(span, "Consume",
{{"aggregate", ToStringExtra()},
@@ -153,7 +153,7 @@ class ScalarAggregateNode : public ExecNode {
KernelContext batch_ctx{plan()->exec_context()};
batch_ctx.SetState(states_[i][thread_index].get());
- ExecBatch single_column_batch{{batch.values[target_field_ids_[i]]},
batch.length};
+ ExecSpan single_column_batch{{batch.values[target_field_ids_[i]]},
batch.length};
RETURN_NOT_OK(kernels_[i]->consume(&batch_ctx, single_column_batch));
}
return Status::OK();
@@ -170,7 +170,7 @@ class ScalarAggregateNode : public ExecNode {
auto thread_index = plan_->GetThreadIndex();
- if (ErrorIfNotOk(DoConsume(std::move(batch), thread_index))) return;
+ if (ErrorIfNotOk(DoConsume(ExecSpan(batch), thread_index))) return;
if (input_counter_.Increment()) {
ErrorIfNotOk(Finish());
@@ -360,7 +360,7 @@ class GroupByNode : public ExecNode {
const char* kind_name() const override { return "GroupByNode"; }
- Status Consume(ExecBatch batch) {
+ Status Consume(ExecSpan batch) {
util::tracing::Span span;
START_COMPUTE_SPAN(span, "Consume",
{{"group_by", ToStringExtra()},
@@ -376,11 +376,11 @@ class GroupByNode : public ExecNode {
RETURN_NOT_OK(InitLocalStateIfNeeded(state));
// Create a batch with key columns
- std::vector<Datum> keys(key_field_ids_.size());
+ std::vector<ExecValue> keys(key_field_ids_.size());
for (size_t i = 0; i < key_field_ids_.size(); ++i) {
- keys[i] = batch.values[key_field_ids_[i]];
+ keys[i] = batch[key_field_ids_[i]];
}
- ExecBatch key_batch(std::move(keys), batch.length);
+ ExecSpan key_batch(std::move(keys), batch.length);
// Create a batch with group ids
ARROW_ASSIGN_OR_RAISE(Datum id_batch, state->grouper->Consume(key_batch));
@@ -396,10 +396,8 @@ class GroupByNode : public ExecNode {
KernelContext kernel_ctx{ctx_};
kernel_ctx.SetState(state->agg_states[i].get());
- ARROW_ASSIGN_OR_RAISE(
- auto agg_batch,
- ExecBatch::Make({batch.values[agg_src_field_ids_[i]], id_batch}));
-
+ ExecSpan agg_batch({batch[agg_src_field_ids_[i]],
ExecValue(*id_batch.array())},
+ batch.length);
RETURN_NOT_OK(agg_kernels_[i]->resize(&kernel_ctx,
state->grouper->num_groups()));
RETURN_NOT_OK(agg_kernels_[i]->consume(&kernel_ctx, agg_batch));
}
@@ -419,7 +417,8 @@ class GroupByNode : public ExecNode {
}
ARROW_ASSIGN_OR_RAISE(ExecBatch other_keys,
state->grouper->GetUniques());
- ARROW_ASSIGN_OR_RAISE(Datum transposition,
state0->grouper->Consume(other_keys));
+ ARROW_ASSIGN_OR_RAISE(Datum transposition,
+ state0->grouper->Consume(ExecSpan(other_keys)));
state->grouper.reset();
for (size_t i = 0; i < agg_kernels_.size(); ++i) {
@@ -515,7 +514,7 @@ class GroupByNode : public ExecNode {
DCHECK_EQ(input, inputs_[0]);
- if (ErrorIfNotOk(Consume(std::move(batch)))) return;
+ if (ErrorIfNotOk(Consume(ExecSpan(batch)))) return;
if (input_counter_.Increment()) {
ErrorIfNotOk(OutputResult());
diff --git a/cpp/src/arrow/compute/exec_internal.h
b/cpp/src/arrow/compute/exec_internal.h
index afca289c20..8beff2a6c6 100644
--- a/cpp/src/arrow/compute/exec_internal.h
+++ b/cpp/src/arrow/compute/exec_internal.h
@@ -39,39 +39,6 @@ static constexpr int64_t kDefaultMaxChunksize =
std::numeric_limits<int64_t>::ma
namespace detail {
-/// \brief Break std::vector<Datum> into a sequence of ExecBatch for kernel
-/// execution
-class ARROW_EXPORT ExecBatchIterator {
- public:
- /// \brief Construct iterator and do basic argument validation
- ///
- /// \param[in] args the Datum argument, must be all array-like or scalar
- /// \param[in] max_chunksize the maximum length of each ExecBatch. Depending
- /// on the chunk layout of ChunkedArray.
- static Result<std::unique_ptr<ExecBatchIterator>> Make(
- std::vector<Datum> args, int64_t max_chunksize = kDefaultMaxChunksize);
-
- /// \brief Compute the next batch. Always returns at least one batch. Return
- /// false if the iterator is exhausted
- bool Next(ExecBatch* batch);
-
- int64_t length() const { return length_; }
-
- int64_t position() const { return position_; }
-
- int64_t max_chunksize() const { return max_chunksize_; }
-
- private:
- ExecBatchIterator(std::vector<Datum> args, int64_t length, int64_t
max_chunksize);
-
- std::vector<Datum> args_;
- std::vector<int> chunk_indexes_;
- std::vector<int64_t> chunk_positions_;
- int64_t position_;
- int64_t length_;
- int64_t max_chunksize_;
-};
-
/// \brief Break std::vector<Datum> into a sequence of non-owning
/// ExecSpan for kernel execution. The lifetime of the Datum vector
/// must be longer than the lifetime of this object
@@ -84,7 +51,11 @@ class ARROW_EXPORT ExecSpanIterator {
/// \param[in] batch the input ExecBatch
/// \param[in] max_chunksize the maximum length of each ExecSpan. Depending
/// on the chunk layout of ChunkedArray.
- Status Init(const ExecBatch& batch, int64_t max_chunksize =
kDefaultMaxChunksize);
+ /// \param[in] promote_if_all_scalars if all of the values are scalars,
+ /// return them in each ExecSpan as ArraySpan of length 1. This must be set
+ /// to true for Scalar and Vector executors but false for Aggregators
+ Status Init(const ExecBatch& batch, int64_t max_chunksize =
kDefaultMaxChunksize,
+ bool promote_if_all_scalars = true);
/// \brief Compute the next span by updating the state of the
/// previous span object. You must keep passing in the previous
@@ -110,6 +81,7 @@ class ARROW_EXPORT ExecSpanIterator {
bool initialized_ = false;
bool have_chunked_arrays_ = false;
bool have_all_scalars_ = false;
+ bool promote_if_all_scalars_ = true;
const std::vector<Datum>* args_;
std::vector<int> chunk_indexes_;
std::vector<int64_t> value_positions_;
diff --git a/cpp/src/arrow/compute/exec_test.cc
b/cpp/src/arrow/compute/exec_test.cc
index 573f4aee4a..c31309da93 100644
--- a/cpp/src/arrow/compute/exec_test.cc
+++ b/cpp/src/arrow/compute/exec_test.cc
@@ -658,137 +658,6 @@ TEST_F(TestPropagateNullsSpans, NullOutputTypeNoop) {
ASSERT_EQ(nullptr, result.buffers[0].data);
}
-// ----------------------------------------------------------------------
-// ExecBatchIterator
-
-class TestExecBatchIterator : public TestComputeInternals {
- public:
- void SetupIterator(std::vector<Datum> args,
- int64_t max_chunksize = kDefaultMaxChunksize) {
- ASSERT_OK_AND_ASSIGN(iterator_,
- ExecBatchIterator::Make(std::move(args),
max_chunksize));
- }
- void CheckIteration(const std::vector<Datum>& args, int chunksize,
- const std::vector<int>& ex_batch_sizes) {
- SetupIterator(args, chunksize);
- ExecBatch batch;
- int64_t position = 0;
- for (size_t i = 0; i < ex_batch_sizes.size(); ++i) {
- ASSERT_EQ(position, iterator_->position());
- ASSERT_TRUE(iterator_->Next(&batch));
- ASSERT_EQ(ex_batch_sizes[i], batch.length);
-
- for (size_t j = 0; j < args.size(); ++j) {
- switch (args[j].kind()) {
- case Datum::SCALAR:
- ASSERT_TRUE(args[j].scalar()->Equals(batch[j].scalar()));
- break;
- case Datum::ARRAY:
- AssertArraysEqual(*args[j].make_array()->Slice(position,
batch.length),
- *batch[j].make_array());
- break;
- case Datum::CHUNKED_ARRAY: {
- const ChunkedArray& carr = *args[j].chunked_array();
- if (batch.length == 0) {
- ASSERT_EQ(0, carr.length());
- } else {
- auto arg_slice = carr.Slice(position, batch.length);
- // The sliced ChunkedArrays should only ever be 1 chunk
- ASSERT_EQ(1, arg_slice->num_chunks());
- AssertArraysEqual(*arg_slice->chunk(0), *batch[j].make_array());
- }
- } break;
- default:
- break;
- }
- }
- position += ex_batch_sizes[i];
- }
- // Ensure that the iterator is exhausted
- ASSERT_FALSE(iterator_->Next(&batch));
-
- ASSERT_EQ(iterator_->length(), iterator_->position());
- }
-
- protected:
- std::unique_ptr<ExecBatchIterator> iterator_;
-};
-
-TEST_F(TestExecBatchIterator, Basics) {
- const int64_t length = 100;
-
- // Simple case with a single chunk
- std::vector<Datum> args = {Datum(GetInt32Array(length)),
Datum(GetFloat64Array(length)),
- Datum(std::make_shared<Int32Scalar>(3))};
- SetupIterator(args);
-
- ExecBatch batch;
- ASSERT_TRUE(iterator_->Next(&batch));
- ASSERT_EQ(3, batch.values.size());
- ASSERT_EQ(3, batch.num_values());
- ASSERT_EQ(length, batch.length);
-
- std::vector<TypeHolder> types = batch.GetTypes();
- ASSERT_EQ(types[0], int32());
- ASSERT_EQ(types[1], float64());
- ASSERT_EQ(types[2], int32());
-
- AssertArraysEqual(*args[0].make_array(), *batch[0].make_array());
- AssertArraysEqual(*args[1].make_array(), *batch[1].make_array());
- ASSERT_TRUE(args[2].scalar()->Equals(batch[2].scalar()));
-
- ASSERT_EQ(length, iterator_->position());
- ASSERT_FALSE(iterator_->Next(&batch));
-
- // Split into chunks of size 16
- CheckIteration(args, /*chunksize=*/16, {16, 16, 16, 16, 16, 16, 4});
-}
-
-TEST_F(TestExecBatchIterator, InputValidation) {
- std::vector<Datum> args = {Datum(GetInt32Array(10)),
Datum(GetInt32Array(9))};
- ASSERT_RAISES(Invalid, ExecBatchIterator::Make(args));
-
- args = {Datum(GetInt32Array(9)), Datum(GetInt32Array(10))};
- ASSERT_RAISES(Invalid, ExecBatchIterator::Make(args));
-
- args = {Datum(GetInt32Array(10))};
- ASSERT_OK_AND_ASSIGN(auto iterator, ExecBatchIterator::Make(args));
- ASSERT_EQ(10, iterator->max_chunksize());
-}
-
-TEST_F(TestExecBatchIterator, ChunkedArrays) {
- std::vector<Datum> args = {Datum(GetInt32Chunked({0, 20, 10})),
- Datum(GetInt32Chunked({15, 15})),
Datum(GetInt32Array(30)),
- Datum(std::make_shared<Int32Scalar>(5)),
- Datum(MakeNullScalar(boolean()))};
-
- CheckIteration(args, /*chunksize=*/10, {10, 5, 5, 10});
- CheckIteration(args, /*chunksize=*/20, {15, 5, 10});
- CheckIteration(args, /*chunksize=*/30, {15, 5, 10});
-}
-
-TEST_F(TestExecBatchIterator, ZeroLengthInputs) {
- auto carr = std::shared_ptr<ChunkedArray>(new ChunkedArray({}, int32()));
-
- auto CheckArgs = [&](const std::vector<Datum>& args) {
- auto iterator = ExecBatchIterator::Make(args).ValueOrDie();
- ExecBatch batch;
- ASSERT_FALSE(iterator->Next(&batch));
- };
-
- // Zero-length ChunkedArray with zero chunks
- std::vector<Datum> args = {Datum(carr)};
- CheckArgs(args);
-
- // Zero-length array
- args = {Datum(GetInt32Array(0))};
- CheckArgs(args);
-
- // ChunkedArray with single empty chunk
- args = {Datum(GetInt32Chunked({0}))};
- CheckArgs(args);
-}
-
// ----------------------------------------------------------------------
// ExecSpanIterator tests
diff --git a/cpp/src/arrow/compute/function_benchmark.cc
b/cpp/src/arrow/compute/function_benchmark.cc
index 791052358e..c7850b841c 100644
--- a/cpp/src/arrow/compute/function_benchmark.cc
+++ b/cpp/src/arrow/compute/function_benchmark.cc
@@ -174,11 +174,13 @@ void BM_ExecuteScalarKernelOnScalar(benchmark::State&
state) {
state.SetItemsProcessed(state.iterations() * N);
}
-void BM_ExecBatchIterator(benchmark::State& state) {
- // Measure overhead related to splitting ExecBatch into smaller ExecBatches
- // for parallelism or more optimal CPU cache affinity
+void BM_ExecSpanIterator(benchmark::State& state) {
+ // Measure overhead related to splitting ExecBatch into smaller non-owning
+ // ExecSpans for parallelism or more optimal CPU cache affinity
random::RandomArrayGenerator rag(kSeed);
+ using ::arrow::compute::detail::ExecSpanIterator;
+
const int64_t length = 1 << 20;
const int num_fields = 32;
@@ -187,22 +189,24 @@ void BM_ExecBatchIterator(benchmark::State& state) {
args[i] = rag.Int64(length, 0, 100)->data();
}
+ ExecBatch batch(args, length);
+
const int64_t blocksize = state.range(0);
+ ExecSpanIterator it;
for (auto _ : state) {
- std::unique_ptr<detail::ExecBatchIterator> it =
- *detail::ExecBatchIterator::Make(args, blocksize);
- ExecBatch batch;
- while (it->Next(&batch)) {
+ ABORT_NOT_OK(it.Init(batch, blocksize));
+ ExecSpan span;
+ while (it.Next(&span)) {
for (int i = 0; i < num_fields; ++i) {
- auto data = batch.values[i].array()->buffers[1]->data();
+ auto data = span[i].array.buffers[1].data;
benchmark::DoNotOptimize(data);
}
}
- benchmark::DoNotOptimize(batch);
+ benchmark::DoNotOptimize(span);
}
// Provides comparability across blocksizes by looking at the iterations per
// second. So 1000 iterations/second means that input splitting associated
- // with ExecBatchIterator takes up 1ms every time.
+ // with ExecSpanIterator takes up 1ms every time.
state.SetItemsProcessed(state.iterations());
}
@@ -211,7 +215,7 @@ BENCHMARK(BM_CastDispatchBaseline);
BENCHMARK(BM_AddDispatch);
BENCHMARK(BM_ExecuteScalarFunctionOnScalar);
BENCHMARK(BM_ExecuteScalarKernelOnScalar);
-BENCHMARK(BM_ExecBatchIterator)->RangeMultiplier(4)->Range(1024, 64 * 1024);
+BENCHMARK(BM_ExecSpanIterator)->RangeMultiplier(4)->Range(1024, 64 * 1024);
} // namespace compute
} // namespace arrow
diff --git a/cpp/src/arrow/compute/function_test.cc
b/cpp/src/arrow/compute/function_test.cc
index 94daa6baa9..ea151e81f0 100644
--- a/cpp/src/arrow/compute/function_test.cc
+++ b/cpp/src/arrow/compute/function_test.cc
@@ -153,7 +153,7 @@ TEST(FunctionOptions, Equality) {
}
}
-struct ExecBatch;
+struct ExecSpan;
TEST(Arity, Basics) {
auto nullary = Arity::Nullary();
@@ -310,10 +310,8 @@ Result<std::unique_ptr<KernelState>>
NoopInit(KernelContext*, const KernelInitAr
return nullptr;
}
-Status NoopConsume(KernelContext*, const ExecBatch&) { return Status::OK(); }
-Status NoopMerge(KernelContext*, const KernelState&, KernelState*) {
- return Status::OK();
-}
+Status NoopConsume(KernelContext*, const ExecSpan&) { return Status::OK(); }
+Status NoopMerge(KernelContext*, KernelState&&, KernelState*) { return
Status::OK(); }
Status NoopFinalize(KernelContext*, Datum*) { return Status::OK(); }
TEST(ScalarAggregateFunction, DispatchExact) {
diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h
index 5463a2de57..d8960308df 100644
--- a/cpp/src/arrow/compute/kernel.h
+++ b/cpp/src/arrow/compute/kernel.h
@@ -258,7 +258,7 @@ class ARROW_EXPORT OutputType {
///
/// This function SHOULD _not_ be used to check for arity, that is to be
/// performed one or more layers above.
- typedef Result<TypeHolder> (*Resolver)(KernelContext*, const
std::vector<TypeHolder>&);
+ using Resolver = Result<TypeHolder> (*)(KernelContext*, const
std::vector<TypeHolder>&);
/// \brief Output an exact type
OutputType(std::shared_ptr<DataType> type) // NOLINT implicit construction
@@ -500,7 +500,7 @@ struct Kernel {
/// endeavor to write into pre-allocated memory if they are able, though for
/// some kernels (e.g. in cases when a builder like StringBuilder) must be
/// employed this may not be possible.
-typedef Status (*ArrayKernelExec)(KernelContext*, const ExecSpan&,
ExecResult*);
+using ArrayKernelExec = Status (*)(KernelContext*, const ExecSpan&,
ExecResult*);
/// \brief Kernel data structure for implementations of ScalarFunction. In
/// addition to the members found in Kernel, contains the null handling
@@ -548,7 +548,7 @@ struct VectorKernel : public Kernel {
/// \brief Function for executing a stateful VectorKernel against a
/// ChunkedArray input. Does not need to be defined for all VectorKernels
- typedef Status (*ChunkedExec)(KernelContext*, const ExecBatch&, Datum* out);
+ using ChunkedExec = Status (*)(KernelContext*, const ExecBatch&, Datum* out);
VectorKernel() = default;
@@ -609,20 +609,17 @@ struct VectorKernel : public Kernel {
// ----------------------------------------------------------------------
// ScalarAggregateKernel (for ScalarAggregateFunction)
-using ScalarAggregateConsume = std::function<Status(KernelContext*, const
ExecBatch&)>;
-
-using ScalarAggregateMerge =
- std::function<Status(KernelContext*, KernelState&&, KernelState*)>;
-
+using ScalarAggregateConsume = Status (*)(KernelContext*, const ExecSpan&);
+using ScalarAggregateMerge = Status (*)(KernelContext*, KernelState&&,
KernelState*);
// Finalize returns Datum to permit multiple return values
-using ScalarAggregateFinalize = std::function<Status(KernelContext*, Datum*)>;
+using ScalarAggregateFinalize = Status (*)(KernelContext*, Datum*);
/// \brief Kernel data structure for implementations of
/// ScalarAggregateFunction. The four necessary components of an aggregation
/// kernel are the init, consume, merge, and finalize functions.
///
/// * init: creates a new KernelState for a kernel.
-/// * consume: processes an ExecBatch and updates the KernelState found in the
+/// * consume: processes an ExecSpan and updates the KernelState found in the
/// KernelContext.
/// * merge: combines one KernelState with another.
/// * finalize: produces the end result of the aggregation using the
@@ -634,16 +631,16 @@ struct ScalarAggregateKernel : public Kernel {
ScalarAggregateConsume consume, ScalarAggregateMerge
merge,
ScalarAggregateFinalize finalize)
: Kernel(std::move(sig), std::move(init)),
- consume(std::move(consume)),
- merge(std::move(merge)),
- finalize(std::move(finalize)) {}
+ consume(consume),
+ merge(merge),
+ finalize(finalize) {}
ScalarAggregateKernel(std::vector<InputType> in_types, OutputType out_type,
KernelInit init, ScalarAggregateConsume consume,
ScalarAggregateMerge merge, ScalarAggregateFinalize
finalize)
: ScalarAggregateKernel(
KernelSignature::Make(std::move(in_types), std::move(out_type)),
- std::move(init), std::move(consume), std::move(merge),
std::move(finalize)) {}
+ std::move(init), consume, merge, finalize) {}
/// \brief Merge a vector of KernelStates into a single KernelState.
/// The merged state will be returned and will be set on the KernelContext.
@@ -659,15 +656,12 @@ struct ScalarAggregateKernel : public Kernel {
// ----------------------------------------------------------------------
// HashAggregateKernel (for HashAggregateFunction)
-using HashAggregateResize = std::function<Status(KernelContext*, int64_t)>;
-
-using HashAggregateConsume = std::function<Status(KernelContext*, const
ExecBatch&)>;
-
-using HashAggregateMerge =
- std::function<Status(KernelContext*, KernelState&&, const ArrayData&)>;
+using HashAggregateResize = Status (*)(KernelContext*, int64_t);
+using HashAggregateConsume = Status (*)(KernelContext*, const ExecSpan&);
+using HashAggregateMerge = Status (*)(KernelContext*, KernelState&&, const
ArrayData&);
// Finalize returns Datum to permit multiple return values
-using HashAggregateFinalize = std::function<Status(KernelContext*, Datum*)>;
+using HashAggregateFinalize = Status (*)(KernelContext*, Datum*);
/// \brief Kernel data structure for implementations of
/// HashAggregateFunction. The four necessary components of an aggregation
@@ -675,7 +669,7 @@ using HashAggregateFinalize =
std::function<Status(KernelContext*, Datum*)>;
///
/// * init: creates a new KernelState for a kernel.
/// * resize: ensure that the KernelState can accommodate the specified number
of groups.
-/// * consume: processes an ExecBatch (which includes the argument as well
+/// * consume: processes an ExecSpan (which includes the argument as well
/// as an array of group identifiers) and updates the KernelState found in
the
/// KernelContext.
/// * merge: combines one KernelState with another.
@@ -688,10 +682,10 @@ struct HashAggregateKernel : public Kernel {
HashAggregateResize resize, HashAggregateConsume consume,
HashAggregateMerge merge, HashAggregateFinalize finalize)
: Kernel(std::move(sig), std::move(init)),
- resize(std::move(resize)),
- consume(std::move(consume)),
- merge(std::move(merge)),
- finalize(std::move(finalize)) {}
+ resize(resize),
+ consume(consume),
+ merge(merge),
+ finalize(finalize) {}
HashAggregateKernel(std::vector<InputType> in_types, OutputType out_type,
KernelInit init, HashAggregateConsume consume,
@@ -699,8 +693,7 @@ struct HashAggregateKernel : public Kernel {
HashAggregateFinalize finalize)
: HashAggregateKernel(
KernelSignature::Make(std::move(in_types), std::move(out_type)),
- std::move(init), std::move(resize), std::move(consume),
std::move(merge),
- std::move(finalize)) {}
+ std::move(init), resize, consume, merge, finalize) {}
HashAggregateResize resize;
HashAggregateConsume consume;
diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic.cc
b/cpp/src/arrow/compute/kernels/aggregate_basic.cc
index fec483318e..400ccbdf9f 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_basic.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_basic.cc
@@ -30,7 +30,7 @@ namespace internal {
namespace {
-Status AggregateConsume(KernelContext* ctx, const ExecBatch& batch) {
+Status AggregateConsume(KernelContext* ctx, const ExecSpan& batch) {
return checked_cast<ScalarAggregator*>(ctx->state())->Consume(ctx, batch);
}
@@ -71,16 +71,16 @@ namespace {
struct CountImpl : public ScalarAggregator {
explicit CountImpl(CountOptions options) : options(std::move(options)) {}
- Status Consume(KernelContext*, const ExecBatch& batch) override {
+ Status Consume(KernelContext*, const ExecSpan& batch) override {
if (options.mode == CountOptions::ALL) {
this->non_nulls += batch.length;
} else if (batch[0].is_array()) {
- const ArrayData& input = *batch[0].array();
+ const ArraySpan& input = batch[0].array;
const int64_t nulls = input.GetNullCount();
this->nulls += nulls;
this->non_nulls += input.length - nulls;
} else {
- const Scalar& input = *batch[0].scalar();
+ const Scalar& input = *batch[0].scalar;
this->nulls += !input.is_valid * batch.length;
this->non_nulls += input.is_valid * batch.length;
}
@@ -133,9 +133,9 @@ struct CountDistinctImpl : public ScalarAggregator {
explicit CountDistinctImpl(MemoryPool* memory_pool, CountOptions options)
: options(std::move(options)), memo_table_(new MemoTable(memory_pool,
0)) {}
- Status Consume(KernelContext*, const ExecBatch& batch) override {
+ Status Consume(KernelContext*, const ExecSpan& batch) override {
if (batch[0].is_array()) {
- const ArrayData& arr = *batch[0].array();
+ const ArraySpan& arr = batch[0].array;
this->has_nulls = arr.GetNullCount() > 0;
auto visit_null = []() { return Status::OK(); };
@@ -144,9 +144,8 @@ struct CountDistinctImpl : public ScalarAggregator {
return memo_table_->GetOrInsert(arg, &y);
};
RETURN_NOT_OK(VisitArraySpanInline<Type>(arr, visit_value, visit_null));
-
} else {
- const Scalar& input = *batch[0].scalar();
+ const Scalar& input = *batch[0].scalar;
this->has_nulls = !input.is_valid;
if (input.is_valid) {
@@ -156,7 +155,6 @@ struct CountDistinctImpl : public ScalarAggregator {
}
this->non_nulls = memo_table_->size();
-
return Status::OK();
}
@@ -292,11 +290,11 @@ struct ProductImpl : public ScalarAggregator {
product(MultiplyTraits<AccType>::one(*out_type)),
nulls_observed(false) {}
- Status Consume(KernelContext*, const ExecBatch& batch) override {
+ Status Consume(KernelContext*, const ExecSpan& batch) override {
if (batch[0].is_array()) {
- const auto& data = batch[0].array();
- this->count += data->length - data->GetNullCount();
- this->nulls_observed = this->nulls_observed || data->GetNullCount();
+ const ArraySpan& data = batch[0].array;
+ this->count += data.length - data.GetNullCount();
+ this->nulls_observed = this->nulls_observed || data.GetNullCount();
if (!options.skip_nulls && this->nulls_observed) {
// Short-circuit
@@ -304,14 +302,14 @@ struct ProductImpl : public ScalarAggregator {
}
internal::VisitArrayValuesInline<ArrowType>(
- *data,
+ data,
[&](typename TypeTraits<ArrowType>::CType value) {
this->product =
MultiplyTraits<AccType>::Multiply(*out_type, this->product,
value);
},
[] {});
} else {
- const auto& data = *batch[0].scalar();
+ const Scalar& data = *batch[0].scalar;
this->count += data.is_valid * batch.length;
this->nulls_observed = this->nulls_observed || !data.is_valid;
if (data.is_valid) {
@@ -461,23 +459,24 @@ void AddMinOrMaxAggKernel(ScalarAggregateFunction* func,
struct BooleanAnyImpl : public ScalarAggregator {
explicit BooleanAnyImpl(ScalarAggregateOptions options) :
options(std::move(options)) {}
- Status Consume(KernelContext*, const ExecBatch& batch) override {
+ Status Consume(KernelContext*, const ExecSpan& batch) override {
// short-circuit if seen a True already
if (this->any == true && this->count >= options.min_count) {
return Status::OK();
}
if (batch[0].is_scalar()) {
- const auto& scalar = *batch[0].scalar();
+ const Scalar& scalar = *batch[0].scalar;
this->has_nulls = !scalar.is_valid;
this->any = scalar.is_valid && checked_cast<const
BooleanScalar&>(scalar).value;
this->count += scalar.is_valid;
return Status::OK();
}
- const auto& data = *batch[0].array();
+ const ArraySpan& data = batch[0].array;
this->has_nulls = data.GetNullCount() > 0;
this->count += data.length - data.GetNullCount();
arrow::internal::OptionalBinaryBitBlockCounter counter(
- data.buffers[0], data.offset, data.buffers[1], data.offset,
data.length);
+ data.buffers[0].data, data.offset, data.buffers[1].data, data.offset,
+ data.length);
int64_t position = 0;
while (position < data.length) {
const auto block = counter.NextAndBlock();
@@ -527,7 +526,7 @@ Result<std::unique_ptr<KernelState>>
AnyInit(KernelContext*, const KernelInitArg
struct BooleanAllImpl : public ScalarAggregator {
explicit BooleanAllImpl(ScalarAggregateOptions options) :
options(std::move(options)) {}
- Status Consume(KernelContext*, const ExecBatch& batch) override {
+ Status Consume(KernelContext*, const ExecSpan& batch) override {
// short-circuit if seen a false already
if (this->all == false && this->count >= options.min_count) {
return Status::OK();
@@ -537,17 +536,18 @@ struct BooleanAllImpl : public ScalarAggregator {
return Status::OK();
}
if (batch[0].is_scalar()) {
- const auto& scalar = *batch[0].scalar();
+ const Scalar& scalar = *batch[0].scalar;
this->has_nulls = !scalar.is_valid;
this->count += scalar.is_valid;
this->all = !scalar.is_valid || checked_cast<const
BooleanScalar&>(scalar).value;
return Status::OK();
}
- const auto& data = *batch[0].array();
+ const ArraySpan& data = batch[0].array;
this->has_nulls = data.GetNullCount() > 0;
this->count += data.length - data.GetNullCount();
arrow::internal::OptionalBinaryBitBlockCounter counter(
- data.buffers[1], data.offset, data.buffers[0], data.offset,
data.length);
+ data.buffers[1].data, data.offset, data.buffers[0].data, data.offset,
+ data.length);
int64_t position = 0;
while (position < data.length) {
const auto block = counter.NextOrNotBlock();
@@ -605,7 +605,7 @@ struct IndexImpl : public ScalarAggregator {
}
}
- Status Consume(KernelContext* ctx, const ExecBatch& batch) override {
+ Status Consume(KernelContext* ctx, const ExecSpan& batch) override {
// short-circuit
if (index >= 0 || !options.value->is_valid) {
return Status::OK();
@@ -615,8 +615,8 @@ struct IndexImpl : public ScalarAggregator {
if (batch[0].is_scalar()) {
seen = batch.length;
- if (batch[0].scalar()->is_valid) {
- const ArgValue v =
internal::UnboxScalar<ArgType>::Unbox(*batch[0].scalar());
+ if (batch[0].scalar->is_valid) {
+ const ArgValue v =
internal::UnboxScalar<ArgType>::Unbox(*batch[0].scalar);
if (v == desired) {
index = 0;
return Status::Cancelled("Found");
@@ -625,12 +625,12 @@ struct IndexImpl : public ScalarAggregator {
return Status::OK();
}
- auto input = batch[0].array();
- seen = input->length;
+ const ArraySpan& input = batch[0].array;
+ seen = input.length;
int64_t i = 0;
ARROW_UNUSED(internal::VisitArrayValuesInline<ArgType>(
- *input,
+ input,
[&](ArgValue v) -> Status {
if (v == desired) {
index = i;
@@ -671,7 +671,7 @@ template <>
struct IndexImpl<NullType> : public ScalarAggregator {
explicit IndexImpl(IndexOptions, KernelState*) {}
- Status Consume(KernelContext*, const ExecBatch&) override { return
Status::OK(); }
+ Status Consume(KernelContext*, const ExecSpan&) override { return
Status::OK(); }
Status MergeFrom(KernelContext*, KernelState&&) override { return
Status::OK(); }
diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h
b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h
index c945e7f27f..bd2fe53460 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h
+++ b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h
@@ -24,6 +24,7 @@
#include "arrow/compute/kernels/aggregate_internal.h"
#include "arrow/compute/kernels/codegen_internal.h"
#include "arrow/compute/kernels/common.h"
+#include "arrow/compute/kernels/util_internal.h"
#include "arrow/util/align_util.h"
#include "arrow/util/bit_block_counter.h"
#include "arrow/util/decimal.h"
@@ -68,11 +69,11 @@ struct SumImpl : public ScalarAggregator {
SumImpl(std::shared_ptr<DataType> out_type, const ScalarAggregateOptions&
options_)
: out_type(out_type), options(options_) {}
- Status Consume(KernelContext*, const ExecBatch& batch) override {
+ Status Consume(KernelContext*, const ExecSpan& batch) override {
if (batch[0].is_array()) {
- const auto& data = batch[0].array();
- this->count += data->length - data->GetNullCount();
- this->nulls_observed = this->nulls_observed || data->GetNullCount();
+ const ArraySpan& data = batch[0].array;
+ this->count += data.length - data.GetNullCount();
+ this->nulls_observed = this->nulls_observed || data.GetNullCount();
if (!options.skip_nulls && this->nulls_observed) {
// Short-circuit
@@ -80,12 +81,12 @@ struct SumImpl : public ScalarAggregator {
}
if (is_boolean_type<ArrowType>::value) {
- this->sum += static_cast<SumCType>(BooleanArray(data).true_count());
+ this->sum += GetTrueCount(data);
} else {
- this->sum += SumArray<CType, SumCType, SimdLevel>(*data);
+ this->sum += SumArray<CType, SumCType, SimdLevel>(data);
}
} else {
- const auto& data = *batch[0].scalar();
+ const Scalar& data = *batch[0].scalar;
this->count += data.is_valid * batch.length;
this->nulls_observed = this->nulls_observed || !data.is_valid;
if (data.is_valid) {
@@ -126,8 +127,8 @@ struct NullImpl : public ScalarAggregator {
explicit NullImpl(const ScalarAggregateOptions& options_) :
options(options_) {}
- Status Consume(KernelContext*, const ExecBatch& batch) override {
- if (batch[0].is_scalar() || batch[0].array()->GetNullCount() > 0) {
+ Status Consume(KernelContext*, const ExecSpan& batch) override {
+ if (batch[0].is_scalar() || batch[0].array.GetNullCount() > 0) {
// If the batch is a scalar or an array with elements, set is_empty to
false
is_empty = false;
}
@@ -428,11 +429,11 @@ struct MinMaxImpl : public ScalarAggregator {
this->options.min_count = std::max<uint32_t>(1, this->options.min_count);
}
- Status Consume(KernelContext*, const ExecBatch& batch) override {
+ Status Consume(KernelContext*, const ExecSpan& batch) override {
if (batch[0].is_array()) {
- return ConsumeArray(ArrayType(batch[0].array()));
+ return ConsumeArray(batch[0].array);
}
- return ConsumeScalar(*batch[0].scalar());
+ return ConsumeScalar(*batch[0].scalar);
}
Status ConsumeScalar(const Scalar& scalar) {
@@ -448,9 +449,11 @@ struct MinMaxImpl : public ScalarAggregator {
return Status::OK();
}
- Status ConsumeArray(const ArrayType& arr) {
+ Status ConsumeArray(const ArraySpan& arr_span) {
StateType local;
+ ArrayType arr(arr_span.ToArrayData());
+
const auto null_count = arr.null_count();
local.has_nulls = null_count > 0;
this->count += arr.length() - null_count;
@@ -566,12 +569,12 @@ struct BooleanMinMaxImpl : public MinMaxImpl<BooleanType,
SimdLevel> {
using MinMaxImpl<BooleanType, SimdLevel>::MinMaxImpl;
using MinMaxImpl<BooleanType, SimdLevel>::options;
- Status Consume(KernelContext*, const ExecBatch& batch) override {
+ Status Consume(KernelContext*, const ExecSpan& batch) override {
if (ARROW_PREDICT_FALSE(batch[0].is_scalar())) {
- return ConsumeScalar(checked_cast<const
BooleanScalar&>(*batch[0].scalar()));
+ return ConsumeScalar(checked_cast<const
BooleanScalar&>(*batch[0].scalar));
}
StateType local;
- ArrayType arr(batch[0].array());
+ ArrayType arr(batch[0].array.ToArrayData());
const auto arr_length = arr.length();
const auto null_count = arr.null_count();
@@ -608,7 +611,7 @@ struct BooleanMinMaxImpl : public MinMaxImpl<BooleanType,
SimdLevel> {
};
struct NullMinMaxImpl : public ScalarAggregator {
- Status Consume(KernelContext*, const ExecBatch& batch) override { return
Status::OK(); }
+ Status Consume(KernelContext*, const ExecSpan& batch) override { return
Status::OK(); }
Status MergeFrom(KernelContext*, KernelState&& src) override { return
Status::OK(); }
diff --git a/cpp/src/arrow/compute/kernels/aggregate_internal.h
b/cpp/src/arrow/compute/kernels/aggregate_internal.h
index 07378e3ce8..8db74bfe0c 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_internal.h
+++ b/cpp/src/arrow/compute/kernels/aggregate_internal.h
@@ -91,7 +91,7 @@ struct MultiplyTraits<Type, enable_if_decimal<Type>> {
};
struct ScalarAggregator : public KernelState {
- virtual Status Consume(KernelContext* ctx, const ExecBatch& batch) = 0;
+ virtual Status Consume(KernelContext* ctx, const ExecSpan& batch) = 0;
virtual Status MergeFrom(KernelContext* ctx, KernelState&& src) = 0;
virtual Status Finalize(KernelContext* ctx, Datum* out) = 0;
};
@@ -142,7 +142,7 @@ struct GetSumType<T, enable_if_decimal<T>> {
template <typename ValueType, typename SumType, SimdLevel::type SimdLevel,
typename ValueFunc>
enable_if_t<std::is_floating_point<SumType>::value, SumType> SumArray(
- const ArrayData& data, ValueFunc&& func) {
+ const ArraySpan& data, ValueFunc&& func) {
using arrow::internal::VisitSetBitRunsVoid;
const int64_t data_size = data.length - data.GetNullCount();
@@ -182,7 +182,7 @@ enable_if_t<std::is_floating_point<SumType>::value,
SumType> SumArray(
};
const ValueType* values = data.GetValues<ValueType>(1);
- VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+ VisitSetBitRunsVoid(data.buffers[0].data, data.offset, data.length,
[&](int64_t pos, int64_t len) {
const ValueType* v = &values[pos];
// unsigned division by constant is cheaper than
signed one
@@ -219,12 +219,12 @@ enable_if_t<std::is_floating_point<SumType>::value,
SumType> SumArray(
template <typename ValueType, typename SumType, SimdLevel::type SimdLevel,
typename ValueFunc>
enable_if_t<!std::is_floating_point<SumType>::value, SumType> SumArray(
- const ArrayData& data, ValueFunc&& func) {
+ const ArraySpan& data, ValueFunc&& func) {
using arrow::internal::VisitSetBitRunsVoid;
SumType sum = 0;
const ValueType* values = data.GetValues<ValueType>(1);
- VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+ VisitSetBitRunsVoid(data.buffers[0].data, data.offset, data.length,
[&](int64_t pos, int64_t len) {
for (int64_t i = 0; i < len; ++i) {
sum += func(values[pos + i]);
@@ -234,7 +234,7 @@ enable_if_t<!std::is_floating_point<SumType>::value,
SumType> SumArray(
}
template <typename ValueType, typename SumType, SimdLevel::type SimdLevel>
-SumType SumArray(const ArrayData& data) {
+SumType SumArray(const ArraySpan& data) {
return SumArray<ValueType, SumType, SimdLevel>(
data, [](ValueType v) { return static_cast<SumType>(v); });
}
diff --git a/cpp/src/arrow/compute/kernels/aggregate_mode.cc
b/cpp/src/arrow/compute/kernels/aggregate_mode.cc
index 263eced9e3..7f379a0719 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_mode.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_mode.cc
@@ -416,31 +416,6 @@ struct Moder<InType, enable_if_decimal<InType>> {
SortModer<InType> impl;
};
-template <typename T>
-Status ScalarMode(KernelContext* ctx, const Scalar& scalar, ExecResult* out) {
- using CType = typename TypeTraits<T>::CType;
-
- const ModeOptions& options = ModeState::Get(ctx);
- if ((!options.skip_nulls && !scalar.is_valid) ||
- (static_cast<uint32_t>(scalar.is_valid) < options.min_count)) {
- return PrepareOutput<T>(/*n=*/0, ctx, *out->type(), out).status();
- }
-
- if (scalar.is_valid) {
- bool called = false;
- return Finalize<T>(ctx, *out->type(), out, [&]() {
- if (!called) {
- called = true;
- return std::pair<CType, uint64_t>(UnboxScalar<T>::Unbox(scalar), 1);
- }
- return std::pair<CType, uint64_t>(static_cast<CType>(0), kCountEOF);
- });
- }
- return Finalize<T>(ctx, *out->type(), out, []() {
- return std::pair<CType, uint64_t>(static_cast<CType>(0), kCountEOF);
- });
-}
-
Status CheckOptions(KernelContext* ctx) {
if (ctx->state() == nullptr) {
return Status::Invalid("Mode requires ModeOptions");
@@ -456,9 +431,6 @@ template <typename OutTypeUnused, typename InType>
struct ModeExecutor {
static Status Exec(KernelContext* ctx, const ExecSpan& batch, ExecResult*
out) {
RETURN_NOT_OK(CheckOptions(ctx));
- if (batch[0].is_scalar()) {
- return ScalarMode<InType>(ctx, *batch[0].scalar, out);
- }
return Moder<InType>().impl.Exec(ctx, batch, out);
}
};
diff --git a/cpp/src/arrow/compute/kernels/aggregate_quantile.cc
b/cpp/src/arrow/compute/kernels/aggregate_quantile.cc
index 921de15c31..32a5d127dc 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_quantile.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_quantile.cc
@@ -471,45 +471,6 @@ struct ExactQuantiler<InType,
enable_if_t<is_decimal_type<InType>::value>> {
SortQuantiler<InType> impl;
};
-template <typename T>
-Status ScalarQuantile(KernelContext* ctx, const Scalar& scalar, ExecResult*
out) {
- const QuantileOptions& options = QuantileState::Get(ctx);
- using CType = typename TypeTraits<T>::CType;
- ArrayData* output = out->array_data().get();
- output->length = options.q.size();
- auto out_type = IsDataPoint(options) ? scalar.type : float64();
- ARROW_ASSIGN_OR_RAISE(output->buffers[1],
- ctx->Allocate(output->length *
out_type->byte_width()));
-
- if (!scalar.is_valid || options.min_count > 1) {
- output->null_count = output->length;
- ARROW_ASSIGN_OR_RAISE(output->buffers[0],
ctx->AllocateBitmap(output->length));
- bit_util::SetBitsTo(output->buffers[0]->mutable_data(), /*offset=*/0,
output->length,
- false);
- if (IsDataPoint(options)) {
- CType* out_buffer = output->template GetMutableValues<CType>(1);
- std::fill(out_buffer, out_buffer + output->length, CType(0));
- } else {
- double* out_buffer = output->template GetMutableValues<double>(1);
- std::fill(out_buffer, out_buffer + output->length, 0.0);
- }
- return Status::OK();
- }
- output->null_count = 0;
- if (IsDataPoint(options)) {
- CType* out_buffer = output->template GetMutableValues<CType>(1);
- for (int64_t i = 0; i < output->length; i++) {
- out_buffer[i] = UnboxScalar<T>::Unbox(scalar);
- }
- } else {
- double* out_buffer = output->template GetMutableValues<double>(1);
- for (int64_t i = 0; i < output->length; i++) {
- out_buffer[i] = DataPointToDouble(UnboxScalar<T>::Unbox(scalar),
*scalar.type);
- }
- }
- return Status::OK();
-}
-
Status CheckQuantileOptions(KernelContext* ctx) {
if (ctx->state() == nullptr) {
return Status::Invalid("Quantile requires QuantileOptions");
@@ -531,9 +492,6 @@ template <typename OutputTypeUnused, typename InType>
struct QuantileExecutor {
static Status Exec(KernelContext* ctx, const ExecSpan& batch, ExecResult*
out) {
RETURN_NOT_OK(CheckQuantileOptions(ctx));
- if (batch[0].is_scalar()) {
- return ScalarQuantile<InType>(ctx, *batch[0].scalar, out);
- }
return ExactQuantiler<InType>().impl.Exec(ctx, batch[0].array, out);
}
};
diff --git a/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc
b/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc
index cfb7d3c3b3..0e00537e3c 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc
@@ -54,19 +54,19 @@ struct TDigestImpl : public ScalarAggregator {
double ToDouble(const Decimal128& value) const { return
value.ToDouble(decimal_scale); }
double ToDouble(const Decimal256& value) const { return
value.ToDouble(decimal_scale); }
- Status Consume(KernelContext*, const ExecBatch& batch) override {
+ Status Consume(KernelContext*, const ExecSpan& batch) override {
if (!this->all_valid) return Status::OK();
if (!options.skip_nulls && batch[0].null_count() > 0) {
this->all_valid = false;
return Status::OK();
}
if (batch[0].is_array()) {
- const ArrayData& data = *batch[0].array();
+ const ArraySpan& data = batch[0].array;
const CType* values = data.GetValues<CType>(1);
if (data.length > data.GetNullCount()) {
this->count += data.length - data.GetNullCount();
- VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+ VisitSetBitRunsVoid(data.buffers[0].data, data.offset, data.length,
[&](int64_t pos, int64_t len) {
for (int64_t i = 0; i < len; ++i) {
this->tdigest.NanAdd(ToDouble(values[pos +
i]));
@@ -74,8 +74,8 @@ struct TDigestImpl : public ScalarAggregator {
});
}
} else {
- const CType value = UnboxScalar<ArrowType>::Unbox(*batch[0].scalar());
- if (batch[0].scalar()->is_valid) {
+ const CType value = UnboxScalar<ArrowType>::Unbox(*batch[0].scalar);
+ if (batch[0].scalar->is_valid) {
this->count += 1;
for (int64_t i = 0; i < batch.length; i++) {
this->tdigest.NanAdd(ToDouble(value));
diff --git a/cpp/src/arrow/compute/kernels/aggregate_var_std.cc
b/cpp/src/arrow/compute/kernels/aggregate_var_std.cc
index 1f9a26960b..1693e95278 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_var_std.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_var_std.cc
@@ -54,19 +54,19 @@ struct VarStdState {
//
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Two-pass_algorithm
template <typename T = ArrowType>
enable_if_t<is_floating_type<T>::value || (sizeof(CType) > 4)> Consume(
- const ArrayType& array) {
- this->all_valid = array.null_count() == 0;
- int64_t count = array.length() - array.null_count();
+ const ArraySpan& array) {
+ this->all_valid = array.GetNullCount() == 0;
+ int64_t count = array.length - array.GetNullCount();
if (count == 0 || (!this->all_valid && !options.skip_nulls)) {
return;
}
using SumType = typename internal::GetSumType<T>::SumType;
- SumType sum = internal::SumArray<CType, SumType,
SimdLevel::NONE>(*array.data());
+ SumType sum = internal::SumArray<CType, SumType, SimdLevel::NONE>(array);
const double mean = ToDouble(sum) / count;
const double m2 = internal::SumArray<CType, double, SimdLevel::NONE>(
- *array.data(), [this, mean](CType value) {
+ array, [this, mean](CType value) {
const double v = ToDouble(value);
return (v - mean) * (v - mean);
});
@@ -79,29 +79,30 @@ struct VarStdState {
// int32/16/8: textbook one pass algorithm with integer arithmetic
template <typename T = ArrowType>
enable_if_t<is_integer_type<T>::value && (sizeof(CType) <= 4)> Consume(
- const ArrayType& array) {
+ const ArraySpan& array) {
// max number of elements that sum will not overflow int64 (2Gi int32
elements)
// for uint32: 0 <= sum < 2^63 (int64 >= 0)
// for int32: -2^62 <= sum < 2^62
constexpr int64_t max_length = 1ULL << (63 - sizeof(CType) * 8);
- this->all_valid = array.null_count() == 0;
+ this->all_valid = array.GetNullCount() == 0;
if (!this->all_valid && !options.skip_nulls) return;
int64_t start_index = 0;
- int64_t valid_count = array.length() - array.null_count();
+ int64_t valid_count = array.length - array.GetNullCount();
+ ArraySpan slice = array;
while (valid_count > 0) {
// process in chunks that overflow will never happen
- const auto slice = array.Slice(start_index, max_length);
- const int64_t count = slice->length() - slice->null_count();
- start_index += max_length;
+ slice.SetSlice(start_index + array.offset,
+ std::min(max_length, array.length - start_index));
+ const int64_t count = slice.length - slice.GetNullCount();
+ start_index += slice.length;
valid_count -= count;
if (count > 0) {
IntegerVarStd<ArrowType> var_std;
- const ArrayData& data = *slice->data();
- const CType* values = data.GetValues<CType>(1);
- VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+ const CType* values = slice.GetValues<CType>(1);
+ VisitSetBitRunsVoid(slice.buffers[0].data, slice.offset, slice.length,
[&](int64_t pos, int64_t len) {
for (int64_t i = 0; i < len; ++i) {
const auto value = values[pos + i];
@@ -166,12 +167,11 @@ struct VarStdImpl : public ScalarAggregator {
const VarianceOptions& options, VarOrStd return_type)
: out_type(out_type), state(decimal_scale, options),
return_type(return_type) {}
- Status Consume(KernelContext*, const ExecBatch& batch) override {
+ Status Consume(KernelContext*, const ExecSpan& batch) override {
if (batch[0].is_array()) {
- ArrayType array(batch[0].array());
- this->state.Consume(array);
+ this->state.Consume(batch[0].array);
} else {
- this->state.Consume(*batch[0].scalar(), batch.length);
+ this->state.Consume(*batch[0].scalar, batch.length);
}
return Status::OK();
}
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc
b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
index 49c88324a9..4537c32eb3 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
@@ -72,7 +72,7 @@ struct GroupedAggregator : KernelState {
virtual Status Resize(int64_t new_num_groups) = 0;
- virtual Status Consume(const ExecBatch& batch) = 0;
+ virtual Status Consume(const ExecSpan& batch) = 0;
virtual Status Merge(GroupedAggregator&& other, const ArrayData&
group_id_mapping) = 0;
@@ -92,7 +92,7 @@ Result<std::unique_ptr<KernelState>>
HashAggregateInit(KernelContext* ctx,
Status HashAggregateResize(KernelContext* ctx, int64_t num_groups) {
return checked_cast<GroupedAggregator*>(ctx->state())->Resize(num_groups);
}
-Status HashAggregateConsume(KernelContext* ctx, const ExecBatch& batch) {
+Status HashAggregateConsume(KernelContext* ctx, const ExecSpan& batch) {
return checked_cast<GroupedAggregator*>(ctx->state())->Consume(batch);
}
Status HashAggregateMerge(KernelContext* ctx, KernelState&& other,
@@ -167,17 +167,17 @@ struct GroupedValueTraits<BooleanType> {
template <typename Type, typename ConsumeValue, typename ConsumeNull>
typename arrow::internal::call_traits::enable_if_return<ConsumeValue,
void>::type
-VisitGroupedValues(const ExecBatch& batch, ConsumeValue&& valid_func,
+VisitGroupedValues(const ExecSpan& batch, ConsumeValue&& valid_func,
ConsumeNull&& null_func) {
- auto g = batch[1].array()->GetValues<uint32_t>(1);
+ auto g = batch[1].array.GetValues<uint32_t>(1);
if (batch[0].is_array()) {
VisitArrayValuesInline<Type>(
- *batch[0].array(),
+ batch[0].array,
[&](typename TypeTraits<Type>::CType val) { valid_func(*g++, val); },
[&]() { null_func(*g++); });
return;
}
- const auto& input = *batch[0].scalar();
+ const Scalar& input = *batch[0].scalar;
if (input.is_valid) {
const auto val = UnboxScalar<Type>::Unbox(input);
for (int64_t i = 0; i < batch.length; i++) {
@@ -192,16 +192,16 @@ VisitGroupedValues(const ExecBatch& batch, ConsumeValue&&
valid_func,
template <typename Type, typename ConsumeValue, typename ConsumeNull>
typename arrow::internal::call_traits::enable_if_return<ConsumeValue,
Status>::type
-VisitGroupedValues(const ExecBatch& batch, ConsumeValue&& valid_func,
+VisitGroupedValues(const ExecSpan& batch, ConsumeValue&& valid_func,
ConsumeNull&& null_func) {
- auto g = batch[1].array()->GetValues<uint32_t>(1);
+ auto g = batch[1].array.GetValues<uint32_t>(1);
if (batch[0].is_array()) {
return VisitArrayValuesInline<Type>(
- *batch[0].array(),
+ batch[0].array,
[&](typename GetViewType<Type>::T val) { return valid_func(*g++, val);
},
[&]() { return null_func(*g++); });
}
- const auto& input = *batch[0].scalar();
+ const Scalar& input = *batch[0].scalar;
if (input.is_valid) {
const auto val = UnboxScalar<Type>::Unbox(input);
for (int64_t i = 0; i < batch.length; i++) {
@@ -216,7 +216,7 @@ VisitGroupedValues(const ExecBatch& batch, ConsumeValue&&
valid_func,
}
template <typename Type, typename ConsumeValue>
-void VisitGroupedValuesNonNull(const ExecBatch& batch, ConsumeValue&&
valid_func) {
+void VisitGroupedValuesNonNull(const ExecSpan& batch, ConsumeValue&&
valid_func) {
VisitGroupedValues<Type>(batch, std::forward<ConsumeValue>(valid_func),
[](uint32_t) {});
}
@@ -251,20 +251,20 @@ struct GroupedCountImpl : public GroupedAggregator {
return Status::OK();
}
- Status Consume(const ExecBatch& batch) override {
+ Status Consume(const ExecSpan& batch) override {
auto counts = reinterpret_cast<int64_t*>(counts_.mutable_data());
- auto g_begin = batch[1].array()->GetValues<uint32_t>(1);
+ auto g_begin = batch[1].array.GetValues<uint32_t>(1);
if (options_.mode == CountOptions::ALL) {
for (int64_t i = 0; i < batch.length; ++i, ++g_begin) {
counts[*g_begin] += 1;
}
} else if (batch[0].is_array()) {
- const auto& input = batch[0].array();
+ const ArraySpan& input = batch[0].array;
if (options_.mode == CountOptions::ONLY_VALID) {
- if (input->type->id() != arrow::Type::NA) {
+ if (input.type->id() != arrow::Type::NA) {
arrow::internal::VisitSetBitRunsVoid(
- input->buffers[0], input->offset, input->length,
+ input.buffers[0].data, input.offset, input.length,
[&](int64_t offset, int64_t length) {
auto g = g_begin + offset;
for (int64_t i = 0; i < length; ++i, ++g) {
@@ -273,19 +273,19 @@ struct GroupedCountImpl : public GroupedAggregator {
});
}
} else { // ONLY_NULL
- if (input->type->id() == arrow::Type::NA) {
+ if (input.type->id() == arrow::Type::NA) {
for (int64_t i = 0; i < batch.length; ++i, ++g_begin) {
counts[*g_begin] += 1;
}
- } else if (input->MayHaveNulls()) {
- auto end = input->offset + input->length;
- for (int64_t i = input->offset; i < end; ++i, ++g_begin) {
- counts[*g_begin] += !bit_util::GetBit(input->buffers[0]->data(),
i);
+ } else if (input.MayHaveNulls()) {
+ auto end = input.offset + input.length;
+ for (int64_t i = input.offset; i < end; ++i, ++g_begin) {
+ counts[*g_begin] += !bit_util::GetBit(input.buffers[0].data, i);
}
}
}
} else {
- const auto& input = *batch[0].scalar();
+ const Scalar& input = *batch[0].scalar;
if (options_.mode == CountOptions::ONLY_VALID) {
for (int64_t i = 0; i < batch.length; ++i, ++g_begin) {
counts[*g_begin] += input.is_valid;
@@ -339,7 +339,7 @@ struct GroupedReducingAggregator : public GroupedAggregator
{
return Status::OK();
}
- Status Consume(const ExecBatch& batch) override {
+ Status Consume(const ExecSpan& batch) override {
CType* reduced = reduced_.mutable_data();
int64_t* counts = counts_.mutable_data();
uint8_t* no_nulls = no_nulls_.mutable_data();
@@ -457,7 +457,7 @@ struct GroupedNullImpl : public GroupedAggregator {
return Status::OK();
}
- Status Consume(const ExecBatch& batch) override { return Status::OK(); }
+ Status Consume(const ExecSpan& batch) override { return Status::OK(); }
Status Merge(GroupedAggregator&& raw_other,
const ArrayData& group_id_mapping) override {
@@ -747,13 +747,13 @@ struct GroupedVarStdImpl : public GroupedAggregator {
return value.ToDouble(decimal_scale_);
}
- Status Consume(const ExecBatch& batch) override { return ConsumeImpl(batch);
}
+ Status Consume(const ExecSpan& batch) override { return ConsumeImpl(batch); }
// float/double/int64/decimal: calculate `m2` (sum((X-mean)^2)) with
// `two pass algorithm` (see aggregate_var_std.cc)
template <typename T = Type>
enable_if_t<is_floating_type<T>::value || (sizeof(CType) > 4), Status>
ConsumeImpl(
- const ExecBatch& batch) {
+ const ExecSpan& batch) {
using SumType = typename internal::GetSumType<T>::SumType;
GroupedVarStdImpl<Type> state;
@@ -799,14 +799,14 @@ struct GroupedVarStdImpl : public GroupedAggregator {
// aggregate_var_std.cc)
template <typename T = Type>
enable_if_t<is_integer_type<T>::value && (sizeof(CType) <= 4), Status>
ConsumeImpl(
- const ExecBatch& batch) {
+ const ExecSpan& batch) {
// max number of elements that sum will not overflow int64 (2Gi int32
elements)
// for uint32: 0 <= sum < 2^63 (int64 >= 0)
// for int32: -2^62 <= sum < 2^62
constexpr int64_t max_length = 1ULL << (63 - sizeof(CType) * 8);
- const auto g = batch[1].array()->GetValues<uint32_t>(1);
- if (batch[0].is_scalar() && !batch[0].scalar()->is_valid) {
+ const auto g = batch[1].array.GetValues<uint32_t>(1);
+ if (batch[0].is_scalar() && !batch[0].scalar->is_valid) {
uint8_t* no_nulls = no_nulls_.mutable_data();
for (int64_t i = 0; i < batch.length; i++) {
bit_util::ClearBit(no_nulls, g[i]);
@@ -839,7 +839,7 @@ struct GroupedVarStdImpl : public GroupedAggregator {
uint8_t* other_no_nulls = state.no_nulls_.mutable_data();
if (batch[0].is_array()) {
- const auto& array = *batch[0].array();
+ const ArraySpan& array = batch[0].array;
const CType* values = array.GetValues<CType>(1);
auto visit_values = [&](int64_t pos, int64_t len) {
for (int64_t i = 0; i < len; ++i) {
@@ -851,7 +851,7 @@ struct GroupedVarStdImpl : public GroupedAggregator {
if (array.MayHaveNulls()) {
arrow::internal::BitRunReader reader(
- array.buffers[0]->data(), array.offset + start_index,
+ array.buffers[0].data, array.offset + start_index,
std::min(max_length, batch.length - start_index));
int64_t position = 0;
while (true) {
@@ -870,7 +870,7 @@ struct GroupedVarStdImpl : public GroupedAggregator {
visit_values(0, array.length);
}
} else {
- const auto value = UnboxScalar<Type>::Unbox(*batch[0].scalar());
+ const auto value = UnboxScalar<Type>::Unbox(*batch[0].scalar);
for (int64_t i = 0; i < std::min(max_length, batch.length -
start_index); ++i) {
const int64_t index = start_index + i;
var_std[g[index]].ConsumeOne(value);
@@ -1052,7 +1052,7 @@ struct GroupedTDigestImpl : public GroupedAggregator {
return value.ToDouble(decimal_scale_);
}
- Status Consume(const ExecBatch& batch) override {
+ Status Consume(const ExecSpan& batch) override {
int64_t* counts = counts_.mutable_data();
uint8_t* no_nulls = no_nulls_.mutable_data();
VisitGroupedValues<Type>(
@@ -1263,7 +1263,7 @@ struct GroupedMinMaxImpl final : public GroupedAggregator
{
return Status::OK();
}
- Status Consume(const ExecBatch& batch) override {
+ Status Consume(const ExecSpan& batch) override {
auto raw_mins = mins_.mutable_data();
auto raw_maxes = maxes_.mutable_data();
@@ -1370,7 +1370,7 @@ struct GroupedMinMaxImpl<Type,
return Status::OK();
}
- Status Consume(const ExecBatch& batch) override {
+ Status Consume(const ExecSpan& batch) override {
return VisitGroupedValues<Type>(
batch,
[&](uint32_t g, util::string_view val) {
@@ -1518,7 +1518,7 @@ struct GroupedNullMinMaxImpl final : public
GroupedAggregator {
return Status::OK();
}
- Status Consume(const ExecBatch& batch) override { return Status::OK(); }
+ Status Consume(const ExecSpan& batch) override { return Status::OK(); }
Status Merge(GroupedAggregator&& raw_other,
const ArrayData& group_id_mapping) override {
@@ -1663,18 +1663,18 @@ struct GroupedBooleanAggregator : public
GroupedAggregator {
return counts_.Append(added_groups, 0);
}
- Status Consume(const ExecBatch& batch) override {
+ Status Consume(const ExecSpan& batch) override {
uint8_t* reduced = reduced_.mutable_data();
uint8_t* no_nulls = no_nulls_.mutable_data();
int64_t* counts = counts_.mutable_data();
- auto g = batch[1].array()->GetValues<uint32_t>(1);
+ auto g = batch[1].array.GetValues<uint32_t>(1);
if (batch[0].is_array()) {
- const auto& input = *batch[0].array();
- const uint8_t* bitmap = input.buffers[1]->data();
+ const ArraySpan& input = batch[0].array;
+ const uint8_t* bitmap = input.buffers[1].data;
if (input.MayHaveNulls()) {
arrow::internal::VisitBitBlocksVoid(
- input.buffers[0]->data(), input.offset, input.length,
+ input.buffers[0].data, input.offset, input.length,
[&](int64_t position) {
counts[*g]++;
Impl::UpdateGroupWith(reduced, *g, bit_util::GetBit(bitmap,
position));
@@ -1694,7 +1694,7 @@ struct GroupedBooleanAggregator : public
GroupedAggregator {
});
}
} else {
- const auto& input = *batch[0].scalar();
+ const Scalar& input = *batch[0].scalar;
if (input.is_valid) {
const bool value = UnboxScalar<BooleanType>::Unbox(input);
for (int64_t i = 0; i < batch.length; i++) {
@@ -1828,7 +1828,7 @@ struct GroupedCountDistinctImpl : public
GroupedAggregator {
return Status::OK();
}
- Status Consume(const ExecBatch& batch) override {
+ Status Consume(const ExecSpan& batch) override {
ARROW_ASSIGN_OR_RAISE(std::ignore, grouper_->Consume(batch));
return Status::OK();
}
@@ -1839,8 +1839,8 @@ struct GroupedCountDistinctImpl : public
GroupedAggregator {
// Get (value, group_id) pairs, then translate the group IDs and consume
them
// ourselves
- ARROW_ASSIGN_OR_RAISE(auto uniques, other->grouper_->GetUniques());
- ARROW_ASSIGN_OR_RAISE(auto remapped_g,
+ ARROW_ASSIGN_OR_RAISE(ExecBatch uniques, other->grouper_->GetUniques());
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> remapped_g,
AllocateBuffer(uniques.length * sizeof(uint32_t),
pool_));
const auto* g_mapping = group_id_mapping.GetValues<uint32_t>(1);
@@ -1850,10 +1850,10 @@ struct GroupedCountDistinctImpl : public
GroupedAggregator {
for (int64_t i = 0; i < uniques.length; i++) {
g[i] = g_mapping[other_g[i]];
}
- uniques.values[1] =
- ArrayData::Make(uint32(), uniques.length, {nullptr,
std::move(remapped_g)});
- return Consume(std::move(uniques));
+ ExecSpan uniques_span(uniques);
+ uniques_span.values[1].array.SetBuffer(1, remapped_g);
+ return Consume(uniques_span);
}
Result<Datum> Finalize() override {
@@ -1990,7 +1990,7 @@ struct GroupedOneImpl final : public GroupedAggregator {
return Status::OK();
}
- Status Consume(const ExecBatch& batch) override {
+ Status Consume(const ExecSpan& batch) override {
auto raw_ones_ = ones_.mutable_data();
return VisitGroupedValues<Type>(
@@ -2049,7 +2049,7 @@ struct GroupedNullOneImpl : public GroupedAggregator {
return Status::OK();
}
- Status Consume(const ExecBatch& batch) override { return Status::OK(); }
+ Status Consume(const ExecSpan& batch) override { return Status::OK(); }
Status Merge(GroupedAggregator&& raw_other,
const ArrayData& group_id_mapping) override {
@@ -2089,7 +2089,7 @@ struct GroupedOneImpl<Type,
enable_if_t<is_base_binary_type<Type>::value ||
return Status::OK();
}
- Status Consume(const ExecBatch& batch) override {
+ Status Consume(const ExecSpan& batch) override {
return VisitGroupedValues<Type>(
batch,
[&](uint32_t g, util::string_view val) -> Status {
@@ -2292,17 +2292,17 @@ struct GroupedListImpl final : public GroupedAggregator
{
return Status::OK();
}
- Status Consume(const ExecBatch& batch) override {
- const auto& values_array_data = batch[0].array();
- int64_t num_values = values_array_data->length;
+ Status Consume(const ExecSpan& batch) override {
+ const ArraySpan& values_array_data = batch[0].array;
+ const ArraySpan& groups_array_data = batch[1].array;
- const auto& groups_array_data = batch[1].array();
- const auto* groups = groups_array_data->GetValues<uint32_t>(1, 0);
- DCHECK_EQ(groups_array_data->offset, 0);
+ int64_t num_values = values_array_data.length;
+ const auto* groups = groups_array_data.GetValues<uint32_t>(1, 0);
+ DCHECK_EQ(groups_array_data.offset, 0);
RETURN_NOT_OK(groups_.Append(groups, num_values));
- int64_t offset = values_array_data->offset;
- const uint8_t* values = values_array_data->buffers[1]->data();
+ int64_t offset = values_array_data.offset;
+ const uint8_t* values = values_array_data.buffers[1].data;
RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, offset, num_values));
if (batch[0].null_count() > 0) {
@@ -2310,7 +2310,7 @@ struct GroupedListImpl final : public GroupedAggregator {
has_nulls_ = true;
RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
}
- const uint8_t* values_bitmap = values_array_data->buffers[0]->data();
+ const uint8_t* values_bitmap = values_array_data.buffers[0].data;
RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
&values_bitmap_, values_bitmap, offset, num_values));
} else if (has_nulls_) {
@@ -2399,20 +2399,20 @@ struct GroupedListImpl<Type,
enable_if_t<is_base_binary_type<Type>::value ||
return Status::OK();
}
- Status Consume(const ExecBatch& batch) override {
- const auto& values_array_data = batch[0].array();
- int64_t num_values = values_array_data->length;
- int64_t offset = values_array_data->offset;
+ Status Consume(const ExecSpan& batch) override {
+ const ArraySpan& values_array_data = batch[0].array;
+ int64_t num_values = values_array_data.length;
+ int64_t offset = values_array_data.offset;
- const auto& groups_array_data = batch[1].array();
- const auto* groups = groups_array_data->GetValues<uint32_t>(1, 0);
- DCHECK_EQ(groups_array_data->offset, 0);
+ const ArraySpan& groups_array_data = batch[1].array;
+ const uint32_t* groups = groups_array_data.GetValues<uint32_t>(1, 0);
+ DCHECK_EQ(groups_array_data.offset, 0);
RETURN_NOT_OK(groups_.Append(groups, num_values));
if (batch[0].null_count() == 0) {
RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
} else {
- const uint8_t* values_bitmap = values_array_data->buffers[0]->data();
+ const uint8_t* values_bitmap = values_array_data.buffers[0].data;
RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
&values_bitmap_, values_bitmap, offset, num_values));
}
@@ -2553,9 +2553,9 @@ struct GroupedNullListImpl : public GroupedAggregator {
return counts_.Append(added_groups, 0);
}
- Status Consume(const ExecBatch& batch) override {
+ Status Consume(const ExecSpan& batch) override {
int64_t* counts = counts_.mutable_data();
- const auto* g_begin = batch[1].array()->GetValues<uint32_t>(1);
+ const auto* g_begin = batch[1].array.GetValues<uint32_t>(1);
for (int64_t i = 0; i < batch.length; ++i, ++g_begin) {
counts[*g_begin] += 1;
}
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
index 156e589612..f599f9abb6 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
@@ -77,7 +77,7 @@ Result<Datum> NaiveGroupBy(std::vector<Datum> arguments,
std::vector<Datum> keys
ARROW_ASSIGN_OR_RAISE(auto grouper, Grouper::Make(key_batch.GetTypes()));
- ARROW_ASSIGN_OR_RAISE(Datum id_batch, grouper->Consume(key_batch));
+ ARROW_ASSIGN_OR_RAISE(Datum id_batch, grouper->Consume(ExecSpan(key_batch)));
ARROW_ASSIGN_OR_RAISE(
auto groupings,
@@ -184,7 +184,7 @@ Result<Datum> GroupByUsingExecPlan(const
std::vector<Datum>& arguments,
const std::vector<Datum>& keys,
const std::vector<Aggregate>& aggregates,
bool use_threads, ExecContext* ctx) {
- using arrow::compute::detail::ExecBatchIterator;
+ using arrow::compute::detail::ExecSpanIterator;
FieldVector scan_fields(arguments.size() + keys.size());
std::vector<std::string> key_names(keys.size());
@@ -202,14 +202,15 @@ Result<Datum> GroupByUsingExecPlan(const
std::vector<Datum>& arguments,
inputs.reserve(inputs.size() + keys.size());
inputs.insert(inputs.end(), keys.begin(), keys.end());
- ARROW_ASSIGN_OR_RAISE(auto batch_iterator,
- ExecBatchIterator::Make(inputs,
ctx->exec_chunksize()));
+ ExecSpanIterator span_iterator;
+ ARROW_ASSIGN_OR_RAISE(auto batch, ExecBatch::Make(inputs));
+ RETURN_NOT_OK(span_iterator.Init(batch, ctx->exec_chunksize()));
BatchesWithSchema input;
input.schema = schema(std::move(scan_fields));
- ExecBatch batch;
- while (batch_iterator->Next(&batch)) {
- if (batch.length == 0) continue;
- input.batches.push_back(std::move(batch));
+ ExecSpan span;
+ while (span_iterator.Next(&span)) {
+ if (span.length == 0) continue;
+ input.batches.push_back(span.ToExecBatch());
}
return GroupByUsingExecPlan(input, key_names, aggregates, use_threads, ctx);
@@ -388,7 +389,7 @@ struct TestGrouper {
}
void ConsumeAndValidate(const ExecBatch& key_batch, Datum* ids = nullptr) {
- ASSERT_OK_AND_ASSIGN(Datum id_batch, grouper_->Consume(key_batch));
+ ASSERT_OK_AND_ASSIGN(Datum id_batch,
grouper_->Consume(ExecSpan(key_batch)));
ValidateConsume(key_batch, id_batch);
@@ -536,11 +537,13 @@ TEST(Grouper, DictKey) {
g.ExpectConsume({WithIndices(" [3, 1, null, 0, 2]")},
ArrayFromJSON(uint32(), "[3, 1, 4, 0, 2]"));
- EXPECT_RAISES_WITH_MESSAGE_THAT(
- NotImplemented, HasSubstr("Unifying differing dictionaries"),
- g.grouper_->Consume(*ExecBatch::Make({*DictionaryArray::FromArrays(
- ArrayFromJSON(int32(), "[0, 1]"),
- ArrayFromJSON(utf8(), R"(["different", "dictionary"])"))})));
+ auto dict_arr = *DictionaryArray::FromArrays(
+ ArrayFromJSON(int32(), "[0, 1]"),
+ ArrayFromJSON(utf8(), R"(["different", "dictionary"])"));
+ ExecSpan dict_span({*dict_arr->data()}, 2);
+ EXPECT_RAISES_WITH_MESSAGE_THAT(NotImplemented,
+ HasSubstr("Unifying differing dictionaries"),
+ g.grouper_->Consume(dict_span));
}
TEST(Grouper, StringInt64Key) {
diff --git a/cpp/src/arrow/compute/kernels/scalar_cast_numeric.cc
b/cpp/src/arrow/compute/kernels/scalar_cast_numeric.cc
index 61e8e90ddd..8d36cff6ae 100644
--- a/cpp/src/arrow/compute/kernels/scalar_cast_numeric.cc
+++ b/cpp/src/arrow/compute/kernels/scalar_cast_numeric.cc
@@ -194,13 +194,7 @@ Status CheckIntegerFloatTruncateImpl(const ExecValue&
input) {
const int64_t limit = FloatingIntegerBound<OutT>::value;
InScalarType bound_lower(IsSigned ? -limit : 0);
InScalarType bound_upper(limit);
-
- if (input.is_scalar()) {
- ArraySpan span(*input.scalar);
- return CheckIntegersInRange(span, bound_lower, bound_upper);
- } else {
- return CheckIntegersInRange(input.array, bound_lower, bound_upper);
- }
+ return CheckIntegersInRange(input.array, bound_lower, bound_upper);
}
Status CheckForIntegerToFloatingTruncation(const ExecValue& input, Type::type
out_type) {
diff --git a/cpp/src/arrow/compute/kernels/scalar_nested.cc
b/cpp/src/arrow/compute/kernels/scalar_nested.cc
index d3e72bea34..0b6118812a 100644
--- a/cpp/src/arrow/compute/kernels/scalar_nested.cc
+++ b/cpp/src/arrow/compute/kernels/scalar_nested.cc
@@ -99,7 +99,7 @@ struct ListElement {
const ArraySpan& list_values = list.child_data[0];
const offset_type* offsets = list.GetValues<offset_type>(1);
- IndexValueType index;
+ IndexValueType index = 0;
RETURN_NOT_OK(GetListElementIndex<IndexScalarType>(batch[1], &index));
std::unique_ptr<ArrayBuilder> builder;
@@ -138,7 +138,7 @@ struct FixedSizeListElement {
const ArraySpan& list = batch[0].array;
const ArraySpan& list_values = list.child_data[0];
- IndexValueType index;
+ IndexValueType index = 0;
RETURN_NOT_OK(GetListElementIndex<IndexScalarType>(batch[1], &index));
std::unique_ptr<ArrayBuilder> builder;
@@ -446,8 +446,7 @@ struct MapLookupFunctor {
const int32_t item_size = offsets[map_index + 1] - offsets[map_index];
// Adjust the keys view to just the map slot that we are about to
search
- map_keys.SetOffset(item_offset);
- map_keys.length = item_size;
+ map_keys.SetSlice(item_offset, item_size);
bool found_at_least_one_key = false;
RETURN_NOT_OK(FindMatchingIndices(map_keys, query_key, [&](int64_t
key_index) {
@@ -477,8 +476,7 @@ struct MapLookupFunctor {
const int32_t item_size = offsets[map_index + 1] - offsets[map_index];
// Adjust the keys view to just the map slot that we are about to
search
- map_keys.SetOffset(item_offset);
- map_keys.length = item_size;
+ map_keys.SetSlice(item_offset, item_size);
ARROW_ASSIGN_OR_RAISE(
int64_t item_index,
diff --git a/cpp/src/arrow/compute/row/grouper.cc
b/cpp/src/arrow/compute/row/grouper.cc
index 28ebc9f196..d6d00c2cce 100644
--- a/cpp/src/arrow/compute/row/grouper.cc
+++ b/cpp/src/arrow/compute/row/grouper.cc
@@ -99,16 +99,10 @@ struct GrouperImpl : Grouper {
return std::move(impl);
}
- Result<Datum> Consume(const ExecBatch& batch) override {
+ Result<Datum> Consume(const ExecSpan& batch) override {
std::vector<int32_t> offsets_batch(batch.length + 1);
for (int i = 0; i < batch.num_values(); ++i) {
- ExecValue value;
- if (batch[i].is_array()) {
- value.SetArray(*batch[i].array());
- } else {
- value.SetScalar(batch[i].scalar().get());
- }
- encoders_[i]->AddLength(value, batch.length, offsets_batch.data());
+ encoders_[i]->AddLength(batch[i], batch.length, offsets_batch.data());
}
int32_t total_length = 0;
@@ -126,13 +120,7 @@ struct GrouperImpl : Grouper {
}
for (int i = 0; i < batch.num_values(); ++i) {
- ExecValue value;
- if (batch[i].is_array()) {
- value.SetArray(*batch[i].array());
- } else {
- value.SetScalar(batch[i].scalar().get());
- }
- RETURN_NOT_OK(encoders_[i]->Encode(value, batch.length,
key_buf_ptrs.data()));
+ RETURN_NOT_OK(encoders_[i]->Encode(batch[i], batch.length,
key_buf_ptrs.data()));
}
TypedBufferBuilder<uint32_t> group_ids_batch(ctx_->memory_pool());
@@ -281,11 +269,11 @@ struct GrouperFastImpl : Grouper {
~GrouperFastImpl() { map_.cleanup(); }
- Result<Datum> Consume(const ExecBatch& batch) override {
+ Result<Datum> Consume(const ExecSpan& batch) override {
// ARROW-14027: broadcast scalar arguments for now
for (int i = 0; i < batch.num_values(); i++) {
- if (batch.values[i].is_scalar()) {
- ExecBatch expanded = batch;
+ if (batch[i].is_scalar()) {
+ ExecBatch expanded = batch.ToExecBatch();
for (int j = i; j < expanded.num_values(); j++) {
if (expanded.values[j].is_scalar()) {
ARROW_ASSIGN_OR_RAISE(
@@ -294,20 +282,20 @@ struct GrouperFastImpl : Grouper {
ctx_->memory_pool()));
}
}
- return ConsumeImpl(expanded);
+ return ConsumeImpl(ExecSpan(expanded));
}
}
return ConsumeImpl(batch);
}
- Result<Datum> ConsumeImpl(const ExecBatch& batch) {
+ Result<Datum> ConsumeImpl(const ExecSpan& batch) {
int64_t num_rows = batch.length;
int num_columns = batch.num_values();
// Process dictionaries
for (int icol = 0; icol < num_columns; ++icol) {
if (key_types_[icol].id() == Type::DICTIONARY) {
- auto data = batch[icol].array();
- auto dict = MakeArray(data->dictionary);
+ const ArraySpan& data = batch[icol].array;
+ auto dict = MakeArray(data.dictionary().ToArrayData());
if (dictionaries_[icol]) {
if (!dictionaries_[icol]->Equals(dict)) {
// TODO(bkietz) unify if necessary. For now, just error if any
batch's
@@ -331,16 +319,16 @@ struct GrouperFastImpl : Grouper {
// Skip if the key's type is NULL
if (key_types_[icol].id() != Type::NA) {
- if (batch[icol].array()->buffers[0] != NULLPTR) {
- non_nulls = batch[icol].array()->buffers[0]->data();
+ if (batch[icol].array.buffers[0].data != NULLPTR) {
+ non_nulls = batch[icol].array.buffers[0].data;
}
- fixedlen = batch[icol].array()->buffers[1]->data();
+ fixedlen = batch[icol].array.buffers[1].data;
if (!col_metadata_[icol].is_fixed_length) {
- varlen = batch[icol].array()->buffers[2]->data();
+ varlen = batch[icol].array.buffers[2].data;
}
}
- int64_t offset = batch[icol].array()->offset;
+ int64_t offset = batch[icol].array.offset;
auto col_base = KeyColumnArray(col_metadata_[icol], offset + num_rows,
non_nulls,
fixedlen, varlen);
diff --git a/cpp/src/arrow/compute/row/grouper.h
b/cpp/src/arrow/compute/row/grouper.h
index 4c10679457..ce09adf09b 100644
--- a/cpp/src/arrow/compute/row/grouper.h
+++ b/cpp/src/arrow/compute/row/grouper.h
@@ -42,7 +42,7 @@ class ARROW_EXPORT Grouper {
/// Consume a batch of keys, producing the corresponding group ids as an
integer array.
/// Currently only uint32 indices will be produced, eventually the bit width
will only
/// be as wide as necessary.
- virtual Result<Datum> Consume(const ExecBatch& batch) = 0;
+ virtual Result<Datum> Consume(const ExecSpan& batch) = 0;
/// Get current unique keys. May be called multiple times.
virtual Result<ExecBatch> GetUniques() = 0;
diff --git a/cpp/src/arrow/dataset/partition.cc
b/cpp/src/arrow/dataset/partition.cc
index a210c947a3..26abc10e6b 100644
--- a/cpp/src/arrow/dataset/partition.cc
+++ b/cpp/src/arrow/dataset/partition.cc
@@ -161,10 +161,10 @@ Result<Partitioning::PartitionedBatches>
KeyValuePartitioning::Partition(
return PartitionedBatches{{batch}, {compute::literal(true)}};
}
- // assemble an ExecBatch of the key columns
- compute::ExecBatch key_batch({}, batch->num_rows());
+ // assemble an ExecSpan of the key columns
+ compute::ExecSpan key_batch({}, batch->num_rows());
for (int i : key_indices) {
- key_batch.values.emplace_back(batch->column_data(i));
+ key_batch.values.emplace_back(ArraySpan(*batch->column_data(i)));
}
ARROW_ASSIGN_OR_RAISE(auto grouper,
compute::Grouper::Make(key_batch.GetTypes()));