Thanks Yaron for the pointer to that PR.

On Tue, Sep 13, 2022 at 4:43 PM Yaron Gvili <rt...@hotmail.com> wrote:

> If you can wrap the flight reader as a RecordBatchReader, then another
> possibility is using an upcoming PR (
> https://github.com/apache/arrow/pull/14041) that enables SourceNode to
> accept it. You would need to know the schema when configuring the
> SourceNode, but you won't need to derived from SourceNode.
>
>
> Yaron.
> ________________________________
> From: Li Jin <ice.xell...@gmail.com>
> Sent: Tuesday, September 13, 2022 3:58 PM
> To: dev@arrow.apache.org <dev@arrow.apache.org>
> Subject: Re: Integration between Flight and Acero
>
> Update:
>
> I am going to try what David Li suggested here:
> https://lists.apache.org/thread/8yfvvyyc79m11z9wql0gzdr25x4b3g7v
>
> This seems to be the least amount of code. This does require calling
> "DoGet" at Acero plan/node creation time rather than execution time but I
> don't think it's a big deal for now.
>
> The alternative path of subclassing SourceNode and having ExecNode::Init or
> ExecNode::StartProducing seems quite a bit of change (also I don't think
> SourceNode is exposed via public header). But let me know if you think I am
> missing something.
>
> Li
>
> On Tue, Sep 6, 2022 at 4:57 AM Yaron Gvili <rt...@hotmail.com> wrote:
>
> > Hi Li,
> >
> > Here's my 2 cents about the Ibis/Substrait part of this.
> >
> > An Ibis expression carries a schema. If you're planning to create an
> > integrated Ibis/Substrait/Arrow solution, then you'll need the schema to
> be
> > available to Ibis in Python. So, you'll need a Python wrapper for the C++
> > implementation you have in mind for the GetSchema method. I think you
> > should pass the schema obtained by (the wrapped) GetSchema to an Ibis
> node,
> > rather than defining a new Ibis node that would have to access the
> network
> > to get the schema on its own.
> >
> > Given the above, I agree with you that when the Acero node is created its
> > schema would already be known.
> >
> >
> > Yaron.
> > ________________________________
> > From: Li Jin <ice.xell...@gmail.com>
> > Sent: Thursday, September 1, 2022 2:49 PM
> > To: dev@arrow.apache.org <dev@arrow.apache.org>
> > Subject: Re: Integration between Flight and Acero
> >
> > Thanks David. I think my original question might not have been accurate
> so
> > I will try to rephrase my question:
> >
> > My ultimate goal is to add an ibis source node:
> >
> > class MyStorageTable(ibis.TableNode, sch.HasSchema):
> >     url = ... # e.g. "my_storage://my_path"
> >     begin = ... # e.g. "20220101"
> >     end = ... # e.g. "20220201"
> >
> > and pass it to Acero and have Acero create a source node that knows how
> to
> > read from my_storage. Currently, I have a C++ class that looks like this
> > that knows how to read/write data:
> >
> > class MyStorageClient {
> >
> >     public:
> >
> >         /// \brief Construct a client
> >
> >         MyStorageClient(const std::string& service_location);
> >
> >
> >
> >         /// \brief Read data from a table streamingly
> >
> >         /// \param[in] table_uri
> >
> >         /// \param[in] start_time The start time (inclusive), e.g.,
> > '20100101'
> >
> >         /// \param[in] end_time The end time (exclusive), e.g.,
> '20100110'
> >
> >         arrow::Result<std::unique_ptr<arrow::flight::FlightStreamReader>>
> > ReadStream(const std::string& table_uri, const std::string& start_time,
> > const std::string& end_time);
> >
> >
> >
> >         /// \brief Write data to a table streamingly
> >
> >         /// This method will return a FlightStreamWriter that can be used
> > for streaming data into
> >
> >         /// \param[in] table_uri
> >
> >         /// \param[in] start_time The start time (inclusive), e.g.,
> > '20100101'
> >
> >         /// \param[in] end_time The end time (exclusive), e.g.,
> '20100110'
> >
> >         arrow::Result<DoPutResult> WriteStream(const std::string&
> > table_uri, const std::shared_ptr<arrow::Schema> &schema, const
> std::string
> > &start_time, const std::string &end_time);
> >
> >
> >
> >         /// \brief Get schema of a table.
> >
> >         /// \param[in] table The Smooth table name, e.g.,
> > smooth:/research/user/ljin/test
> >
> >         arrow::Result<std::shared_ptr<arrow::Schema>> GetSchema(const
> > std::string& table_uri);
> >     };
> >
> > I think Acero node's schema must be known when the node is created, I'd
> > imagine I would implement MyStorageExecNode that gets created by
> > SubstraitConsumer (via some registration mechanism in SubstraitConsumer):
> >
> > (1) GetSchema is called in SubstraitConsumer when creating the node
> > (network call to the storage backend to get schema)
> > (2) ReadStream is called in either ExecNode::Init or
> > ExecNode::StartProducing
> > to create the FlightStreamReader (3) Some thread (either the Plan's
> > execution thread or the thread owned by MyStorageExecNode) will read from
> > FlightStreamReader and send data downstream.
> >
> > Does that sound like the right approach or is there some other way I
> should
> > do this?
> >
> > On Wed, Aug 31, 2022 at 6:16 PM David Li <lidav...@apache.org> wrote:
> >
> > > Hi Li,
> > >
> > > It'd depend on how exactly you expect everything to fit together, and I
> > > think the way you'd go about it would depend on what exactly the
> > > application is. For instance, you could have the application code do
> > > everything up through DoGet and get a reader, then create a SourceNode
> > from
> > > the reader and continue from there.
> > >
> > > Otherwise, I would think the way to go would be to be able to create a
> > > node from a FlightDescriptor (which would contain the URL/parameters in
> > > your example). In that case, I think it'd fit into Arrow Dataset, under
> > > ARROW-10524 [1]. In that case, I'd equate GetFlightInfo to dataset
> > > discovery, and each FlightEndpoint in the FlightInfo to a Fragment. As
> a
> > > bonus, there's already good integration between Dataset and Acero and
> > this
> > > should naturally do things like read the FlightEndpoints in parallel
> with
> > > readahead and so on.
> > >
> > > That means: you'd start with the FlightDescriptor, and create a Dataset
> > > from it. This will call GetFlightInfo under the hood. (There's a minor
> > > catch here: this assumes the service that returns the FlightInfo can
> > embed
> > > an accurate schema into it. If that's not true, there'll have to be
> some
> > > finagling with various ways of getting the actual schema, depending on
> > what
> > > exactly your service supports.) Once you have a Dataset, you can create
> > an
> > > ExecPlan and proceed like normal.
> > >
> > > Of course, if you then want to get things into Python, R, Substrait,
> > > etc... that requires some more work - especially for Substrait where
> I'm
> > > not sure how best to encode a custom source like that.
> > >
> > > [1]: https://issues.apache.org/jira/browse/ARROW-10524
> > >
> > > -David
> > >
> > > On Wed, Aug 31, 2022, at 17:09, Li Jin wrote:
> > > > Hello!
> > > >
> > > > I have recently started to look into integrating Flight RPC with
> Acero
> > > > source/sink node.
> > > >
> > > > In Flight, the life cycle of a "read" request looks sth like:
> > > >
> > > >    - User specifies a URL (e.g. my_storage://my_path) and parameter
> > > (e.g.,
> > > >    begin = "20220101", end = "20220201")
> > > >    - Client issue GetFlightInfo and get FlightInfo from server
> > > >    - Client issue DoGet with the FlightInfo and get a stream reader
> > > >    - Client calls Nextuntil stream is exhausted
> > > >
> > > > My question is, how does the above life cycle fit in an Acero node?
> In
> > > > other words, what are the proper places in Acero node lifecycle to
> > issue
> > > > the corresponding flight RPC?
> > > >
> > > > Appreciate any thoughts,
> > > > Li
> > >
> >
>

Reply via email to