> Reading the source code of exec_plan.cc, DeclarationToReader called
> DeclarationToRecordBatchGenerator, which ignores the sequence_output
> parameter in SinkNodeOptions, also, it calls validate which should
> fail if the SinkNodeOptions honors the sequence_output. Then it seems
> that DeclarationToReader cannot follow the input batch order?

These methods should not be ignoring sequence_output.  Do you want to open
a bug?  This should be a straightforward one to fix.

> Then how the substrait works in this scenario? Does it output
> disorderly as well?

Probably.  Much of internal Substrait testing is probably using
DeclarationToTable or DeclarationToBatches.  The ordered execution hasn't
been adopted widely yet because the old scanner doesn't set the batch index
and the new scanner isn't ready yet.  This limits the usefulness to data
that is already in memory (the in-memory sources do set the batch index).

I think your understanding of the concept is correct however.  Can you
share a sample plan that is not working for you?  If you use
DeclarationToTable do you get consistently ordered results?

On Tue, Jul 25, 2023 at 7:06 AM Wenbo Hu <huwenbo1...@gmail.com> wrote:

> Reading the source code of exec_plan.cc, DeclarationToReader called
> DeclarationToRecordBatchGenerator, which ignores the sequence_output
> parameter in SinkNodeOptions, also, it calls validate which should
> fail if the SinkNodeOptions honors the sequence_output. Then it seems
> that DeclarationToReader cannot follow the input batch order?
> Then how the substrait works in this scenario? Does it output
> disorderly as well?
>
> Wenbo Hu <huwenbo1...@gmail.com> 于2023年7月25日周二 19:12写道:
> >
> > Hi,
> >     I'm trying to zip two streams with same order but different
> processes.
> >     For example, the original stream comes with two column 'id' and
> > 'age', and splits into two stream processed distributedly using acero:
> > 1. hash the 'id' into a stream with single column 'bucket_id' and 2.
> > classify 'age' into ['child', 'teenage', 'adult',...]. And then zip
> > into a single stream.
> >
> >        [      'id'      |      'age'      | many other columns]
> >                 |                  |                        |
> >        ['bucket_id']   ['classify']                |
> >                  |                  |                       |
> >               [zipped_stream | many_other_columns]
> > I was expecting both bucket_id and classify can keep the same order as
> > the orginal stream before they are zipped.
> > According to document, "ordered execution" is using batch_index to
> > indicate the order of batches.
> > but acero::DeclarationToReader with a QueryOptions that sequce_output
> > is set to true does not mean that it keeps the order if the input
> > stream is not ordered. But it doesn't fail during the execution
> > (bucket_id and classify are not specify any ordering). Then How can I
> > make the acero produce a stream that keep the order as the original
> > input?
> > --
> > ---------------------
> > Best Regards,
> > Wenbo Hu,
>
>
>
> --
> ---------------------
> Best Regards,
> Wenbo Hu,
>

Reply via email to