If you are using a source node (which it appears you are) then it will be creating new thread tasks for each batch. So, in theory, these could get out of order. My guess is that the file reader is slow enough that by the time you load batch N from disk and decode it, you have a pretty good chance that compute has finished on batch N-1.
If you run your program on a stressed CPU with limited cores you may find that you still receive batches out of order. Also note that your file reader is only going to read a single batch at a time and will not start reading the next batch until it has scheduled the compute task for the previous batch. This tends to be pretty inefficient, especially for a remote filesystem, but even for an HDD you typically want some parallelism in your read path. You might try using the scanner, although I know you will get out of order data in that case. -Weston On Tue, Jul 19, 2022 at 6:08 AM Ivan Chau <ivan.m.c...@gmail.com> wrote: > > Hi all, > > I am doing some investigations of the AsOfJoinNode, and consequently have > come across some strange behavior when experimenting with the > ExecutionContext and in-memory / file streaming source nodes. > > Our AsOfJoin algorithm requires that the input be in chronological order > with respect to one of our columns, and as such, the order of batches > matters for correctness. I found the following results and was wondering > how such behavior can be explained: > > When using in-memory objects (for example, reading a feather file into a > table object, or creating a table object from scratch), and instantiating > our execution context with a default memory pool and a > `arrow::internal::GetCPUThreadPool()` executor, batches do not come in > order. However, when instantiated with `nullptr` as the executor, batches > do come in order. This sort of makes sense, since there is no guarantee of > in-order batches if there are multiple threads running. > > When using file-streaming objects (see implementation using > RecordBatchFileReader below), the batches are in order regardless of our > executor. > > I'm curious why we don't have the same problems using the CPUThreadPool for > our ExecutionContext when using the RecordBatchFileReader approach, and was > wondering if anyone could shed some light on what might be going on. > > Ivan > > ``` > // Wrapper to enable the use of RecordBatchFileReaders as RecordBatchReaders > class RecordBatchFileReaderWrapper : public arrow::ipc::RecordBatchReader > { > shared_ptr<arrow::ipc::RecordBatchFileReader> _reader; > int _next; > public: > virtual ~RecordBatchFileReaderWrapper() {} > RecordBatchFileReaderWrapper(shared_ptr<arrow::ipc::RecordBatchFileReader> > reader) > : _reader(reader),_next(0) {} > > virtual arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch> *batch) { > //cerr << "ReadNext _next=" << _next << "\n"; > if(_next<_reader->num_record_batches()) { > ARROW_ASSIGN_OR_RAISE(*batch,_reader->ReadRecordBatch(_next++)); > //cerr << "\t --> " << (*batch)->num_rows() << "\n"; > } else { > batch->reset(); > //cerr << "\t --> EOF\n"; > } > > return arrow::Status::OK(); > } > > virtual std::shared_ptr<arrow::Schema> schema() const { > return _reader->schema(); > } > }; > > optional<arrow::compute::ExecNode*> make_arrow_ipc_reader_node( > shared_ptr<arrow::compute::ExecPlan> &plan, > shared_ptr<arrow::fs::FileSystem> &fs, > const string &filename) { > //TODO: error checking > shared_ptr<arrow::io::RandomAccessFile> input=*fs->OpenInputFile(filename); > shared_ptr<arrow::ipc::RecordBatchFileReader> in_reader=* > arrow::ipc::RecordBatchFileReader::Open(input); > shared_ptr<RecordBatchFileReaderWrapper> reader(new > RecordBatchFileReaderWrapper(in_reader)); > > auto schema=reader->schema(); > auto batch_gen=*arrow::compute::MakeReaderGenerator(std::move(reader), > arrow::internal::GetCpuThreadPool()); > > //cerr << "create source("<<filename<<")\n"; > return *arrow::compute::MakeExecNode( > "source", // registered type > plan.get(), // execution plan > {}, // inputs > arrow::compute::SourceNodeOptions(make_shared<arrow::Schema>(*schema),batch_gen)); > //options > } > ```