Thanks for those results, Yibo! Looks like there's still more room for
improvement here. Yes, things are a little unstable, though I didn't
get that much trouble trying to just start the benchmark - I will need
to find suitable hardware and iron out these issues. Note that I've
only implemented DoGet, and I haven't implemented concurrent streams,
which would explain why most benchmark configurations hang or error.

Since the last time, I've rewritten the prototype to use UCX's "active
message" functionality instead of trying to implement messages over
the "streams" API. This simplified the code. I also did some
refactoring along the lines of Yibo's prototype to share more code
between the gRPC and UCX implementations. Here are some benchmark
numbers:

For IPC (server/client on the same machine): UCX with shared memory
handily beats gRPC here. UCX with TCP isn't quite up to par, though.

gRPC:
128KiB batches: 4463 MiB/s
2MiB batches:   3537 MiB/s
32MiB batches:  1828 MiB/s

UCX (shared memory):
128KiB batches: 6500 MiB/s
2MiB batches:  13879 MiB/s
32MiB batches:  9045 MiB/s

UCX (TCP):
128KiB batches: 1069 MiB/s
2MiB batches:   1735 MiB/s
32MiB batches:  1602 MiB/s

For RPC (server/client on different machines): Two t3.xlarge (4 core,
16 thread) machines were used in AWS EC2. These have "up to" 5Gbps
bandwidth. This isn't really a scenario where UCX is expected to
shine, however, UCX performs comparably to gRPC here.

gRPC:
128 KiB batches: 554 MiB/s
2 MiB batches:   575 MiB/s

UCX:
128 KiB batches: 546 MiB/s
2 MiB batches:   567 MiB/s

Raw test logs can be found here:
https://gist.github.com/lidavidm/57d8a3cba46229e4d277ae0730939acc

For IPC, the shared memory results are promising in that it could be
feasible to expose a library purely over Flight without worrying about
FFI bindings. Also, it seems results are roughly comparable to what
Yibo observed in ARROW-15282 [1] meaning UCX will get us both a
performant shared memory transport and support for more exotic
hardware.

There's still much work to be done; at this point, I'd like to start
implementing the rest of the Flight methods, fixing up the many TODOs
scattered around, trying to refactor more things to share code between
gRPC/UCX, and find and benchmark some hardware that UCX has a fast
path for.

[1]: https://issues.apache.org/jira/browse/ARROW-15282

-David

