> I think the key problem is that the input stream is unordered. The > input stream is a ArrowArrayStream imported from python side, and then > declared to a "record_batch_reader_source", which is a unordered > source node. So the behavior is expected. > I think the RecordBatchReaderSourceOptions should add an ordering > parameter to indicate the input stream ordering. Otherwise, I need to > convert the record_batch_reader_source into a "record_batch_source" > with a record_batch generator.
I agree. Also, keep in mind that there is the "implicit order" which means "this source node has some kind of deterministic ordering even if it isn't reflected in any single column". In other words, if you have a CSV file for example it will always be implicitly ordered (by line number) even if the line number isn't a column. This should allow us to do things like "grab the first 10 rows" and get behavior the user expects even if their data isn't explicitly ordered. In most cases we can assume that data has some kind of batch order. The only time it does not is if the source itself is non-deterministic. For example, maybe the source is some kind of unordered scan from an external SQL source. > Also, I'd like to have a discuss on dataset scanner, is it produce a > stable sequence of record batches (as an implicit ordering) when the > underlying storage is not changed? Yes, both the old and new scan node are capable of doing this. The implicit order is given by the order of the fragments in the dataset (which we assume will always be consistent and, in the case of a FileSystemDataset, it is). In the old scan node you need to set the property `require_sequenced_output` on the ScanNodeOptions to true (I believe the new scan node will always sequence output but this property may eventually exist there too). > For my situation, the downstream > executor may crush, then it would request to continue from a > intermediate state (with a restart offset). I'd like to make it into a > fetch node to skip heading rows, but it seems not an optimized way. Regrettably the old scan node does not have skip implemented. It is a little tricky since we do not have a catalog and thus do not know how many rows every single file has. So we have to calculate the skip at runtime. I am planning to support this in the new scan node. > Maybe I should inspect fragments in the dataset, to skip reading > unnecessary files, and build a FlieSystemDataset on the fly? Yes, this should work today. On Tue, Jul 25, 2023 at 10:37 PM Wenbo Hu <huwenbo1...@gmail.com> wrote: > Replacing > ``` > ac::Declaration source{"record_batch_reader_source", > ac::RecordBatchReaderSourceNodeOptions{std::move(input)}}; > ``` > with > ``` > ac::RecordBatchSourceNodeOptions rb_source_options{ > input->schema(), [input]() { return > arrow::MakeFunctionIterator([input] { return input->Next(); }); }}; > ac::Declaration source{"record_batch_source", > std::move(rb_source_options)}; > ``` > Works as expected. > > Wenbo Hu <huwenbo1...@gmail.com> 于2023年7月26日周三 10:22写道: > > > > Hi, > > I'll open a issue on the DeclareToReader problem. > > I think the key problem is that the input stream is unordered. The > > input stream is a ArrowArrayStream imported from python side, and then > > declared to a "record_batch_reader_source", which is a unordered > > source node. So the behavior is expected. > > I think the RecordBatchReaderSourceOptions should add an ordering > > parameter to indicate the input stream ordering. Otherwise, I need to > > convert the record_batch_reader_source into a "record_batch_source" > > with a record_batch generator. > > Also, I'd like to have a discuss on dataset scanner, is it produce a > > stable sequence of record batches (as an implicit ordering) when the > > underlying storage is not changed? For my situation, the downstream > > executor may crush, then it would request to continue from a > > intermediate state (with a restart offset). I'd like to make it into a > > fetch node to skip heading rows, but it seems not an optimized way. > > Maybe I should inspect fragments in the dataset, to skip reading > > unnecessary files, and build a FlieSystemDataset on the fly? > > > > Weston Pace <weston.p...@gmail.com> 于2023年7月25日周二 23:44写道: > > > > > > > Reading the source code of exec_plan.cc, DeclarationToReader called > > > > DeclarationToRecordBatchGenerator, which ignores the sequence_output > > > > parameter in SinkNodeOptions, also, it calls validate which should > > > > fail if the SinkNodeOptions honors the sequence_output. Then it seems > > > > that DeclarationToReader cannot follow the input batch order? > > > > > > These methods should not be ignoring sequence_output. Do you want to > open > > > a bug? This should be a straightforward one to fix. > > > > > > > Then how the substrait works in this scenario? Does it output > > > > disorderly as well? > > > > > > Probably. Much of internal Substrait testing is probably using > > > DeclarationToTable or DeclarationToBatches. The ordered execution > hasn't > > > been adopted widely yet because the old scanner doesn't set the batch > index > > > and the new scanner isn't ready yet. This limits the usefulness to > data > > > that is already in memory (the in-memory sources do set the batch > index). > > > > > > I think your understanding of the concept is correct however. Can you > > > share a sample plan that is not working for you? If you use > > > DeclarationToTable do you get consistently ordered results? > > > > > > On Tue, Jul 25, 2023 at 7:06 AM Wenbo Hu <huwenbo1...@gmail.com> > wrote: > > > > > > > Reading the source code of exec_plan.cc, DeclarationToReader called > > > > DeclarationToRecordBatchGenerator, which ignores the sequence_output > > > > parameter in SinkNodeOptions, also, it calls validate which should > > > > fail if the SinkNodeOptions honors the sequence_output. Then it seems > > > > that DeclarationToReader cannot follow the input batch order? > > > > Then how the substrait works in this scenario? Does it output > > > > disorderly as well? > > > > > > > > Wenbo Hu <huwenbo1...@gmail.com> 于2023年7月25日周二 19:12写道: > > > > > > > > > > Hi, > > > > > I'm trying to zip two streams with same order but different > > > > processes. > > > > > For example, the original stream comes with two column 'id' and > > > > > 'age', and splits into two stream processed distributedly using > acero: > > > > > 1. hash the 'id' into a stream with single column 'bucket_id' and > 2. > > > > > classify 'age' into ['child', 'teenage', 'adult',...]. And then zip > > > > > into a single stream. > > > > > > > > > > [ 'id' | 'age' | many other columns] > > > > > | | | > > > > > ['bucket_id'] ['classify'] | > > > > > | | | > > > > > [zipped_stream | many_other_columns] > > > > > I was expecting both bucket_id and classify can keep the same > order as > > > > > the orginal stream before they are zipped. > > > > > According to document, "ordered execution" is using batch_index to > > > > > indicate the order of batches. > > > > > but acero::DeclarationToReader with a QueryOptions that > sequce_output > > > > > is set to true does not mean that it keeps the order if the input > > > > > stream is not ordered. But it doesn't fail during the execution > > > > > (bucket_id and classify are not specify any ordering). Then How > can I > > > > > make the acero produce a stream that keep the order as the original > > > > > input? > > > > > -- > > > > > --------------------- > > > > > Best Regards, > > > > > Wenbo Hu, > > > > > > > > > > > > > > > > -- > > > > --------------------- > > > > Best Regards, > > > > Wenbo Hu, > > > > > > > > > > > > -- > > --------------------- > > Best Regards, > > Wenbo Hu, > > > > -- > --------------------- > Best Regards, > Wenbo Hu, >