Hi all,

I am doing some investigations of the AsOfJoinNode, and consequently have
come across some strange behavior when experimenting with the
ExecutionContext and in-memory / file streaming source nodes.

Our AsOfJoin algorithm requires that the input be in chronological order
with respect to one of our columns, and as such, the order of batches
matters for correctness. I found the following results and was wondering
how such behavior can be explained:

When using in-memory objects (for example, reading a feather file into a
table object, or creating a table object from scratch), and instantiating
our execution context with a default memory pool and a
`arrow::internal::GetCPUThreadPool()` executor, batches do not come in
order. However, when instantiated with `nullptr` as the executor, batches
do come in order. This sort of makes sense, since there is no guarantee of
in-order batches if there are multiple threads running.

When using file-streaming objects (see implementation using
RecordBatchFileReader below), the batches are in order regardless of our
executor.

I'm curious why we don't have the same problems using the CPUThreadPool for
our ExecutionContext when using the RecordBatchFileReader approach, and was
wondering if anyone could shed some light on what might be going on.

Ivan

```
// Wrapper to enable the use of RecordBatchFileReaders as RecordBatchReaders
class RecordBatchFileReaderWrapper : public arrow::ipc::RecordBatchReader
{
shared_ptr<arrow::ipc::RecordBatchFileReader> _reader;
int _next;
public:
virtual ~RecordBatchFileReaderWrapper() {}
RecordBatchFileReaderWrapper(shared_ptr<arrow::ipc::RecordBatchFileReader>
reader)
: _reader(reader),_next(0) {}

virtual arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch> *batch) {
//cerr << "ReadNext _next=" << _next << "\n";
if(_next<_reader->num_record_batches()) {
ARROW_ASSIGN_OR_RAISE(*batch,_reader->ReadRecordBatch(_next++));
//cerr << "\t --> " << (*batch)->num_rows() << "\n";
} else {
batch->reset();
//cerr << "\t --> EOF\n";
}

return arrow::Status::OK();
}

virtual std::shared_ptr<arrow::Schema> schema() const {
return _reader->schema();
}
};

optional<arrow::compute::ExecNode*> make_arrow_ipc_reader_node(
shared_ptr<arrow::compute::ExecPlan> &plan,
shared_ptr<arrow::fs::FileSystem> &fs,
const string &filename) {
//TODO: error checking
shared_ptr<arrow::io::RandomAccessFile> input=*fs->OpenInputFile(filename);
shared_ptr<arrow::ipc::RecordBatchFileReader> in_reader=*
arrow::ipc::RecordBatchFileReader::Open(input);
shared_ptr<RecordBatchFileReaderWrapper> reader(new
RecordBatchFileReaderWrapper(in_reader));

auto schema=reader->schema();
auto batch_gen=*arrow::compute::MakeReaderGenerator(std::move(reader),
arrow::internal::GetCpuThreadPool());

//cerr << "create source("<<filename<<")\n";
return *arrow::compute::MakeExecNode(
"source", // registered type
plan.get(), // execution plan
{}, // inputs
arrow::compute::SourceNodeOptions(make_shared<arrow::Schema>(*schema),batch_gen));
//options
}
```

Reply via email to