On Tue, Jan 18, 2022, at 04:35, Yibo Cai wrote:
> Some updates.
> 
> I tested David's UCX transport patch over 100Gb network. FlightRPC over 
> UCX/RDMA improves throughput about 50%, with lower and flat latency.
> And I think there are chances to improve further. See test report [1].
> 
> For the data plane approach, the PoC shared memory data plane also 
> introduces significantly performance boost. Details at [2].
> 
> Glad to see there are big potentials to improve FlightRPC performance.
> 
> [1] https://issues.apache.org/jira/browse/ARROW-15229
> [2] https://issues.apache.org/jira/browse/ARROW-15282
> 
> On 12/30/21 11:57 PM, David Li wrote:
> > Ah, I see.
> > 
> > I think both projects can proceed as well. At some point we will have to 
> > figure out how to merge them, but I think it's too early to see how exactly 
> > we will want to refactor things.
> > 
> > I looked over the code and I don't have any important comments for now. 
> > Looking forward to reviewing when it's ready.
> > 
> > -David
> > 
> > On Wed, Dec 29, 2021, at 22:16, Yibo Cai wrote:
> >>
> >>
> >> On 12/29/21 11:03 PM, David Li wrote:
> >>> Awesome, thanks for sharing this too!
> >>>
> >>> The refactoring you have with DataClientStream what I would like to do as 
> >>> well - I think much of the existing code can be adapted to be more 
> >>> transport-agnostic and then it will be easier to support new transports 
> >>> (whether data-only or for all methods).
> >>>
> >>> Where do you see the gaps between gRPC and this? I think what would 
> >>> happen is 1) client calls GetFlightInfo 2) server returns a `shm://` URI 
> >>> 3) client sees the unfamiliar prefix and creates a new client for the 
> >>> DoGet call (it would have to do this anyways if, for instance, the 
> >>> GetFlightInfo call returned the address of a different server).
> >>>
> >>
> >> I mean implementation details. Some unit test runs longer than expected
> >> (data plane timeouts reading from an ended stream). Looks grpc stream
> >> finish message is not correctly intercepted and forwarded to data plane.
> >> I don't think it's big problem, just need some time to debug.
> >>
> >>> I also wonder how this stacks up to UCX's shared memory backend (I did 
> >>> not test this though).
> >>>
> >>
> >> I implemented a shared memory data plane only to verify and consolidate
> >> the data plane design, as it's the easiest (and useful) driver. I also
> >> plan to implement a socket based data plane, not useful in practice,
> >> only to make sure the design works ok across network. Then we can add
> >> more useful drivers like UCX or DPDK (the benefit of DPDK is it works on
> >> commodity hardware, unlike UCX/RDMA which requires expensive equipment).
> >>
> >>> I would like to be able to support entire new transports for certain 
> >>> cases (namely browser support - though perhaps one of the gRPC proxies 
> >>> would suffice there), but even in that case, we could make it so that a 
> >>> new transport only needs to implement the data plane methods. Only having 
> >>> to support the data plane methods would save significant implementation 
> >>> effort for all non-browser cases so I think it's a worthwhile approach.
> >>>
> >>
> >> Thanks for being interest in this approach. My current plan is to first
> >> refactor shared memory data plane to verify it beats grpc in local rpc
> >> by considerable margin, otherwise there must be big mistakes in my
> >> design. After that I will fix unit test issues and deliver for community
> >> review.
> >>
> >> Anyway, don't let me block your implementations. And if you think it's
> >> useful, I can push current code for more detailed discussion.
> >>
> >>> -David
> >>>
> >>> On Wed, Dec 29, 2021, at 04:37, Yibo Cai wrote:
> >>>> Thanks David to initiate UCX integration, great work!
> >>>> I think 5Gbps network is too limited for performance evaluation. I will 
> >>>> try the patch on 100Gb RDMA network, hopefully we can see some 
> >>>> improvements.
> >>>> I once benchmarked flight over 100Gb network [1], grpc based throughput 
> >>>> is 2.4GB/s for one thread, 8.8GB/s for six threads, about 60us latency. 
> >>>> I also benchmarked raw RDMA performance (same batch sizes as flight), 
> >>>> one thread can achive 9GB/s with 12us latency. Of couse the comparison 
> >>>> is not fair. With David's patch, we can get a more realistic comparison.
> >>>>
> >>>> I'm implementing a data plane approach to hope we can adopt new data 
> >>>> acceleration methods easily. My approach is to replace only the 
> >>>> FlighData transmission of DoGet/Put/Exchange with data plane drivers, 
> >>>> and grpc is still used for all rpc calls.
> >>>> Code is at my github repo [2]. Besides the framework, I just implemented 
> >>>> a shared memory data plane driver as PoC. Get/Put/Exchange unit tests 
> >>>> passed, TestCancel hangs, some unit tests run longer than expected, 
> >>>> still debugging. The shared memory data plane performance is pretty bad 
> >>>> now, due to repeated map/unmap for each read/write, pre-allocated pages 
> >>>> should improve much, still experimenting.
> >>>>
> >>>> Would like to hear community comments.
> >>>>
> >>>> My personal opinion is the data plane approach reuses grpc control 
> >>>> plane, may be easier to add new data acceleration methods, but it needs 
> >>>> to fit into grpc seamlessly (there're still gaps not resolved). A new 
> >>>> tranport requires much more initial effort, but may payoff later. And 
> >>>> looks these two approaches don't conflict with each other.
> >>>>
> >>>> [1] Test environment
> >>>> nics: mellanox connectx5
> >>>> hosts: client (neoverse n1), server (xeon gold 5218)
> >>>> os: ubuntu 20.04, linux kernel 5.4
> >>>> test case: 128k batch size, DoGet
> >>>>
> >>>> [2] https://github.com/cyb70289/arrow/tree/flight-data-plane
> >>>>
> >>>> ________________________________
> >>>> From: David Li <lidav...@apache.org>
> >>>> Sent: Wednesday, December 29, 2021 3:09 AM
> >>>> To: dev@arrow.apache.org <dev@arrow.apache.org>
> >>>> Subject: Re: Arrow in HPC
> >>>>
> >>>> I ended up drafting an implementation of Flight based on UCX, and doing 
> >>>> some
> >>>> of the necessary refactoring to support additional backends in the 
> >>>> future.
> >>>> It can run the Flight benchmark, and performance is about comparable to
> >>>> gRPC, as tested on AWS EC2.
> >>>>
> >>>> The implementation is based on the UCP streams API. It's extremely
> >>>> bare-bones and is really only a proof of concept; a good amount of work 
> >>>> is
> >>>> needed to turn it into a usable implementation. I had hoped it would 
> >>>> perform
> >>>> markedly better than gRPC, at least in this early test, but this seems 
> >>>> not
> >>>> to be the case. That said: I am likely not using UCX properly, UCX would
> >>>> still open up support for additional hardware, and this work should allow
> >>>> other backends to be implemented more easily.
> >>>>
> >>>> The branch can be viewed at
> >>>> https://github.com/lidavidm/arrow/tree/flight-ucx
> >>>>
> >>>> I've attached the benchmark output at the end.
> >>>>
> >>>> There are still quite a few TODOs and things that need investigating:
> >>>>
> >>>> - Only DoGet and GetFlightInfo are implemented, and incompletely at that.
> >>>> - Concurrent requests are not supported, or even making more than one
> >>>>     request on a connection, nor does the server support concurrent 
> >>>> clients.
> >>>>     We also need to decide whether to even support concurrent requests, 
> >>>> and
> >>>>     how (e.g. pooling multiple connections, or implementing a gRPC/HTTP2 
> >>>> style
> >>>>     protocol, or even possibly implementing HTTP2).
> >>>> - We need to make sure we properly handle errors, etc. everywhere.
> >>>> - Are we using UCX in a performant and idiomatic manner? Will the
> >>>>     implementation work well on RDMA and other specialized hardware?
> >>>> - Do we also need to support the UCX tag API?
> >>>> - Can we refactor out interfaces that allow sharing more of the
> >>>>     client/server implementation between different backends?
> >>>> - Are the abstractions sufficient to support other potential backends 
> >>>> like
> >>>>     MPI, libfabrics, or WebSockets?
> >>>>
> >>>> If anyone has experience with UCX, I'd appreciate any feedback. 
> >>>> Otherwise,
> >>>> I'm hoping to plan out and try to tackle some of the TODOs above, and 
> >>>> figure
> >>>> out how this effort can proceed.
> >>>>
> >>>> Antoine/Micah raised the possibility of extending gRPC instead. That 
> >>>> would
> >>>> be preferable, frankly, given otherwise we'd might have to re-implement a
> >>>> lot of what gRPC and HTTP2 provide by ourselves. However, the necessary
> >>>> proposal stalled and was dropped without much discussion:
> >>>> https://groups.google.com/g/grpc-io/c/oIbBfPVO0lY
> >>>>
> >>>> Benchmark results (also uploaded at
> >>>> https://gist.github.com/lidavidm/c4676c5d9c89d4cc717d6dea07dee952):
> >>>>
> >>>> Testing was done between two t3.xlarge instances in the same zone.
> >>>> t3.xlarge has "up to 5 Gbps" of bandwidth (~600 MiB/s).
> >>>>
> >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info 
> >>>> ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host 
> >>>> 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000 
> >>>> -records_per_batch=4096
> >>>> Testing method: DoGet
> >>>> [1640703417.639373] [ip-172-31-37-78:10110:0]     ucp_worker.c:1627 UCX  
> >>>> INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> >>>> [1640703417.650068] [ip-172-31-37-78:10110:1]     ucp_worker.c:1627 UCX  
> >>>> INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> >>>> Number of perf runs: 1
> >>>> Number of concurrent gets/puts: 1
> >>>> Batch size: 131072
> >>>> Batches read: 10000
> >>>> Bytes read: 1310720000
> >>>> Nanos: 2165862969
> >>>> Speed: 577.137 MB/s
> >>>> Throughput: 4617.1 batches/s
> >>>> Latency mean: 214 us
> >>>> Latency quantile=0.5: 209 us
> >>>> Latency quantile=0.95: 340 us
> >>>> Latency quantile=0.99: 409 us
> >>>> Latency max: 6350 us
> >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info 
> >>>> ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host 
> >>>> 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000 
> >>>> -records_per_batch=65536
> >>>> Testing method: DoGet
> >>>> [1640703439.428785] [ip-172-31-37-78:10116:0]     ucp_worker.c:1627 UCX  
> >>>> INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> >>>> [1640703439.440359] [ip-172-31-37-78:10116:1]     ucp_worker.c:1627 UCX  
> >>>> INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> >>>> Number of perf runs: 1
> >>>> Number of concurrent gets/puts: 1
> >>>> Batch size: 2097152
> >>>> Batches read: 10000
> >>>> Bytes read: 20971520000
> >>>> Nanos: 34184175236
> >>>> Speed: 585.066 MB/s
> >>>> Throughput: 292.533 batches/s
> >>>> Latency mean: 3415 us
> >>>> Latency quantile=0.5: 3408 us
> >>>> Latency quantile=0.95: 3549 us
> >>>> Latency quantile=0.99: 3800 us
> >>>> Latency max: 20236 us
> >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info 
> >>>> ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host 
> >>>> 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000 
> >>>> -records_per_batch=4096
> >>>> Testing method: DoGet
> >>>> Using standalone TCP server
> >>>> Server host: 172.31.34.4
> >>>> Server port: 31337
> >>>> Number of perf runs: 1
> >>>> Number of concurrent gets/puts: 1
> >>>> Batch size: 131072
> >>>> Batches read: 10000
> >>>> Bytes read: 1310720000
> >>>> Nanos: 2375289668
> >>>> Speed: 526.252 MB/s
> >>>> Throughput: 4210.01 batches/s
> >>>> Latency mean: 235 us
> >>>> Latency quantile=0.5: 203 us
> >>>> Latency quantile=0.95: 328 us
> >>>> Latency quantile=0.99: 1377 us
> >>>> Latency max: 17860 us
> >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info 
> >>>> ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host 
> >>>> 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000 
> >>>> -records_per_batch=65536
> >>>> Testing method: DoGet
> >>>> Using standalone TCP server
> >>>> Server host: 172.31.34.4
> >>>> Server port: 31337
> >>>> Number of perf runs: 1
> >>>> Number of concurrent gets/puts: 1
> >>>> Batch size: 2097152
> >>>> Batches read: 10000
> >>>> Bytes read: 20971520000
> >>>> Nanos: 34202704498
> >>>> Speed: 584.749 MB/s
> >>>> Throughput: 292.375 batches/s
> >>>> Latency mean: 3416 us
> >>>> Latency quantile=0.5: 3406 us
> >>>> Latency quantile=0.95: 3548 us
> >>>> Latency quantile=0.99: 3764 us
> >>>> Latency max: 17086 us
> >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ iperf3 -c 172.31.34.4 -p 
> >>>> 1337 -Z -l 1M
> >>>> Connecting to host 172.31.34.4, port 1337
> >>>> [  5] local 172.31.37.78 port 48422 connected to 172.31.34.4 port 1337
> >>>> [ ID] Interval           Transfer     Bitrate         Retr  Cwnd
> >>>> [  5]   0.00-1.00   sec   572 MBytes  4.79 Gbits/sec   36   2.35 MBytes
> >>>> [  5]   1.00-2.00   sec   582 MBytes  4.88 Gbits/sec    0   2.43 MBytes
> >>>> [  5]   2.00-3.00   sec   585 MBytes  4.91 Gbits/sec    0   2.43 MBytes
> >>>> [  5]   3.00-4.00   sec   587 MBytes  4.92 Gbits/sec    0   2.43 MBytes
> >>>> [  5]   4.00-5.00   sec   587 MBytes  4.92 Gbits/sec    0   2.43 MBytes
> >>>> [  5]   5.00-6.00   sec   586 MBytes  4.91 Gbits/sec    0   2.43 MBytes
> >>>> [  5]   6.00-7.00   sec   586 MBytes  4.92 Gbits/sec    0   2.43 MBytes
> >>>> [  5]   7.00-8.00   sec   580 MBytes  4.87 Gbits/sec    0   2.43 MBytes
> >>>> [  5]   8.00-9.00   sec   584 MBytes  4.89 Gbits/sec    0   2.43 MBytes
> >>>> [  5]   9.00-10.00  sec   577 MBytes  4.84 Gbits/sec    0   2.43 MBytes
> >>>> - - - - - - - - - - - - - - - - - - - - - - - - -
> >>>> [ ID] Interval           Transfer     Bitrate         Retr
> >>>> [  5]   0.00-10.00  sec  5.69 GBytes  4.89 Gbits/sec   36             
> >>>> sender
> >>>> [  5]   0.00-10.00  sec  5.69 GBytes  4.88 Gbits/sec                  
> >>>> receiver
> >>>>
> >>>> iperf Done.
> >>>>
> >>>> Best,
> >>>> David
> >>>>
> >>>> On Tue, Nov 2, 2021, at 19:59, Jed Brown wrote:
> >>>>> "David Li" <lidav...@apache.org> writes:
> >>>>>
> >>>>>> Thanks for the clarification Yibo, looking forward to the results. 
> >>>>>> Even if it is a very hacky PoC it will be interesting to see how it 
> >>>>>> affects performance, though as Keith points out there are benefits in 
> >>>>>> general to UCX (or similar library), and we can work out the 
> >>>>>> implementation plan from there.
> >>>>>>
> >>>>>> To Benson's point - the work done to get UCX supported would pave the 
> >>>>>> way to supporting other backends as well. I'm personally not familiar 
> >>>>>> with UCX, MPI, etc. so is MPI here more about playing well with 
> >>>>>> established practices or does it also offer potential hardware 
> >>>>>> support/performance improvements like UCX would?
> >>>>>
> >>>>> There are two main implementations of MPI, MPICH and Open MPI, both of 
> >>>>> which are permissively licensed open source community projects. Both 
> >>>>> have direct support for UCX and unless your needs are very specific, 
> >>>>> the overhead of going through MPI is likely to be negligible. Both also 
> >>>>> have proprietary derivatives, such as Cray MPI (MPICH derivative) and 
> >>>>> Spectrum MPI (Open MPI derivative), which may have optimizations for 
> >>>>> proprietary networks. Both MPICH and Open MPI can be built without UCX, 
> >>>>> and this is often easier (UCX 'master' is more volatile in my 
> >>>>> experience).
> >>>>>
> >>>>> The vast majority of distributed memory scientific applications use MPI 
> >>>>> or higher level libraries, rather than writing directly to UCX (which 
> >>>>> provides less coverage of HPC networks). I think MPI compatibility is 
> >>>>> important.
> >>>>>
> >>>>>   From way up-thread (sorry):
> >>>>>
> >>>>>>>>>>>> Jed - how would you see MPI and Flight interacting? As another
> >>>>>>>>>>>> transport/alternative to UCX? I admit I'm not familiar with the 
> >>>>>>>>>>>> HPC
> >>>>>>>>>>>> space.
> >>>>>
> >>>>> MPI has collective operations like MPI_Allreduce (perform a reduction 
> >>>>> and give every process the result; these run in log(P) or better time 
> >>>>> with small constants -- 15 microseconds is typical for a cheap 
> >>>>> reduction operation on a million processes). MPI supports user-defined 
> >>>>> operations for reductions and prefix-scan operations. If we defined 
> >>>>> MPI_Ops for Arrow types, we could compute summary statistics and other 
> >>>>> algorithmic building blocks fast at arbitrary scale.
> >>>>>
> >>>>> The collective execution model might not be everyone's bag, but MPI_Op 
> >>>>> can also be used in one-sided operations (MPI_Accumulate and 
> >>>>> MPI_Fetch_and_op) and dropping into collective mode has big advantages 
> >>>>> for certain algorithms in computational statistics/machine learning.
> >>>>>
> >>>> IMPORTANT NOTICE: The contents of this email and any attachments are 
> >>>> confidential and may also be privileged. If you are not the intended 
> >>>> recipient, please notify the sender immediately and do not disclose the 
> >>>> contents to any other person, use it for any purpose, or store or copy 
> >>>> the information in any medium. Thank you.
> >>>>
> >>>
> >>
> > 
> 

Reply via email to