Thanks David to initiate UCX integration, great work!
I think 5Gbps network is too limited for performance evaluation. I will try the
patch on 100Gb RDMA network, hopefully we can see some improvements.
I once benchmarked flight over 100Gb network [1], grpc based throughput is
2.4GB/s for one thread, 8.8GB/s for six threads, about 60us latency. I also
benchmarked raw RDMA performance (same batch sizes as flight), one thread can
achive 9GB/s with 12us latency. Of couse the comparison is not fair. With
David's patch, we can get a more realistic comparison.
I'm implementing a data plane approach to hope we can adopt new data
acceleration methods easily. My approach is to replace only the FlighData
transmission of DoGet/Put/Exchange with data plane drivers, and grpc is still
used for all rpc calls.
Code is at my github repo [2]. Besides the framework, I just implemented a
shared memory data plane driver as PoC. Get/Put/Exchange unit tests passed,
TestCancel hangs, some unit tests run longer than expected, still debugging.
The shared memory data plane performance is pretty bad now, due to repeated
map/unmap for each read/write, pre-allocated pages should improve much, still
experimenting.
Would like to hear community comments.
My personal opinion is the data plane approach reuses grpc control plane, may
be easier to add new data acceleration methods, but it needs to fit into grpc
seamlessly (there're still gaps not resolved). A new tranport requires much
more initial effort, but may payoff later. And looks these two approaches don't
conflict with each other.
[1] Test environment
nics: mellanox connectx5
hosts: client (neoverse n1), server (xeon gold 5218)
os: ubuntu 20.04, linux kernel 5.4
test case: 128k batch size, DoGet
[2] https://github.com/cyb70289/arrow/tree/flight-data-plane
________________________________
From: David Li <lidav...@apache.org>
Sent: Wednesday, December 29, 2021 3:09 AM
To: dev@arrow.apache.org <dev@arrow.apache.org>
Subject: Re: Arrow in HPC
I ended up drafting an implementation of Flight based on UCX, and doing some
of the necessary refactoring to support additional backends in the future.
It can run the Flight benchmark, and performance is about comparable to
gRPC, as tested on AWS EC2.
The implementation is based on the UCP streams API. It's extremely
bare-bones and is really only a proof of concept; a good amount of work is
needed to turn it into a usable implementation. I had hoped it would perform
markedly better than gRPC, at least in this early test, but this seems not
to be the case. That said: I am likely not using UCX properly, UCX would
still open up support for additional hardware, and this work should allow
other backends to be implemented more easily.
The branch can be viewed at
https://github.com/lidavidm/arrow/tree/flight-ucx
I've attached the benchmark output at the end.
There are still quite a few TODOs and things that need investigating:
- Only DoGet and GetFlightInfo are implemented, and incompletely at that.
- Concurrent requests are not supported, or even making more than one
request on a connection, nor does the server support concurrent clients.
We also need to decide whether to even support concurrent requests, and
how (e.g. pooling multiple connections, or implementing a gRPC/HTTP2 style
protocol, or even possibly implementing HTTP2).
- We need to make sure we properly handle errors, etc. everywhere.
- Are we using UCX in a performant and idiomatic manner? Will the
implementation work well on RDMA and other specialized hardware?
- Do we also need to support the UCX tag API?
- Can we refactor out interfaces that allow sharing more of the
client/server implementation between different backends?
- Are the abstractions sufficient to support other potential backends like
MPI, libfabrics, or WebSockets?
If anyone has experience with UCX, I'd appreciate any feedback. Otherwise,
I'm hoping to plan out and try to tackle some of the TODOs above, and figure
out how this effort can proceed.
Antoine/Micah raised the possibility of extending gRPC instead. That would
be preferable, frankly, given otherwise we'd might have to re-implement a
lot of what gRPC and HTTP2 provide by ourselves. However, the necessary
proposal stalled and was dropped without much discussion:
https://groups.google.com/g/grpc-io/c/oIbBfPVO0lY
Benchmark results (also uploaded at
https://gist.github.com/lidavidm/c4676c5d9c89d4cc717d6dea07dee952):
Testing was done between two t3.xlarge instances in the same zone.
t3.xlarge has "up to 5 Gbps" of bandwidth (~600 MiB/s).
(ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host 172.31.34.4
-num_streams=1 -num_threads=1 -records_per_stream=40960000
-records_per_batch=4096
Testing method: DoGet
[1640703417.639373] [ip-172-31-37-78:10110:0] ucp_worker.c:1627 UCX INFO
ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
[1640703417.650068] [ip-172-31-37-78:10110:1] ucp_worker.c:1627 UCX INFO
ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
Number of perf runs: 1
Number of concurrent gets/puts: 1
Batch size: 131072
Batches read: 10000
Bytes read: 1310720000
Nanos: 2165862969
Speed: 577.137 MB/s
Throughput: 4617.1 batches/s
Latency mean: 214 us
Latency quantile=0.5: 209 us
Latency quantile=0.95: 340 us
Latency quantile=0.99: 409 us
Latency max: 6350 us
(ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host 172.31.34.4
-num_streams=1 -num_threads=1 -records_per_stream=655360000
-records_per_batch=65536
Testing method: DoGet
[1640703439.428785] [ip-172-31-37-78:10116:0] ucp_worker.c:1627 UCX INFO
ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
[1640703439.440359] [ip-172-31-37-78:10116:1] ucp_worker.c:1627 UCX INFO
ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
Number of perf runs: 1
Number of concurrent gets/puts: 1
Batch size: 2097152
Batches read: 10000
Bytes read: 20971520000
Nanos: 34184175236
Speed: 585.066 MB/s
Throughput: 292.533 batches/s
Latency mean: 3415 us
Latency quantile=0.5: 3408 us
Latency quantile=0.95: 3549 us
Latency quantile=0.99: 3800 us
Latency max: 20236 us
(ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host
172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000
-records_per_batch=4096
Testing method: DoGet
Using standalone TCP server
Server host: 172.31.34.4
Server port: 31337
Number of perf runs: 1
Number of concurrent gets/puts: 1
Batch size: 131072
Batches read: 10000
Bytes read: 1310720000
Nanos: 2375289668
Speed: 526.252 MB/s
Throughput: 4210.01 batches/s
Latency mean: 235 us
Latency quantile=0.5: 203 us
Latency quantile=0.95: 328 us
Latency quantile=0.99: 1377 us
Latency max: 17860 us
(ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host
172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000
-records_per_batch=65536
Testing method: DoGet
Using standalone TCP server
Server host: 172.31.34.4
Server port: 31337
Number of perf runs: 1
Number of concurrent gets/puts: 1
Batch size: 2097152
Batches read: 10000
Bytes read: 20971520000
Nanos: 34202704498
Speed: 584.749 MB/s
Throughput: 292.375 batches/s
Latency mean: 3416 us
Latency quantile=0.5: 3406 us
Latency quantile=0.95: 3548 us
Latency quantile=0.99: 3764 us
Latency max: 17086 us
(ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ iperf3 -c 172.31.34.4 -p 1337 -Z -l
1M
Connecting to host 172.31.34.4, port 1337
[ 5] local 172.31.37.78 port 48422 connected to 172.31.34.4 port 1337
[ ID] Interval Transfer Bitrate Retr Cwnd
[ 5] 0.00-1.00 sec 572 MBytes 4.79 Gbits/sec 36 2.35 MBytes
[ 5] 1.00-2.00 sec 582 MBytes 4.88 Gbits/sec 0 2.43 MBytes
[ 5] 2.00-3.00 sec 585 MBytes 4.91 Gbits/sec 0 2.43 MBytes
[ 5] 3.00-4.00 sec 587 MBytes 4.92 Gbits/sec 0 2.43 MBytes
[ 5] 4.00-5.00 sec 587 MBytes 4.92 Gbits/sec 0 2.43 MBytes
[ 5] 5.00-6.00 sec 586 MBytes 4.91 Gbits/sec 0 2.43 MBytes
[ 5] 6.00-7.00 sec 586 MBytes 4.92 Gbits/sec 0 2.43 MBytes
[ 5] 7.00-8.00 sec 580 MBytes 4.87 Gbits/sec 0 2.43 MBytes
[ 5] 8.00-9.00 sec 584 MBytes 4.89 Gbits/sec 0 2.43 MBytes
[ 5] 9.00-10.00 sec 577 MBytes 4.84 Gbits/sec 0 2.43 MBytes
- - - - - - - - - - - - - - - - - - - - - - - - -
[ ID] Interval Transfer Bitrate Retr
[ 5] 0.00-10.00 sec 5.69 GBytes 4.89 Gbits/sec 36 sender
[ 5] 0.00-10.00 sec 5.69 GBytes 4.88 Gbits/sec receiver
iperf Done.
Best,
David
On Tue, Nov 2, 2021, at 19:59, Jed Brown wrote:
"David Li" <lidav...@apache.org> writes:
Thanks for the clarification Yibo, looking forward to the results. Even if it
is a very hacky PoC it will be interesting to see how it affects performance,
though as Keith points out there are benefits in general to UCX (or similar
library), and we can work out the implementation plan from there.
To Benson's point - the work done to get UCX supported would pave the way to
supporting other backends as well. I'm personally not familiar with UCX, MPI,
etc. so is MPI here more about playing well with established practices or does
it also offer potential hardware support/performance improvements like UCX
would?
There are two main implementations of MPI, MPICH and Open MPI, both of which
are permissively licensed open source community projects. Both have direct
support for UCX and unless your needs are very specific, the overhead of going
through MPI is likely to be negligible. Both also have proprietary derivatives,
such as Cray MPI (MPICH derivative) and Spectrum MPI (Open MPI derivative),
which may have optimizations for proprietary networks. Both MPICH and Open MPI
can be built without UCX, and this is often easier (UCX 'master' is more
volatile in my experience).
The vast majority of distributed memory scientific applications use MPI or
higher level libraries, rather than writing directly to UCX (which provides
less coverage of HPC networks). I think MPI compatibility is important.
From way up-thread (sorry):
Jed - how would you see MPI and Flight interacting? As another
transport/alternative to UCX? I admit I'm not familiar with the HPC
space.
MPI has collective operations like MPI_Allreduce (perform a reduction and give
every process the result; these run in log(P) or better time with small
constants -- 15 microseconds is typical for a cheap reduction operation on a
million processes). MPI supports user-defined operations for reductions and
prefix-scan operations. If we defined MPI_Ops for Arrow types, we could compute
summary statistics and other algorithmic building blocks fast at arbitrary
scale.
The collective execution model might not be everyone's bag, but MPI_Op can also
be used in one-sided operations (MPI_Accumulate and MPI_Fetch_and_op) and
dropping into collective mode has big advantages for certain algorithms in
computational statistics/machine learning.
IMPORTANT NOTICE: The contents of this email and any attachments are
confidential and may also be privileged. If you are not the intended recipient,
please notify the sender immediately and do not disclose the contents to any
other person, use it for any purpose, or store or copy the information in any
medium. Thank you.