Ah, yes, thanks for the reminder. That's one of the things that needs to be 
addressed for sure.

-David

On Tue, Jan 18, 2022, at 17:48, Supun Kamburugamuve wrote:
> One general observation. I think this implementation uses the polling to
> check the progress. Because of the client server semantics of Arrow Flight,
> you may need to use an interrupt based polling like epoll to avoid the busy
> looping.
> 
> Best,
> Supun..
> 
> On Tue, Jan 18, 2022 at 8:13 AM David Li <lidav...@apache.org> wrote:
> 
> > Thanks for those results, Yibo! Looks like there's still more room for
> > improvement here. Yes, things are a little unstable, though I didn't
> > get that much trouble trying to just start the benchmark - I will need
> > to find suitable hardware and iron out these issues. Note that I've
> > only implemented DoGet, and I haven't implemented concurrent streams,
> > which would explain why most benchmark configurations hang or error.
> >
> > Since the last time, I've rewritten the prototype to use UCX's "active
> > message" functionality instead of trying to implement messages over
> > the "streams" API. This simplified the code. I also did some
> > refactoring along the lines of Yibo's prototype to share more code
> > between the gRPC and UCX implementations. Here are some benchmark
> > numbers:
> >
> > For IPC (server/client on the same machine): UCX with shared memory
> > handily beats gRPC here. UCX with TCP isn't quite up to par, though.
> >
> > gRPC:
> > 128KiB batches: 4463 MiB/s
> > 2MiB batches:   3537 MiB/s
> > 32MiB batches:  1828 MiB/s
> >
> > UCX (shared memory):
> > 128KiB batches: 6500 MiB/s
> > 2MiB batches:  13879 MiB/s
> > 32MiB batches:  9045 MiB/s
> >
> > UCX (TCP):
> > 128KiB batches: 1069 MiB/s
> > 2MiB batches:   1735 MiB/s
> > 32MiB batches:  1602 MiB/s
> >
> > For RPC (server/client on different machines): Two t3.xlarge (4 core,
> > 16 thread) machines were used in AWS EC2. These have "up to" 5Gbps
> > bandwidth. This isn't really a scenario where UCX is expected to
> > shine, however, UCX performs comparably to gRPC here.
> >
> > gRPC:
> > 128 KiB batches: 554 MiB/s
> > 2 MiB batches:   575 MiB/s
> >
> > UCX:
> > 128 KiB batches: 546 MiB/s
> > 2 MiB batches:   567 MiB/s
> >
> > Raw test logs can be found here:
> > https://gist.github.com/lidavidm/57d8a3cba46229e4d277ae0730939acc
> >
> > For IPC, the shared memory results are promising in that it could be
> > feasible to expose a library purely over Flight without worrying about
> > FFI bindings. Also, it seems results are roughly comparable to what
> > Yibo observed in ARROW-15282 [1] meaning UCX will get us both a
> > performant shared memory transport and support for more exotic
> > hardware.
> >
> > There's still much work to be done; at this point, I'd like to start
> > implementing the rest of the Flight methods, fixing up the many TODOs
> > scattered around, trying to refactor more things to share code between
> > gRPC/UCX, and find and benchmark some hardware that UCX has a fast
> > path for.
> >
> > [1]: https://issues.apache.org/jira/browse/ARROW-15282
> >
> > -David
> >
> > On Tue, Jan 18, 2022, at 04:35, Yibo Cai wrote:
> > > Some updates.
> > >
> > > I tested David's UCX transport patch over 100Gb network. FlightRPC over
> > > UCX/RDMA improves throughput about 50%, with lower and flat latency.
> > > And I think there are chances to improve further. See test report [1].
> > >
> > > For the data plane approach, the PoC shared memory data plane also
> > > introduces significantly performance boost. Details at [2].
> > >
> > > Glad to see there are big potentials to improve FlightRPC performance.
> > >
> > > [1] https://issues.apache.org/jira/browse/ARROW-15229
> > > [2] https://issues.apache.org/jira/browse/ARROW-15282
> > >
> > > On 12/30/21 11:57 PM, David Li wrote:
> > > > Ah, I see.
> > > >
> > > > I think both projects can proceed as well. At some point we will have
> > to figure out how to merge them, but I think it's too early to see how
> > exactly we will want to refactor things.
> > > >
> > > > I looked over the code and I don't have any important comments for
> > now. Looking forward to reviewing when it's ready.
> > > >
> > > > -David
> > > >
> > > > On Wed, Dec 29, 2021, at 22:16, Yibo Cai wrote:
> > > >>
> > > >>
> > > >> On 12/29/21 11:03 PM, David Li wrote:
> > > >>> Awesome, thanks for sharing this too!
> > > >>>
> > > >>> The refactoring you have with DataClientStream what I would like to
> > do as well - I think much of the existing code can be adapted to be more
> > transport-agnostic and then it will be easier to support new transports
> > (whether data-only or for all methods).
> > > >>>
> > > >>> Where do you see the gaps between gRPC and this? I think what would
> > happen is 1) client calls GetFlightInfo 2) server returns a `shm://` URI 3)
> > client sees the unfamiliar prefix and creates a new client for the DoGet
> > call (it would have to do this anyways if, for instance, the GetFlightInfo
> > call returned the address of a different server).
> > > >>>
> > > >>
> > > >> I mean implementation details. Some unit test runs longer than
> > expected
> > > >> (data plane timeouts reading from an ended stream). Looks grpc stream
> > > >> finish message is not correctly intercepted and forwarded to data
> > plane.
> > > >> I don't think it's big problem, just need some time to debug.
> > > >>
> > > >>> I also wonder how this stacks up to UCX's shared memory backend (I
> > did not test this though).
> > > >>>
> > > >>
> > > >> I implemented a shared memory data plane only to verify and
> > consolidate
> > > >> the data plane design, as it's the easiest (and useful) driver. I also
> > > >> plan to implement a socket based data plane, not useful in practice,
> > > >> only to make sure the design works ok across network. Then we can add
> > > >> more useful drivers like UCX or DPDK (the benefit of DPDK is it works
> > on
> > > >> commodity hardware, unlike UCX/RDMA which requires expensive
> > equipment).
> > > >>
> > > >>> I would like to be able to support entire new transports for certain
> > cases (namely browser support - though perhaps one of the gRPC proxies
> > would suffice there), but even in that case, we could make it so that a new
> > transport only needs to implement the data plane methods. Only having to
> > support the data plane methods would save significant implementation effort
> > for all non-browser cases so I think it's a worthwhile approach.
> > > >>>
> > > >>
> > > >> Thanks for being interest in this approach. My current plan is to
> > first
> > > >> refactor shared memory data plane to verify it beats grpc in local rpc
> > > >> by considerable margin, otherwise there must be big mistakes in my
> > > >> design. After that I will fix unit test issues and deliver for
> > community
> > > >> review.
> > > >>
> > > >> Anyway, don't let me block your implementations. And if you think it's
> > > >> useful, I can push current code for more detailed discussion.
> > > >>
> > > >>> -David
> > > >>>
> > > >>> On Wed, Dec 29, 2021, at 04:37, Yibo Cai wrote:
> > > >>>> Thanks David to initiate UCX integration, great work!
> > > >>>> I think 5Gbps network is too limited for performance evaluation. I
> > will try the patch on 100Gb RDMA network, hopefully we can see some
> > improvements.
> > > >>>> I once benchmarked flight over 100Gb network [1], grpc based
> > throughput is 2.4GB/s for one thread, 8.8GB/s for six threads, about 60us
> > latency. I also benchmarked raw RDMA performance (same batch sizes as
> > flight), one thread can achive 9GB/s with 12us latency. Of couse the
> > comparison is not fair. With David's patch, we can get a more realistic
> > comparison.
> > > >>>>
> > > >>>> I'm implementing a data plane approach to hope we can adopt new
> > data acceleration methods easily. My approach is to replace only the
> > FlighData transmission of DoGet/Put/Exchange with data plane drivers, and
> > grpc is still used for all rpc calls.
> > > >>>> Code is at my github repo [2]. Besides the framework, I just
> > implemented a shared memory data plane driver as PoC. Get/Put/Exchange unit
> > tests passed, TestCancel hangs, some unit tests run longer than expected,
> > still debugging. The shared memory data plane performance is pretty bad
> > now, due to repeated map/unmap for each read/write, pre-allocated pages
> > should improve much, still experimenting.
> > > >>>>
> > > >>>> Would like to hear community comments.
> > > >>>>
> > > >>>> My personal opinion is the data plane approach reuses grpc control
> > plane, may be easier to add new data acceleration methods, but it needs to
> > fit into grpc seamlessly (there're still gaps not resolved). A new tranport
> > requires much more initial effort, but may payoff later. And looks these
> > two approaches don't conflict with each other.
> > > >>>>
> > > >>>> [1] Test environment
> > > >>>> nics: mellanox connectx5
> > > >>>> hosts: client (neoverse n1), server (xeon gold 5218)
> > > >>>> os: ubuntu 20.04, linux kernel 5.4
> > > >>>> test case: 128k batch size, DoGet
> > > >>>>
> > > >>>> [2] https://github.com/cyb70289/arrow/tree/flight-data-plane
> > > >>>>
> > > >>>> ________________________________
> > > >>>> From: David Li <lidav...@apache.org>
> > > >>>> Sent: Wednesday, December 29, 2021 3:09 AM
> > > >>>> To: dev@arrow.apache.org <dev@arrow.apache.org>
> > > >>>> Subject: Re: Arrow in HPC
> > > >>>>
> > > >>>> I ended up drafting an implementation of Flight based on UCX, and
> > doing some
> > > >>>> of the necessary refactoring to support additional backends in the
> > future.
> > > >>>> It can run the Flight benchmark, and performance is about
> > comparable to
> > > >>>> gRPC, as tested on AWS EC2.
> > > >>>>
> > > >>>> The implementation is based on the UCP streams API. It's extremely
> > > >>>> bare-bones and is really only a proof of concept; a good amount of
> > work is
> > > >>>> needed to turn it into a usable implementation. I had hoped it
> > would perform
> > > >>>> markedly better than gRPC, at least in this early test, but this
> > seems not
> > > >>>> to be the case. That said: I am likely not using UCX properly, UCX
> > would
> > > >>>> still open up support for additional hardware, and this work should
> > allow
> > > >>>> other backends to be implemented more easily.
> > > >>>>
> > > >>>> The branch can be viewed at
> > > >>>> https://github.com/lidavidm/arrow/tree/flight-ucx
> > > >>>>
> > > >>>> I've attached the benchmark output at the end.
> > > >>>>
> > > >>>> There are still quite a few TODOs and things that need
> > investigating:
> > > >>>>
> > > >>>> - Only DoGet and GetFlightInfo are implemented, and incompletely at
> > that.
> > > >>>> - Concurrent requests are not supported, or even making more than
> > one
> > > >>>>     request on a connection, nor does the server support concurrent
> > clients.
> > > >>>>     We also need to decide whether to even support concurrent
> > requests, and
> > > >>>>     how (e.g. pooling multiple connections, or implementing a
> > gRPC/HTTP2 style
> > > >>>>     protocol, or even possibly implementing HTTP2).
> > > >>>> - We need to make sure we properly handle errors, etc. everywhere.
> > > >>>> - Are we using UCX in a performant and idiomatic manner? Will the
> > > >>>>     implementation work well on RDMA and other specialized hardware?
> > > >>>> - Do we also need to support the UCX tag API?
> > > >>>> - Can we refactor out interfaces that allow sharing more of the
> > > >>>>     client/server implementation between different backends?
> > > >>>> - Are the abstractions sufficient to support other potential
> > backends like
> > > >>>>     MPI, libfabrics, or WebSockets?
> > > >>>>
> > > >>>> If anyone has experience with UCX, I'd appreciate any feedback.
> > Otherwise,
> > > >>>> I'm hoping to plan out and try to tackle some of the TODOs above,
> > and figure
> > > >>>> out how this effort can proceed.
> > > >>>>
> > > >>>> Antoine/Micah raised the possibility of extending gRPC instead.
> > That would
> > > >>>> be preferable, frankly, given otherwise we'd might have to
> > re-implement a
> > > >>>> lot of what gRPC and HTTP2 provide by ourselves. However, the
> > necessary
> > > >>>> proposal stalled and was dropped without much discussion:
> > > >>>> https://groups.google.com/g/grpc-io/c/oIbBfPVO0lY
> > > >>>>
> > > >>>> Benchmark results (also uploaded at
> > > >>>> https://gist.github.com/lidavidm/c4676c5d9c89d4cc717d6dea07dee952):
> > > >>>>
> > > >>>> Testing was done between two t3.xlarge instances in the same zone.
> > > >>>> t3.xlarge has "up to 5 Gbps" of bandwidth (~600 MiB/s).
> > > >>>>
> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
> > ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host
> > 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000
> > -records_per_batch=4096
> > > >>>> Testing method: DoGet
> > > >>>> [1640703417.639373] [ip-172-31-37-78:10110:0]     ucp_worker.c:1627
> > UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> > > >>>> [1640703417.650068] [ip-172-31-37-78:10110:1]     ucp_worker.c:1627
> > UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> > > >>>> Number of perf runs: 1
> > > >>>> Number of concurrent gets/puts: 1
> > > >>>> Batch size: 131072
> > > >>>> Batches read: 10000
> > > >>>> Bytes read: 1310720000
> > > >>>> Nanos: 2165862969
> > > >>>> Speed: 577.137 MB/s
> > > >>>> Throughput: 4617.1 batches/s
> > > >>>> Latency mean: 214 us
> > > >>>> Latency quantile=0.5: 209 us
> > > >>>> Latency quantile=0.95: 340 us
> > > >>>> Latency quantile=0.99: 409 us
> > > >>>> Latency max: 6350 us
> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
> > ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host
> > 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000
> > -records_per_batch=65536
> > > >>>> Testing method: DoGet
> > > >>>> [1640703439.428785] [ip-172-31-37-78:10116:0]     ucp_worker.c:1627
> > UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> > > >>>> [1640703439.440359] [ip-172-31-37-78:10116:1]     ucp_worker.c:1627
> > UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> > > >>>> Number of perf runs: 1
> > > >>>> Number of concurrent gets/puts: 1
> > > >>>> Batch size: 2097152
> > > >>>> Batches read: 10000
> > > >>>> Bytes read: 20971520000
> > > >>>> Nanos: 34184175236
> > > >>>> Speed: 585.066 MB/s
> > > >>>> Throughput: 292.533 batches/s
> > > >>>> Latency mean: 3415 us
> > > >>>> Latency quantile=0.5: 3408 us
> > > >>>> Latency quantile=0.95: 3549 us
> > > >>>> Latency quantile=0.99: 3800 us
> > > >>>> Latency max: 20236 us
> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
> > ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host
> > 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000
> > -records_per_batch=4096
> > > >>>> Testing method: DoGet
> > > >>>> Using standalone TCP server
> > > >>>> Server host: 172.31.34.4
> > > >>>> Server port: 31337
> > > >>>> Number of perf runs: 1
> > > >>>> Number of concurrent gets/puts: 1
> > > >>>> Batch size: 131072
> > > >>>> Batches read: 10000
> > > >>>> Bytes read: 1310720000
> > > >>>> Nanos: 2375289668
> > > >>>> Speed: 526.252 MB/s
> > > >>>> Throughput: 4210.01 batches/s
> > > >>>> Latency mean: 235 us
> > > >>>> Latency quantile=0.5: 203 us
> > > >>>> Latency quantile=0.95: 328 us
> > > >>>> Latency quantile=0.99: 1377 us
> > > >>>> Latency max: 17860 us
> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
> > ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host
> > 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000
> > -records_per_batch=65536
> > > >>>> Testing method: DoGet
> > > >>>> Using standalone TCP server
> > > >>>> Server host: 172.31.34.4
> > > >>>> Server port: 31337
> > > >>>> Number of perf runs: 1
> > > >>>> Number of concurrent gets/puts: 1
> > > >>>> Batch size: 2097152
> > > >>>> Batches read: 10000
> > > >>>> Bytes read: 20971520000
> > > >>>> Nanos: 34202704498
> > > >>>> Speed: 584.749 MB/s
> > > >>>> Throughput: 292.375 batches/s
> > > >>>> Latency mean: 3416 us
> > > >>>> Latency quantile=0.5: 3406 us
> > > >>>> Latency quantile=0.95: 3548 us
> > > >>>> Latency quantile=0.99: 3764 us
> > > >>>> Latency max: 17086 us
> > > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ iperf3 -c 172.31.34.4
> > -p 1337 -Z -l 1M
> > > >>>> Connecting to host 172.31.34.4, port 1337
> > > >>>> [  5] local 172.31.37.78 port 48422 connected to 172.31.34.4 port
> > 1337
> > > >>>> [ ID] Interval           Transfer     Bitrate         Retr  Cwnd
> > > >>>> [  5]   0.00-1.00   sec   572 MBytes  4.79 Gbits/sec   36   2.35
> > MBytes
> > > >>>> [  5]   1.00-2.00   sec   582 MBytes  4.88 Gbits/sec    0   2.43
> > MBytes
> > > >>>> [  5]   2.00-3.00   sec   585 MBytes  4.91 Gbits/sec    0   2.43
> > MBytes
> > > >>>> [  5]   3.00-4.00   sec   587 MBytes  4.92 Gbits/sec    0   2.43
> > MBytes
> > > >>>> [  5]   4.00-5.00   sec   587 MBytes  4.92 Gbits/sec    0   2.43
> > MBytes
> > > >>>> [  5]   5.00-6.00   sec   586 MBytes  4.91 Gbits/sec    0   2.43
> > MBytes
> > > >>>> [  5]   6.00-7.00   sec   586 MBytes  4.92 Gbits/sec    0   2.43
> > MBytes
> > > >>>> [  5]   7.00-8.00   sec   580 MBytes  4.87 Gbits/sec    0   2.43
> > MBytes
> > > >>>> [  5]   8.00-9.00   sec   584 MBytes  4.89 Gbits/sec    0   2.43
> > MBytes
> > > >>>> [  5]   9.00-10.00  sec   577 MBytes  4.84 Gbits/sec    0   2.43
> > MBytes
> > > >>>> - - - - - - - - - - - - - - - - - - - - - - - - -
> > > >>>> [ ID] Interval           Transfer     Bitrate         Retr
> > > >>>> [  5]   0.00-10.00  sec  5.69 GBytes  4.89 Gbits/sec   36
> >    sender
> > > >>>> [  5]   0.00-10.00  sec  5.69 GBytes  4.88 Gbits/sec
> >   receiver
> > > >>>>
> > > >>>> iperf Done.
> > > >>>>
> > > >>>> Best,
> > > >>>> David
> > > >>>>
> > > >>>> On Tue, Nov 2, 2021, at 19:59, Jed Brown wrote:
> > > >>>>> "David Li" <lidav...@apache.org> writes:
> > > >>>>>
> > > >>>>>> Thanks for the clarification Yibo, looking forward to the
> > results. Even if it is a very hacky PoC it will be interesting to see how
> > it affects performance, though as Keith points out there are benefits in
> > general to UCX (or similar library), and we can work out the implementation
> > plan from there.
> > > >>>>>>
> > > >>>>>> To Benson's point - the work done to get UCX supported would pave
> > the way to supporting other backends as well. I'm personally not familiar
> > with UCX, MPI, etc. so is MPI here more about playing well with established
> > practices or does it also offer potential hardware support/performance
> > improvements like UCX would?
> > > >>>>>
> > > >>>>> There are two main implementations of MPI, MPICH and Open MPI,
> > both of which are permissively licensed open source community projects.
> > Both have direct support for UCX and unless your needs are very specific,
> > the overhead of going through MPI is likely to be negligible. Both also
> > have proprietary derivatives, such as Cray MPI (MPICH derivative) and
> > Spectrum MPI (Open MPI derivative), which may have optimizations for
> > proprietary networks. Both MPICH and Open MPI can be built without UCX, and
> > this is often easier (UCX 'master' is more volatile in my experience).
> > > >>>>>
> > > >>>>> The vast majority of distributed memory scientific applications
> > use MPI or higher level libraries, rather than writing directly to UCX
> > (which provides less coverage of HPC networks). I think MPI compatibility
> > is important.
> > > >>>>>
> > > >>>>>   From way up-thread (sorry):
> > > >>>>>
> > > >>>>>>>>>>>> Jed - how would you see MPI and Flight interacting? As
> > another
> > > >>>>>>>>>>>> transport/alternative to UCX? I admit I'm not familiar with
> > the HPC
> > > >>>>>>>>>>>> space.
> > > >>>>>
> > > >>>>> MPI has collective operations like MPI_Allreduce (perform a
> > reduction and give every process the result; these run in log(P) or better
> > time with small constants -- 15 microseconds is typical for a cheap
> > reduction operation on a million processes). MPI supports user-defined
> > operations for reductions and prefix-scan operations. If we defined MPI_Ops
> > for Arrow types, we could compute summary statistics and other algorithmic
> > building blocks fast at arbitrary scale.
> > > >>>>>
> > > >>>>> The collective execution model might not be everyone's bag, but
> > MPI_Op can also be used in one-sided operations (MPI_Accumulate and
> > MPI_Fetch_and_op) and dropping into collective mode has big advantages for
> > certain algorithms in computational statistics/machine learning.
> > > >>>>>
> > > >>>> IMPORTANT NOTICE: The contents of this email and any attachments
> > are confidential and may also be privileged. If you are not the intended
> > recipient, please notify the sender immediately and do not disclose the
> > contents to any other person, use it for any purpose, or store or copy the
> > information in any medium. Thank you.
> > > >>>>
> > > >>>
> > > >>
> > > >
> > >
> >
> 
> 
> -- 
> Supun Kamburugamuve
> 

Reply via email to