Yes. If you need the source node to read in parallel OR if you have multiple fragments (especially if those fragments don't have identical schemas) then you want a dataset and not just a plain source node.
On Tue, Sep 13, 2022 at 1:55 PM David Li <lidav...@apache.org> wrote: > > Yeah, I concur with Weston. > > > To start with I think a custom factory function will be sufficient > > (e.g. look at MakeScanNode in scanner.cc for an example). So the > > options would somehow describe the coordinates of the flight endpoint. > > These 'coordinates' would be a FlightDescriptor. > > > However, it might be nice if "open a connection to the flight > > endpoint" happened during the call to StartProducing and not during > > the factory function call. This could maybe be a follow-up task. > > The factory would call GetFlightInfo (or maybe GetSchema, from what it sounds > like) to get the schema, but this wouldn't actually read any data. > StartProducing would then actually call DoGet to actually read data. > > --- > > The reason why I suggested adapting Flight to Dataset, assuming this matches > the semantics of your service, is because it encapsulates these steps, but > reuses all the machinery we already have: > > - Dataset discovery naturally becomes GetFlightInfo. (Semantically, this is > like beginning execution of a query, and returns one or more partitions where > the result set can be read.) > - Those partitions then each become a Fragment, and then they can be read in > parallel by Dataset. > > It sounds like the service in question here isn't quite that complex, though, > so no need to necessarily go that far. > > On Tue, Sep 13, 2022, at 19:18, Weston Pace wrote: > >> The alternative path of subclassing SourceNode and having ExecNode::Init or > >> ExecNode::StartProducing seems quite a bit of change (also I don't think > >> SourceNode is exposed via public header). But let me know if you think I am > >> missing something. > > > > Agreed that we don't want to go this route. David's suggestion is a > > good idea. However, this shouldn't be the responsibility of the > > caller exactly. > > > > In other words (and my lack of detailed knowledge about flight is > > probably going to leak here) there should still be a factory function > > (e.g. "flight_source" or something like that) and a custom options > > object (FlightSourceOptions). > > > > To start with I think a custom factory function will be sufficient > > (e.g. look at MakeScanNode in scanner.cc for an example). So the > > options would somehow describe the coordinates of the flight endpoint. > > The factory function would open a connection to the flight endpoint > > and convert this into a record batch reader. Then it would create one > > of the node's that Yaron has contributed and return that. > > > > However, it might be nice if "open a connection to the flight > > endpoint" happened during the call to StartProducing and not during > > the factory function call. This could maybe be a follow-up task. > > Perhaps source node could change so that, instead of accepting an > > AsyncGenerator, it accepts an AsyncGenerator factory function. Then > > it could execute that function during the call to StartProducing. > > > > On Tue, Sep 13, 2022 at 4:05 PM Li Jin <ice.xell...@gmail.com> wrote: > >> > >> Thanks Yaron for the pointer to that PR. > >> > >> On Tue, Sep 13, 2022 at 4:43 PM Yaron Gvili <rt...@hotmail.com> wrote: > >> > >> > If you can wrap the flight reader as a RecordBatchReader, then another > >> > possibility is using an upcoming PR ( > >> > https://github.com/apache/arrow/pull/14041) that enables SourceNode to > >> > accept it. You would need to know the schema when configuring the > >> > SourceNode, but you won't need to derived from SourceNode. > >> > > >> > > >> > Yaron. > >> > ________________________________ > >> > From: Li Jin <ice.xell...@gmail.com> > >> > Sent: Tuesday, September 13, 2022 3:58 PM > >> > To: dev@arrow.apache.org <dev@arrow.apache.org> > >> > Subject: Re: Integration between Flight and Acero > >> > > >> > Update: > >> > > >> > I am going to try what David Li suggested here: > >> > https://lists.apache.org/thread/8yfvvyyc79m11z9wql0gzdr25x4b3g7v > >> > > >> > This seems to be the least amount of code. This does require calling > >> > "DoGet" at Acero plan/node creation time rather than execution time but I > >> > don't think it's a big deal for now. > >> > > >> > The alternative path of subclassing SourceNode and having ExecNode::Init > >> > or > >> > ExecNode::StartProducing seems quite a bit of change (also I don't think > >> > SourceNode is exposed via public header). But let me know if you think I > >> > am > >> > missing something. > >> > > >> > Li > >> > > >> > On Tue, Sep 6, 2022 at 4:57 AM Yaron Gvili <rt...@hotmail.com> wrote: > >> > > >> > > Hi Li, > >> > > > >> > > Here's my 2 cents about the Ibis/Substrait part of this. > >> > > > >> > > An Ibis expression carries a schema. If you're planning to create an > >> > > integrated Ibis/Substrait/Arrow solution, then you'll need the schema > >> > > to > >> > be > >> > > available to Ibis in Python. So, you'll need a Python wrapper for the > >> > > C++ > >> > > implementation you have in mind for the GetSchema method. I think you > >> > > should pass the schema obtained by (the wrapped) GetSchema to an Ibis > >> > node, > >> > > rather than defining a new Ibis node that would have to access the > >> > network > >> > > to get the schema on its own. > >> > > > >> > > Given the above, I agree with you that when the Acero node is created > >> > > its > >> > > schema would already be known. > >> > > > >> > > > >> > > Yaron. > >> > > ________________________________ > >> > > From: Li Jin <ice.xell...@gmail.com> > >> > > Sent: Thursday, September 1, 2022 2:49 PM > >> > > To: dev@arrow.apache.org <dev@arrow.apache.org> > >> > > Subject: Re: Integration between Flight and Acero > >> > > > >> > > Thanks David. I think my original question might not have been accurate > >> > so > >> > > I will try to rephrase my question: > >> > > > >> > > My ultimate goal is to add an ibis source node: > >> > > > >> > > class MyStorageTable(ibis.TableNode, sch.HasSchema): > >> > > url = ... # e.g. "my_storage://my_path" > >> > > begin = ... # e.g. "20220101" > >> > > end = ... # e.g. "20220201" > >> > > > >> > > and pass it to Acero and have Acero create a source node that knows how > >> > to > >> > > read from my_storage. Currently, I have a C++ class that looks like > >> > > this > >> > > that knows how to read/write data: > >> > > > >> > > class MyStorageClient { > >> > > > >> > > public: > >> > > > >> > > /// \brief Construct a client > >> > > > >> > > MyStorageClient(const std::string& service_location); > >> > > > >> > > > >> > > > >> > > /// \brief Read data from a table streamingly > >> > > > >> > > /// \param[in] table_uri > >> > > > >> > > /// \param[in] start_time The start time (inclusive), e.g., > >> > > '20100101' > >> > > > >> > > /// \param[in] end_time The end time (exclusive), e.g., > >> > '20100110' > >> > > > >> > > > >> > > arrow::Result<std::unique_ptr<arrow::flight::FlightStreamReader>> > >> > > ReadStream(const std::string& table_uri, const std::string& start_time, > >> > > const std::string& end_time); > >> > > > >> > > > >> > > > >> > > /// \brief Write data to a table streamingly > >> > > > >> > > /// This method will return a FlightStreamWriter that can be > >> > > used > >> > > for streaming data into > >> > > > >> > > /// \param[in] table_uri > >> > > > >> > > /// \param[in] start_time The start time (inclusive), e.g., > >> > > '20100101' > >> > > > >> > > /// \param[in] end_time The end time (exclusive), e.g., > >> > '20100110' > >> > > > >> > > arrow::Result<DoPutResult> WriteStream(const std::string& > >> > > table_uri, const std::shared_ptr<arrow::Schema> &schema, const > >> > std::string > >> > > &start_time, const std::string &end_time); > >> > > > >> > > > >> > > > >> > > /// \brief Get schema of a table. > >> > > > >> > > /// \param[in] table The Smooth table name, e.g., > >> > > smooth:/research/user/ljin/test > >> > > > >> > > arrow::Result<std::shared_ptr<arrow::Schema>> GetSchema(const > >> > > std::string& table_uri); > >> > > }; > >> > > > >> > > I think Acero node's schema must be known when the node is created, I'd > >> > > imagine I would implement MyStorageExecNode that gets created by > >> > > SubstraitConsumer (via some registration mechanism in > >> > > SubstraitConsumer): > >> > > > >> > > (1) GetSchema is called in SubstraitConsumer when creating the node > >> > > (network call to the storage backend to get schema) > >> > > (2) ReadStream is called in either ExecNode::Init or > >> > > ExecNode::StartProducing > >> > > to create the FlightStreamReader (3) Some thread (either the Plan's > >> > > execution thread or the thread owned by MyStorageExecNode) will read > >> > > from > >> > > FlightStreamReader and send data downstream. > >> > > > >> > > Does that sound like the right approach or is there some other way I > >> > should > >> > > do this? > >> > > > >> > > On Wed, Aug 31, 2022 at 6:16 PM David Li <lidav...@apache.org> wrote: > >> > > > >> > > > Hi Li, > >> > > > > >> > > > It'd depend on how exactly you expect everything to fit together, > >> > > > and I > >> > > > think the way you'd go about it would depend on what exactly the > >> > > > application is. For instance, you could have the application code do > >> > > > everything up through DoGet and get a reader, then create a > >> > > > SourceNode > >> > > from > >> > > > the reader and continue from there. > >> > > > > >> > > > Otherwise, I would think the way to go would be to be able to create > >> > > > a > >> > > > node from a FlightDescriptor (which would contain the URL/parameters > >> > > > in > >> > > > your example). In that case, I think it'd fit into Arrow Dataset, > >> > > > under > >> > > > ARROW-10524 [1]. In that case, I'd equate GetFlightInfo to dataset > >> > > > discovery, and each FlightEndpoint in the FlightInfo to a Fragment. > >> > > > As > >> > a > >> > > > bonus, there's already good integration between Dataset and Acero and > >> > > this > >> > > > should naturally do things like read the FlightEndpoints in parallel > >> > with > >> > > > readahead and so on. > >> > > > > >> > > > That means: you'd start with the FlightDescriptor, and create a > >> > > > Dataset > >> > > > from it. This will call GetFlightInfo under the hood. (There's a > >> > > > minor > >> > > > catch here: this assumes the service that returns the FlightInfo can > >> > > embed > >> > > > an accurate schema into it. If that's not true, there'll have to be > >> > some > >> > > > finagling with various ways of getting the actual schema, depending > >> > > > on > >> > > what > >> > > > exactly your service supports.) Once you have a Dataset, you can > >> > > > create > >> > > an > >> > > > ExecPlan and proceed like normal. > >> > > > > >> > > > Of course, if you then want to get things into Python, R, Substrait, > >> > > > etc... that requires some more work - especially for Substrait where > >> > I'm > >> > > > not sure how best to encode a custom source like that. > >> > > > > >> > > > [1]: https://issues.apache.org/jira/browse/ARROW-10524 > >> > > > > >> > > > -David > >> > > > > >> > > > On Wed, Aug 31, 2022, at 17:09, Li Jin wrote: > >> > > > > Hello! > >> > > > > > >> > > > > I have recently started to look into integrating Flight RPC with > >> > Acero > >> > > > > source/sink node. > >> > > > > > >> > > > > In Flight, the life cycle of a "read" request looks sth like: > >> > > > > > >> > > > > - User specifies a URL (e.g. my_storage://my_path) and parameter > >> > > > (e.g., > >> > > > > begin = "20220101", end = "20220201") > >> > > > > - Client issue GetFlightInfo and get FlightInfo from server > >> > > > > - Client issue DoGet with the FlightInfo and get a stream reader > >> > > > > - Client calls Nextuntil stream is exhausted > >> > > > > > >> > > > > My question is, how does the above life cycle fit in an Acero node? > >> > In > >> > > > > other words, what are the proper places in Acero node lifecycle to > >> > > issue > >> > > > > the corresponding flight RPC? > >> > > > > > >> > > > > Appreciate any thoughts, > >> > > > > Li > >> > > > > >> > > > >> >