@david I think this is really interesting and a really good starting point. :-) Like Keith said, this paves way to use shared memory execution backends. In our experience at Cylon, these backends give good scalability for data engineering jobs.
While send/ receive arrow payloads is essential, I feel like the HPC use cases may require more operations on top of arrow datastructures, for it to be widely adoptable. Few examples would be collective operations and non-blocking operations (isend/ irecv). This is the main motivation for Cylon. We've been implementing operations such as shuffle (~alltoall), all-gather, broadcast, etc on top of arrow tables and arrays. I'd like to point out a challenge we encountered while using Arrow IPC/ Flight. AFAIU current serialization utils are copying data to a new contiguous buffer (ex: arrow::ipc::SerializeRecordBatch), which would then be used for MPI/ UCX. Because of this, we kind of stepped out of Arrow serialization, and use a separate struct that tracks individual buffers and their lengths, and then the buffers will be sent/ received using non-blocking communication operations individually. I hope this UCX HPC effort can alleviate this serialization problem. On Tue, Dec 28, 2021 at 2:10 PM David Li <lidav...@apache.org> wrote: > I ended up drafting an implementation of Flight based on UCX, and doing > some > of the necessary refactoring to support additional backends in the future. > It can run the Flight benchmark, and performance is about comparable to > gRPC, as tested on AWS EC2. > > The implementation is based on the UCP streams API. It's extremely > bare-bones and is really only a proof of concept; a good amount of work is > needed to turn it into a usable implementation. I had hoped it would > perform > markedly better than gRPC, at least in this early test, but this seems not > to be the case. That said: I am likely not using UCX properly, UCX would > still open up support for additional hardware, and this work should allow > other backends to be implemented more easily. > > The branch can be viewed at > https://github.com/lidavidm/arrow/tree/flight-ucx > > I've attached the benchmark output at the end. > > There are still quite a few TODOs and things that need investigating: > > - Only DoGet and GetFlightInfo are implemented, and incompletely at that. > - Concurrent requests are not supported, or even making more than one > request on a connection, nor does the server support concurrent clients. > We also need to decide whether to even support concurrent requests, and > how (e.g. pooling multiple connections, or implementing a gRPC/HTTP2 > style > protocol, or even possibly implementing HTTP2). > - We need to make sure we properly handle errors, etc. everywhere. > - Are we using UCX in a performant and idiomatic manner? Will the > implementation work well on RDMA and other specialized hardware? > - Do we also need to support the UCX tag API? > - Can we refactor out interfaces that allow sharing more of the > client/server implementation between different backends? > - Are the abstractions sufficient to support other potential backends like > MPI, libfabrics, or WebSockets? > > If anyone has experience with UCX, I'd appreciate any feedback. Otherwise, > I'm hoping to plan out and try to tackle some of the TODOs above, and > figure > out how this effort can proceed. > > Antoine/Micah raised the possibility of extending gRPC instead. That would > be preferable, frankly, given otherwise we'd might have to re-implement a > lot of what gRPC and HTTP2 provide by ourselves. However, the necessary > proposal stalled and was dropped without much discussion: > https://groups.google.com/g/grpc-io/c/oIbBfPVO0lY > > Benchmark results (also uploaded at > https://gist.github.com/lidavidm/c4676c5d9c89d4cc717d6dea07dee952): > > Testing was done between two t3.xlarge instances in the same zone. > t3.xlarge has "up to 5 Gbps" of bandwidth (~600 MiB/s). > > (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info > ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host > 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000 > -records_per_batch=4096 > Testing method: DoGet > [1640703417.639373] [ip-172-31-37-78:10110:0] ucp_worker.c:1627 UCX > INFO ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5); > [1640703417.650068] [ip-172-31-37-78:10110:1] ucp_worker.c:1627 UCX > INFO ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5); > Number of perf runs: 1 > Number of concurrent gets/puts: 1 > Batch size: 131072 > Batches read: 10000 > Bytes read: 1310720000 > Nanos: 2165862969 > Speed: 577.137 MB/s > Throughput: 4617.1 batches/s > Latency mean: 214 us > Latency quantile=0.5: 209 us > Latency quantile=0.95: 340 us > Latency quantile=0.99: 409 us > Latency max: 6350 us > (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info > ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host > 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000 > -records_per_batch=65536 > Testing method: DoGet > [1640703439.428785] [ip-172-31-37-78:10116:0] ucp_worker.c:1627 UCX > INFO ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5); > [1640703439.440359] [ip-172-31-37-78:10116:1] ucp_worker.c:1627 UCX > INFO ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5); > Number of perf runs: 1 > Number of concurrent gets/puts: 1 > Batch size: 2097152 > Batches read: 10000 > Bytes read: 20971520000 > Nanos: 34184175236 > Speed: 585.066 MB/s > Throughput: 292.533 batches/s > Latency mean: 3415 us > Latency quantile=0.5: 3408 us > Latency quantile=0.95: 3549 us > Latency quantile=0.99: 3800 us > Latency max: 20236 us > (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info > ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host > 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000 > -records_per_batch=4096 > Testing method: DoGet > Using standalone TCP server > Server host: 172.31.34.4 > Server port: 31337 > Number of perf runs: 1 > Number of concurrent gets/puts: 1 > Batch size: 131072 > Batches read: 10000 > Bytes read: 1310720000 > Nanos: 2375289668 > Speed: 526.252 MB/s > Throughput: 4210.01 batches/s > Latency mean: 235 us > Latency quantile=0.5: 203 us > Latency quantile=0.95: 328 us > Latency quantile=0.99: 1377 us > Latency max: 17860 us > (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info > ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host > 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000 > -records_per_batch=65536 > Testing method: DoGet > Using standalone TCP server > Server host: 172.31.34.4 > Server port: 31337 > Number of perf runs: 1 > Number of concurrent gets/puts: 1 > Batch size: 2097152 > Batches read: 10000 > Bytes read: 20971520000 > Nanos: 34202704498 > Speed: 584.749 MB/s > Throughput: 292.375 batches/s > Latency mean: 3416 us > Latency quantile=0.5: 3406 us > Latency quantile=0.95: 3548 us > Latency quantile=0.99: 3764 us > Latency max: 17086 us > (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ iperf3 -c 172.31.34.4 -p 1337 > -Z -l 1M > Connecting to host 172.31.34.4, port 1337 > [ 5] local 172.31.37.78 port 48422 connected to 172.31.34.4 port 1337 > [ ID] Interval Transfer Bitrate Retr Cwnd > [ 5] 0.00-1.00 sec 572 MBytes 4.79 Gbits/sec 36 2.35 MBytes > [ 5] 1.00-2.00 sec 582 MBytes 4.88 Gbits/sec 0 2.43 MBytes > [ 5] 2.00-3.00 sec 585 MBytes 4.91 Gbits/sec 0 2.43 MBytes > [ 5] 3.00-4.00 sec 587 MBytes 4.92 Gbits/sec 0 2.43 MBytes > [ 5] 4.00-5.00 sec 587 MBytes 4.92 Gbits/sec 0 2.43 MBytes > [ 5] 5.00-6.00 sec 586 MBytes 4.91 Gbits/sec 0 2.43 MBytes > [ 5] 6.00-7.00 sec 586 MBytes 4.92 Gbits/sec 0 2.43 MBytes > [ 5] 7.00-8.00 sec 580 MBytes 4.87 Gbits/sec 0 2.43 MBytes > [ 5] 8.00-9.00 sec 584 MBytes 4.89 Gbits/sec 0 2.43 MBytes > [ 5] 9.00-10.00 sec 577 MBytes 4.84 Gbits/sec 0 2.43 MBytes > - - - - - - - - - - - - - - - - - - - - - - - - - > [ ID] Interval Transfer Bitrate Retr > [ 5] 0.00-10.00 sec 5.69 GBytes 4.89 Gbits/sec 36 > sender > [ 5] 0.00-10.00 sec 5.69 GBytes 4.88 Gbits/sec > receiver > > iperf Done. > > Best, > David > > On Tue, Nov 2, 2021, at 19:59, Jed Brown wrote: > > "David Li" <lidav...@apache.org> writes: > > > > > Thanks for the clarification Yibo, looking forward to the results. > Even if it is a very hacky PoC it will be interesting to see how it affects > performance, though as Keith points out there are benefits in general to > UCX (or similar library), and we can work out the implementation plan from > there. > > > > > > To Benson's point - the work done to get UCX supported would pave the > way to supporting other backends as well. I'm personally not familiar with > UCX, MPI, etc. so is MPI here more about playing well with established > practices or does it also offer potential hardware support/performance > improvements like UCX would? > > > > There are two main implementations of MPI, MPICH and Open MPI, both of > which are permissively licensed open source community projects. Both have > direct support for UCX and unless your needs are very specific, the > overhead of going through MPI is likely to be negligible. Both also have > proprietary derivatives, such as Cray MPI (MPICH derivative) and Spectrum > MPI (Open MPI derivative), which may have optimizations for proprietary > networks. Both MPICH and Open MPI can be built without UCX, and this is > often easier (UCX 'master' is more volatile in my experience). > > > > The vast majority of distributed memory scientific applications use MPI > or higher level libraries, rather than writing directly to UCX (which > provides less coverage of HPC networks). I think MPI compatibility is > important. > > > > From way up-thread (sorry): > > > > >> >>>>> Jed - how would you see MPI and Flight interacting? As another > > >> >>>>> transport/alternative to UCX? I admit I'm not familiar with the > HPC > > >> >>>>> space. > > > > MPI has collective operations like MPI_Allreduce (perform a reduction > and give every process the result; these run in log(P) or better time with > small constants -- 15 microseconds is typical for a cheap reduction > operation on a million processes). MPI supports user-defined operations for > reductions and prefix-scan operations. If we defined MPI_Ops for Arrow > types, we could compute summary statistics and other algorithmic building > blocks fast at arbitrary scale. > > > > The collective execution model might not be everyone's bag, but MPI_Op > can also be used in one-sided operations (MPI_Accumulate and > MPI_Fetch_and_op) and dropping into collective mode has big advantages for > certain algorithms in computational statistics/machine learning. > > > -- Niranda Perera https://niranda.dev/ @n1r44 <https://twitter.com/N1R44>