Some updates.

I tested David's UCX transport patch over 100Gb network. FlightRPC over UCX/RDMA improves throughput about 50%, with lower and flat latency.
And I think there are chances to improve further. See test report [1].

For the data plane approach, the PoC shared memory data plane also introduces significantly performance boost. Details at [2].

Glad to see there are big potentials to improve FlightRPC performance.

[1] https://issues.apache.org/jira/browse/ARROW-15229
[2] https://issues.apache.org/jira/browse/ARROW-15282

On 12/30/21 11:57 PM, David Li wrote:
Ah, I see.

I think both projects can proceed as well. At some point we will have to figure 
out how to merge them, but I think it's too early to see how exactly we will 
want to refactor things.

I looked over the code and I don't have any important comments for now. Looking 
forward to reviewing when it's ready.

-David

On Wed, Dec 29, 2021, at 22:16, Yibo Cai wrote:


On 12/29/21 11:03 PM, David Li wrote:
Awesome, thanks for sharing this too!

The refactoring you have with DataClientStream what I would like to do as well 
- I think much of the existing code can be adapted to be more 
transport-agnostic and then it will be easier to support new transports 
(whether data-only or for all methods).

Where do you see the gaps between gRPC and this? I think what would happen is 
1) client calls GetFlightInfo 2) server returns a `shm://` URI 3) client sees 
the unfamiliar prefix and creates a new client for the DoGet call (it would 
have to do this anyways if, for instance, the GetFlightInfo call returned the 
address of a different server).


I mean implementation details. Some unit test runs longer than expected
(data plane timeouts reading from an ended stream). Looks grpc stream
finish message is not correctly intercepted and forwarded to data plane.
I don't think it's big problem, just need some time to debug.

I also wonder how this stacks up to UCX's shared memory backend (I did not test 
this though).


I implemented a shared memory data plane only to verify and consolidate
the data plane design, as it's the easiest (and useful) driver. I also
plan to implement a socket based data plane, not useful in practice,
only to make sure the design works ok across network. Then we can add
more useful drivers like UCX or DPDK (the benefit of DPDK is it works on
commodity hardware, unlike UCX/RDMA which requires expensive equipment).

I would like to be able to support entire new transports for certain cases 
(namely browser support - though perhaps one of the gRPC proxies would suffice 
there), but even in that case, we could make it so that a new transport only 
needs to implement the data plane methods. Only having to support the data 
plane methods would save significant implementation effort for all non-browser 
cases so I think it's a worthwhile approach.


Thanks for being interest in this approach. My current plan is to first
refactor shared memory data plane to verify it beats grpc in local rpc
by considerable margin, otherwise there must be big mistakes in my
design. After that I will fix unit test issues and deliver for community
review.

Anyway, don't let me block your implementations. And if you think it's
useful, I can push current code for more detailed discussion.

-David

On Wed, Dec 29, 2021, at 04:37, Yibo Cai wrote:
Thanks David to initiate UCX integration, great work!
I think 5Gbps network is too limited for performance evaluation. I will try the 
patch on 100Gb RDMA network, hopefully we can see some improvements.
I once benchmarked flight over 100Gb network [1], grpc based throughput is 
2.4GB/s for one thread, 8.8GB/s for six threads, about 60us latency. I also 
benchmarked raw RDMA performance (same batch sizes as flight), one thread can 
achive 9GB/s with 12us latency. Of couse the comparison is not fair. With 
David's patch, we can get a more realistic comparison.

I'm implementing a data plane approach to hope we can adopt new data 
acceleration methods easily. My approach is to replace only the FlighData 
transmission of DoGet/Put/Exchange with data plane drivers, and grpc is still 
used for all rpc calls.
Code is at my github repo [2]. Besides the framework, I just implemented a 
shared memory data plane driver as PoC. Get/Put/Exchange unit tests passed, 
TestCancel hangs, some unit tests run longer than expected, still debugging. 
The shared memory data plane performance is pretty bad now, due to repeated 
map/unmap for each read/write, pre-allocated pages should improve much, still 
experimenting.

Would like to hear community comments.

