Hi all, I am doing some investigations of the AsOfJoinNode, and consequently have come across some strange behavior when experimenting with the ExecutionContext and in-memory / file streaming source nodes.
Our AsOfJoin algorithm requires that the input be in chronological order with respect to one of our columns, and as such, the order of batches matters for correctness. I found the following results and was wondering how such behavior can be explained: When using in-memory objects (for example, reading a feather file into a table object, or creating a table object from scratch), and instantiating our execution context with a default memory pool and a `arrow::internal::GetCPUThreadPool()` executor, batches do not come in order. However, when instantiated with `nullptr` as the executor, batches do come in order. This sort of makes sense, since there is no guarantee of in-order batches if there are multiple threads running. When using file-streaming objects (see implementation using RecordBatchFileReader below), the batches are in order regardless of our executor. I'm curious why we don't have the same problems using the CPUThreadPool for our ExecutionContext when using the RecordBatchFileReader approach, and was wondering if anyone could shed some light on what might be going on. Ivan ``` // Wrapper to enable the use of RecordBatchFileReaders as RecordBatchReaders class RecordBatchFileReaderWrapper : public arrow::ipc::RecordBatchReader { shared_ptr<arrow::ipc::RecordBatchFileReader> _reader; int _next; public: virtual ~RecordBatchFileReaderWrapper() {} RecordBatchFileReaderWrapper(shared_ptr<arrow::ipc::RecordBatchFileReader> reader) : _reader(reader),_next(0) {} virtual arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch> *batch) { //cerr << "ReadNext _next=" << _next << "\n"; if(_next<_reader->num_record_batches()) { ARROW_ASSIGN_OR_RAISE(*batch,_reader->ReadRecordBatch(_next++)); //cerr << "\t --> " << (*batch)->num_rows() << "\n"; } else { batch->reset(); //cerr << "\t --> EOF\n"; } return arrow::Status::OK(); } virtual std::shared_ptr<arrow::Schema> schema() const { return _reader->schema(); } }; optional<arrow::compute::ExecNode*> make_arrow_ipc_reader_node( shared_ptr<arrow::compute::ExecPlan> &plan, shared_ptr<arrow::fs::FileSystem> &fs, const string &filename) { //TODO: error checking shared_ptr<arrow::io::RandomAccessFile> input=*fs->OpenInputFile(filename); shared_ptr<arrow::ipc::RecordBatchFileReader> in_reader=* arrow::ipc::RecordBatchFileReader::Open(input); shared_ptr<RecordBatchFileReaderWrapper> reader(new RecordBatchFileReaderWrapper(in_reader)); auto schema=reader->schema(); auto batch_gen=*arrow::compute::MakeReaderGenerator(std::move(reader), arrow::internal::GetCpuThreadPool()); //cerr << "create source("<<filename<<")\n"; return *arrow::compute::MakeExecNode( "source", // registered type plan.get(), // execution plan {}, // inputs arrow::compute::SourceNodeOptions(make_shared<arrow::Schema>(*schema),batch_gen)); //options } ```