milenkovicm opened a new issue, #1315: URL: https://github.com/apache/datafusion-ballista/issues/1315
## Executive Summary By optimizing the Ballista Arrow Flight Server and ballista client implementation, the time required to transfer a 5.5 GB shuffle file can be reduced from 61 seconds to 11 seconds. This represents nearly a sixfold improvement in transfer speed and approximately a threefold reduction in the amount of data transmitted. ## Is your feature request related to a problem or challenge? Please describe what you are trying to do Ballista `ShuffleWriterExec` writes shuffle files as compressed arrow stream (`CompressionType::LZ4_FRAME`). Shuffle files are read by `ShuffleReaderExec` either locally using local file read or remotely using ballista flight server. Local read will be performed using `StreamReader`. Remote reads will be performed using `BallistaClient` which will use arrow flight stream to fetch data from a arrow flight server end point.  `doGet` method implementation at, the flight server side , will read arrow stream file, uncompress it decode it (split it in batches) and, decode, compress and ship data over GRPC. This all will take some time, with compress/decompress taking most of the time, as illustrated in  [flight server flamegraph](https://github.com/user-attachments/assets/065b0e1f-e2fa-4d89-aae9-3ce595409b76) Actual ballista [flight server implementation](https://github.com/apache/datafusion-ballista/blob/4c9b360b51afd3f29614b43b4f3727c473e515a5/ballista/executor/src/flight_service.rs#L88:L120) details. At the client side we see similar flamegraph where decompression takes most of the time  [ballista flight client flamegraph](https://github.com/user-attachments/assets/3a3f53bd-b4f2-4da9-badf-5888a895879b) The question to be asked is why do we need to do all the work on the server side when source file is already arrow stream file? One of the possible answers could be: "to comply with `do_get` method semantics". ## Describe the solution you'd like Modify the Flight server implementation to eliminate shuffle partition decoding and decompressing. Instead, the server should divide the file into smaller, fixed-size chunks (for example, 4 MB byte arrays) and transmit these chunks sequentially over the network. This approach avoids unnecessary decoding and leverages efficient bulk data compression. Adopting this proposal will alter/replace the existing method, where each transmitted chunk is directly convertible to a `RecordBatch`. In the revised approach, each chunk may correspond to zero or more `RecordBatch` instances, depending on the data boundaries within the chunk. Transmitting file chunks rather than serialized record batches is expected to enhance compression efficiency, as the compression algorithm can operate on larger, contiguous data segments.  On the client side, implement data fetching stream logic and decode it using custom `StreamDecoder`. This would free arrow flight server of very heavy compression/decompression operation, making it more scalable.  [block transfer flight server flamegraph](https://github.com/user-attachments/assets/554305f7-6087-4724-a03c-994649e64ed0) As we can see, server just moves data from file to socket. At the client side we see similar flamegraph to previous client implementation, decompression is operation taking the most time, but a bit less than in previous case.  [block transfer flight clien](https://github.com/user-attachments/assets/8d96fe6e-fec4-4b66-a979-18b7f4744170) ### Experimental Results A compressed streaming file has been created, taking 5945958784 bytes (~5.5G) to act as shuffle file (~60M rows, 59986051 to be precise). Running old ballista setup takes 61.67s to transfer whole file, but total transferred bytes is 14802700720 (~14.5GB), looks like compression is not helping much. On the other hand, new implementation takes 10.75s and transfers 5945958784 bytes (actual file size)! | | Time (sec) | Bytes Transferred | | -------------------- |-----------:|-------------------:| | Old Implementation | 61 | 14802700720 | | New Implementation | 11 | 5945958784 | - ***NOTE 1: this approach uses `LZ4_FRAME` compression.*** - ***NOTE 2: all tests use local network interface.*** ### Implementation Details We have two options to implement it, extend do `do_get(...)` or implement it as `do_action(...)`. `do_get(...)` returns stream of `FlightData`, which is declared as: ```grpc /* * A batch of Arrow data as part of a stream of batches. */ message FlightData { /* [removed] ... */ FlightDescriptor flight_descriptor = 1; /* [removed] ... */ bytes data_header = 2; /* [removed] ... */ bytes app_metadata = 3; /* * The actual batch of Arrow data. [removed] ... */ bytes data_body = 1000; } ``` `data_body` property has to contain "The actual batch of Arrow data", our proposal clearly does not provide such guarantee, so extension of `do_get` may break semantic of the interface. `do_action`, on the other hand, can be extended in any way needed: ```grpc /// /// Flight services can support an arbitrary number of simple actions in /// addition to the possible ListFlights, GetFlightInfo, DoGet, DoPut /// operations that are potentially available. DoAction allows a flight client /// to do a specific action against a flight service. An action includes /// opaque request and response objects that are specific to the type action /// being undertaken. async fn do_action( &self, request: tonic::Request<super::Action>, ) -> std::result::Result<tonic::Response<Self::DoActionStream>, tonic::Status>; /// Server streaming response type for the ListActions method. type ListActionsStream: tonic::codegen::tokio_stream::Stream< Item = std::result::Result<super::ActionType, tonic::Status>, > + std::marker::Send + 'static; ``` which returns type which may be the perfect fit in our case: ```grpc /// /// An opaque result returned after executing an action. #[derive(Clone, PartialEq, ::prost::Message)] pub struct Result { #[prost(bytes = "bytes", tag = "1")] pub body: ::prost::bytes::Bytes, } ``` additionally to `do_action` we would need to implement `list_actions` ```grcp /// /// A flight service exposes all of the available action types that it has /// along with descriptions. This allows different flight consumers to /// understand the capabilities of the flight service. async fn list_actions( &self, request: tonic::Request<super::Empty>, ) -> std::result::Result< tonic::Response<Self::ListActionsStream>, tonic::Status, >; ``` ## Describe alternatives you've considered ## Additional context The proposed "block" file transport mechanism should be introduced as a configurable option. It remains to be determined whether this configuration will be applied at compile time or runtime. Further evaluation is required to select the most appropriate approach, ensuring flexibility and compatibility with existing deployment workflows. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org