Yes, that sounds about right to me.

I'd like getStream to return Future<ArrowReader> or something like that, 
additionally. That way we could free up the RPC handler thread.

Metadata would need some work, so possibly ArrowReader isn't the right 
interface, but something like it. (This is more a sketch of an idea than a 
concrete proposal.)

I still think it would be OK to expose setOnReadyHandler. There is a bit of 
tension here between wrapping gRPC entirely and trying not to expose it (so 
that we can offer alternative transports in the future), and allowing deeper 
integration with gRPC (which would likely be more immediately practically 
useful, at the cost of tying us to implementation details/quirks). 

And yes, ideally we'd let you customize how to handle backpressure so that 
different applications could customize it for the needs of their deployment. So 
maybe the current blocking implementation is best after all (with the option of 
setOnReadyHandler).

On Sun, Mar 19, 2023, at 22:16, Nathaniel Jones wrote:
> Hi David,
>
> Thanks so much for the fast reply, that’s really helpful context.
>
> I like the idea you mentioned where the application provides an 
> ArrowReader. Would that story go something like the following?
> 1. Today in FlightService doGetCustom, a GetListener is passed to 
> producer.getStream - this leaves the application developer responsible 
> for handling gRPC details discussed above (they can choose to respect 
> OutboundStreamListener.isReady or not, etc.).
> 2. In the ArrowReader case you mentioned, that same call to 
> producer.getStream would instead (or as another option) return an 
> ArrowReader… then the FlightService code could say something like 
> “while loadNextBatch is true, (somehow) unload that RecordBatch and 
> pipe it through to the OutboundStreamListener’s putNext”
> — One nice thing about interacting with the OutboundStreamListener 
> directly is that application developers can putNext with metadata - I 
> wonder where metadata fits in the ArrowReader case?
> — That makes sense that it would be hard to get that async writing out 
> to gRPC to work flawlessly - maybe one option in this scenario where 
> Flight is deciding how to write out on the stream would be to default 
> to a “noop BackpressureStrategy” or a simple blocking one and let 
> application developers optionally override?
>
> Thanks,
> Nate  
>
> On 2023/03/12 17:12:41 David Li wrote:
>> Hi Nate,
>> 
>> That sounds about right to me (it's been a while since I've dug into 
>> gRPC-Java behavior). A better server API is something I've long wanted to 
>> consider and haven't had the time to; the current APIs try to let you write 
>> blocking/procedural code as much as possible which then does not mesh well 
>> with the actual gRPC APIs they attempt to wrap.
>> 
>> We could expose setOnReadyHandler, IMO. Though from what I recall, it's very 
>> tricky to use correctly and it's easy to get yourself "stuck" by missing a 
>> callback.
>> 
>> My hope for a better server API would be to eventually just have the 
>> application provide an ArrowReader (asynchronously) and then have the Flight 
>> implementation pull from that reader in the most efficient possible manner 
>> (though gRPC-Java makes that hard, what with the hardcoded backpressure 
>> threshold [1] - I think that may have been another reason why I didn't 
>> expose setOnReadyListener before since it would artificially limit your 
>> throughput)
>> 
>> [1]: https://github.com/grpc/proposal/pull/135
>> 
>> -David
>> 
>> On Fri, Mar 10, 2023, at 18:46, Nathaniel Jones wrote:
>> > Hello,
>> >
>> > I'm hoping to check my understanding around various ways to implement a
>> > DoGet handler with respect to flow control, and then inquire about
>> > potential future API changes.
>> >
>> > First, I'm aware of 3 ways to respect flow control when implementing a Java
>> > server's DoGet that have different characteristics:
>> >
>> >    1. Busy-Waiting / Thread.sleep()-ing:
>> >       1. Implement a blocking body that loops (and maybe sleeps) while 
>> > the
>> >       ServerStreamListener's isReady is false (and respect isCancelled, 
>> > too)
>> >    2. Using BackpressureStrategy
>> >    
>> > <https://sourcegraph.com/github.com/apache/arrow/-/blob/java/flight/flight-core/src/main/java/org/apache/arrow/flight/BackpressureStrategy.java?L28:8>
>> > (and
>> >    specifically CallbackBackpressureStrategy
>> >    
>> > <https://sourcegraph.com/github.com/apache/arrow/-/blob/java/flight/flight-core/src/main/java/org/apache/arrow/flight/BackpressureStrategy.java?L75>
>> > since
>> >    one could implement option #1 above as a simple strategy):
>> >       1. My own experiments with CallbackBackpressureStrategy /
>> >       understanding from initial PR discussion
>> >       <https://github.com/apache/arrow/pull/8476#issuecomment-710777211>
>> > demonstrate
>> >       that the DoGet handler must be run on a separate thread; you 
>> > can't invoke
>> >       the "waitForListener" on the thread that gRPC uses to invoke this 
>> > RPC
>> >       because if you're blocking (in this case Thread await-ing) on 
>> > this gRPC
>> >       thread, gRPC can't process onReady callbacks for this RPC, and 
>> > thus
>> >       CallbackBackpressureStrategy would never be notify-ed to wake up
>> >    3. Write a fully async implementation relying directly on underlying
>> >    CallStreamObserver's setOnReadyHandler:
>> >       1. This is similar in spirit to #2 above, but now operates 
>> > completely
>> >       on threads from gRPC's thread pool (the onReady handler *is* the
>> >       DoGet logic). The code looks very roughly like:
>> >          1. make VectorSchemaRoot with some schema and allocator
>> >          2. On our ServerStreamListener, invoke listener.start(root)
>> >          immediately
>> >          3. set listener.setOnReadyHandler(<the DoGet logic to stream 
>> > data
>> >          to client>)
>> >          4. So, no blocking
>> >
>> > My understanding of the trade-offs between options #2 and #3 above:
>> >
>> >    - In CallbackBackpressureStrategy with background threads, there'll be
>> >    (# threads in gRPC pool) + (sum of background threads across currently
>> >    running DoGet streams) threads. If the actual streaming logic is 
>> > intensive
>> >    / CPU bound, it might be good for that to live on a background thread,
>> >    because if the gRPC threads were tied up in intensive callbacks (and 
>> > even
>> >    exhausted), new RPC requests / ready and cancel callbacks could be slow 
>> > /
>> >    stuck.
>> >    - On the other hand, for quick I/O bound work in DoGet logic, option #3
>> >    might work well if it's not worth the extra threads / context switching,
>> >    and gRPC threads can handle everything quickly.
>> >    - So overall, it seems like different workloads could demand a different
>> >    model, which should inform how DoGet logic is written.
>> >
>> >
>> > *Q1: *Is my understanding of the above points approximately correct? I'd
>> > appreciate any pointers on items I'm misunderstanding.
>> >
>> > *Q2: *Assuming option #3 has utility in some cases, I'm curious if 
>> > there is
>> > a way in Flight to expose an easier API to implement DoGet fully
>> > asynchronously. I find it tricky to manage cleanup of the 
>> > VectorSchemaRoot
>> > as well as being careful about things like respecting isCancelled 
>> > becoming
>> > true. Developers also "need to know" that the async setOnReadyHandler
>> > exists at the gRPC layer and understand its benefits. I noticed that on
>> > ARROW-4484 <https://issues.apache.org/jira/browse/ARROW-4484>, which
>> > focuses on DoPut's busy-wait, the first comment mentions that this 
>> > applies
>> > to DoGet as well - though it seems GetListener's waitUntilStreamReady
>> > <https://sourcegraph.com/github.com/apache/arrow/-/blob/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightService.java?L189-192>
>> > doesn't
>> > busy-wait, I was curious if the spirit of the comment was similar to my
>> > question around an easier way to write the logic asynchronously? If so,
>> > would the idea be for Flight to wrap gRPC's callbacks and expose those 
>> > to
>> > DoGet authors, while helping to abstract some cleanup items?
>> >
>> > Thank you for any feedback,
>> > Nate
>>

Reply via email to