GitHub user moba15 edited a discussion: Arrow-Datasets C++: Scanner::Scan 
visitor is executing serially despite use_threads=true

Hey together, 
I am working with the Apache Arrow C++ Dataset API to scan multiple Parquet 
files. My goal is to process RecordBatches in parallel using a callback 
function without materializing the entire table at once.

Following the Dataset Tutorial, I am using the Scan method. According to the 
documentation:

    If multiple threads are used (via use_threads), the visitor will be invoked 
from those threads and is responsible for any synchronization.

However, in my implementation, the visitor function is called strictly in 
order—one call only begins after the previous one finishes—even though 
`use_threads` is set to `true`. I have also tried `ScanBatchesUnordered`, but I 
am seeing similar serial behavior.

Minimal Working Example:

```C++
#include <iostream>
#include <memory>
#include <arrow/api.h>
#include <arrow/compute/api.h>
#include <arrow/dataset/api.h>

#include <thread>

arrow::Status ProcessBatch(const arrow::dataset::TaggedRecordBatch 
&tagged_batch) {
    std::cerr << "ThreadId " << std::this_thread::get_id() << " got batch with "
            << tagged_batch.record_batch->num_rows() << " rows at "
            << std::chrono::system_clock::now() << "\n";
    //Wait: simulate processing time
    std::this_thread::sleep_for(std::chrono::seconds(5));
    return arrow::Status::OK();
}

arrow::Status ScanWholeDataset(
    const std::shared_ptr<arrow::fs::FileSystem> &filesystem,
    const std::shared_ptr<arrow::dataset::FileFormat> &format, const 
std::string &base_dir) {
    // Create custom scan options
    auto customOption = std::make_shared<arrow::dataset::ScanOptions>();
    customOption->use_threads = true;
    customOption->fragment_readahead = 20;

    arrow::fs::FileSelector selector;
    selector.base_dir = base_dir;
    selector.recursive = true;

    ARROW_ASSIGN_OR_RAISE(
        auto factory,
        arrow::dataset::FileSystemDatasetFactory::Make(filesystem, selector, 
format, arrow::dataset::
            FileSystemFactoryOptions()));

    ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
    arrow::dataset::ScannerBuilder scan_builder(dataset, customOption);

    ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder.Finish());
    //Call Scan method with callback function
    scanner->Scan(ProcessBatch);
    return arrow::Status::OK();
}

arrow::Status Test() {
    ARROW_RETURN_NOT_OK(arrow::compute::Initialize());
    
    std::string base_path = "xxx";
    std::string root_path;
    std::string uri = "file://xxx";
    ARROW_ASSIGN_OR_RAISE(auto fs, arrow::fs::FileSystemFromUri(uri, 
&root_path));
    auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();

    ARROW_RETURN_NOT_OK(ScanWholeDataset(fs, format, base_path));

    return arrow::Status::OK();
}

int main() {
    auto status = Test();
    if (!status.ok()) {
        std::cerr << "Error: " << status.message() << std::endl;
        return 1;
    }
    return 0;
}
```

Observed Behavior

When running this against 6 Parquet files (approx. 5 GiB total), the timestamps 
in the output show a perfect 5-second gap between batches.

```
ThreadId 133083219080896 got batch with 122880 rows at 2026-03-20 
09:25:04.164492317 
ThreadId 133083219080896 got batch with 122880 rows at 2026-03-20 
09:25:09.165123649 
ThreadId 133083219080896 got batch with 122880 rows at 2026-03-20 
09:25:14.167036139
```

Apache arrow 22

Am I missing a configuration step in ScanOptions or ScannerBuilder to actually 
trigger parallel execution of the visitor? Is there a preferred way to handle 
parallel callbacks in the Dataset API?  
  
Thanks for your help

GitHub link: https://github.com/apache/arrow/discussions/49568

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to