Hi, I also have a related question: could you recommend a way to get the batches in order when using a source node? If necessary, a way that involves changing or wrapping the source node's code is acceptable.
Yaron. ________________________________ From: Li Jin <ice.xell...@gmail.com> Sent: Tuesday, July 19, 2022 10:59 AM To: dev@arrow.apache.org <dev@arrow.apache.org> Subject: Re: ExecutionContext, batch ordering clarification Thanks Weston, two follow up questions: (1) What is the threading model when passing "exector=nullptr" to "ExecContext" ? (Does it only uses one thread?) (2) For the file reader, if we want to ensure batches coming out of the reader are ordered but also have parallelism, I'd imagine doing sth like using additional thread for "read-ahead" but still keep the output batches in order. This would reduce the downstream parallelism (for example, projection) but I think at least it would guarantee correctness? On Tue, Jul 19, 2022 at 10:43 AM Weston Pace <weston.p...@gmail.com> wrote: > If you are using a source node (which it appears you are) then it will > be creating new thread tasks for each batch. So, in theory, these > could get out of order. My guess is that the file reader is slow > enough that by the time you load batch N from disk and decode it, you > have a pretty good chance that compute has finished on batch N-1. > > If you run your program on a stressed CPU with limited cores you may > find that you still receive batches out of order. > > Also note that your file reader is only going to read a single batch > at a time and will not start reading the next batch until it has > scheduled the compute task for the previous batch. This tends to be > pretty inefficient, especially for a remote filesystem, but even for > an HDD you typically want some parallelism in your read path. You > might try using the scanner, although I know you will get out of order > data in that case. > > -Weston > > On Tue, Jul 19, 2022 at 6:08 AM Ivan Chau <ivan.m.c...@gmail.com> wrote: > > > > Hi all, > > > > I am doing some investigations of the AsOfJoinNode, and consequently have > > come across some strange behavior when experimenting with the > > ExecutionContext and in-memory / file streaming source nodes. > > > > Our AsOfJoin algorithm requires that the input be in chronological order > > with respect to one of our columns, and as such, the order of batches > > matters for correctness. I found the following results and was wondering > > how such behavior can be explained: > > > > When using in-memory objects (for example, reading a feather file into a > > table object, or creating a table object from scratch), and instantiating > > our execution context with a default memory pool and a > > `arrow::internal::GetCPUThreadPool()` executor, batches do not come in > > order. However, when instantiated with `nullptr` as the executor, batches > > do come in order. This sort of makes sense, since there is no guarantee > of > > in-order batches if there are multiple threads running. > > > > When using file-streaming objects (see implementation using > > RecordBatchFileReader below), the batches are in order regardless of our > > executor. > > > > I'm curious why we don't have the same problems using the CPUThreadPool > for > > our ExecutionContext when using the RecordBatchFileReader approach, and > was > > wondering if anyone could shed some light on what might be going on. > > > > Ivan > > > > ``` > > // Wrapper to enable the use of RecordBatchFileReaders as > RecordBatchReaders > > class RecordBatchFileReaderWrapper : public arrow::ipc::RecordBatchReader > > { > > shared_ptr<arrow::ipc::RecordBatchFileReader> _reader; > > int _next; > > public: > > virtual ~RecordBatchFileReaderWrapper() {} > > > RecordBatchFileReaderWrapper(shared_ptr<arrow::ipc::RecordBatchFileReader> > > reader) > > : _reader(reader),_next(0) {} > > > > virtual arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch> > *batch) { > > //cerr << "ReadNext _next=" << _next << "\n"; > > if(_next<_reader->num_record_batches()) { > > ARROW_ASSIGN_OR_RAISE(*batch,_reader->ReadRecordBatch(_next++)); > > //cerr << "\t --> " << (*batch)->num_rows() << "\n"; > > } else { > > batch->reset(); > > //cerr << "\t --> EOF\n"; > > } > > > > return arrow::Status::OK(); > > } > > > > virtual std::shared_ptr<arrow::Schema> schema() const { > > return _reader->schema(); > > } > > }; > > > > optional<arrow::compute::ExecNode*> make_arrow_ipc_reader_node( > > shared_ptr<arrow::compute::ExecPlan> &plan, > > shared_ptr<arrow::fs::FileSystem> &fs, > > const string &filename) { > > //TODO: error checking > > shared_ptr<arrow::io::RandomAccessFile> > input=*fs->OpenInputFile(filename); > > shared_ptr<arrow::ipc::RecordBatchFileReader> in_reader=* > > arrow::ipc::RecordBatchFileReader::Open(input); > > shared_ptr<RecordBatchFileReaderWrapper> reader(new > > RecordBatchFileReaderWrapper(in_reader)); > > > > auto schema=reader->schema(); > > auto batch_gen=*arrow::compute::MakeReaderGenerator(std::move(reader), > > arrow::internal::GetCpuThreadPool()); > > > > //cerr << "create source("<<filename<<")\n"; > > return *arrow::compute::MakeExecNode( > > "source", // registered type > > plan.get(), // execution plan > > {}, // inputs > > > arrow::compute::SourceNodeOptions(make_shared<arrow::Schema>(*schema),batch_gen)); > > //options > > } > > ``` >