> Currently, it is not
> possible to assign the Implicit ordering in scan node. Such option has
> been added in another nodes[0]. This problem is mentioned here [1]. I
> have started to work on it [2] but I am unsure how to move forward
> because I did not fine any clear roadmap about ordering in general.

The original plans for ordering are quite old at this point (from 2021 it
looks like):
https://docs.google.com/document/d/1MfVE9td9D4n5y-PTn66kk4-9xG7feXs1zSFf-qxQgPs/edit?usp=sharing

The goal was to support multi-threaded ordered execution by tagging batches
with a sequential index, allowing operators to introduce jitter, and
resequencing as needed with small serialized critical sections.

> This also affects asof-join node. Since the node relies on ordered data
> and dataset asserts no implicit ordering, it causes obscure errors in
> threaded execution [3].

Yes, the asof-join node was designed without using the above ordering
concept (I believe it predates the actual implementation of ordering
somewhat or was at least concurrent with it)  It was initially designed to
only be run in serial execution (serial compute, I/O is always parallel
IIRC) and I'm not aware of anyone that changed this but it has been over a
year since I've been actively working on Acero so I'm not sure I would have
noticed.

> I Fixed this node by inserting SerialSequencingQueuein asof-join
> node[5], and adding implicit ordering in dataset. In general I think
> asof-join should require implicit ordering (or any kind of ordering) on
> all inputs.

Yes, something like this is where I hoped the node would eventually end up.

> Could you please review my code? I would appreciate any feedback to help
> improve it. Thanks in advance for your feedback.

I will attempt to look at it over the weekend though maybe someone else
has/will get to it first.  Conceptually, your approach is where I was
hoping things would go.  The scanner, when tagged for ordered output,
should be able to assign an implicit ordering.  The asof join node should
be able to use a sequencer instead of requiring serial execution.

My main concern would be that the asof join node's sidecar thread model
(having a dedicated processing thread) is rather unique to that node and
there were various deadlocks and race conditions encountered getting
everything just right, even for so-called serial execution.  However, as
long as the node's InputReceived is called in-order and one-at-a-time,
which I think is what the SerialSequencingQueue gives you, then perhaps
things are ok.

On Thu, Oct 3, 2024 at 5:27 AM Kamil Tokarek
<kamil.toka...@secom.com.pl.invalid> wrote:

> Hello,
> I would like to raise the subject of ordering. Currently, it is not
> possible to assign the Implicit ordering in scan node. Such option has
> been added in another nodes[0]. This problem is mentioned here [1]. I
> have started to work on it [2] but I am unsure how to move forward
> because I did not fine any clear roadmap about ordering in general.
>
> This also affects asof-join node. Since the node relies on ordered data
> and dataset asserts no implicit ordering, it causes obscure errors in
> threaded execution [3]. asof-join node also does not sequence the input.
> I Fixed this node by inserting SerialSequencingQueuein asof-join
> node[5], and adding implicit ordering in dataset. In general I think
> asof-join should require implicit ordering (or any kind of ordering) on
> all inputs. I created pull request with following changes[7]:
>
> 1. Assert implicit ordering with
> ScanNodeOptions.require_sequenced_outputoption enabled. [4]
>
> 2. Add SerialSequencingQueuein asof-join node inputs and require
> implicit ordering on input [5]
>
> 3. Modify asof-join node tests to test threaded operation [6]
>
>
> Could you please review my code? I would appreciate any feedback to help
> improve it. Thanks in advance for your feedback.
>
> [0]
>
> https://github.com/apache/arrow/pull/34137/commits/bcc1692dbeb5693508ea89e961b4eaf91170d71d
> <
> https://github.com/apache/arrow/pull/34137/commits/bcc1692dbeb5693508ea89e961b4eaf91170d71d
> >
> [1] https://github.com/apache/arrow/issues/34698
> <https://github.com/apache/arrow/issues/34698>
> [2] https://github.com/mroz45/arrow/commits/Ordering/
> <https://github.com/mroz45/arrow/commits/Ordering/>
> [3] https://github.com/apache/arrow/issues/41706
> <https://github.com/apache/arrow/issues/41706>
>
> [4]
> _
> https://github.com/mroz45/arrow/commit/7a14586b83641d1bfa1b037f3f2377eb6c911f55_
> <
> https://github.com/mroz45/arrow/commit/7a14586b83641d1bfa1b037f3f2377eb6c911f55
> >
>
> [5]
> _
> https://github.com/apache/arrow/pull/44083/commits/c8047bb8e3d8c83f12070507f3cdc43cb6ee6152_
> <
> https://github.com/apache/arrow/pull/44083/commits/c8047bb8e3d8c83f12070507f3cdc43cb6ee6152
> >
>
> [6]
> _
> https://github.com/apache/arrow/pull/44083/commits/59da79331aefda3fc434e74eb1458cd0e195c879_
> <
> https://github.com/apache/arrow/pull/44083/commits/59da79331aefda3fc434e74eb1458cd0e195c879
> >
>
> [7] _https://github.com/apache/arrow/pull/44083_
> <https://github.com/apache/arrow/pull/44083>
>
> Related issue:
> _https://github.com/apache/arrow/issues/26818_
> <https://github.com/apache/arrow/issues/26818>
>

Reply via email to