Thanks David to initiate UCX integration, great work! I think 5Gbps network is too limited for performance evaluation. I will try the patch on 100Gb RDMA network, hopefully we can see some improvements. I once benchmarked flight over 100Gb network [1], grpc based throughput is 2.4GB/s for one thread, 8.8GB/s for six threads, about 60us latency. I also benchmarked raw RDMA performance (same batch sizes as flight), one thread can achive 9GB/s with 12us latency. Of couse the comparison is not fair. With David's patch, we can get a more realistic comparison.
I'm implementing a data plane approach to hope we can adopt new data acceleration methods easily. My approach is to replace only the FlighData transmission of DoGet/Put/Exchange with data plane drivers, and grpc is still used for all rpc calls. Code is at my github repo [2]. Besides the framework, I just implemented a shared memory data plane driver as PoC. Get/Put/Exchange unit tests passed, TestCancel hangs, some unit tests run longer than expected, still debugging. The shared memory data plane performance is pretty bad now, due to repeated map/unmap for each read/write, pre-allocated pages should improve much, still experimenting. Would like to hear community comments. My personal opinion is the data plane approach reuses grpc control plane, may be easier to add new data acceleration methods, but it needs to fit into grpc seamlessly (there're still gaps not resolved). A new tranport requires much more initial effort, but may payoff later. And looks these two approaches don't conflict with each other. [1] Test environment nics: mellanox connectx5 hosts: client (neoverse n1), server (xeon gold 5218) os: ubuntu 20.04, linux kernel 5.4 test case: 128k batch size, DoGet [2] https://github.com/cyb70289/arrow/tree/flight-data-plane ________________________________ From: David Li <lidav...@apache.org> Sent: Wednesday, December 29, 2021 3:09 AM To: dev@arrow.apache.org <dev@arrow.apache.org> Subject: Re: Arrow in HPC I ended up drafting an implementation of Flight based on UCX, and doing some of the necessary refactoring to support additional backends in the future. It can run the Flight benchmark, and performance is about comparable to gRPC, as tested on AWS EC2. The implementation is based on the UCP streams API. It's extremely bare-bones and is really only a proof of concept; a good amount of work is needed to turn it into a usable implementation. I had hoped it would perform markedly better than gRPC, at least in this early test, but this seems not to be the case. That said: I am likely not using UCX properly, UCX would still open up support for additional hardware, and this work should allow other backends to be implemented more easily. The branch can be viewed at https://github.com/lidavidm/arrow/tree/flight-ucx I've attached the benchmark output at the end. There are still quite a few TODOs and things that need investigating: - Only DoGet and GetFlightInfo are implemented, and incompletely at that. - Concurrent requests are not supported, or even making more than one request on a connection, nor does the server support concurrent clients. We also need to decide whether to even support concurrent requests, and how (e.g. pooling multiple connections, or implementing a gRPC/HTTP2 style protocol, or even possibly implementing HTTP2). - We need to make sure we properly handle errors, etc. everywhere. - Are we using UCX in a performant and idiomatic manner? Will the implementation work well on RDMA and other specialized hardware? - Do we also need to support the UCX tag API? - Can we refactor out interfaces that allow sharing more of the client/server implementation between different backends? - Are the abstractions sufficient to support other potential backends like MPI, libfabrics, or WebSockets? If anyone has experience with UCX, I'd appreciate any feedback. Otherwise, I'm hoping to plan out and try to tackle some of the TODOs above, and figure out how this effort can proceed. Antoine/Micah raised the possibility of extending gRPC instead. That would be preferable, frankly, given otherwise we'd might have to re-implement a lot of what gRPC and HTTP2 provide by ourselves. However, the necessary proposal stalled and was dropped without much discussion: https://groups.google.com/g/grpc-io/c/oIbBfPVO0lY Benchmark results (also uploaded at https://gist.github.com/lidavidm/c4676c5d9c89d4cc717d6dea07dee952): Testing was done between two t3.xlarge instances in the same zone. t3.xlarge has "up to 5 Gbps" of bandwidth (~600 MiB/s). (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000 -records_per_batch=4096 Testing method: DoGet [1640703417.639373] [ip-172-31-37-78:10110:0] ucp_worker.c:1627 UCX INFO ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5); [1640703417.650068] [ip-172-31-37-78:10110:1] ucp_worker.c:1627 UCX INFO ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5); Number of perf runs: 1 Number of concurrent gets/puts: 1 Batch size: 131072 Batches read: 10000 Bytes read: 1310720000 Nanos: 2165862969 Speed: 577.137 MB/s Throughput: 4617.1 batches/s Latency mean: 214 us Latency quantile=0.5: 209 us Latency quantile=0.95: 340 us Latency quantile=0.99: 409 us Latency max: 6350 us (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000 -records_per_batch=65536 Testing method: DoGet [1640703439.428785] [ip-172-31-37-78:10116:0] ucp_worker.c:1627 UCX INFO ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5); [1640703439.440359] [ip-172-31-37-78:10116:1] ucp_worker.c:1627 UCX INFO ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5); Number of perf runs: 1 Number of concurrent gets/puts: 1 Batch size: 2097152 Batches read: 10000 Bytes read: 20971520000 Nanos: 34184175236 Speed: 585.066 MB/s Throughput: 292.533 batches/s Latency mean: 3415 us Latency quantile=0.5: 3408 us Latency quantile=0.95: 3549 us Latency quantile=0.99: 3800 us Latency max: 20236 us (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000 -records_per_batch=4096 Testing method: DoGet Using standalone TCP server Server host: 172.31.34.4 Server port: 31337 Number of perf runs: 1 Number of concurrent gets/puts: 1 Batch size: 131072 Batches read: 10000 Bytes read: 1310720000 Nanos: 2375289668 Speed: 526.252 MB/s Throughput: 4210.01 batches/s Latency mean: 235 us Latency quantile=0.5: 203 us Latency quantile=0.95: 328 us Latency quantile=0.99: 1377 us Latency max: 17860 us (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000 -records_per_batch=65536 Testing method: DoGet Using standalone TCP server Server host: 172.31.34.4 Server port: 31337 Number of perf runs: 1 Number of concurrent gets/puts: 1 Batch size: 2097152 Batches read: 10000 Bytes read: 20971520000 Nanos: 34202704498 Speed: 584.749 MB/s Throughput: 292.375 batches/s Latency mean: 3416 us Latency quantile=0.5: 3406 us Latency quantile=0.95: 3548 us Latency quantile=0.99: 3764 us Latency max: 17086 us (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ iperf3 -c 172.31.34.4 -p 1337 -Z -l 1M Connecting to host 172.31.34.4, port 1337 [ 5] local 172.31.37.78 port 48422 connected to 172.31.34.4 port 1337 [ ID] Interval Transfer Bitrate Retr Cwnd [ 5] 0.00-1.00 sec 572 MBytes 4.79 Gbits/sec 36 2.35 MBytes [ 5] 1.00-2.00 sec 582 MBytes 4.88 Gbits/sec 0 2.43 MBytes [ 5] 2.00-3.00 sec 585 MBytes 4.91 Gbits/sec 0 2.43 MBytes [ 5] 3.00-4.00 sec 587 MBytes 4.92 Gbits/sec 0 2.43 MBytes [ 5] 4.00-5.00 sec 587 MBytes 4.92 Gbits/sec 0 2.43 MBytes [ 5] 5.00-6.00 sec 586 MBytes 4.91 Gbits/sec 0 2.43 MBytes [ 5] 6.00-7.00 sec 586 MBytes 4.92 Gbits/sec 0 2.43 MBytes [ 5] 7.00-8.00 sec 580 MBytes 4.87 Gbits/sec 0 2.43 MBytes [ 5] 8.00-9.00 sec 584 MBytes 4.89 Gbits/sec 0 2.43 MBytes [ 5] 9.00-10.00 sec 577 MBytes 4.84 Gbits/sec 0 2.43 MBytes - - - - - - - - - - - - - - - - - - - - - - - - - [ ID] Interval Transfer Bitrate Retr [ 5] 0.00-10.00 sec 5.69 GBytes 4.89 Gbits/sec 36 sender [ 5] 0.00-10.00 sec 5.69 GBytes 4.88 Gbits/sec receiver iperf Done. Best, David On Tue, Nov 2, 2021, at 19:59, Jed Brown wrote: > "David Li" <lidav...@apache.org> writes: > > > Thanks for the clarification Yibo, looking forward to the results. Even if > > it is a very hacky PoC it will be interesting to see how it affects > > performance, though as Keith points out there are benefits in general to > > UCX (or similar library), and we can work out the implementation plan from > > there. > > > > To Benson's point - the work done to get UCX supported would pave the way > > to supporting other backends as well. I'm personally not familiar with UCX, > > MPI, etc. so is MPI here more about playing well with established practices > > or does it also offer potential hardware support/performance improvements > > like UCX would? > > There are two main implementations of MPI, MPICH and Open MPI, both of which > are permissively licensed open source community projects. Both have direct > support for UCX and unless your needs are very specific, the overhead of > going through MPI is likely to be negligible. Both also have proprietary > derivatives, such as Cray MPI (MPICH derivative) and Spectrum MPI (Open MPI > derivative), which may have optimizations for proprietary networks. Both > MPICH and Open MPI can be built without UCX, and this is often easier (UCX > 'master' is more volatile in my experience). > > The vast majority of distributed memory scientific applications use MPI or > higher level libraries, rather than writing directly to UCX (which provides > less coverage of HPC networks). I think MPI compatibility is important. > > From way up-thread (sorry): > > >> >>>>> Jed - how would you see MPI and Flight interacting? As another > >> >>>>> transport/alternative to UCX? I admit I'm not familiar with the HPC > >> >>>>> space. > > MPI has collective operations like MPI_Allreduce (perform a reduction and > give every process the result; these run in log(P) or better time with small > constants -- 15 microseconds is typical for a cheap reduction operation on a > million processes). MPI supports user-defined operations for reductions and > prefix-scan operations. If we defined MPI_Ops for Arrow types, we could > compute summary statistics and other algorithmic building blocks fast at > arbitrary scale. > > The collective execution model might not be everyone's bag, but MPI_Op can > also be used in one-sided operations (MPI_Accumulate and MPI_Fetch_and_op) > and dropping into collective mode has big advantages for certain algorithms > in computational statistics/machine learning. > IMPORTANT NOTICE: The contents of this email and any attachments are confidential and may also be privileged. If you are not the intended recipient, please notify the sender immediately and do not disclose the contents to any other person, use it for any purpose, or store or copy the information in any medium. Thank you.