This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 4a10094 perf(reader): Improve the performance of parquet reader (#230)
4a10094 is described below
commit 4a10094a91f1a3e96015996bcce06d4849686237
Author: umi <[email protected]>
AuthorDate: Sat Apr 11 17:49:44 2026 +0800
perf(reader): Improve the performance of parquet reader (#230)
---
crates/paimon/src/arrow/format/parquet.rs | 365 +++++++++++++++++++++++++++++-
1 file changed, 364 insertions(+), 1 deletion(-)
diff --git a/crates/paimon/src/arrow/format/parquet.rs
b/crates/paimon/src/arrow/format/parquet.rs
index 915bd56..b0aa0ec 100644
--- a/crates/paimon/src/arrow/format/parquet.rs
+++ b/crates/paimon/src/arrow/format/parquet.rs
@@ -33,7 +33,7 @@ use arrow_schema::ArrowError;
use async_trait::async_trait;
use bytes::Bytes;
use futures::future::BoxFuture;
-use futures::{StreamExt, TryFutureExt};
+use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use parquet::arrow::arrow_reader::{
ArrowPredicate, ArrowPredicateFn, ArrowReaderOptions, RowFilter,
RowSelection, RowSelector,
};
@@ -785,6 +785,18 @@ struct ArrowFileReader {
r: Box<dyn FileRead>,
}
+/// coalesce threshold: 1 MiB.
+const RANGE_COALESCE_BYTES: u64 = 1024 * 1024;
+/// concurrent range fetches.
+const RANGE_FETCH_CONCURRENCY: usize = 10;
+/// metadata prefetch hint: 512 KiB.
+const METADATA_SIZE_HINT: usize = 512 * 1024;
+/// Minimum range size for splitting: 4 MiB.
+/// The block size used for split alignment and as the minimum split
+/// granularity. Ranges smaller than this will not be split further to
+/// avoid excessive small IO requests whose per-request overhead dominates.
+const IO_BLOCK_SIZE: u64 = 4 * 1024 * 1024;
+
impl ArrowFileReader {
fn new(file_size: u64, r: Box<dyn FileRead>) -> Self {
Self { file_size, r }
@@ -809,14 +821,128 @@ impl AsyncFileReader for ArrowFileReader {
self.read_bytes(range)
}
+ fn get_byte_ranges(
+ &mut self,
+ ranges: Vec<Range<u64>>,
+ ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>> {
+ let coalesce_bytes = RANGE_COALESCE_BYTES;
+ let concurrency = RANGE_FETCH_CONCURRENCY;
+
+ async move {
+ if ranges.is_empty() {
+ return Ok(vec![]);
+ }
+
+ // Two-phase range optimization:
+ // Phase 1: Merge nearby ranges based on coalesce threshold.
+ let coalesced = merge_byte_ranges(&ranges, coalesce_bytes);
+ // Phase 2: Split large merged ranges to utilize concurrency,
+ // but only at original range boundaries.
+ let fetch_ranges = split_ranges_for_concurrency(coalesced,
concurrency);
+
+ // Fetch merged ranges concurrently.
+ let r = &self.r;
+ let fetched: Vec<Bytes> = if fetch_ranges.len() <= concurrency {
+ // All ranges fit within the concurrency limit — fire them all
at once.
+ futures::future::try_join_all(fetch_ranges.iter().map(|range| {
+ r.read(range.clone())
+ .map_err(|e|
parquet::errors::ParquetError::External(format!("{e}").into()))
+ }))
+ .await?
+ } else {
+ // More ranges than concurrency slots — use buffered stream.
+ futures::stream::iter(fetch_ranges.iter().cloned())
+ .map(|range| async move {
+ r.read(range).await.map_err(|e| {
+
parquet::errors::ParquetError::External(format!("{e}").into())
+ })
+ })
+ .buffered(concurrency)
+ .try_collect()
+ .await?
+ };
+
+ // Slice the fetched data back into the originally requested
+ // ranges. A single original range may span multiple fetch
+ // chunks, so we copy from as many chunks as needed.
+ let result: parquet::errors::Result<Vec<Bytes>> = ranges
+ .iter()
+ .map(|range| {
+ // Find the first fetch chunk whose end is past
range.start.
+ let first = fetch_ranges.partition_point(|v| v.end <=
range.start);
+ if first >= fetch_ranges.len() {
+ return
Err(parquet::errors::ParquetError::General(format!(
+ "No fetch range covers requested range {}..{}",
+ range.start, range.end
+ )));
+ }
+
+ let need = (range.end - range.start) as usize;
+
+ // Fast path: the original range fits entirely within one
+ // fetch chunk — zero-copy slice.
+ let fr = &fetch_ranges[first];
+ if range.end <= fr.end {
+ let start = (range.start - fr.start) as usize;
+ let end = (range.end - fr.start) as usize;
+ return Ok(fetched[first].slice(start..end));
+ }
+
+ // Slow path: the original range spans multiple fetch
+ // chunks — copy pieces into a new buffer (mirrors Java's
+ // copyMultiBytesToBytes).
+ let mut buf = Vec::with_capacity(need);
+ let mut pos = range.start;
+ for i in first..fetch_ranges.len() {
+ if pos >= range.end {
+ break;
+ }
+ let fr = &fetch_ranges[i];
+ let chunk = &fetched[i];
+ let src_start = (pos - fr.start) as usize;
+ let src_end = ((range.end.min(fr.end)) - fr.start) as
usize;
+ if src_end > chunk.len() {
+ return
Err(parquet::errors::ParquetError::General(format!(
+ "Fetched data too short for range {}..{}: \
+ chunk {}..{} has {} bytes, need up to offset
{}",
+ range.start,
+ range.end,
+ fr.start,
+ fr.end,
+ chunk.len(),
+ src_end,
+ )));
+ }
+ buf.extend_from_slice(&chunk[src_start..src_end]);
+ pos = fr.end;
+ }
+ if buf.len() != need {
+ return
Err(parquet::errors::ParquetError::General(format!(
+ "Assembled {} bytes for range {}..{}, expected {}",
+ buf.len(),
+ range.start,
+ range.end,
+ need,
+ )));
+ }
+ Ok(Bytes::from(buf))
+ })
+ .collect();
+ result
+ }
+ .boxed()
+ }
+
fn get_metadata(
&mut self,
options: Option<&ArrowReaderOptions>,
) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
let metadata_opts = options.map(|o| o.metadata_options().clone());
+ let prefetch_hint = Some(METADATA_SIZE_HINT);
Box::pin(async move {
let file_size = self.file_size;
let metadata = ParquetMetaDataReader::new()
+ .with_prefetch_hint(prefetch_hint)
.with_metadata_options(metadata_opts)
.load_and_finish(self, file_size)
.await?;
@@ -825,6 +951,98 @@ impl AsyncFileReader for ArrowFileReader {
}
}
+// ---------------------------------------------------------------------------
+// Range coalescing
+// ---------------------------------------------------------------------------
+
+/// Merge nearby byte ranges to reduce the number of requests.
+///
+/// Ranges whose gap is ≤ `coalesce` bytes are merged into a single range.
+/// The input does not need to be sorted.
+fn merge_byte_ranges(ranges: &[Range<u64>], coalesce: u64) -> Vec<Range<u64>> {
+ if ranges.is_empty() {
+ return vec![];
+ }
+
+ let mut sorted = ranges.to_vec();
+ sorted.sort_unstable_by_key(|r| r.start);
+
+ let mut merged = Vec::with_capacity(sorted.len());
+ let mut start_idx = 0;
+ let mut end_idx = 1;
+
+ while start_idx != sorted.len() {
+ let mut range_end = sorted[start_idx].end;
+
+ while end_idx != sorted.len()
+ && sorted[end_idx]
+ .start
+ .checked_sub(range_end)
+ .map(|delta| delta <= coalesce)
+ .unwrap_or(true)
+ {
+ range_end = range_end.max(sorted[end_idx].end);
+ end_idx += 1;
+ }
+
+ merged.push(sorted[start_idx].start..range_end);
+ start_idx = end_idx;
+ end_idx += 1;
+ }
+
+ merged
+}
+
+/// Split merged ranges into fixed-size batches to utilize concurrency,
+/// Each merged range is divided into chunks of `expected_size`,
+/// with the last chunk taking whatever remains.
+/// Ranges smaller than `2 * IO_BLOCK_SIZE` are kept as-is to
+/// avoid excessive small IO requests.
+fn split_ranges_for_concurrency(merged: Vec<Range<u64>>, concurrency: usize)
-> Vec<Range<u64>> {
+ if merged.is_empty() || concurrency <= 1 {
+ return merged;
+ }
+
+ let mut result = Vec::with_capacity(merged.len());
+
+ for range in &merged {
+ let length = range.end - range.start;
+ let raw_size = IO_BLOCK_SIZE.max(length.div_ceil(concurrency as u64));
+ // Round up to the nearest multiple of IO_BLOCK_SIZE (4 MB) so that
+ // every split boundary is 4 MB-aligned relative to the range start.
+ let expected_size = raw_size.div_ceil(IO_BLOCK_SIZE) * IO_BLOCK_SIZE;
+ let min_tail_size = expected_size.max(IO_BLOCK_SIZE * 2);
+
+ let mut offset = range.start;
+ let end = range.end;
+
+ // Align the first split boundary: if `offset` is not 4 MB-aligned,
+ // emit a short head chunk so that all subsequent chunks start on a
+ // 4 MB boundary.
+ let misalign = offset % IO_BLOCK_SIZE;
+ if misalign != 0 {
+ let first_end = (offset - misalign + IO_BLOCK_SIZE).min(end);
+ result.push(offset..first_end);
+ offset = first_end;
+ }
+
+ loop {
+ if offset >= end {
+ break;
+ }
+ if end - offset < min_tail_size {
+ result.push(offset..end);
+ break;
+ } else {
+ result.push(offset..offset + expected_size);
+ offset += expected_size;
+ }
+ }
+ }
+
+ result
+}
+
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
@@ -878,4 +1096,149 @@ mod tests {
assert!(row_filter.is_some());
}
+
+ // -----------------------------------------------------------------------
+ // merge_byte_ranges tests
+ // -----------------------------------------------------------------------
+
+ #[test]
+ fn test_merge_byte_ranges_empty() {
+ assert_eq!(
+ super::merge_byte_ranges(&[], 1024),
+ Vec::<std::ops::Range<u64>>::new()
+ );
+ }
+
+ #[test]
+ fn test_merge_byte_ranges_no_coalesce() {
+ // Ranges far apart should not be merged
+ let ranges = vec![0..100, 1_000_000..1_000_100];
+ let merged = super::merge_byte_ranges(&ranges, 1024);
+ assert_eq!(merged, vec![0..100, 1_000_000..1_000_100]);
+ }
+
+ #[test]
+ fn test_merge_byte_ranges_coalesce() {
+ // Ranges within the gap threshold should be merged
+ let ranges = vec![0..100, 200..300, 500..600];
+ let merged = super::merge_byte_ranges(&ranges, 1024);
+ assert_eq!(merged, vec![0..600]);
+ }
+
+ #[test]
+ fn test_merge_byte_ranges_zero_coalesce_gap() {
+ // With coalesce=0, ranges with a 1-byte gap should NOT merge
+ let ranges = vec![0..100, 101..200];
+ let merged = super::merge_byte_ranges(&ranges, 0);
+ assert_eq!(merged, vec![0..100, 101..200]);
+ }
+
+ // -----------------------------------------------------------------------
+ // split_ranges_for_concurrency tests
+ // -----------------------------------------------------------------------
+
+ #[test]
+ fn test_split_aligned_range_0_to_20mb() {
+ // 0..20MB, concurrency=4:
+ // raw_size = max(4MB, 5MB+1) = 5MB+1
+ // expected_size = ceil((5MB+1)/4MB)*4MB = 8MB
+ // min_tail_size = max(8MB, 8MB) = 8MB
+ // No misalign. Chunks: [0..8, 8..16, 16..20]
+ let mb = 1024 * 1024u64;
+ #[allow(clippy::single_range_in_vec_init)]
+ let merged = vec![0..20 * mb];
+ let result = super::split_ranges_for_concurrency(merged, 4);
+ assert_eq!(result, vec![0..8 * mb, 8 * mb..16 * mb, 16 * mb..20 * mb]);
+ }
+
+ #[test]
+ fn test_split_unaligned_start_6_to_14mb() {
+ // 6MB..14MB, concurrency=4:
+ // raw_size = max(4MB, 2MB+1) = 4MB
+ // expected_size = 4MB, min_tail_size = 8MB
+ // Head: 6..8MB. Loop: 8+8=16 > 14 → tail 8..14.
+ // Result: [6..8, 8..14]
+ let mb = 1024 * 1024u64;
+ #[allow(clippy::single_range_in_vec_init)]
+ let merged = vec![6 * mb..14 * mb];
+ let result = super::split_ranges_for_concurrency(merged, 4);
+ assert_eq!(result, vec![6 * mb..8 * mb, 8 * mb..14 * mb]);
+ }
+
+ #[test]
+ fn test_split_unaligned_start_6_to_22mb() {
+ // 6MB..22MB, concurrency=4:
+ // raw_size = max(4MB, ceil(16MB/4)) = 4MB
+ // expected_size = ceil(4MB/4MB)*4MB = 4MB
+ // min_tail_size = max(4MB, 8MB) = 8MB
+ // Head: 6..8MB (misalign=2MB).
+ // Loop: 22-8=14≥8 → 8..12; 22-12=10≥8 → 12..16; 22-16=6<8 → tail
16..22.
+ // Result: [6..8, 8..12, 12..16, 16..22]
+ let mb = 1024 * 1024u64;
+ #[allow(clippy::single_range_in_vec_init)]
+ let merged = vec![6 * mb..22 * mb];
+ let result = super::split_ranges_for_concurrency(merged, 4);
+ assert_eq!(
+ result,
+ vec![
+ 6 * mb..8 * mb,
+ 8 * mb..12 * mb,
+ 12 * mb..16 * mb,
+ 16 * mb..22 * mb,
+ ]
+ );
+ }
+
+ #[test]
+ fn test_split_already_aligned_8_to_24mb() {
+ // 8MB..24MB, concurrency=4:
+ // raw_size = max(4MB, ceil(16MB/4)) = 4MB
+ // expected_size = 4MB, min_tail_size = 8MB
+ // No misalign.
+ // Loop: 24-8=16≥8 → 8..12; 24-12=12≥8 → 12..16; 24-16=8≥8 → 16..20;
24-20=4<8 → tail 20..24.
+ // Result: [8..12, 12..16, 16..20, 20..24]
+ let mb = 1024 * 1024u64;
+ #[allow(clippy::single_range_in_vec_init)]
+ let merged = vec![8 * mb..24 * mb];
+ let result = super::split_ranges_for_concurrency(merged, 4);
+ assert_eq!(
+ result,
+ vec![
+ 8 * mb..12 * mb,
+ 12 * mb..16 * mb,
+ 16 * mb..20 * mb,
+ 20 * mb..24 * mb,
+ ]
+ );
+ }
+
+ #[test]
+ fn test_split_multiple_ranges() {
+ // [0..20MB, 24..44MB], concurrency=4:
+ // Range 0..20MB → [0..8, 8..16, 16..20] (same as test above)
+ // Range 24..44MB (20MB): expected_size=8MB, min_tail_size=8MB, no
misalign.
+ // 24+8=32 ≤ 44 → 24..32; 32+8=40 ≤ 44 → 32..40; 40+8=48 > 44 →
tail 40..44.
+ // Result: [0..8, 8..16, 16..20, 24..32, 32..40, 40..44]
+ let mb = 1024 * 1024u64;
+ let merged = vec![0..20 * mb, 24 * mb..44 * mb];
+ let result = super::split_ranges_for_concurrency(merged, 4);
+ assert_eq!(
+ result,
+ vec![
+ 0..8 * mb,
+ 8 * mb..16 * mb,
+ 16 * mb..20 * mb,
+ 24 * mb..32 * mb,
+ 32 * mb..40 * mb,
+ 40 * mb..44 * mb,
+ ]
+ );
+ }
+
+ #[test]
+ fn test_split_empty() {
+ let merged: Vec<std::ops::Range<u64>> = vec![];
+ let result = super::split_ranges_for_concurrency(merged, 4);
+ assert!(result.is_empty());
+ }
}