Thanks David. I think my original question might not have been accurate so
I will try to rephrase my question:

My ultimate goal is to add an ibis source node:

class MyStorageTable(ibis.TableNode, sch.HasSchema):
    url = ... # e.g. "my_storage://my_path"
    begin = ... # e.g. "20220101"
    end = ... # e.g. "20220201"

and pass it to Acero and have Acero create a source node that knows how to
read from my_storage. Currently, I have a C++ class that looks like this
that knows how to read/write data:

class MyStorageClient {

    public:

        /// \brief Construct a client

        MyStorageClient(const std::string& service_location);



        /// \brief Read data from a table streamingly

        /// \param[in] table_uri

        /// \param[in] start_time The start time (inclusive), e.g.,
'20100101'

        /// \param[in] end_time The end time (exclusive), e.g., '20100110'

        arrow::Result<std::unique_ptr<arrow::flight::FlightStreamReader>>
ReadStream(const std::string& table_uri, const std::string& start_time,
const std::string& end_time);



        /// \brief Write data to a table streamingly

        /// This method will return a FlightStreamWriter that can be used
for streaming data into

        /// \param[in] table_uri

        /// \param[in] start_time The start time (inclusive), e.g.,
'20100101'

        /// \param[in] end_time The end time (exclusive), e.g., '20100110'

        arrow::Result<DoPutResult> WriteStream(const std::string&
table_uri, const std::shared_ptr<arrow::Schema> &schema, const std::string
&start_time, const std::string &end_time);



        /// \brief Get schema of a table.

        /// \param[in] table The Smooth table name, e.g.,
smooth:/research/user/ljin/test

        arrow::Result<std::shared_ptr<arrow::Schema>> GetSchema(const
std::string& table_uri);
    };

I think Acero node's schema must be known when the node is created, I'd
imagine I would implement MyStorageExecNode that gets created by
SubstraitConsumer (via some registration mechanism in SubstraitConsumer):

(1) GetSchema is called in SubstraitConsumer when creating the node
(network call to the storage backend to get schema)
(2) ReadStream is called in either ExecNode::Init or ExecNode::StartProducing
to create the FlightStreamReader (3) Some thread (either the Plan's
execution thread or the thread owned by MyStorageExecNode) will read from
FlightStreamReader and send data downstream.

Does that sound like the right approach or is there some other way I should
do this?

On Wed, Aug 31, 2022 at 6:16 PM David Li <lidav...@apache.org> wrote:

> Hi Li,
>
> It'd depend on how exactly you expect everything to fit together, and I
> think the way you'd go about it would depend on what exactly the
> application is. For instance, you could have the application code do
> everything up through DoGet and get a reader, then create a SourceNode from
> the reader and continue from there.
>
> Otherwise, I would think the way to go would be to be able to create a
> node from a FlightDescriptor (which would contain the URL/parameters in
> your example). In that case, I think it'd fit into Arrow Dataset, under
> ARROW-10524 [1]. In that case, I'd equate GetFlightInfo to dataset
> discovery, and each FlightEndpoint in the FlightInfo to a Fragment. As a
> bonus, there's already good integration between Dataset and Acero and this
> should naturally do things like read the FlightEndpoints in parallel with
> readahead and so on.
>
> That means: you'd start with the FlightDescriptor, and create a Dataset
> from it. This will call GetFlightInfo under the hood. (There's a minor
> catch here: this assumes the service that returns the FlightInfo can embed
> an accurate schema into it. If that's not true, there'll have to be some
> finagling with various ways of getting the actual schema, depending on what
> exactly your service supports.) Once you have a Dataset, you can create an
> ExecPlan and proceed like normal.
>
> Of course, if you then want to get things into Python, R, Substrait,
> etc... that requires some more work - especially for Substrait where I'm
> not sure how best to encode a custom source like that.
>
> [1]: https://issues.apache.org/jira/browse/ARROW-10524
>
> -David
>
> On Wed, Aug 31, 2022, at 17:09, Li Jin wrote:
> > Hello!
> >
> > I have recently started to look into integrating Flight RPC with Acero
> > source/sink node.
> >
> > In Flight, the life cycle of a "read" request looks sth like:
> >
> >    - User specifies a URL (e.g. my_storage://my_path) and parameter
> (e.g.,
> >    begin = "20220101", end = "20220201")
> >    - Client issue GetFlightInfo and get FlightInfo from server
> >    - Client issue DoGet with the FlightInfo and get a stream reader
> >    - Client calls Nextuntil stream is exhausted
> >
> > My question is, how does the above life cycle fit in an Acero node? In
> > other words, what are the proper places in Acero node lifecycle to issue
> > the corresponding flight RPC?
> >
> > Appreciate any thoughts,
> > Li
>

Reply via email to