Update: I am going to try what David Li suggested here: https://lists.apache.org/thread/8yfvvyyc79m11z9wql0gzdr25x4b3g7v
This seems to be the least amount of code. This does require calling "DoGet" at Acero plan/node creation time rather than execution time but I don't think it's a big deal for now. The alternative path of subclassing SourceNode and having ExecNode::Init or ExecNode::StartProducing seems quite a bit of change (also I don't think SourceNode is exposed via public header). But let me know if you think I am missing something. Li On Tue, Sep 6, 2022 at 4:57 AM Yaron Gvili <rt...@hotmail.com> wrote: > Hi Li, > > Here's my 2 cents about the Ibis/Substrait part of this. > > An Ibis expression carries a schema. If you're planning to create an > integrated Ibis/Substrait/Arrow solution, then you'll need the schema to be > available to Ibis in Python. So, you'll need a Python wrapper for the C++ > implementation you have in mind for the GetSchema method. I think you > should pass the schema obtained by (the wrapped) GetSchema to an Ibis node, > rather than defining a new Ibis node that would have to access the network > to get the schema on its own. > > Given the above, I agree with you that when the Acero node is created its > schema would already be known. > > > Yaron. > ________________________________ > From: Li Jin <ice.xell...@gmail.com> > Sent: Thursday, September 1, 2022 2:49 PM > To: dev@arrow.apache.org <dev@arrow.apache.org> > Subject: Re: Integration between Flight and Acero > > Thanks David. I think my original question might not have been accurate so > I will try to rephrase my question: > > My ultimate goal is to add an ibis source node: > > class MyStorageTable(ibis.TableNode, sch.HasSchema): > url = ... # e.g. "my_storage://my_path" > begin = ... # e.g. "20220101" > end = ... # e.g. "20220201" > > and pass it to Acero and have Acero create a source node that knows how to > read from my_storage. Currently, I have a C++ class that looks like this > that knows how to read/write data: > > class MyStorageClient { > > public: > > /// \brief Construct a client > > MyStorageClient(const std::string& service_location); > > > > /// \brief Read data from a table streamingly > > /// \param[in] table_uri > > /// \param[in] start_time The start time (inclusive), e.g., > '20100101' > > /// \param[in] end_time The end time (exclusive), e.g., '20100110' > > arrow::Result<std::unique_ptr<arrow::flight::FlightStreamReader>> > ReadStream(const std::string& table_uri, const std::string& start_time, > const std::string& end_time); > > > > /// \brief Write data to a table streamingly > > /// This method will return a FlightStreamWriter that can be used > for streaming data into > > /// \param[in] table_uri > > /// \param[in] start_time The start time (inclusive), e.g., > '20100101' > > /// \param[in] end_time The end time (exclusive), e.g., '20100110' > > arrow::Result<DoPutResult> WriteStream(const std::string& > table_uri, const std::shared_ptr<arrow::Schema> &schema, const std::string > &start_time, const std::string &end_time); > > > > /// \brief Get schema of a table. > > /// \param[in] table The Smooth table name, e.g., > smooth:/research/user/ljin/test > > arrow::Result<std::shared_ptr<arrow::Schema>> GetSchema(const > std::string& table_uri); > }; > > I think Acero node's schema must be known when the node is created, I'd > imagine I would implement MyStorageExecNode that gets created by > SubstraitConsumer (via some registration mechanism in SubstraitConsumer): > > (1) GetSchema is called in SubstraitConsumer when creating the node > (network call to the storage backend to get schema) > (2) ReadStream is called in either ExecNode::Init or > ExecNode::StartProducing > to create the FlightStreamReader (3) Some thread (either the Plan's > execution thread or the thread owned by MyStorageExecNode) will read from > FlightStreamReader and send data downstream. > > Does that sound like the right approach or is there some other way I should > do this? > > On Wed, Aug 31, 2022 at 6:16 PM David Li <lidav...@apache.org> wrote: > > > Hi Li, > > > > It'd depend on how exactly you expect everything to fit together, and I > > think the way you'd go about it would depend on what exactly the > > application is. For instance, you could have the application code do > > everything up through DoGet and get a reader, then create a SourceNode > from > > the reader and continue from there. > > > > Otherwise, I would think the way to go would be to be able to create a > > node from a FlightDescriptor (which would contain the URL/parameters in > > your example). In that case, I think it'd fit into Arrow Dataset, under > > ARROW-10524 [1]. In that case, I'd equate GetFlightInfo to dataset > > discovery, and each FlightEndpoint in the FlightInfo to a Fragment. As a > > bonus, there's already good integration between Dataset and Acero and > this > > should naturally do things like read the FlightEndpoints in parallel with > > readahead and so on. > > > > That means: you'd start with the FlightDescriptor, and create a Dataset > > from it. This will call GetFlightInfo under the hood. (There's a minor > > catch here: this assumes the service that returns the FlightInfo can > embed > > an accurate schema into it. If that's not true, there'll have to be some > > finagling with various ways of getting the actual schema, depending on > what > > exactly your service supports.) Once you have a Dataset, you can create > an > > ExecPlan and proceed like normal. > > > > Of course, if you then want to get things into Python, R, Substrait, > > etc... that requires some more work - especially for Substrait where I'm > > not sure how best to encode a custom source like that. > > > > [1]: https://issues.apache.org/jira/browse/ARROW-10524 > > > > -David > > > > On Wed, Aug 31, 2022, at 17:09, Li Jin wrote: > > > Hello! > > > > > > I have recently started to look into integrating Flight RPC with Acero > > > source/sink node. > > > > > > In Flight, the life cycle of a "read" request looks sth like: > > > > > > - User specifies a URL (e.g. my_storage://my_path) and parameter > > (e.g., > > > begin = "20220101", end = "20220201") > > > - Client issue GetFlightInfo and get FlightInfo from server > > > - Client issue DoGet with the FlightInfo and get a stream reader > > > - Client calls Nextuntil stream is exhausted > > > > > > My question is, how does the above life cycle fit in an Acero node? In > > > other words, what are the proper places in Acero node lifecycle to > issue > > > the corresponding flight RPC? > > > > > > Appreciate any thoughts, > > > Li > > >