My personal opinion is the data plane approach reuses grpc control plane, may 
be easier to add new data acceleration methods, but it needs to fit into grpc 
seamlessly (there're still gaps not resolved). A new tranport requires much 
more initial effort, but may payoff later. And looks these two approaches don't 
conflict with each other.

[1] Test environment
nics: mellanox connectx5
hosts: client (neoverse n1), server (xeon gold 5218)
os: ubuntu 20.04, linux kernel 5.4
test case: 128k batch size, DoGet

[2] https://github.com/cyb70289/arrow/tree/flight-data-plane

________________________________
From: David Li <lidav...@apache.org>
Sent: Wednesday, December 29, 2021 3:09 AM
To: dev@arrow.apache.org <dev@arrow.apache.org>
Subject: Re: Arrow in HPC

I ended up drafting an implementation of Flight based on UCX, and doing some
of the necessary refactoring to support additional backends in the future.
It can run the Flight benchmark, and performance is about comparable to
gRPC, as tested on AWS EC2.

The implementation is based on the UCP streams API. It's extremely
bare-bones and is really only a proof of concept; a good amount of work is
needed to turn it into a usable implementation. I had hoped it would perform
markedly better than gRPC, at least in this early test, but this seems not
to be the case. That said: I am likely not using UCX properly, UCX would
still open up support for additional hardware, and this work should allow
other backends to be implemented more easily.

The branch can be viewed at
https://github.com/lidavidm/arrow/tree/flight-ucx

I've attached the benchmark output at the end.

There are still quite a few TODOs and things that need investigating:

- Only DoGet and GetFlightInfo are implemented, and incompletely at that.
- Concurrent requests are not supported, or even making more than one
    request on a connection, nor does the server support concurrent clients.
    We also need to decide whether to even support concurrent requests, and
    how (e.g. pooling multiple connections, or implementing a gRPC/HTTP2 style
    protocol, or even possibly implementing HTTP2).
- We need to make sure we properly handle errors, etc. everywhere.
- Are we using UCX in a performant and idiomatic manner? Will the
    implementation work well on RDMA and other specialized hardware?
- Do we also need to support the UCX tag API?
- Can we refactor out interfaces that allow sharing more of the
    client/server implementation between different backends?
- Are the abstractions sufficient to support other potential backends like
    MPI, libfabrics, or WebSockets?

If anyone has experience with UCX, I'd appreciate any feedback. Otherwise,
I'm hoping to plan out and try to tackle some of the TODOs above, and figure
out how this effort can proceed.

Antoine/Micah raised the possibility of extending gRPC instead. That would
be preferable, frankly, given otherwise we'd might have to re-implement a
lot of what gRPC and HTTP2 provide by ourselves. However, the necessary
proposal stalled and was dropped without much discussion:
https://groups.google.com/g/grpc-io/c/oIbBfPVO0lY

Benchmark results (also uploaded at
https://gist.github.com/lidavidm/c4676c5d9c89d4cc717d6dea07dee952):

Testing was done between two t3.xlarge instances in the same zone.
t3.xlarge has "up to 5 Gbps" of bandwidth (~600 MiB/s).

(ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info 
./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host 172.31.34.4 
-num_streams=1 -num_threads=1 -records_per_stream=40960000 
-records_per_batch=4096
Testing method: DoGet
[1640703417.639373] [ip-172-31-37-78:10110:0]     ucp_worker.c:1627 UCX  INFO  
ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
[1640703417.650068] [ip-172-31-37-78:10110:1]     ucp_worker.c:1627 UCX  INFO  
ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
Number of perf runs: 1
Number of concurrent gets/puts: 1
Batch size: 131072
Batches read: 10000
Bytes read: 1310720000
Nanos: 2165862969
Speed: 577.137 MB/s
Throughput: 4617.1 batches/s
Latency mean: 214 us
Latency quantile=0.5: 209 us
Latency quantile=0.95: 340 us
Latency quantile=0.99: 409 us
Latency max: 6350 us
(ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info 
./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host 172.31.34.4 
-num_streams=1 -num_threads=1 -records_per_stream=655360000 
-records_per_batch=65536
Testing method: DoGet
[1640703439.428785] [ip-172-31-37-78:10116:0]     ucp_worker.c:1627 UCX  INFO  
ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
[1640703439.440359] [ip-172-31-37-78:10116:1]     ucp_worker.c:1627 UCX  INFO  
ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
Number of perf runs: 1
Number of concurrent gets/puts: 1
Batch size: 2097152
Batches read: 10000
Bytes read: 20971520000
Nanos: 34184175236
Speed: 585.066 MB/s
Throughput: 292.533 batches/s
Latency mean: 3415 us
Latency quantile=0.5: 3408 us
Latency quantile=0.95: 3549 us
Latency quantile=0.99: 3800 us
Latency max: 20236 us
(ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info 
./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host 
172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000 
-records_per_batch=4096
Testing method: DoGet
Using standalone TCP server
Server host: 172.31.34.4
Server port: 31337
Number of perf runs: 1
Number of concurrent gets/puts: 1
Batch size: 131072
Batches read: 10000
Bytes read: 1310720000
Nanos: 2375289668
Speed: 526.252 MB/s
Throughput: 4210.01 batches/s
Latency mean: 235 us
Latency quantile=0.5: 203 us
Latency quantile=0.95: 328 us
Latency quantile=0.99: 1377 us
Latency max: 17860 us
(ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info 
./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host 
172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000 
-records_per_batch=65536
Testing method: DoGet
Using standalone TCP server
Server host: 172.31.34.4
Server port: 31337
Number of perf runs: 1
Number of concurrent gets/puts: 1
Batch size: 2097152
Batches read: 10000
Bytes read: 20971520000
Nanos: 34202704498
Speed: 584.749 MB/s
Throughput: 292.375 batches/s
Latency mean: 3416 us
Latency quantile=0.5: 3406 us
Latency quantile=0.95: 3548 us
Latency quantile=0.99: 3764 us
Latency max: 17086 us
(ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ iperf3 -c 172.31.34.4 -p 1337 -Z -l 
1M
Connecting to host 172.31.34.4, port 1337
[  5] local 172.31.37.78 port 48422 connected to 172.31.34.4 port 1337
[ ID] Interval           Transfer     Bitrate         Retr  Cwnd
[  5]   0.00-1.00   sec   572 MBytes  4.79 Gbits/sec   36   2.35 MBytes
[  5]   1.00-2.00   sec   582 MBytes  4.88 Gbits/sec    0   2.43 MBytes
[  5]   2.00-3.00   sec   585 MBytes  4.91 Gbits/sec    0   2.43 MBytes
[  5]   3.00-4.00   sec   587 MBytes  4.92 Gbits/sec    0   2.43 MBytes
[  5]   4.00-5.00   sec   587 MBytes  4.92 Gbits/sec    0   2.43 MBytes
[  5]   5.00-6.00   sec   586 MBytes  4.91 Gbits/sec    0   2.43 MBytes
[  5]   6.00-7.00   sec   586 MBytes  4.92 Gbits/sec    0   2.43 MBytes
[  5]   7.00-8.00   sec   580 MBytes  4.87 Gbits/sec    0   2.43 MBytes
[  5]   8.00-9.00   sec   584 MBytes  4.89 Gbits/sec    0   2.43 MBytes
[  5]   9.00-10.00  sec   577 MBytes  4.84 Gbits/sec    0   2.43 MBytes
- - - - - - - - - - - - - - - - - - - - - - - - -
[ ID] Interval           Transfer     Bitrate         Retr
[  5]   0.00-10.00  sec  5.69 GBytes  4.89 Gbits/sec   36             sender
[  5]   0.00-10.00  sec  5.69 GBytes  4.88 Gbits/sec                  receiver

iperf Done.

Best,
David

On Tue, Nov 2, 2021, at 19:59, Jed Brown wrote:
"David Li" <lidav...@apache.org> writes:

Thanks for the clarification Yibo, looking forward to the results. Even if it 
is a very hacky PoC it will be interesting to see how it affects performance, 
though as Keith points out there are benefits in general to UCX (or similar 
library), and we can work out the implementation plan from there.

To Benson's point - the work done to get UCX supported would pave the way to 
supporting other backends as well. I'm personally not familiar with UCX, MPI, 
etc. so is MPI here more about playing well with established practices or does 
it also offer potential hardware support/performance improvements like UCX 
would?

There are two main implementations of MPI, MPICH and Open MPI, both of which 
are permissively licensed open source community projects. Both have direct 
support for UCX and unless your needs are very specific, the overhead of going 
through MPI is likely to be negligible. Both also have proprietary derivatives, 
such as Cray MPI (MPICH derivative) and Spectrum MPI (Open MPI derivative), 
which may have optimizations for proprietary networks. Both MPICH and Open MPI 
can be built without UCX, and this is often easier (UCX 'master' is more 
volatile in my experience).

The vast majority of distributed memory scientific applications use MPI or 
higher level libraries, rather than writing directly to UCX (which provides 
less coverage of HPC networks). I think MPI compatibility is important.

  From way up-thread (sorry):

Jed - how would you see MPI and Flight interacting? As another
transport/alternative to UCX? I admit I'm not familiar with the HPC
space.

MPI has collective operations like MPI_Allreduce (perform a reduction and give 
every process the result; these run in log(P) or better time with small 
constants -- 15 microseconds is typical for a cheap reduction operation on a 
million processes). MPI supports user-defined operations for reductions and 
prefix-scan operations. If we defined MPI_Ops for Arrow types, we could compute 
summary statistics and other algorithmic building blocks fast at arbitrary 
scale.

The collective execution model might not be everyone's bag, but MPI_Op can also 
be used in one-sided operations (MPI_Accumulate and MPI_Fetch_and_op) and 
dropping into collective mode has big advantages for certain algorithms in 
computational statistics/machine learning.

IMPORTANT NOTICE: The contents of this email and any attachments are 
confidential and may also be privileged. If you are not the intended recipient, 
please notify the sender immediately and do not disclose the contents to any 
other person, use it for any purpose, or store or copy the information in any 
medium. Thank you.




Reply via email to