This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 4a10094  perf(reader): Improve the performance of parquet reader (#230)
4a10094 is described below

commit 4a10094a91f1a3e96015996bcce06d4849686237
Author: umi <[email protected]>
AuthorDate: Sat Apr 11 17:49:44 2026 +0800

    perf(reader): Improve the performance of parquet reader (#230)
---
 crates/paimon/src/arrow/format/parquet.rs | 365 +++++++++++++++++++++++++++++-
 1 file changed, 364 insertions(+), 1 deletion(-)

diff --git a/crates/paimon/src/arrow/format/parquet.rs 
b/crates/paimon/src/arrow/format/parquet.rs
index 915bd56..b0aa0ec 100644
--- a/crates/paimon/src/arrow/format/parquet.rs
+++ b/crates/paimon/src/arrow/format/parquet.rs
@@ -33,7 +33,7 @@ use arrow_schema::ArrowError;
 use async_trait::async_trait;
 use bytes::Bytes;
 use futures::future::BoxFuture;
-use futures::{StreamExt, TryFutureExt};
+use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
 use parquet::arrow::arrow_reader::{
     ArrowPredicate, ArrowPredicateFn, ArrowReaderOptions, RowFilter, 
RowSelection, RowSelector,
 };
@@ -785,6 +785,18 @@ struct ArrowFileReader {
     r: Box<dyn FileRead>,
 }
 
+/// coalesce threshold: 1 MiB.
+const RANGE_COALESCE_BYTES: u64 = 1024 * 1024;
+/// concurrent range fetches.
+const RANGE_FETCH_CONCURRENCY: usize = 10;
+/// metadata prefetch hint: 512 KiB.
+const METADATA_SIZE_HINT: usize = 512 * 1024;
+/// Minimum range size for splitting: 4 MiB.
+/// The block size used for split alignment and as the minimum split
+/// granularity.  Ranges smaller than this will not be split further to
+/// avoid excessive small IO requests whose per-request overhead dominates.
+const IO_BLOCK_SIZE: u64 = 4 * 1024 * 1024;
+
 impl ArrowFileReader {
     fn new(file_size: u64, r: Box<dyn FileRead>) -> Self {
         Self { file_size, r }
@@ -809,14 +821,128 @@ impl AsyncFileReader for ArrowFileReader {
         self.read_bytes(range)
     }
 
+    fn get_byte_ranges(
+        &mut self,
+        ranges: Vec<Range<u64>>,
+    ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>> {
+        let coalesce_bytes = RANGE_COALESCE_BYTES;
+        let concurrency = RANGE_FETCH_CONCURRENCY;
+
+        async move {
+            if ranges.is_empty() {
+                return Ok(vec![]);
+            }
+
+            // Two-phase range optimization:
+            // Phase 1: Merge nearby ranges based on coalesce threshold.
+            let coalesced = merge_byte_ranges(&ranges, coalesce_bytes);
+            // Phase 2: Split large merged ranges to utilize concurrency,
+            // but only at original range boundaries.
+            let fetch_ranges = split_ranges_for_concurrency(coalesced, 
concurrency);
+
+            // Fetch merged ranges concurrently.
+            let r = &self.r;
+            let fetched: Vec<Bytes> = if fetch_ranges.len() <= concurrency {
+                // All ranges fit within the concurrency limit — fire them all 
at once.
+                futures::future::try_join_all(fetch_ranges.iter().map(|range| {
+                    r.read(range.clone())
+                        .map_err(|e| 
parquet::errors::ParquetError::External(format!("{e}").into()))
+                }))
+                .await?
+            } else {
+                // More ranges than concurrency slots — use buffered stream.
+                futures::stream::iter(fetch_ranges.iter().cloned())
+                    .map(|range| async move {
+                        r.read(range).await.map_err(|e| {
+                            
parquet::errors::ParquetError::External(format!("{e}").into())
+                        })
+                    })
+                    .buffered(concurrency)
+                    .try_collect()
+                    .await?
+            };
+
+            // Slice the fetched data back into the originally requested
+            // ranges.  A single original range may span multiple fetch
+            // chunks, so we copy from as many chunks as needed.
+            let result: parquet::errors::Result<Vec<Bytes>> = ranges
+                .iter()
+                .map(|range| {
+                    // Find the first fetch chunk whose end is past 
range.start.
+                    let first = fetch_ranges.partition_point(|v| v.end <= 
range.start);
+                    if first >= fetch_ranges.len() {
+                        return 
Err(parquet::errors::ParquetError::General(format!(
+                            "No fetch range covers requested range {}..{}",
+                            range.start, range.end
+                        )));
+                    }
+
+                    let need = (range.end - range.start) as usize;
+
+                    // Fast path: the original range fits entirely within one
+                    // fetch chunk — zero-copy slice.
+                    let fr = &fetch_ranges[first];
+                    if range.end <= fr.end {
+                        let start = (range.start - fr.start) as usize;
+                        let end = (range.end - fr.start) as usize;
+                        return Ok(fetched[first].slice(start..end));
+                    }
+
+                    // Slow path: the original range spans multiple fetch
+                    // chunks — copy pieces into a new buffer (mirrors Java's
+                    // copyMultiBytesToBytes).
+                    let mut buf = Vec::with_capacity(need);
+                    let mut pos = range.start;
+                    for i in first..fetch_ranges.len() {
+                        if pos >= range.end {
+                            break;
+                        }
+                        let fr = &fetch_ranges[i];
+                        let chunk = &fetched[i];
+                        let src_start = (pos - fr.start) as usize;
+                        let src_end = ((range.end.min(fr.end)) - fr.start) as 
usize;
+                        if src_end > chunk.len() {
+                            return 
Err(parquet::errors::ParquetError::General(format!(
+                                "Fetched data too short for range {}..{}: \
+                                 chunk {}..{} has {} bytes, need up to offset 
{}",
+                                range.start,
+                                range.end,
+                                fr.start,
+                                fr.end,
+                                chunk.len(),
+                                src_end,
+                            )));
+                        }
+                        buf.extend_from_slice(&chunk[src_start..src_end]);
+                        pos = fr.end;
+                    }
+                    if buf.len() != need {
+                        return 
Err(parquet::errors::ParquetError::General(format!(
+                            "Assembled {} bytes for range {}..{}, expected {}",
+                            buf.len(),
+                            range.start,
+                            range.end,
+                            need,
+                        )));
+                    }
+                    Ok(Bytes::from(buf))
+                })
+                .collect();
+            result
+        }
+        .boxed()
+    }
+
     fn get_metadata(
         &mut self,
         options: Option<&ArrowReaderOptions>,
     ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
         let metadata_opts = options.map(|o| o.metadata_options().clone());
+        let prefetch_hint = Some(METADATA_SIZE_HINT);
         Box::pin(async move {
             let file_size = self.file_size;
             let metadata = ParquetMetaDataReader::new()
+                .with_prefetch_hint(prefetch_hint)
                 .with_metadata_options(metadata_opts)
                 .load_and_finish(self, file_size)
                 .await?;
@@ -825,6 +951,98 @@ impl AsyncFileReader for ArrowFileReader {
     }
 }
 
+// ---------------------------------------------------------------------------
+// Range coalescing
+// ---------------------------------------------------------------------------
+
+/// Merge nearby byte ranges to reduce the number of requests.
+///
+/// Ranges whose gap is ≤ `coalesce` bytes are merged into a single range.
+/// The input does not need to be sorted.
+fn merge_byte_ranges(ranges: &[Range<u64>], coalesce: u64) -> Vec<Range<u64>> {
+    if ranges.is_empty() {
+        return vec![];
+    }
+
+    let mut sorted = ranges.to_vec();
+    sorted.sort_unstable_by_key(|r| r.start);
+
+    let mut merged = Vec::with_capacity(sorted.len());
+    let mut start_idx = 0;
+    let mut end_idx = 1;
+
+    while start_idx != sorted.len() {
+        let mut range_end = sorted[start_idx].end;
+
+        while end_idx != sorted.len()
+            && sorted[end_idx]
+                .start
+                .checked_sub(range_end)
+                .map(|delta| delta <= coalesce)
+                .unwrap_or(true)
+        {
+            range_end = range_end.max(sorted[end_idx].end);
+            end_idx += 1;
+        }
+
+        merged.push(sorted[start_idx].start..range_end);
+        start_idx = end_idx;
+        end_idx += 1;
+    }
+
+    merged
+}
+
+/// Split merged ranges into fixed-size batches to utilize concurrency,
+/// Each merged range is divided into chunks of `expected_size`,
+/// with the last chunk taking whatever remains.
+/// Ranges smaller than `2 * IO_BLOCK_SIZE` are kept as-is to
+/// avoid excessive small IO requests.
+fn split_ranges_for_concurrency(merged: Vec<Range<u64>>, concurrency: usize) 
-> Vec<Range<u64>> {
+    if merged.is_empty() || concurrency <= 1 {
+        return merged;
+    }
+
+    let mut result = Vec::with_capacity(merged.len());
+
+    for range in &merged {
+        let length = range.end - range.start;
+        let raw_size = IO_BLOCK_SIZE.max(length.div_ceil(concurrency as u64));
+        // Round up to the nearest multiple of IO_BLOCK_SIZE (4 MB) so that
+        // every split boundary is 4 MB-aligned relative to the range start.
+        let expected_size = raw_size.div_ceil(IO_BLOCK_SIZE) * IO_BLOCK_SIZE;
+        let min_tail_size = expected_size.max(IO_BLOCK_SIZE * 2);
+
+        let mut offset = range.start;
+        let end = range.end;
+
+        // Align the first split boundary: if `offset` is not 4 MB-aligned,
+        // emit a short head chunk so that all subsequent chunks start on a
+        // 4 MB boundary.
+        let misalign = offset % IO_BLOCK_SIZE;
+        if misalign != 0 {
+            let first_end = (offset - misalign + IO_BLOCK_SIZE).min(end);
+            result.push(offset..first_end);
+            offset = first_end;
+        }
+
+        loop {
+            if offset >= end {
+                break;
+            }
+            if end - offset < min_tail_size {
+                result.push(offset..end);
+                break;
+            } else {
+                result.push(offset..offset + expected_size);
+                offset += expected_size;
+            }
+        }
+    }
+
+    result
+}
+
 // ---------------------------------------------------------------------------
 // Tests
 // ---------------------------------------------------------------------------
@@ -878,4 +1096,149 @@ mod tests {
 
         assert!(row_filter.is_some());
     }
+
+    // -----------------------------------------------------------------------
+    // merge_byte_ranges tests
+    // -----------------------------------------------------------------------
+
+    #[test]
+    fn test_merge_byte_ranges_empty() {
+        assert_eq!(
+            super::merge_byte_ranges(&[], 1024),
+            Vec::<std::ops::Range<u64>>::new()
+        );
+    }
+
+    #[test]
+    fn test_merge_byte_ranges_no_coalesce() {
+        // Ranges far apart should not be merged
+        let ranges = vec![0..100, 1_000_000..1_000_100];
+        let merged = super::merge_byte_ranges(&ranges, 1024);
+        assert_eq!(merged, vec![0..100, 1_000_000..1_000_100]);
+    }
+
+    #[test]
+    fn test_merge_byte_ranges_coalesce() {
+        // Ranges within the gap threshold should be merged
+        let ranges = vec![0..100, 200..300, 500..600];
+        let merged = super::merge_byte_ranges(&ranges, 1024);
+        assert_eq!(merged, vec![0..600]);
+    }
+
+    #[test]
+    fn test_merge_byte_ranges_zero_coalesce_gap() {
+        // With coalesce=0, ranges with a 1-byte gap should NOT merge
+        let ranges = vec![0..100, 101..200];
+        let merged = super::merge_byte_ranges(&ranges, 0);
+        assert_eq!(merged, vec![0..100, 101..200]);
+    }
+
+    // -----------------------------------------------------------------------
+    // split_ranges_for_concurrency tests
+    // -----------------------------------------------------------------------
+
+    #[test]
+    fn test_split_aligned_range_0_to_20mb() {
+        // 0..20MB, concurrency=4:
+        //   raw_size = max(4MB, 5MB+1) = 5MB+1
+        //   expected_size = ceil((5MB+1)/4MB)*4MB = 8MB
+        //   min_tail_size = max(8MB, 8MB) = 8MB
+        //   No misalign. Chunks: [0..8, 8..16, 16..20]
+        let mb = 1024 * 1024u64;
+        #[allow(clippy::single_range_in_vec_init)]
+        let merged = vec![0..20 * mb];
+        let result = super::split_ranges_for_concurrency(merged, 4);
+        assert_eq!(result, vec![0..8 * mb, 8 * mb..16 * mb, 16 * mb..20 * mb]);
+    }
+
+    #[test]
+    fn test_split_unaligned_start_6_to_14mb() {
+        // 6MB..14MB, concurrency=4:
+        //   raw_size = max(4MB, 2MB+1) = 4MB
+        //   expected_size = 4MB, min_tail_size = 8MB
+        //   Head: 6..8MB. Loop: 8+8=16 > 14 → tail 8..14.
+        //   Result: [6..8, 8..14]
+        let mb = 1024 * 1024u64;
+        #[allow(clippy::single_range_in_vec_init)]
+        let merged = vec![6 * mb..14 * mb];
+        let result = super::split_ranges_for_concurrency(merged, 4);
+        assert_eq!(result, vec![6 * mb..8 * mb, 8 * mb..14 * mb]);
+    }
+
+    #[test]
+    fn test_split_unaligned_start_6_to_22mb() {
+        // 6MB..22MB, concurrency=4:
+        //   raw_size = max(4MB, ceil(16MB/4)) = 4MB
+        //   expected_size = ceil(4MB/4MB)*4MB = 4MB
+        //   min_tail_size = max(4MB, 8MB) = 8MB
+        //   Head: 6..8MB (misalign=2MB).
+        //   Loop: 22-8=14≥8 → 8..12; 22-12=10≥8 → 12..16; 22-16=6<8 → tail 
16..22.
+        //   Result: [6..8, 8..12, 12..16, 16..22]
+        let mb = 1024 * 1024u64;
+        #[allow(clippy::single_range_in_vec_init)]
+        let merged = vec![6 * mb..22 * mb];
+        let result = super::split_ranges_for_concurrency(merged, 4);
+        assert_eq!(
+            result,
+            vec![
+                6 * mb..8 * mb,
+                8 * mb..12 * mb,
+                12 * mb..16 * mb,
+                16 * mb..22 * mb,
+            ]
+        );
+    }
+
+    #[test]
+    fn test_split_already_aligned_8_to_24mb() {
+        // 8MB..24MB, concurrency=4:
+        //   raw_size = max(4MB, ceil(16MB/4)) = 4MB
+        //   expected_size = 4MB, min_tail_size = 8MB
+        //   No misalign.
+        //   Loop: 24-8=16≥8 → 8..12; 24-12=12≥8 → 12..16; 24-16=8≥8 → 16..20; 
24-20=4<8 → tail 20..24.
+        //   Result: [8..12, 12..16, 16..20, 20..24]
+        let mb = 1024 * 1024u64;
+        #[allow(clippy::single_range_in_vec_init)]
+        let merged = vec![8 * mb..24 * mb];
+        let result = super::split_ranges_for_concurrency(merged, 4);
+        assert_eq!(
+            result,
+            vec![
+                8 * mb..12 * mb,
+                12 * mb..16 * mb,
+                16 * mb..20 * mb,
+                20 * mb..24 * mb,
+            ]
+        );
+    }
+
+    #[test]
+    fn test_split_multiple_ranges() {
+        // [0..20MB, 24..44MB], concurrency=4:
+        //   Range 0..20MB → [0..8, 8..16, 16..20] (same as test above)
+        //   Range 24..44MB (20MB): expected_size=8MB, min_tail_size=8MB, no 
misalign.
+        //     24+8=32 ≤ 44 → 24..32; 32+8=40 ≤ 44 → 32..40; 40+8=48 > 44 → 
tail 40..44.
+        //   Result: [0..8, 8..16, 16..20, 24..32, 32..40, 40..44]
+        let mb = 1024 * 1024u64;
+        let merged = vec![0..20 * mb, 24 * mb..44 * mb];
+        let result = super::split_ranges_for_concurrency(merged, 4);
+        assert_eq!(
+            result,
+            vec![
+                0..8 * mb,
+                8 * mb..16 * mb,
+                16 * mb..20 * mb,
+                24 * mb..32 * mb,
+                32 * mb..40 * mb,
+                40 * mb..44 * mb,
+            ]
+        );
+    }
+
+    #[test]
+    fn test_split_empty() {
+        let merged: Vec<std::ops::Range<u64>> = vec![];
+        let result = super::split_ranges_for_concurrency(merged, 4);
+        assert!(result.is_empty());
+    }
 }

Reply via email to