milenkovicm opened a new issue, #1315:
URL: https://github.com/apache/datafusion-ballista/issues/1315

   ## Executive Summary
   
   By optimizing the Ballista Arrow Flight Server and ballista client 
implementation, the time required to transfer a 5.5 GB shuffle file can be 
reduced from 61 seconds to 11 seconds. This represents nearly a sixfold 
improvement in transfer speed and approximately a threefold reduction in the 
amount of data transmitted.
   
   ## Is your feature request related to a problem or challenge? Please 
describe what you are trying to do
   
   Ballista `ShuffleWriterExec` writes shuffle files as compressed arrow stream 
(`CompressionType::LZ4_FRAME`). Shuffle files are read by `ShuffleReaderExec` 
either locally using local file read or remotely using ballista flight server.
   
   Local read will be performed using `StreamReader`.
   
   Remote reads will be performed using `BallistaClient` which will use arrow 
flight stream to fetch data from a arrow flight server end point.
   
   
![Image](https://github.com/user-attachments/assets/3ba0f323-4c3d-4306-acce-ccaf6c0b569e)
   
   `doGet` method implementation at, the flight server side ,  will read arrow 
stream file, uncompress it decode it (split it in batches) and, decode, 
compress and ship data over GRPC. This all will take some time, with 
compress/decompress taking most of the time, as illustrated in
   
   ![flight server 
flamegraph](https://github.com/user-attachments/assets/065b0e1f-e2fa-4d89-aae9-3ce595409b76)
   [flight server 
flamegraph](https://github.com/user-attachments/assets/065b0e1f-e2fa-4d89-aae9-3ce595409b76)
   
   Actual ballista [flight server 
implementation](https://github.com/apache/datafusion-ballista/blob/4c9b360b51afd3f29614b43b4f3727c473e515a5/ballista/executor/src/flight_service.rs#L88:L120)
 details.
   
   At the client side we see similar flamegraph where decompression takes most 
of the time
   
   ![ballista flight client 
flamegraph](https://github.com/user-attachments/assets/3a3f53bd-b4f2-4da9-badf-5888a895879b)
   [ballista flight client 
flamegraph](https://github.com/user-attachments/assets/3a3f53bd-b4f2-4da9-badf-5888a895879b)
   
   The question to be asked is why do we need to do all the work on the server 
side when source file is already arrow stream file? One of the possible answers 
could be: "to comply with `do_get` method semantics".
   
   ## Describe the solution you'd like
   
   Modify the Flight server implementation to eliminate shuffle partition 
decoding and decompressing. Instead, the server should divide the file into 
smaller, fixed-size chunks (for example, 4 MB byte arrays) and transmit these 
chunks sequentially over the network. This approach avoids unnecessary decoding 
and leverages efficient bulk data compression.
   
   Adopting this proposal will alter/replace the existing method, where each 
transmitted chunk is directly convertible to a `RecordBatch`. In the revised 
approach, each chunk may correspond to zero or more `RecordBatch` instances, 
depending on the data boundaries within the chunk. Transmitting file chunks 
rather than serialized record batches is expected to enhance compression 
efficiency, as the compression algorithm can operate on larger, contiguous data 
segments.
   
   
![Image](https://github.com/user-attachments/assets/a001fcc4-d411-4e4f-a0bc-f798f8eb5f62)
   
   On the client side, implement data fetching stream logic and decode it using 
custom `StreamDecoder`.
   
   This would free arrow flight server of very heavy compression/decompression 
operation, making it more scalable.
   
   ![block transfer flight server 
flamegraph](https://github.com/user-attachments/assets/554305f7-6087-4724-a03c-994649e64ed0)
   [block transfer flight server 
flamegraph](https://github.com/user-attachments/assets/554305f7-6087-4724-a03c-994649e64ed0)
   
   As we can see, server just moves data from file to socket.
   
   At the client side we see similar flamegraph to previous client 
implementation, decompression is operation taking the most time, but a bit less 
than in previous case.
   
   ![block transfer flight 
clien](https://github.com/user-attachments/assets/8d96fe6e-fec4-4b66-a979-18b7f4744170)
   [block transfer flight 
clien](https://github.com/user-attachments/assets/8d96fe6e-fec4-4b66-a979-18b7f4744170)
   
   
   ### Experimental Results
   
   A compressed streaming file has been created, taking 5945958784 bytes 
(~5.5G) to act as shuffle file (~60M rows, 59986051 to be precise).
   
   Running old ballista setup takes 61.67s to transfer whole file, but total 
transferred bytes is 14802700720 (~14.5GB), looks like compression is not 
helping much.
   
   On the other hand, new implementation takes 10.75s and transfers 5945958784 
bytes (actual file size)!
   
   |                      | Time (sec) | Bytes Transferred  |
   | -------------------- |-----------:|-------------------:|
   | Old Implementation   | 61         |  14802700720       |
   | New Implementation   | 11         |  5945958784        |
   
   - ***NOTE 1: this approach uses `LZ4_FRAME` compression.***
   - ***NOTE 2: all tests use local network interface.***
   
   ### Implementation Details
   
   We have two options to implement it, extend do `do_get(...)` or implement it 
as `do_action(...)`.
   
   `do_get(...)` returns stream of `FlightData`, which is declared as:
   
   ```grpc
   /*
    * A batch of Arrow data as part of a stream of batches.
    */
   message FlightData {
     /* [removed] ... */
     FlightDescriptor flight_descriptor = 1;
     /* [removed] ... */
     bytes data_header = 2;
     /* [removed] ... */
     bytes app_metadata = 3;
     /*
      * The actual batch of Arrow data. [removed] ...
      */
     bytes data_body = 1000;
   }
   ```
   
   `data_body` property has to contain "The actual batch of Arrow data", our 
proposal clearly does not provide such guarantee, so extension of `do_get` may 
break semantic of the interface.
   
   `do_action`, on the other hand, can be extended in any way needed:
   
   ```grpc
   ///
   /// Flight services can support an arbitrary number of simple actions in
   /// addition to the possible ListFlights, GetFlightInfo, DoGet, DoPut
   /// operations that are potentially available. DoAction allows a flight 
client
   /// to do a specific action against a flight service. An action includes
   /// opaque request and response objects that are specific to the type action
   /// being undertaken.
   async fn do_action(
       &self,
       request: tonic::Request<super::Action>,
   ) -> std::result::Result<tonic::Response<Self::DoActionStream>, 
tonic::Status>;
   /// Server streaming response type for the ListActions method.
   type ListActionsStream: tonic::codegen::tokio_stream::Stream<
           Item = std::result::Result<super::ActionType, tonic::Status>,
       >
       + std::marker::Send
       + 'static;
   
   ```
   
   which returns type which may be the perfect fit in our case:
   
   ```grpc
   ///
   /// An opaque result returned after executing an action.
   #[derive(Clone, PartialEq, ::prost::Message)]
   pub struct Result {
       #[prost(bytes = "bytes", tag = "1")]
       pub body: ::prost::bytes::Bytes,
   }
   ```
   
   additionally to `do_action` we would need to implement `list_actions`
   
   ```grcp
   ///
   /// A flight service exposes all of the available action types that it has
   /// along with descriptions. This allows different flight consumers to
   /// understand the capabilities of the flight service.
   async fn list_actions(
       &self,
       request: tonic::Request<super::Empty>,
   ) -> std::result::Result<
       tonic::Response<Self::ListActionsStream>,
       tonic::Status,
   >;
   ```
   
   ## Describe alternatives you've considered
   
   ## Additional context
   
   The proposed "block" file transport mechanism should be introduced as a 
configurable option. It remains to be determined whether this configuration 
will be applied at compile time or runtime. Further evaluation is required to 
select the most appropriate approach, ensuring flexibility and compatibility 
with existing deployment workflows.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to