@david
I think this is really interesting and a really good starting point. :-)
Like Keith said, this paves way to use shared memory execution backends. In
our experience at Cylon, these backends give good scalability for data
engineering jobs.

While send/ receive arrow payloads is essential, I feel like the HPC use
cases may require more operations on top of arrow datastructures, for it to
be widely adoptable. Few examples would be collective operations and
non-blocking operations (isend/ irecv). This is the main motivation for
Cylon. We've been implementing operations such as shuffle (~alltoall),
all-gather, broadcast, etc on top of arrow tables and arrays.

I'd like to point out a challenge we encountered while using Arrow IPC/
Flight. AFAIU current serialization utils are copying data to a new
contiguous buffer (ex: arrow::ipc::SerializeRecordBatch), which would then
be used for MPI/ UCX. Because of this, we kind of stepped out of Arrow
serialization, and use a separate struct that tracks individual buffers and
their lengths, and then the buffers will be sent/ received using
non-blocking communication operations individually.
I hope this UCX HPC effort can alleviate this serialization problem.



On Tue, Dec 28, 2021 at 2:10 PM David Li <lidav...@apache.org> wrote:

> I ended up drafting an implementation of Flight based on UCX, and doing
> some
> of the necessary refactoring to support additional backends in the future.
> It can run the Flight benchmark, and performance is about comparable to
> gRPC, as tested on AWS EC2.
>
> The implementation is based on the UCP streams API. It's extremely
> bare-bones and is really only a proof of concept; a good amount of work is
> needed to turn it into a usable implementation. I had hoped it would
> perform
> markedly better than gRPC, at least in this early test, but this seems not
> to be the case. That said: I am likely not using UCX properly, UCX would
> still open up support for additional hardware, and this work should allow
> other backends to be implemented more easily.
>
> The branch can be viewed at
> https://github.com/lidavidm/arrow/tree/flight-ucx
>
> I've attached the benchmark output at the end.
>
> There are still quite a few TODOs and things that need investigating:
>
> - Only DoGet and GetFlightInfo are implemented, and incompletely at that.
> - Concurrent requests are not supported, or even making more than one
>   request on a connection, nor does the server support concurrent clients.
>   We also need to decide whether to even support concurrent requests, and
>   how (e.g. pooling multiple connections, or implementing a gRPC/HTTP2
> style
>   protocol, or even possibly implementing HTTP2).
> - We need to make sure we properly handle errors, etc. everywhere.
> - Are we using UCX in a performant and idiomatic manner? Will the
>   implementation work well on RDMA and other specialized hardware?
> - Do we also need to support the UCX tag API?
> - Can we refactor out interfaces that allow sharing more of the
>   client/server implementation between different backends?
> - Are the abstractions sufficient to support other potential backends like
>   MPI, libfabrics, or WebSockets?
>
> If anyone has experience with UCX, I'd appreciate any feedback. Otherwise,
> I'm hoping to plan out and try to tackle some of the TODOs above, and
> figure
> out how this effort can proceed.
>
> Antoine/Micah raised the possibility of extending gRPC instead. That would
> be preferable, frankly, given otherwise we'd might have to re-implement a
> lot of what gRPC and HTTP2 provide by ourselves. However, the necessary
> proposal stalled and was dropped without much discussion:
> https://groups.google.com/g/grpc-io/c/oIbBfPVO0lY
>
> Benchmark results (also uploaded at
> https://gist.github.com/lidavidm/c4676c5d9c89d4cc717d6dea07dee952):
>
> Testing was done between two t3.xlarge instances in the same zone.
> t3.xlarge has "up to 5 Gbps" of bandwidth (~600 MiB/s).
>
> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
> ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host
> 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000
> -records_per_batch=4096
> Testing method: DoGet
> [1640703417.639373] [ip-172-31-37-78:10110:0]     ucp_worker.c:1627 UCX
> INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> [1640703417.650068] [ip-172-31-37-78:10110:1]     ucp_worker.c:1627 UCX
> INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> Number of perf runs: 1
> Number of concurrent gets/puts: 1
> Batch size: 131072
> Batches read: 10000
> Bytes read: 1310720000
> Nanos: 2165862969
> Speed: 577.137 MB/s
> Throughput: 4617.1 batches/s
> Latency mean: 214 us
> Latency quantile=0.5: 209 us
> Latency quantile=0.95: 340 us
> Latency quantile=0.99: 409 us
> Latency max: 6350 us
> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
> ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host
> 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000
> -records_per_batch=65536
> Testing method: DoGet
> [1640703439.428785] [ip-172-31-37-78:10116:0]     ucp_worker.c:1627 UCX
> INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> [1640703439.440359] [ip-172-31-37-78:10116:1]     ucp_worker.c:1627 UCX
> INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> Number of perf runs: 1
> Number of concurrent gets/puts: 1
> Batch size: 2097152
> Batches read: 10000
> Bytes read: 20971520000
> Nanos: 34184175236
> Speed: 585.066 MB/s
> Throughput: 292.533 batches/s
> Latency mean: 3415 us
> Latency quantile=0.5: 3408 us
> Latency quantile=0.95: 3549 us
> Latency quantile=0.99: 3800 us
> Latency max: 20236 us
> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
> ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host
> 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000
> -records_per_batch=4096
> Testing method: DoGet
> Using standalone TCP server
> Server host: 172.31.34.4
> Server port: 31337
> Number of perf runs: 1
> Number of concurrent gets/puts: 1
> Batch size: 131072
> Batches read: 10000
> Bytes read: 1310720000
> Nanos: 2375289668
> Speed: 526.252 MB/s
> Throughput: 4210.01 batches/s
> Latency mean: 235 us
> Latency quantile=0.5: 203 us
> Latency quantile=0.95: 328 us
> Latency quantile=0.99: 1377 us
> Latency max: 17860 us
> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
> ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host
> 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000
> -records_per_batch=65536
> Testing method: DoGet
> Using standalone TCP server
> Server host: 172.31.34.4
> Server port: 31337
> Number of perf runs: 1
> Number of concurrent gets/puts: 1
> Batch size: 2097152
> Batches read: 10000
> Bytes read: 20971520000
> Nanos: 34202704498
> Speed: 584.749 MB/s
> Throughput: 292.375 batches/s
> Latency mean: 3416 us
> Latency quantile=0.5: 3406 us
> Latency quantile=0.95: 3548 us
> Latency quantile=0.99: 3764 us
> Latency max: 17086 us
> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ iperf3 -c 172.31.34.4 -p 1337
> -Z -l 1M
> Connecting to host 172.31.34.4, port 1337
> [  5] local 172.31.37.78 port 48422 connected to 172.31.34.4 port 1337
> [ ID] Interval           Transfer     Bitrate         Retr  Cwnd
> [  5]   0.00-1.00   sec   572 MBytes  4.79 Gbits/sec   36   2.35 MBytes
> [  5]   1.00-2.00   sec   582 MBytes  4.88 Gbits/sec    0   2.43 MBytes
> [  5]   2.00-3.00   sec   585 MBytes  4.91 Gbits/sec    0   2.43 MBytes
> [  5]   3.00-4.00   sec   587 MBytes  4.92 Gbits/sec    0   2.43 MBytes
> [  5]   4.00-5.00   sec   587 MBytes  4.92 Gbits/sec    0   2.43 MBytes
> [  5]   5.00-6.00   sec   586 MBytes  4.91 Gbits/sec    0   2.43 MBytes
> [  5]   6.00-7.00   sec   586 MBytes  4.92 Gbits/sec    0   2.43 MBytes
> [  5]   7.00-8.00   sec   580 MBytes  4.87 Gbits/sec    0   2.43 MBytes
> [  5]   8.00-9.00   sec   584 MBytes  4.89 Gbits/sec    0   2.43 MBytes
> [  5]   9.00-10.00  sec   577 MBytes  4.84 Gbits/sec    0   2.43 MBytes
> - - - - - - - - - - - - - - - - - - - - - - - - -
> [ ID] Interval           Transfer     Bitrate         Retr
> [  5]   0.00-10.00  sec  5.69 GBytes  4.89 Gbits/sec   36
>  sender
> [  5]   0.00-10.00  sec  5.69 GBytes  4.88 Gbits/sec
> receiver
>
> iperf Done.
>
> Best,
> David
>
> On Tue, Nov 2, 2021, at 19:59, Jed Brown wrote:
> > "David Li" <lidav...@apache.org> writes:
> >
> > > Thanks for the clarification Yibo, looking forward to the results.
> Even if it is a very hacky PoC it will be interesting to see how it affects
> performance, though as Keith points out there are benefits in general to
> UCX (or similar library), and we can work out the implementation plan from
> there.
> > >
> > > To Benson's point - the work done to get UCX supported would pave the
> way to supporting other backends as well. I'm personally not familiar with
> UCX, MPI, etc. so is MPI here more about playing well with established
> practices or does it also offer potential hardware support/performance
> improvements like UCX would?
> >
> > There are two main implementations of MPI, MPICH and Open MPI, both of
> which are permissively licensed open source community projects. Both have
> direct support for UCX and unless your needs are very specific, the
> overhead of going through MPI is likely to be negligible. Both also have
> proprietary derivatives, such as Cray MPI (MPICH derivative) and Spectrum
> MPI (Open MPI derivative), which may have optimizations for proprietary
> networks. Both MPICH and Open MPI can be built without UCX, and this is
> often easier (UCX 'master' is more volatile in my experience).
> >
> > The vast majority of distributed memory scientific applications use MPI
> or higher level libraries, rather than writing directly to UCX (which
> provides less coverage of HPC networks). I think MPI compatibility is
> important.
> >
> > From way up-thread (sorry):
> >
> > >> >>>>> Jed - how would you see MPI and Flight interacting? As another
> > >> >>>>> transport/alternative to UCX? I admit I'm not familiar with the
> HPC
> > >> >>>>> space.
> >
> > MPI has collective operations like MPI_Allreduce (perform a reduction
> and give every process the result; these run in log(P) or better time with
> small constants -- 15 microseconds is typical for a cheap reduction
> operation on a million processes). MPI supports user-defined operations for
> reductions and prefix-scan operations. If we defined MPI_Ops for Arrow
> types, we could compute summary statistics and other algorithmic building
> blocks fast at arbitrary scale.
> >
> > The collective execution model might not be everyone's bag, but MPI_Op
> can also be used in one-sided operations (MPI_Accumulate and
> MPI_Fetch_and_op) and dropping into collective mode has big advantages for
> certain algorithms in computational statistics/machine learning.
> >
>


-- 
Niranda Perera
https://niranda.dev/
@n1r44 <https://twitter.com/N1R44>

Reply via email to