So far it seems as if pyarrow is completely ignoring the RecordBatch.length field. More info to follow...
On Tue, Jul 2, 2019 at 3:02 PM John Muehlhausen <j...@jgm.org> wrote: > Crikey! I'll do some testing around that and suggest some test cases to > ensure it continues to work, assuming that it does. > > -John > > On Tue, Jul 2, 2019 at 2:41 PM Wes McKinney <wesmck...@gmail.com> wrote: > >> Thanks for the attachment, it's helpful. >> >> On Tue, Jul 2, 2019 at 1:40 PM John Muehlhausen <j...@jgm.org> wrote: >> > >> > Attachments referred to in previous two messages: >> > >> https://www.dropbox.com/sh/6ycfuivrx70q2jx/AAAt-RDaZWmQ2VqlM-0s6TqWa?dl=0 >> > >> > On Tue, Jul 2, 2019 at 1:14 PM John Muehlhausen <j...@jgm.org> wrote: >> > >> > > Thanks, Wes, for the thoughtful reply. I really appreciate the >> > > engagement. In order to clarify things a bit, I am attaching a >> graphic of >> > > how our application will take record-wise (row-oriented) data from an >> event >> > > source and incrementally populate a pre-allocated Arrow-compatible >> buffer, >> > > including for variable-length fields. (Obviously at this stage I am >> not >> > > using the reference implementation Arrow code, although that would be >> a >> > > goal.... to contribute that back to the project.) >> > > >> > > For sake of simplicity these are non-nullable fields. As a result a >> > > reader of "y" that has no knowledge of the "utilized" metadata would >> get a >> > > long string (zeros, spaces, uninitialized, or whatever we decide for >> the >> > > pre-allocation model) for the record just beyond the last utilized >> record. >> > > >> > > I don't see any "big O"-analysis problems with this approach. The >> > > space/time tradeoff is that we have to guess how much room to >> allocate for >> > > variable-length fields. We will probably almost always be wrong. >> This >> > > ends up in "wasted" space. However, we can do calculations based on >> these >> > > partially filled batches that take full advantage of the columnar >> layout. >> > > (Here I've shown the case where we had too little variable-length >> buffer >> > > set aside, resulting in "wasted" rows. The flip side is that rows >> achieve >> > > full [1] utilization but there is wasted variable-length buffer if we >> guess >> > > incorrectly in the other direction.) >> > > >> > > I proposed a few things that are "nice to have" but really what I'm >> eyeing >> > > is the ability for a reader-- any reader (e.g. pyarrow)-- to see that >> some >> > > of the rows in a RecordBatch are not to be read, based on the new >> > > "utilized" (or whatever name) metadata. That single tweak to the >> > > metadata-- and readers honoring it-- is the core of the proposal. >> > > (Proposal 4.) This would indicate that the attached example (or >> something >> > > similar) is the blessed approach for those seeking to accumulate >> events and >> > > process them while still expecting more data, with the heavier-weight >> task >> > > of creating a new pre-allocated batch being a rare occurrence. >> > > >> >> So the "length" field in RecordBatch is already the utilized number of >> rows. The body buffers can certainly have excess unused space. So your >> application can mutate Flatbuffer "length" field in-place as new >> records are filled in. >> >> > > Notice that the mutability is only in the sense of "appending." The >> > > current doctrine of total immutability would be revised to refer to >> the >> > > immutability of only the already-populated rows. >> > > >> > > It gives folks an option other than choosing the lesser of two evils: >> on >> > > the one hand, length 1 RecordBatches that don't result in a stream >> that is >> > > computationally efficient. On the other hand, adding artificial >> latency by >> > > accumulating events before "freezing" a larger batch and only then >> making >> > > it available to computation. >> > > >> > > -John >> > > >> > > On Tue, Jul 2, 2019 at 12:21 PM Wes McKinney <wesmck...@gmail.com> >> wrote: >> > > >> > >> hi John, >> > >> >> > >> On Tue, Jul 2, 2019 at 11:23 AM John Muehlhausen <j...@jgm.org> >> wrote: >> > >> > >> > >> > During my time building financial analytics and trading systems (23 >> > >> years!), both the "batch processing" and "stream processing" >> paradigms have >> > >> been extensively used by myself and by colleagues. >> > >> > >> > >> > Unfortunately, the tools used in these paradigms have not >> successfully >> > >> overlapped. For example, an analyst might use a Python notebook with >> > >> pandas to do some batch analysis. Then, for acceptable latency and >> > >> throughput, a C++ programmer must implement the same schemas and >> processing >> > >> logic in order to analyze real-time data for real-time decision >> support. >> > >> (Time horizons often being sub-second or even sub-millisecond for an >> > >> acceptable reaction to an event. The most aggressive software-based >> > >> systems, leaving custom hardware aside other than things like >> kernel-bypass >> > >> NICs, target 10s of microseconds for a full round trip from data >> ingestion >> > >> to decision.) >> > >> > >> > >> > As a result, TCO is more than doubled. A doubling can be >> accounted for >> > >> by two implementations that share little or nothing in the way of >> > >> architecture. Then additional effort is required to ensure that >> these >> > >> implementations continue to behave the same way and are upgraded in >> > >> lock-step. >> > >> > >> > >> > Arrow purports to be a "bridge" technology that eases one of the >> pain >> > >> points of working in different ecosystems by providing a common event >> > >> stream data structure. (Discussion of common processing techniques >> is >> > >> beyond the scope of this discussion. Suffice it to say that a >> streaming >> > >> algo can always be run in batch, but not vice versa.) >> > >> > >> > >> > Arrow seems to be growing up primarily in the batch processing >> world. >> > >> One publication notes that "the missing piece is streaming, where the >> > >> velocity of incoming data poses a special challenge. There are some >> early >> > >> experiments to populate Arrow nodes in microbatches..." [1] Part >> our our >> > >> discussion could be a response to this observation. In what ways is >> it >> > >> true or false? What are the plans to remedy this shortcoming, if it >> > >> exists? What steps can be taken now to ease the transition to >> low-latency >> > >> streaming support in the future? >> > >> > >> > >> >> > >> Arrow columnar format describes a collection of records with values >> > >> between records being placed adjacent to each other in memory. If you >> > >> break that assumption, you don't have a columnar format anymore. So I >> > >> don't where the "shortcoming" is. We don't have any software in the >> > >> project for managing the creation of record batches in a streaming >> > >> application, but this seems like an interesting development expansion >> > >> area for the project. >> > >> >> > >> Note that many contributors have already expanded the surface area of >> > >> what's in the Arrow libraries in many directions. >> > >> >> > >> Streaming data collection is yet another area of expansion, but >> > >> _personally_ it is not on the short list of projects that I will >> > >> personally be working on (or asking my direct or indirect colleagues >> > >> to work on). Since this is a project made up of volunteers, it's up >> to >> > >> contributors to drive new directions for the project by writing >> design >> > >> documents and pull requests. >> > >> >> > >> > In my own experience, a successful strategy for stream processing >> where >> > >> context (i.e. recent past events) must be considered by calculations >> is to >> > >> pre-allocate memory for event collection, to organize this memory in >> a >> > >> columnar layout, and to run incremental calculations at each event >> ingress >> > >> into the partially populated memory. [Fig 1] When the pre-allocated >> > >> memory has been exhausted, allocate a new batch of column-wise >> memory and >> > >> continue. When a batch is no longer pertinent to the calculation >> look-back >> > >> window, free the memory back to the heap or pool. >> > >> > >> > >> > Here we run into the first philosophical barrier with Arrow, where >> > >> "Arrow data is immutable." [2] There is currently little or no >> > >> consideration for reading a partially constructed RecordBatch, e.g. >> one >> > >> with only some of the rows containing event data at the present >> moment in >> > >> time. >> > >> > >> > >> >> > >> It seems like the use case you have heavily revolves around mutating >> > >> pre-allocated, memory-mapped datasets that are being consumed by >> other >> > >> processes on the same host. So you want to incrementally fill some >> > >> memory-mapped data that you've already exposed to another process. >> > >> >> > >> Because of the memory layout for variable-size and nested cells, it >> is >> > >> impossible in general to mutate Arrow record batches. This is not a >> > >> philosophical position: this was a deliberate technical decision to >> > >> guarantee data locality for scans and predictable O(1) random access >> > >> on variable-length and nested data. >> > >> >> > >> Technically speaking, you can mutate memory in-place for fixed-size >> > >> types in-RAM or on-disk, if you want to. It's an "off-label" use case >> > >> but no one is saying you can't do this. >> > >> >> > >> > Proposal 1: Shift the Arrow "immutability" doctrine to apply to >> > >> populated records of a RecordBatch instead of to all records? >> > >> > >> > >> >> > >> Per above, this is impossible in generality. You can't alter >> > >> variable-length or nested records without rewriting the record batch. >> > >> >> > >> > As an alternative approach, RecordBatch can be used as a single >> Record >> > >> (batch length of one). [Fig 2] In this approach the benefit of the >> > >> columnar layout is lost for look-back window processing. >> > >> > >> > >> > Another alternative approach is to collect an entire RecordBatch >> before >> > >> stepping through it with the stream processing calculation. [Fig 3] >> With >> > >> this approach some columnar processing benefit can be recovered, >> however >> > >> artificial latency is introduced. As tolerance for delays in >> decision >> > >> support dwindles, this model will be of increasingly limited value. >> It is >> > >> already unworkable in many areas of finance. >> > >> > >> > >> > When considering the Arrow format and variable length values such >> as >> > >> strings, the pre-allocation approach (and subsequent processing of a >> > >> partially populated batch) encounters a hiccup. How do we know the >> amount >> > >> of buffer space to pre-allocate? If we allocate too much buffer for >> > >> variable-length data, some of it will be unused. If we allocate too >> little >> > >> buffer for variable-length data, some row entities will be unusable. >> > >> (Additional "rows" remain but when populating string fields there is >> no >> > >> longer string storage space to point them to.) >> > >> > >> > >> > As with many optimization space/time tradeoff problems, the >> solution >> > >> seems to be to guess. Pre-allocation sets aside variable length >> buffer >> > >> storage based on the typical "expected size" of the variable length >> data. >> > >> This can result in some unused rows, as discussed above. [Fig 4] >> In fact >> > >> it will necessarily result in one unused row unless the last of each >> > >> variable length field in the last row exactly fits into the >> remaining space >> > >> in the variable length data buffer. Consider the case where there >> is more >> > >> variable length buffer space than data: >> > >> > >> > >> > Given variable-length field x, last row index of y, variable length >> > >> buffer v, beginning offset into v of o: >> > >> > x[y] begins at o >> > >> > x[y] ends at the offset of the next record, there is no next >> > >> record, so x[y] ends after the total remaining area in variable >> length >> > >> buffer... however, this is too much! >> > >> > >> > >> >> > >> It isn't clear to me what you're proposing. It sounds like you want a >> > >> major redesign of the columnar format to permit in-place mutation of >> > >> strings. I doubt that would be possible at this point. >> > >> >> > >> > Proposal 2: [low priority] Create an "expected length" statistic >> in the >> > >> Schema for variable length fields? >> > >> > >> > >> > Proposal 3: [low priority] Create metadata to store the index into >> > >> variable-length data that represents the end of the value for the >> last >> > >> record? Alternatively: a row is "wasted," however pre-allocation is >> > >> inexact to begin with. >> > >> > >> > >> > Proposal 4: Add metadata to indicate to a RecordBatch reader that >> only >> > >> some of the rows are to be utilized. [Fig 5] This is useful not >> only when >> > >> processing a batch that is still under construction, but also for >> "closed" >> > >> batches that were not able to be fully populated due to an imperfect >> > >> projection of variable length storage. >> > >> > >> > >> > On this last proposal, Wes has weighed in: >> > >> > >> > >> > "I believe your use case can be addressed by pre-allocating record >> > >> batches and maintaining application level metadata about what >> portion of >> > >> the record batches has been 'filled' (so the unfilled records can be >> > >> dropped by slicing). I don't think any change to the binary protocol >> is >> > >> warranted." [3] >> > >> > >> > >> >> > >> My personal opinion is that a solution to the problem you have can be >> > >> composed from the components (combined with some new pieces of code) >> > >> that we have developed in the project already. >> > >> >> > >> So the "application level" could be an add-on C++ component in the >> > >> Apache Arrow project. Call it a "memory-mapped streaming data >> > >> collector" that pre-allocates on-disk record batches (of only >> > >> fixed-size or even possibly dictionary-encoded types) and then fills >> > >> them incrementally as bits of data come in, updating some auxiliary >> > >> metadata that other processes can use to determine what portion of >> the >> > >> Arrow IPC messages to "slice off". >> > >> >> > >> > Concerns with positioning this at the app level: >> > >> > >> > >> > 1- Do we need to address or begin to address the overall concern >> of how >> > >> Arrow data structures are to be used in "true" (non-microbatch) >> streaming >> > >> environments, cf [1] in the last paragraph, as a *first-class* usage >> > >> pattern? If so, is now the time? >> > >> >if you break that design invariant you don't have a columnar format >> > >> anymore. >> > >> >> > >> Arrow provides a binary protocol for describing a payload data on the >> > >> wire (or on-disk, or in-memory, all the same). I don't see how it is >> > >> in conflict with streaming environments, unless the streaming >> > >> application has difficulty collecting multiple records into an Arrow >> > >> record batches. In that case, it's a system trade-off. Currently >> > >> people are using Avro with Kafka and sending one record at a time, >> but >> > >> then they're also spending a lot of CPU cycles in serialization. >> > >> >> > >> > 2- If we can even make broad-stroke attempts at data structure >> features >> > >> that are likely to be useful when streaming becomes a first class >> citizen, >> > >> it reduces the chances of "breaking" format changes in the future. >> I do >> > >> not believe the proposals place an undue hardship on batch processing >> > >> paradigms. We are currently discussing making a breaking change to >> the IPC >> > >> format [4], so there is a window of opportunity to consider features >> useful >> > >> for streaming? (Current clients can feel free to ignore the proposed >> > >> "utilized" metadata of RecordBatch.) >> > >> > >> > >> >> > >> I think the perception that streaming is not a first class citizen is >> > >> an editorialization (e.g. the article you cited was an editorial >> > >> written by an industry analyst based on an interview with Jacques and >> > >> me). Columnar data formats in general are designed to work with more >> > >> than one value at a time (which we are calling a "batch" but I think >> > >> that's conflating terminology with the "batch processing" paradigm of >> > >> Hadoop, etc.), >> > >> >> > >> > 3- Part of the promise of Arrow is that applications are not a >> world >> > >> unto themselves, but interoperate with other Arrow-compliant >> systems. In >> > >> my case, I would like users to be able to examine RecordBatchs in >> tools >> > >> such as pyarrow without needing to be aware of any streaming >> app-specific >> > >> metadata. For example, a researcher may pull in an IPC "File" >> containing N >> > >> RecordBatch messages corresponding to those in Fig 4. I would very >> much >> > >> like for this casual user to not have to apply N slice operations >> based on >> > >> out-of-band data to get to the data that is relevant. >> > >> > >> > >> >> > >> Per above, should this become a standard enough use case, I think >> that >> > >> code can be developed in the Apache project to address it. >> > >> >> > >> > Devil's advocate: >> > >> > >> > >> > 1- Concurrent access to a mutable (growing) RecordBatch will >> require >> > >> synchronization of some sort to get consistent metadata reads. >> Since the >> > >> above proposals do not specify how this synchronization will occur >> for >> > >> tools such as pyarrow (we can imagine a Python user getting >> synchronized >> > >> access to File metadata and mapping a read-only area before the >> writer is >> > >> allowed to continue "appending" to this batch, or batches to this >> File), >> > >> some "unusual" code will be required anyway, so what is the harm of >> > >> consulting side-band data for slicing all the batches as part of this >> > >> "unusual" code? [Potential response: Yes, but it is still one less >> thing >> > >> to worry about, and perhaps first-class support for common >> synchronization >> > >> patterns can be forthcoming? These patterns may not require further >> format >> > >> changes?] >> > >> > >> > >> > My overall concern is that I see a lot of wasted effort dealing >> with >> > >> the "impedance mismatch" between batch oriented and streaming >> systems. I >> > >> believe that "best practices" will begin (and continue!) to prefer >> tools >> > >> that help bridge the gap. Certainly this is the case in my own >> work. I >> > >> agree with the appraisal at the end of the ZDNet article. If the >> above is >> > >> not a helpful solution, what other steps can be made? Or if Arrow is >> > >> intentionally confined to batch processing for the foreseeable >> future (in >> > >> terms of first-class support), I'm interested in the rationale. >> Perhaps >> > >> the feeling is that we avoid scope creep now (which I understand can >> be >> > >> never-ending) even if it means a certain breaking change in the >> future? >> > >> > >> > >> >> > >> There's some semantic issues with what "streaming" and "batch" means. >> > >> When people see "streaming" nowadays they think "Kafka" (or >> > >> Kafka-like). Single events flow in and out of streaming computation >> > >> nodes (e.g. like https://apache.github.io/incubator-heron/ or >> others). >> > >> The "streaming" is more about computational semantics than data >> > >> representation. >> > >> >> > >> The Arrow columnar format fundamentally deals with multiple records >> at >> > >> a time (you can have a record batch with size 1, but that is not >> going >> > >> to be efficient). But I do not think Arrow is "intentially confined" >> > >> to batch processing. If it makes sense to use a columnar format to >> > >> represent data in a streaming application, then you can certainly use >> > >> it for that. I'm aware of people successfully using Arrow with Kafka, >> > >> for example. >> > >> >> > >> - Wes >> > >> >> > >> > Who else encounters the need to mix/match batch and streaming, and >> what >> > >> are your experiences? >> > >> > >> > >> > Thanks for the further consideration and discussion! >> > >> > >> > >> > [1] https://zd.net/2H0LlBY >> > >> > [2] https://arrow.apache.org/docs/python/data.html >> > >> > [3] https://bit.ly/2J5sENZ >> > >> > [4] https://bit.ly/2Yske8L >> > >> >> > > >> >