>
> And then having two sets of buffers, is the same as having two record
> batches, albeit you need both sets to be delivered together, as noted.


It seems that atomic application could also be something controlled in
metadata (i.e. this is batch 1 or X)?

The schema evolution question is interesting, it could be useful in other
contexts as well.  (e.g. switching dictionary encoding on/off).

-Micah


On Fri, Mar 5, 2021 at 11:42 AM David Li <lidav...@apache.org> wrote:

> (responses inline)
>
> On Thu, Mar 4, 2021, at 17:26, Nate Bauernfeind wrote:
> > Regarding the BarrageRecordBatch:
> >
> > I have been concatenating them; it’s one batch with two sets of arrow
> > payloads. They don’t have separate metadata headers; the update is to be
> > applied atomically. I have only studied the Java Arrow Flight
> > implementation, and I believe it is usable maybe with some minor changes.
> > The piece of code in Flight that does the deserialization takes two
> > parallel lists/iterators, a `Buffer` list (these describe the length of a
> > section of the body payload) and a `FieldNode` list (these describe num
> > rows and null_count). Each field node is 2-3 buffers depending on schema
> > type. Buffers are allowed to have length of 0, to omit their payloads;
> > this, for example, is how you omit the validity buffer when null_count is
> > zero.
> >
> > The proposed barrage payload keeps this structural pattern (list of
> buffer,
> > list of field node) with the following modifications:
> > - we only include field nodes / buffers for subscribed columns
> > - the first set of field nodes are for added rows; these may be omitted
> if
> > there are no added rows included in the update
> > - the second set of field nodes are for modified rows; we omit columns
> that
> > have no modifications included in the update
> >
> > I believe the only thing that is missing is the ability to control the
> > field types to be deserialized (like a third list/iterator parallel to
> > field nodes and buffers).
>
> Right. I think we're on the same page here, but looking at this from
> different angles. I think being able to control which columns to
> deserialize/being able to only include a subset of buffers, is essentially
> equivalent to having a stream with schema evolution. And then having two
> sets of buffers, is the same as having two record batches, albeit you need
> both sets to be delivered together, as noted. Regardless, we can work out
> how to handle this.
>
> >
> > Note that the BarrageRecordBatch.addedRowsIncluded,
> > BarrageFieldNode.addedRows, BarrageFieldNode.modifiedRows and
> > BarrageFieldNode.includedRows (all part of the flatbuffer metadata) are
> > intended to be used by code one layer of abstraction higher than that
> > actual wire-format parser. The parser doesn't really need them except to
> > know which columns to expect in the payload. Technically, we could encode
> > the field nodes / buffers as empty, too (but why be wasteful if this
> > information is already encoded?).
>
> Right - presumably this could go in the Flight metadata instead of having
> to be inlined into the batch's metadata.
>
> >
> > Regarding Browser Flight Support:
> >
> > Was this company FactSet by chance? (I saw they are mentioned in the JS
> > thread that recently was bumped on the dev list.)
> >
> > I looked at the ticket and wanted to comment how we are handling
> > bi-directional streams for our web-ui. We use ArrowFlight's concept of
> > Ticket to allow a client to create and identify temporary state (new
> tables
> > / views / REPL sessions / etc). Any bidirectional stream we support also
> > has a server-streaming only variant with the ability for the client to
> > attach a Ticket to reference/identify that stream. The client may then
> send
> > a message, out-of-band, to the Ticket. They are sequenced by the client
> > (since gRPC doesn't guarantee ordered delivery) and delivered to the
> piece
> > of code controlling that server-stream. It does require that the server
> be
> > a bit stateful; but it works =).
>
> I still can't figure out who it was and now I wonder if it was all in my
> imagination. I'm hoping they'll see this and chime in, in the spirit of
> community participation :)
>
> I agree bidirectionality will be a challenge. I think WebSockets has been
> proposed as well, but that is also stateful (well, as soon as you have
> bidirectionality, you're going to have statefulness).
>
> >
> > On Thu, Mar 4, 2021 at 6:58 AM David Li <lidav...@apache.org> wrote:
> >
> > > Re: the multiple batches, that makes sense. In that case, depending on
> how
> > > exactly the two record batches are laid out, I'd suggest considering a
> > > Union of Struct columns (where a Struct is essentially interchangeable
> with
> > > a record batch or table) - that would let you encode two distinct
> record
> > > batches inside the same physical batch. Or if the two batches have
> > > identical schemas, you could just concatenate them and include indices
> in
> > > your metadata.
> > >
> > > As for browser Flight support - there's an existing ticket:
> > > https://issues.apache.org/jira/browse/ARROW-9860
> > >
> > > I was sure I had seen another organization talking about browser
> support
> > > recently, but now I can't find them. I'll update here if I do figure
> it out.
> > >
> > > Best,
> > > David
> > >
> > > On Wed, Mar 3, 2021, at 21:00, Nate Bauernfeind wrote:
> > > > >  if each payload has two batches with different purposes [...]
> > > >
> > > > The purposes of the payloads are slightly different, however they are
> > > > intended to be applied atomically. If there are guarantees by the
> table
> > > > operation generating the updates then those guarantees are only
> valid on
> > > > each boundary of applying the update to your local state. In a
> sense, one
> > > > is relatively useless without the other. Record batches fit well in
> > > > map-reduce paradigms / algorithms, but what we have is stateful to
> > > > enable/support incremental updates. For example, sorting a flight of
> data
> > > > is best done map-reduce-style and requires one to re-sort the entire
> data
> > > > set when it changes. Our approach focuses on producing incremental
> > > updates
> > > > which are used to manipulate your existing client state using a much
> > > > smaller footprint (in both time and space). You can imagine, in the
> sort
> > > > scenario, if you evaluate the table after adding rows but before
> > > modifying
> > > > existing rows your table won’t be sorted between the two updates. The
> > > > client would then need to wait until it receives the pair of
> > > RecordBatches
> > > > anyways, so it seems more natural to deliver them together.
> > > >
> > > > > As a side note - is said UI browser-based? Another project
> recently was
> > > > planning to look at JavaScript support for Flight (using WebSockets
> as
> > > the
> > > > transport, IIRC) and it might make sense to join forces if that’s a
> path
> > > > you were also going to pursue.
> > > >
> > > > Yes, our UI runs in the browser, although table operations
> themselves run
> > > > on the server to keep the browser lean and fast. That said, the
> browser
> > > > isn’t the only target for the API we’re iterating on. We’re engaged
> in a
> > > > rewrite to unify our “first-class” Java API for intra-engine (server,
> > > > heavyweight client) usage and our cross-language
> > > (Javascript/C++/C#/Python)
> > > > “open” API. Our existing customers use the engine to drive
> multi-process
> > > > data applications, REPL/notebook experiences, and dashboards. We are
> > > > preserving these capabilities as we make the engine available as open
> > > > source software. One goal of the OSS effort is to produce a singular
> > > modern
> > > > API that’s more interoperable with the data science and development
> > > > community as a whole. In the interest of minimizing entry/egress
> points,
> > > we
> > > > are migrating to gRPC for everything in addition to the data IPC
> layer,
> > > so
> > > > not just the barrage/arrow-flight piece.
> > > >
> > > > The point of all this is to make the Deephaven engine as accessible
> as
> > > > possible for a broad user base, including developers using the API
> from
> > > > their language of choice or scripts/code running co-located within an
> > > > engine process. Our software can be used to explore or build
> applications
> > > > and visualizations around static as well as real-time data (imagine
> > > joins,
> > > > aggregations, sorts, filters, time-series joins, etc), perform table
> > > > operations with code or with a few clicks in a GUI, or as a
> > > building-block
> > > > in a multi-stage data pipeline. We think making ourselves as
> > > interoperable
> > > > as possible with tools built on Arrow is an important part of
> attaining
> > > > this goal.
> > > >
> > > > That said, we have run into quite a few pain points migrating to
> gRPC,
> > > such
> > > > as 1) no-client-side streaming is supported by any browser, 2) today,
> > > > server-side streams require a proxy layer of some sort (such as
> envoy),
> > > 3)
> > > > flatbuffer’s javascript/typescript support is a little weak, and I’m
> sure
> > > > there are others that aren’t coming to mind at the moment. We have
> some
> > > > interesting solutions to these problems, but, today, these issues
> are a
> > > > decent chunk of our focus. That said, the UI is usable today by our
> > > > enterprise clients, but it interacts with the server over websockets
> and
> > > a
> > > > protocol that is heavily influenced by 10-years of existing
> proprietary
> > > > java-to-java IPC (which are NOT friendly to being robust over
> > > intermittent
> > > > failures). Today, we’re just heads-down going the gRPC route and
> hoping
> > > > that eventually browsers get around to better support for some of
> this
> > > > stuff (so, maybe one day a proxy isn’t required, etc). Some of our
> RPCs
> > > > make most sense as bidirectional streams, but to support our web-ui
> we
> > > also
> > > > have a server-streaming variant that we can pass data to
> “out-of-band”
> > > via
> > > > a unary call referencing the particular server stream. It’s fun
> stuff!
> > > I’m
> > > > actually very excited about it even if the text doesn’t sound that
> way
> > > =).
> > > >
> > > > If you can point me to that project/person/post we’d love to get in
> touch
> > > > and are excited to share whatever can be shared.
> > > >
> > > > Nate
> > > >
> > > > On Wed, Mar 3, 2021 at 4:22 PM David Li <lidav...@apache.org> wrote:
> > > >
> > > > > Ah okay, thank you for clarifying! In that case, if each payload
> has
> > > two
> > > > > batches with different purposes - might it make sense to just make
> > > that two
> > > > > different payloads, and set a flag/enum in the metadata to indicate
> > > how to
> > > > > interpret the batch? Then you'd be officially the same as Arrow
> Flight
> > > :)
> > > > >
> > > > > As a side note - is said UI browser-based? Another project
> recently was
> > > > > planning to look at JavaScript support for Flight (using
> WebSockets as
> > > the
> > > > > transport, IIRC) and it might make sense to join forces if that's a
> > > path
> > > > > you were also going to pursue.
> > > > >
> > > > > Best,
> > > > > David
> > > > >
> > > > > On Wed, Mar 3, 2021, at 18:05, Nate Bauernfeind wrote:
> > > > > > Thanks for the interest =).
> > > > > >
> > > > > > > However, if I understand right, you're sending data without a
> fixed
> > > > > > schema [...]
> > > > > >
> > > > > > The dataset does have a known schema ahead of time, which is
> similar
> > > to
> > > > > > Flight. However, as you point out, the subscription can change
> which
> > > > > > columns it is interested in without re-acquiring data for
> columns it
> > > was
> > > > > > already subscribed to. This is mostly for convenience. We use it
> > > > > primarily
> > > > > > to limit which columns are sent to our user interface until the
> user
> > > > > > scrolls them into view.
> > > > > >
> > > > > > The enhancement of the RecordBatch here, aside from the
> additional
> > > > > > metadata, is only in that the payload has two sets of RecordBatch
> > > > > payloads.
> > > > > > The first payload is for added rows, every added row must send
> data
> > > for
> > > > > > each column subscribed; based on the subscribed columns this is
> > > otherwise
> > > > > > fixed width (in the number of columns / buffers). The second
> payload
> > > is
> > > > > for
> > > > > > modified rows. Here we only send the columns that have rows that
> are
> > > > > > modified. Aside from this difference, I have been aiming to be
> > > compatible
> > > > > > enough to be able to reuse the payload parsing that is already
> > > written
> > > > > for
> > > > > > Arrow.
> > > > > >
> > > > > > > I don't quite see why it couldn't be carried as metadata on the
> > > side
> > > > > of a
> > > > > > record batch, instead of having to duplicate the record batch
> > > structure
> > > > > > [...]
> > > > > >
> > > > > > Whoa, this is a good point. I have iterated on this a few times
> to
> > > get it
> > > > > > closer to Arrow's setup and did not realize that 'BarrageData'
> is now
> > > > > > officially identical to `FlightData`. This is an instance of
> being
> > > too
> > > > > > close to the project and forgetting to step back once in a while.
> > > > > >
> > > > > > > Flight already has a bidirectional streaming endpoint,
> DoExchange,
> > > that
> > > > > > allows arbitrary payloads (with mixed metadata/data or only one
> of
> > > the
> > > > > > two), which seems like it should be able to cover the
> > > SubscriptionRequest
> > > > > > endpoint.
> > > > > >
> > > > > > This is exactly the kind of feedback I'm looking for! I wasn't
> > > seeing the
> > > > > > solution where the client-side stream doesn't actually need
> payload
> > > and
> > > > > > that the subscription changes can be described with another
> > > flatbuffer
> > > > > > metadata type. I like that.
> > > > > >
> > > > > > Thanks David!
> > > > > > Nate
> > > > > >
> > > > > > On Wed, Mar 3, 2021 at 3:28 PM David Li <lidav...@apache.org>
> wrote:
> > > > > >
> > > > > > > Hey Nate,
> > > > > > >
> > > > > > > Thanks for sharing this & for the detailed docs and writeup. I
> > > think
> > > > > your
> > > > > > > use case is interesting, but I'd like to clarify a few things.
> > > > > > >
> > > > > > > I would say Arrow Flight doesn't try to impose a particular
> model,
> > > but
> > > > > I
> > > > > > > agree that Barrage does things that aren't easily doable with
> > > Flight.
> > > > > > > Flight does name concepts in a way that suggests how to apply
> it to
> > > > > > > something that looks like a database, but you can mostly think
> of
> > > > > Flight as
> > > > > > > an efficient way to transfer Arrow data over the network upon
> which
> > > > > you can
> > > > > > > layer further semantics.
> > > > > > >
> > > > > > > However, if I understand right, you're sending data without a
> fixed
> > > > > > > schema, in the sense that each BarrageRecordBatch may have
> only a
> > > > > subset of
> > > > > > > the columns declared up front, or may carry new columns? I
> think
> > > this
> > > > > is
> > > > > > > the main thing you can't easily do currently, as Flight (and
> Arrow
> > > IPC
> > > > > in
> > > > > > > general) assumes a fixed schema (and expects all columns in a
> > > batch to
> > > > > have
> > > > > > > the same length).
> > > > > > >
> > > > > > > Otherwise, the encoding for identifying rows and changes is
> > > > > interesting,
> > > > > > > but I don't quite see why it couldn't be carried as metadata
> on the
> > > > > side of
> > > > > > > a record batch, instead of having to duplicate the record batch
> > > > > structure,
> > > > > > > except for the aforementioned schema issue. And in that case it
> > > might
> > > > > be
> > > > > > > better to work out the schema evolution issue & any ergonomic
> > > issues
> > > > > with
> > > > > > > Flight's existing metadata fields/API that would prevent you
> from
> > > using
> > > > > > > them, as that way you (and we!) don't have to fully duplicate
> one
> > > of
> > > > > > > Arrow's format definitions. Similarly, Flight already has a
> > > > > bidirectional
> > > > > > > streaming endpoint, DoExchange, that allows arbitrary payloads
> > > (with
> > > > > mixed
> > > > > > > metadata/data or only one of the two), which seems like it
> should
> > > be
> > > > > able
> > > > > > > to cover the SubscriptionRequest endpoint.
> > > > > > >
> > > > > > > Best,
> > > > > > > David
> > > > > > >
> > > > > > > On Wed, Mar 3, 2021, at 16:08, Nate Bauernfeind wrote:
> > > > > > > > Hello,
> > > > > > > >
> > > > > > > > My colleagues at Deephaven Data Labs and I have been
> addressing
> > > > > problems
> > > > > > > at
> > > > > > > > the intersection of data-driven applications, data science,
> and
> > > > > updating
> > > > > > > > (/ticking) data for some years.
> > > > > > > >
> > > > > > > > Deephaven has a query engine that supports updating tabular
> data
> > > via
> > > > > a
> > > > > > > > protocol that communicates precise changes about datasets,
> such
> > > as 1)
> > > > > > > which
> > > > > > > > rows were removed, 2) which rows were added, 3) which rows
> were
> > > > > modified
> > > > > > > > (and for which columns). We are inspired by Arrow and would
> like
> > > to
> > > > > > > adopt a
> > > > > > > > version of this protocol that adheres to goals similar to
> Arrow
> > > and
> > > > > Arrow
> > > > > > > > Flight.
> > > > > > > >
> > > > > > > > Out of the box, Arrow Flight is insufficient to represent
> such a
> > > > > stream
> > > > > > > of
> > > > > > > > changes. For example, because you cannot identify a
> particular
> > > row
> > > > > within
> > > > > > > > an Arrow Flight, you cannot indicate which rows were removed
> or
> > > > > modified.
> > > > > > > >
> > > > > > > > The project integrates with Arrow Flight at the
> header-metadata
> > > > > level. We
> > > > > > > > have preliminarily named the project Barrage as in a
> "barrage of
> > > > > arrows"
> > > > > > > > which plays in the same "namespace" as a "flight of arrows."
> > > > > > > >
> > > > > > > > We built this as part of an initiative to modernize and open
> up
> > > our
> > > > > table
> > > > > > > > IPC mechanisms. This is part of a larger open source effort
> which
> > > > > will
> > > > > > > > become more visible in the next month or so once we've
> finished
> > > the
> > > > > work
> > > > > > > > necessary to share our core software components, including a
> > > unified
> > > > > > > static
> > > > > > > > and real time query engine complete with data visualization
> > > tools, a
> > > > > REPL
> > > > > > > > experience, Jupyter integration, and more.
> > > > > > > >
> > > > > > > > I would like to find out:
> > > > > > > > - if we have understood the primary goals of Arrow, and are
> > > honoring
> > > > > them
> > > > > > > > as closely as possible
> > > > > > > > - if there are other projects that might benefit from sharing
> > > this
> > > > > > > > extension of Arrow Flight
> > > > > > > > - if there are any gaps that are best addressed early on to
> > > maximize
> > > > > > > future
> > > > > > > > compatibility
> > > > > > > >
> > > > > > > > A great place to digest the concepts that differ from Arrow
> > > Flight
> > > > > are
> > > > > > > here:
> > > > > > > > https://deephaven.github.io/barrage/Concepts.html
> > > > > > > >
> > > > > > > > The proposed protocol can be perused here:
> > > > > > > > https://github.com/deephaven/barrage
> > > > > > > >
> > > > > > > > Internally, we already have a java server and java client
> > > > > implemented as
> > > > > > > a
> > > > > > > > working proof of concept for our use case.
> > > > > > > >
> > > > > > > > I really look forward to your feedback; thank you!
> > > > > > > >
> > > > > > > > Nate Bauernfeind
> > > > > > > >
> > > > > > > > Deephaven Data Labs - https://deephaven.io/
> > > > > > > > --
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > >
> > >
> >
> >
> > --
> >
>

Reply via email to