This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new fa8091e refactor(read): split ArrowReader into DataFileReader,
DataEvolutionReader and TableRead (#246)
fa8091e is described below
commit fa8091eca2d4e07011a9a9287045828d3e49f94d
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Apr 15 15:16:38 2026 +0800
refactor(read): split ArrowReader into DataFileReader, DataEvolutionReader
and TableRead (#246)
Move the monolithic arrow/reader.rs into three focused modules under table/:
- data_file_reader.rs: regular Parquet file reading with schema evolution
- data_evolution_reader.rs: column-merge reading for data evolution mode
- table_read.rs: TableRead extracted from read_builder.rs
Also replaces assert! with proper error returns and adds bounds checking
in attach_row_id.
---
crates/paimon/src/arrow/mod.rs | 3 -
crates/paimon/src/arrow/reader.rs | 945 -----------------------
crates/paimon/src/table/data_evolution_reader.rs | 455 +++++++++++
crates/paimon/src/table/data_file_reader.rs | 457 +++++++++++
crates/paimon/src/table/mod.rs | 6 +-
crates/paimon/src/table/read_builder.rs | 105 +--
crates/paimon/src/table/table_read.rs | 110 +++
crates/paimon/src/table/write_builder.rs | 2 +-
8 files changed, 1036 insertions(+), 1047 deletions(-)
diff --git a/crates/paimon/src/arrow/mod.rs b/crates/paimon/src/arrow/mod.rs
index e2f60fc..37f907b 100644
--- a/crates/paimon/src/arrow/mod.rs
+++ b/crates/paimon/src/arrow/mod.rs
@@ -17,11 +17,8 @@
pub(crate) mod filtering;
pub(crate) mod format;
-mod reader;
pub(crate) mod schema_evolution;
-pub use crate::arrow::reader::ArrowReaderBuilder;
-
use crate::spec::{
ArrayType, BigIntType, BooleanType, DataField, DataType as PaimonDataType,
DateType,
DecimalType, DoubleType, FloatType, IntType, LocalZonedTimestampType,
MapType, RowType,
diff --git a/crates/paimon/src/arrow/reader.rs
b/crates/paimon/src/arrow/reader.rs
deleted file mode 100644
index 3f19bd8..0000000
--- a/crates/paimon/src/arrow/reader.rs
+++ /dev/null
@@ -1,945 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::arrow::build_target_arrow_schema;
-use crate::arrow::format::create_format_reader;
-use crate::arrow::schema_evolution::{create_index_mapping, NULL_FIELD_INDEX};
-use crate::deletion_vector::{DeletionVector, DeletionVectorFactory};
-use crate::io::FileIO;
-use crate::spec::{DataField, DataFileMeta, Predicate, ROW_ID_FIELD_NAME};
-use crate::table::schema_manager::SchemaManager;
-use crate::table::ArrowRecordBatchStream;
-use crate::table::RowRange;
-use crate::{DataSplit, Error};
-use arrow_array::{Array, Int64Array, RecordBatch};
-use arrow_cast::cast;
-
-use async_stream::try_stream;
-use futures::StreamExt;
-use std::collections::HashMap;
-use std::sync::Arc;
-
-/// Builder to create ArrowReader
-pub struct ArrowReaderBuilder {
- batch_size: Option<usize>,
- file_io: FileIO,
- schema_manager: SchemaManager,
- table_schema_id: i64,
- predicates: Vec<Predicate>,
- table_fields: Vec<DataField>,
-}
-
-impl ArrowReaderBuilder {
- /// Create a new ArrowReaderBuilder
- pub(crate) fn new(
- file_io: FileIO,
- schema_manager: SchemaManager,
- table_schema_id: i64,
- ) -> Self {
- ArrowReaderBuilder {
- batch_size: None,
- file_io,
- schema_manager,
- table_schema_id,
- predicates: Vec::new(),
- table_fields: Vec::new(),
- }
- }
-
- /// Set data predicates used for Parquet row-group pruning and partial
- /// decode-time filtering.
- pub(crate) fn with_predicates(mut self, predicates: Vec<Predicate>) ->
Self {
- self.predicates = predicates;
- self
- }
-
- /// Set the full table schema fields used for filter-to-file field mapping.
- pub(crate) fn with_table_fields(mut self, table_fields: Vec<DataField>) ->
Self {
- self.table_fields = table_fields;
- self
- }
-
- /// Build the ArrowReader with the given read type (logical row type or
projected subset).
- /// Used to clip Parquet schema to requested columns only.
- pub fn build(self, read_type: Vec<DataField>) -> ArrowReader {
- ArrowReader {
- batch_size: self.batch_size,
- file_io: self.file_io,
- schema_manager: self.schema_manager,
- table_schema_id: self.table_schema_id,
- predicates: self.predicates,
- table_fields: self.table_fields,
- read_type,
- }
- }
-}
-
-/// Reads data from Parquet files
-#[derive(Clone)]
-pub struct ArrowReader {
- batch_size: Option<usize>,
- file_io: FileIO,
- schema_manager: SchemaManager,
- table_schema_id: i64,
- predicates: Vec<Predicate>,
- table_fields: Vec<DataField>,
- read_type: Vec<DataField>,
-}
-
-impl ArrowReader {
- /// Take a stream of DataSplits and read every data file in each split.
- /// Returns a stream of Arrow RecordBatches from all files.
- ///
- /// Uses SchemaManager to load the data file's schema (via
`DataFileMeta.schema_id`)
- /// and computes field-ID-based index mapping for schema evolution (added
columns,
- /// type promotion, column reordering).
- ///
- /// Matches
[RawFileSplitRead.createReader](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java).
- pub fn read(self, data_splits: &[DataSplit]) ->
crate::Result<ArrowRecordBatchStream> {
- let file_io = self.file_io.clone();
- let batch_size = self.batch_size;
- let splits: Vec<DataSplit> = data_splits.to_vec();
- let read_type = self.read_type;
- let predicates = self.predicates;
- let table_fields = self.table_fields;
- let schema_manager = self.schema_manager;
- let table_schema_id = self.table_schema_id;
- Ok(try_stream! {
- for split in splits {
- // Create DV factory for this split only.
- let dv_factory = if split
- .data_deletion_files()
- .is_some_and(|files| files.iter().any(Option::is_some))
- {
- Some(
- DeletionVectorFactory::new(
- &file_io,
- split.data_files(),
- split.data_deletion_files(),
- )
- .await?,
- )
- } else {
- None
- };
-
- for file_meta in split.data_files().to_vec() {
- let dv = dv_factory
- .as_ref()
- .and_then(|factory|
factory.get_deletion_vector(&file_meta.file_name))
- .cloned();
-
- // Load data file's schema if it differs from the table
schema.
- let data_fields: Option<Vec<DataField>> = if
file_meta.schema_id != table_schema_id {
- let data_schema =
schema_manager.schema(file_meta.schema_id).await?;
- Some(data_schema.fields().to_vec())
- } else {
- None
- };
-
- let mut stream = read_single_file_stream(
- file_io.clone(),
- SingleFileReadRequest {
- split: split.clone(),
- file_meta,
- read_type: read_type.clone(),
- table_fields: table_fields.clone(),
- data_fields,
- predicates: predicates.clone(),
- batch_size,
- dv,
- row_ranges: None,
- },
- )?;
- while let Some(batch) = stream.next().await {
- yield batch?;
- }
- }
- }
- }
- .boxed())
- }
-
- /// Read data files in data evolution mode, merging columns from files
that share the same row ID range.
- ///
- /// Each DataSplit contains files grouped by `first_row_id`. Files within
a split may contain
- /// different columns for the same logical rows. This method reads each
file and merges them
- /// column-wise, respecting `max_sequence_number` for conflict resolution.
- ///
- /// `table_fields` is the full table schema fields, used to determine
which columns each file
- /// provides when `write_cols` is not set.
- pub fn read_data_evolution(
- self,
- data_splits: &[DataSplit],
- table_fields: &[DataField],
- ) -> crate::Result<ArrowRecordBatchStream> {
- let file_io = self.file_io.clone();
- let batch_size = self.batch_size;
- let splits: Vec<DataSplit> = data_splits.to_vec();
- let read_type = self.read_type;
- let table_fields: Vec<DataField> = table_fields.to_vec();
- let schema_manager = self.schema_manager;
- let table_schema_id = self.table_schema_id;
-
- let row_id_index = read_type.iter().position(|f| f.name() ==
ROW_ID_FIELD_NAME);
- let file_read_type: Vec<DataField> = read_type
- .iter()
- .filter(|f| f.name() != ROW_ID_FIELD_NAME)
- .cloned()
- .collect();
- let output_schema = build_target_arrow_schema(&read_type)?;
-
- Ok(try_stream! {
- for split in splits {
- let row_ranges = split.row_ranges().map(|r| r.to_vec());
-
- if split.raw_convertible() || split.data_files().len() == 1 {
- for file_meta in split.data_files().to_vec() {
- let data_fields: Option<Vec<DataField>> = if
file_meta.schema_id != table_schema_id {
- let data_schema =
schema_manager.schema(file_meta.schema_id).await?;
- Some(data_schema.fields().to_vec())
- } else {
- None
- };
-
- let has_row_id = file_meta.first_row_id.is_some();
- let effective_row_ranges = if has_row_id {
row_ranges.clone() } else { None };
-
- let selected_row_ids = if row_id_index.is_some() &&
has_row_id {
- effective_row_ranges.as_ref().map(|ranges| {
- expand_selected_row_ids(
- file_meta.first_row_id.unwrap(),
- file_meta.row_count,
- ranges,
- )
- })
- } else {
- None
- };
- let file_base_row_id =
file_meta.first_row_id.unwrap_or(0);
- let mut row_id_cursor = file_base_row_id;
- let mut row_id_offset: usize = 0;
-
- let mut stream = read_single_file_stream(
- file_io.clone(),
- SingleFileReadRequest {
- split: split.clone(),
- file_meta,
- read_type: file_read_type.clone(),
- table_fields: table_fields.clone(),
- data_fields,
- predicates: Vec::new(),
- batch_size,
- dv: None,
- row_ranges: effective_row_ranges,
- },
- )?;
- while let Some(batch) = stream.next().await {
- let batch = batch?;
- let num_rows = batch.num_rows();
- if let Some(idx) = row_id_index {
- if !has_row_id {
- yield append_null_row_id_column(batch,
idx, &output_schema)?;
- } else if let Some(ref ids) = selected_row_ids
{
- yield attach_row_id(batch, idx, ids, &mut
row_id_offset, &output_schema)?;
- } else {
- let row_ids: Vec<i64> =
(row_id_cursor..row_id_cursor + num_rows as i64).collect();
- row_id_cursor += num_rows as i64;
- let array: Arc<dyn arrow_array::Array> =
Arc::new(Int64Array::from(row_ids));
- yield insert_column_at(batch, array, idx,
&output_schema)?;
- }
- } else {
- yield batch;
- }
- }
- }
- } else {
- let files = split.data_files();
- assert!(
- files.iter().all(|f| f.first_row_id.is_some()),
- "All files in a field merge split should have
first_row_id"
- );
- assert!(
- files.iter().all(|f| f.row_count ==
files[0].row_count),
- "All files in a field merge split should have the same
row count"
- );
- assert!(
- files.iter().all(|f| f.first_row_id ==
files[0].first_row_id),
- "All files in a field merge split should have the same
first row id"
- );
-
- let group_base_row_id = files
- .iter()
- .filter_map(|f| f.first_row_id)
- .min();
- let has_group_row_id = group_base_row_id.is_some();
- let group_row_count = files.iter().map(|f|
f.row_count).max().unwrap_or(0);
- let effective_row_ranges = if has_group_row_id {
row_ranges.clone() } else { None };
-
- let selected_row_ids = if row_id_index.is_some() &&
has_group_row_id {
- effective_row_ranges.as_ref().map(|ranges| {
- expand_selected_row_ids(
- group_base_row_id.unwrap(),
- group_row_count,
- ranges,
- )
- })
- } else {
- None
- };
- let mut row_id_cursor = group_base_row_id.unwrap_or(0);
- let mut row_id_offset: usize = 0;
-
- let mut merge_stream = merge_files_by_columns(
- &file_io,
- &split,
- &file_read_type,
- &table_fields,
- schema_manager.clone(),
- table_schema_id,
- batch_size,
- effective_row_ranges,
- )?;
- while let Some(batch) = merge_stream.next().await {
- let batch = batch?;
- let num_rows = batch.num_rows();
- if let Some(idx) = row_id_index {
- if !has_group_row_id {
- yield append_null_row_id_column(batch, idx,
&output_schema)?;
- } else if let Some(ref ids) = selected_row_ids {
- yield attach_row_id(batch, idx, ids, &mut
row_id_offset, &output_schema)?;
- } else {
- let row_ids: Vec<i64> =
(row_id_cursor..row_id_cursor + num_rows as i64).collect();
- row_id_cursor += num_rows as i64;
- let array: Arc<dyn arrow_array::Array> =
Arc::new(Int64Array::from(row_ids));
- yield insert_column_at(batch, array, idx,
&output_schema)?;
- }
- } else {
- yield batch;
- }
- }
- }
- }
- }
- .boxed())
- }
-}
-
-struct SingleFileReadRequest {
- split: DataSplit,
- file_meta: DataFileMeta,
- read_type: Vec<DataField>,
- table_fields: Vec<DataField>,
- data_fields: Option<Vec<DataField>>,
- predicates: Vec<Predicate>,
- batch_size: Option<usize>,
- dv: Option<Arc<DeletionVector>>,
- row_ranges: Option<Vec<RowRange>>,
-}
-
-/// Read a single parquet file from a split, returning a lazy stream of
batches.
-/// Optionally applies a deletion vector.
-///
-/// Handles schema evolution using field-ID-based index mapping:
-/// - `data_fields`: if `Some`, the fields from the data file's schema (loaded
via SchemaManager).
-/// Used to compute index mapping between `read_type` and data fields by
field ID.
-/// - Columns missing from the file are filled with null arrays.
-/// - Columns whose Arrow type differs from the target type are cast (type
promotion).
-///
-/// Reference:
[RawFileSplitRead.createFileReader](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java)
-fn read_single_file_stream(
- file_io: FileIO,
- request: SingleFileReadRequest,
-) -> crate::Result<ArrowRecordBatchStream> {
- let SingleFileReadRequest {
- split,
- file_meta,
- read_type,
- table_fields,
- data_fields,
- predicates,
- batch_size,
- dv,
- row_ranges,
- } = request;
-
- let target_schema = build_target_arrow_schema(&read_type)?;
- let file_fields = data_fields.clone().unwrap_or_else(||
table_fields.clone());
-
- // Compute index mapping and determine which columns to read from the file.
- // If data_fields is provided, use field-ID-based mapping; otherwise use
read_type names directly.
- let (projected_read_fields, index_mapping) = if let Some(ref df) =
data_fields {
- let mapping = create_index_mapping(&read_type, df);
- match mapping {
- Some(ref idx_map) => {
- // Only read data fields that are referenced by the index
mapping.
- // Dedup by data field index to avoid duplicate column
projections.
- let mut seen = std::collections::HashSet::new();
- let fields_to_read: Vec<DataField> = idx_map
- .iter()
- .filter(|&&idx| idx != NULL_FIELD_INDEX &&
seen.insert(idx))
- .map(|&idx| df[idx as usize].clone())
- .collect();
- (fields_to_read, Some(idx_map.clone()))
- }
- None => {
- // Identity mapping — read data fields in order.
- (df.clone(), None)
- }
- }
- } else {
- // No schema evolution — read by read_type names.
- (read_type.clone(), None)
- };
-
- // Remap predicates from table-level to file-level indices.
- let file_predicates = {
- let remapped = crate::arrow::filtering::remap_predicates_to_file(
- &predicates,
- &table_fields,
- &file_fields,
- );
- if remapped.is_empty() {
- None
- } else {
- Some(crate::arrow::format::FilePredicates {
- predicates: remapped,
- file_fields: file_fields.clone(),
- })
- }
- };
-
- Ok(try_stream! {
- let path_to_read = split.data_file_path(&file_meta);
- let format_reader = create_format_reader(&path_to_read)?;
- let input_file = file_io.new_input(&path_to_read)?;
- let file_reader = input_file.reader().await?;
- let local_ranges = row_ranges.as_ref().map(|ranges| {
- to_local_row_ranges(ranges, file_meta.first_row_id.unwrap_or(0),
file_meta.row_count)
- });
-
- let row_selection = merge_row_selection(
- file_meta.row_count,
- dv.as_deref(),
- local_ranges.as_deref(),
- );
-
- let mut batch_stream = format_reader.read_batch_stream(
- Box::new(file_reader),
- file_meta.file_size as u64,
- &projected_read_fields,
- file_predicates.as_ref(),
- batch_size,
- row_selection,
- ).await?;
-
- while let Some(batch) = batch_stream.next().await {
- let batch = batch?;
- let num_rows = batch.num_rows();
- let batch_schema = batch.schema();
-
- // Build output columns using index mapping (field-ID-based) or by
name.
- let mut columns: Vec<Arc<dyn arrow_array::Array>> =
Vec::with_capacity(target_schema.fields().len());
- for (i, target_field) in target_schema.fields().iter().enumerate()
{
- let source_col = if let Some(ref idx_map) = index_mapping {
- let data_idx = idx_map[i];
- if data_idx == NULL_FIELD_INDEX {
- None
- } else {
- // Find the column in the batch by the data field's
name.
- let data_field =
&data_fields.as_ref().unwrap()[data_idx as usize];
- batch_schema
- .index_of(data_field.name())
- .ok()
- .map(|col_idx| batch.column(col_idx))
- }
- } else if let Some(ref df) = data_fields {
- // Identity mapping with data_fields present (e.g. renamed
column).
- // Use data field name (old name in parquet) at the same
position.
- batch_schema
- .index_of(df[i].name())
- .ok()
- .map(|col_idx| batch.column(col_idx))
- } else {
- // No schema evolution — look up by target field name.
- batch_schema
- .index_of(target_field.name())
- .ok()
- .map(|col_idx| batch.column(col_idx))
- };
-
- match source_col {
- Some(col) => {
- if col.data_type() == target_field.data_type() {
- columns.push(col.clone());
- } else {
- // Type promotion: cast to target type.
- let casted = cast(col,
target_field.data_type()).map_err(|e| {
- Error::UnexpectedError {
- message: format!(
- "Failed to cast column '{}' from {:?}
to {:?}: {e}",
- target_field.name(),
- col.data_type(),
- target_field.data_type()
- ),
- source: Some(Box::new(e)),
- }
- })?;
- columns.push(casted);
- }
- }
- None => {
- // Column missing from file: fill with nulls.
- let null_array =
arrow_array::new_null_array(target_field.data_type(), num_rows);
- columns.push(null_array);
- }
- }
- }
-
- let result = if columns.is_empty() {
- RecordBatch::try_new_with_options(
- target_schema.clone(),
- columns,
-
&arrow_array::RecordBatchOptions::new().with_row_count(Some(num_rows)),
- )
- } else {
- RecordBatch::try_new(target_schema.clone(), columns)
- }
- .map_err(|e| {
- Error::UnexpectedError {
- message: format!("Failed to build schema-evolved
RecordBatch: {e}"),
- source: Some(Box::new(e)),
- }
- })?;
- yield result;
- }
- }
- .boxed())
-}
-
-/// Merge multiple files column-wise for data evolution, streaming with
bounded memory.
-///
-/// Uses field IDs (not column names) to resolve which file provides which
column,
-/// ensuring correctness across schema evolution (column rename, add, drop).
-///
-/// Opens all file readers simultaneously and maintains a cursor (current
batch + offset)
-/// per file. Each poll slices up to `batch_size` rows from each file's
current batch,
-/// assembles columns from the winning files, and yields the merged batch.
When a file's
-/// current batch is exhausted, the next batch is read from its stream on
demand.
-#[allow(clippy::too_many_arguments)]
-fn merge_files_by_columns(
- file_io: &FileIO,
- split: &DataSplit,
- read_type: &[DataField],
- table_fields: &[DataField],
- schema_manager: SchemaManager,
- table_schema_id: i64,
- batch_size: Option<usize>,
- row_ranges: Option<Vec<RowRange>>,
-) -> crate::Result<ArrowRecordBatchStream> {
- let data_files = split.data_files();
- if data_files.is_empty() {
- return Ok(futures::stream::empty().boxed());
- }
-
- // Build owned data for the stream closure.
- let file_io = file_io.clone();
- let split = split.clone();
- let data_files: Vec<DataFileMeta> = data_files.to_vec();
- let read_type = read_type.to_vec();
- let table_fields = table_fields.to_vec();
- let output_batch_size = batch_size.unwrap_or(1024);
- let target_schema = build_target_arrow_schema(&read_type)?;
-
- Ok(try_stream! {
- // Pre-load schemas and collect field IDs + data_fields per file.
- // file_idx -> (field_ids, Option<Vec<DataField>>)
- let mut file_info: HashMap<usize, (Vec<i32>, Option<Vec<DataField>>)>
= HashMap::new();
-
- for (file_idx, file_meta) in data_files.iter().enumerate() {
- let (field_ids, data_fields) = if file_meta.schema_id !=
table_schema_id {
- let file_schema =
schema_manager.schema(file_meta.schema_id).await?;
- let file_fields = file_schema.fields();
-
- let ids: Vec<i32> = if let Some(ref wc) = file_meta.write_cols
{
- // write_cols names are from the file's schema at write
time.
- wc.iter()
- .filter_map(|name| file_fields.iter().find(|f|
f.name() == name).map(|f| f.id()))
- .collect()
- } else {
- file_fields.iter().map(|f| f.id()).collect()
- };
-
- (ids, Some(file_fields.to_vec()))
- } else {
- let ids: Vec<i32> = if let Some(ref wc) = file_meta.write_cols
{
- // write_cols names are from the current table schema.
- wc.iter()
- .filter_map(|name| table_fields.iter().find(|f|
f.name() == name).map(|f| f.id()))
- .collect()
- } else {
- table_fields.iter().map(|f| f.id()).collect()
- };
-
- (ids, None)
- };
-
- file_info.insert(file_idx, (field_ids, data_fields));
- }
-
- // Determine which file provides each field ID, resolving conflicts by
max_sequence_number.
- // field_id -> (file_index, max_sequence_number)
- let mut field_id_source: HashMap<i32, (usize, i64)> = HashMap::new();
- for (file_idx, file_meta) in data_files.iter().enumerate() {
- let (ref field_ids, _) = file_info[&file_idx];
- for &fid in field_ids {
- let entry = field_id_source
- .entry(fid)
- .or_insert((file_idx, i64::MIN));
- if file_meta.max_sequence_number > entry.1 {
- *entry = (file_idx, file_meta.max_sequence_number);
- }
- }
- }
-
- // For each projected field, determine which file provides it (by
field ID).
- // file_index -> Vec<column_name> (target column names)
- let mut file_read_columns: HashMap<usize, Vec<String>> =
HashMap::new();
- for field in &read_type {
- if let Some(&(file_idx, _)) = field_id_source.get(&field.id()) {
- file_read_columns
- .entry(file_idx)
- .or_default()
- .push(field.name().to_string());
- }
- }
-
- // For each projected field, record (file_index, target_column_name)
for assembly.
- let column_plan: Vec<(Option<usize>, String)> = read_type
- .iter()
- .map(|field| {
- let file_idx = field_id_source.get(&field.id()).map(|&(idx,
_)| idx);
- (file_idx, field.name().to_string())
- })
- .collect();
-
- // Collect which file indices we need to open streams for.
- let active_file_indices: Vec<usize> =
file_read_columns.keys().copied().collect();
-
- // Edge case: if no file provides any projected column (e.g. SELECT on
a newly added
- // column that no file contains yet), we still need to emit
NULL-filled rows to
- // preserve the correct row count.
- if active_file_indices.is_empty() {
- let first_row_id = data_files[0].first_row_id.unwrap_or(0);
- let file_row_count = data_files[0].row_count;
- let total_rows = match &row_ranges {
- Some(ranges) => expand_selected_row_ids(first_row_id,
file_row_count, ranges).len(),
- None => file_row_count as usize,
- };
- let mut emitted = 0;
- while emitted < total_rows {
- let rows_to_emit = (total_rows -
emitted).min(output_batch_size);
- let columns: Vec<Arc<dyn arrow_array::Array>> = target_schema
- .fields()
- .iter()
- .map(|f| arrow_array::new_null_array(f.data_type(),
rows_to_emit))
- .collect();
- let batch = if columns.is_empty() {
- RecordBatch::try_new_with_options(
- target_schema.clone(),
- columns,
-
&arrow_array::RecordBatchOptions::new().with_row_count(Some(rows_to_emit)),
- )
- } else {
- RecordBatch::try_new(target_schema.clone(), columns)
- }
- .map_err(|e| Error::UnexpectedError {
- message: format!("Failed to build NULL-filled RecordBatch:
{e}"),
- source: Some(Box::new(e)),
- })?;
- emitted += rows_to_emit;
- yield batch;
- }
- } else {
-
- // Open a stream for each active file.
- // Build per-file read_type: only the DataFields this file is
responsible for.
- let mut file_streams: HashMap<usize, ArrowRecordBatchStream> =
HashMap::new();
- for &file_idx in &active_file_indices {
- let file_cols =
file_read_columns.get(&file_idx).cloned().unwrap_or_default();
- let file_read_type: Vec<DataField> = file_cols
- .iter()
- .filter_map(|col_name| read_type.iter().find(|f| f.name() ==
col_name).cloned())
- .collect();
-
- let (_, ref data_fields) = file_info[&file_idx];
-
- let stream = read_single_file_stream(
- file_io.clone(),
- SingleFileReadRequest {
- split: split.clone(),
- file_meta: data_files[file_idx].clone(),
- read_type: file_read_type,
- table_fields: table_fields.clone(),
- data_fields: data_fields.clone(),
- predicates: Vec::new(),
- batch_size,
- dv: None,
- row_ranges: row_ranges.clone(),
- },
- )?;
- file_streams.insert(file_idx, stream);
- }
-
- // Per-file cursor: current batch + offset within it.
- let mut file_cursors: HashMap<usize, (RecordBatch, usize)> =
HashMap::new();
-
- loop {
- // Ensure each active file has a current batch. If a file's cursor
is exhausted
- // or not yet initialized, read the next batch from its stream.
- for &file_idx in &active_file_indices {
- let needs_next = match file_cursors.get(&file_idx) {
- None => true,
- Some((batch, offset)) => *offset >= batch.num_rows(),
- };
- if needs_next {
- file_cursors.remove(&file_idx);
- if let Some(stream) = file_streams.get_mut(&file_idx) {
- if let Some(batch_result) = stream.next().await {
- let batch = batch_result?;
- if batch.num_rows() > 0 {
- file_cursors.insert(file_idx, (batch, 0));
- }
- }
- }
- }
- }
-
- // All active files must have a cursor to assemble a valid row.
- // If any file has no cursor (stream exhausted), we're done.
- if active_file_indices.iter().any(|idx|
!file_cursors.contains_key(idx)) {
- break;
- }
-
- // Determine how many rows we can emit: min of remaining rows
across all files.
- let remaining: usize = active_file_indices
- .iter()
- .map(|idx| {
- let (batch, offset) = file_cursors.get(idx).unwrap();
- batch.num_rows() - offset
- })
- .min()
- .unwrap_or(0);
-
- if remaining == 0 {
- break;
- }
-
- let rows_to_emit = remaining.min(output_batch_size);
-
- // Slice each file's current batch and assemble columns.
- // Use the target schema so that missing columns are null-filled.
- let mut columns: Vec<Arc<dyn arrow_array::Array>> =
- Vec::with_capacity(column_plan.len());
-
- for (i, (file_idx_opt, col_name)) in
column_plan.iter().enumerate() {
- let target_field = &target_schema.fields()[i];
- let col = file_idx_opt
- .and_then(|file_idx| file_cursors.get(&file_idx))
- .and_then(|(batch, offset)| {
- batch
- .schema()
- .index_of(col_name)
- .ok()
- .map(|col_idx|
batch.column(col_idx).slice(*offset, rows_to_emit))
- });
-
- columns.push(col.unwrap_or_else(|| {
- arrow_array::new_null_array(target_field.data_type(),
rows_to_emit)
- }));
- }
-
- // Advance all cursors.
- for &file_idx in &active_file_indices {
- if let Some((_, ref mut offset)) =
file_cursors.get_mut(&file_idx) {
- *offset += rows_to_emit;
- }
- }
-
- let merged = RecordBatch::try_new(target_schema.clone(),
columns).map_err(|e| Error::UnexpectedError {
- message: format!("Failed to build merged RecordBatch: {e}"),
- source: Some(Box::new(e)),
- })?;
- yield merged;
- }
- } // end else (active_file_indices non-empty)
- }
- .boxed())
-}
-
-/// Convert absolute RowRanges to file-local 0-based ranges.
-fn to_local_row_ranges(
- row_ranges: &[RowRange],
- first_row_id: i64,
- row_count: i64,
-) -> Vec<RowRange> {
- let file_end = first_row_id + row_count - 1;
- row_ranges
- .iter()
- .filter_map(|r| {
- if r.to() < first_row_id || r.from() > file_end {
- return None;
- }
- let local_from = (r.from() - first_row_id).max(0);
- let local_to = (r.to() - first_row_id).min(row_count - 1);
- Some(RowRange::new(local_from, local_to))
- })
- .collect()
-}
-
-/// Merge DV and row_ranges into a unified list of 0-based inclusive RowRanges.
-/// Returns `None` if no filtering is needed (no DV and no ranges).
-///
-/// Complexity: O(D + R) where D = number of deleted rows, R = number of
ranges.
-fn merge_row_selection(
- row_count: i64,
- dv: Option<&DeletionVector>,
- row_ranges: Option<&[RowRange]>,
-) -> Option<Vec<RowRange>> {
- let has_dv = dv.is_some_and(|d| !d.is_empty());
- let has_ranges = row_ranges.is_some();
- if !has_dv && !has_ranges {
- return None;
- }
-
- // Fast path: no DV, just return row_ranges as-is.
- if !has_dv {
- return row_ranges.map(|r| r.to_vec());
- }
-
- // Build non-deleted ranges from DV (sorted iterator).
- let dv_ranges = dv_to_non_deleted_ranges(dv.unwrap(), row_count);
-
- match row_ranges {
- Some(ranges) => Some(intersect_sorted_ranges(&dv_ranges, ranges)),
- None => Some(dv_ranges),
- }
-}
-
-/// Convert a DeletionVector into sorted non-deleted inclusive RowRanges.
-/// The DV iterator yields sorted deleted positions.
-fn dv_to_non_deleted_ranges(dv: &DeletionVector, row_count: i64) ->
Vec<RowRange> {
- let mut result = Vec::new();
- let mut cursor: i64 = 0;
- for deleted in dv.iter() {
- let del = deleted as i64;
- if del >= row_count {
- break;
- }
- if del > cursor {
- result.push(RowRange::new(cursor, del - 1));
- }
- cursor = del + 1;
- }
- if cursor < row_count {
- result.push(RowRange::new(cursor, row_count - 1));
- }
- result
-}
-
-/// Intersect two sorted lists of inclusive RowRanges using a merge-style scan.
-fn intersect_sorted_ranges(a: &[RowRange], b: &[RowRange]) -> Vec<RowRange> {
- let mut result = Vec::new();
- let (mut i, mut j) = (0, 0);
- while i < a.len() && j < b.len() {
- let from = a[i].from().max(b[j].from());
- let to = a[i].to().min(b[j].to());
- if from <= to {
- result.push(RowRange::new(from, to));
- }
- // Advance the range that ends first.
- if a[i].to() < b[j].to() {
- i += 1;
- } else {
- j += 1;
- }
- }
- result
-}
-
-/// Expand row_ranges into a flat sequence of selected row IDs for a file.
-fn expand_selected_row_ids(first_row_id: i64, row_count: i64, row_ranges:
&[RowRange]) -> Vec<i64> {
- if row_count == 0 {
- return Vec::new();
- }
- let file_end = first_row_id + row_count - 1;
- let mut ids = Vec::new();
- for r in row_ranges {
- let from = r.from().max(first_row_id);
- let to = r.to().min(file_end);
- for id in from..=to {
- ids.push(id);
- }
- }
- ids
-}
-
-fn attach_row_id(
- batch: RecordBatch,
- row_id_index: usize,
- selected_row_ids: &[i64],
- row_id_offset: &mut usize,
- output_schema: &Arc<arrow_schema::Schema>,
-) -> crate::Result<RecordBatch> {
- let num_rows = batch.num_rows();
- let batch_ids = &selected_row_ids[*row_id_offset..*row_id_offset +
num_rows];
- *row_id_offset += num_rows;
- let array: Arc<dyn arrow_array::Array> =
Arc::new(Int64Array::from(batch_ids.to_vec()));
- insert_column_at(batch, array, row_id_index, output_schema)
-}
-
-fn insert_column_at(
- batch: RecordBatch,
- column: Arc<dyn arrow_array::Array>,
- insert_index: usize,
- output_schema: &Arc<arrow_schema::Schema>,
-) -> crate::Result<RecordBatch> {
- let mut columns: Vec<Arc<dyn arrow_array::Array>> =
Vec::with_capacity(batch.num_columns() + 1);
- for (i, col) in batch.columns().iter().enumerate() {
- if i == insert_index {
- columns.push(column.clone());
- }
- columns.push(col.clone());
- }
- if insert_index >= batch.num_columns() {
- columns.push(column);
- }
- RecordBatch::try_new(output_schema.clone(), columns).map_err(|e|
Error::UnexpectedError {
- message: format!("Failed to insert column into RecordBatch: {e}"),
- source: Some(Box::new(e)),
- })
-}
-
-/// Append a null `_ROW_ID` column for files without `first_row_id`.
-fn append_null_row_id_column(
- batch: RecordBatch,
- insert_index: usize,
- output_schema: &Arc<arrow_schema::Schema>,
-) -> crate::Result<RecordBatch> {
- let array: Arc<dyn arrow_array::Array> =
Arc::new(Int64Array::new_null(batch.num_rows()));
- insert_column_at(batch, array, insert_index, output_schema)
-}
diff --git a/crates/paimon/src/table/data_evolution_reader.rs
b/crates/paimon/src/table/data_evolution_reader.rs
new file mode 100644
index 0000000..ec06e14
--- /dev/null
+++ b/crates/paimon/src/table/data_evolution_reader.rs
@@ -0,0 +1,455 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::data_file_reader::{
+ append_null_row_id_column, attach_row_id, expand_selected_row_ids,
insert_column_at,
+ DataFileReader,
+};
+use crate::arrow::build_target_arrow_schema;
+use crate::io::FileIO;
+use crate::spec::{DataField, DataFileMeta, ROW_ID_FIELD_NAME};
+use crate::table::schema_manager::SchemaManager;
+use crate::table::ArrowRecordBatchStream;
+use crate::table::RowRange;
+use crate::{DataSplit, Error};
+use arrow_array::{Array, Int64Array, RecordBatch};
+
+use async_stream::try_stream;
+use futures::StreamExt;
+use std::collections::HashMap;
+use std::sync::Arc;
+
+/// Reads data files in data evolution mode, merging columns from files
+/// that share the same row ID range.
+pub(crate) struct DataEvolutionReader {
+ file_io: FileIO,
+ schema_manager: SchemaManager,
+ table_schema_id: i64,
+ table_fields: Vec<DataField>,
+ /// read_type with _ROW_ID filtered out — used for file reads.
+ file_read_type: Vec<DataField>,
+ /// Position of _ROW_ID in the original read_type, if requested.
+ row_id_index: Option<usize>,
+ /// Arrow schema for the full output (including _ROW_ID if requested).
+ output_schema: Arc<arrow_schema::Schema>,
+}
+
+impl DataEvolutionReader {
+ pub(crate) fn new(
+ file_io: FileIO,
+ schema_manager: SchemaManager,
+ table_schema_id: i64,
+ table_fields: Vec<DataField>,
+ read_type: Vec<DataField>,
+ ) -> crate::Result<Self> {
+ let row_id_index = read_type.iter().position(|f| f.name() ==
ROW_ID_FIELD_NAME);
+ let file_read_type: Vec<DataField> = read_type
+ .iter()
+ .filter(|f| f.name() != ROW_ID_FIELD_NAME)
+ .cloned()
+ .collect();
+ let output_schema = build_target_arrow_schema(&read_type)?;
+
+ Ok(Self {
+ file_io,
+ schema_manager,
+ table_schema_id,
+ table_fields,
+ file_read_type,
+ row_id_index,
+ output_schema,
+ })
+ }
+
+ /// Read data files in data evolution mode.
+ ///
+ /// Each DataSplit contains files grouped by `first_row_id`. Files within
a split may contain
+ /// different columns for the same logical rows. This method reads each
file and merges them
+ /// column-wise, respecting `max_sequence_number` for conflict resolution.
+ pub fn read(self, data_splits: &[DataSplit]) ->
crate::Result<ArrowRecordBatchStream> {
+ let splits: Vec<DataSplit> = data_splits.to_vec();
+
+ Ok(try_stream! {
+ let file_reader = DataFileReader::new(
+ self.file_io.clone(),
+ self.schema_manager.clone(),
+ self.table_schema_id,
+ self.table_fields.clone(),
+ self.file_read_type.clone(),
+ Vec::new(),
+ );
+
+ for split in splits {
+ let row_ranges = split.row_ranges().map(|r| r.to_vec());
+
+ if split.raw_convertible() || split.data_files().len() == 1 {
+ for file_meta in split.data_files().to_vec() {
+ let data_fields: Option<Vec<DataField>> = if
file_meta.schema_id != self.table_schema_id {
+ let data_schema =
self.schema_manager.schema(file_meta.schema_id).await?;
+ Some(data_schema.fields().to_vec())
+ } else {
+ None
+ };
+
+ let has_row_id = file_meta.first_row_id.is_some();
+ let effective_row_ranges = if has_row_id {
row_ranges.clone() } else { None };
+
+ let selected_row_ids = if self.row_id_index.is_some()
&& has_row_id {
+ effective_row_ranges.as_ref().map(|ranges| {
+ expand_selected_row_ids(
+ file_meta.first_row_id.unwrap(),
+ file_meta.row_count,
+ ranges,
+ )
+ })
+ } else {
+ None
+ };
+ let file_base_row_id =
file_meta.first_row_id.unwrap_or(0);
+ let mut row_id_cursor = file_base_row_id;
+ let mut row_id_offset: usize = 0;
+
+ let mut stream = file_reader.read_single_file_stream(
+ &split,
+ file_meta,
+ data_fields,
+ None,
+ effective_row_ranges,
+ )?;
+ while let Some(batch) = stream.next().await {
+ let batch = batch?;
+ let num_rows = batch.num_rows();
+ if let Some(idx) = self.row_id_index {
+ if !has_row_id {
+ yield append_null_row_id_column(batch,
idx, &self.output_schema)?;
+ } else if let Some(ref ids) = selected_row_ids
{
+ yield attach_row_id(batch, idx, ids, &mut
row_id_offset, &self.output_schema)?;
+ } else {
+ let row_ids: Vec<i64> =
(row_id_cursor..row_id_cursor + num_rows as i64).collect();
+ row_id_cursor += num_rows as i64;
+ let array: Arc<dyn arrow_array::Array> =
Arc::new(Int64Array::from(row_ids));
+ yield insert_column_at(batch, array, idx,
&self.output_schema)?;
+ }
+ } else {
+ yield batch;
+ }
+ }
+ }
+ } else {
+ let files = split.data_files();
+ if !files.iter().all(|f| f.first_row_id.is_some()) {
+ Err(Error::UnexpectedError {
+ message: "All files in a field merge split should
have first_row_id".to_string(),
+ source: None,
+ })?;
+ }
+ if !files.iter().all(|f| f.row_count ==
files[0].row_count) {
+ Err(Error::UnexpectedError {
+ message: "All files in a field merge split should
have the same row count".to_string(),
+ source: None,
+ })?;
+ }
+ if !files.iter().all(|f| f.first_row_id ==
files[0].first_row_id) {
+ Err(Error::UnexpectedError {
+ message: "All files in a field merge split should
have the same first row id".to_string(),
+ source: None,
+ })?;
+ }
+
+ let group_base_row_id = files[0].first_row_id;
+ let has_group_row_id = group_base_row_id.is_some();
+ let group_row_count = files[0].row_count;
+ let effective_row_ranges = if has_group_row_id {
row_ranges.clone() } else { None };
+
+ let selected_row_ids = if self.row_id_index.is_some() &&
has_group_row_id {
+ effective_row_ranges.as_ref().map(|ranges| {
+ expand_selected_row_ids(
+ group_base_row_id.unwrap(),
+ group_row_count,
+ ranges,
+ )
+ })
+ } else {
+ None
+ };
+ let mut row_id_cursor = group_base_row_id.unwrap_or(0);
+ let mut row_id_offset: usize = 0;
+
+ let mut merge_stream = self.merge_files_by_columns(
+ &split,
+ effective_row_ranges,
+ )?;
+ while let Some(batch) = merge_stream.next().await {
+ let batch = batch?;
+ let num_rows = batch.num_rows();
+ if let Some(idx) = self.row_id_index {
+ if !has_group_row_id {
+ yield append_null_row_id_column(batch, idx,
&self.output_schema)?;
+ } else if let Some(ref ids) = selected_row_ids {
+ yield attach_row_id(batch, idx, ids, &mut
row_id_offset, &self.output_schema)?;
+ } else {
+ let row_ids: Vec<i64> =
(row_id_cursor..row_id_cursor + num_rows as i64).collect();
+ row_id_cursor += num_rows as i64;
+ let array: Arc<dyn arrow_array::Array> =
Arc::new(Int64Array::from(row_ids));
+ yield insert_column_at(batch, array, idx,
&self.output_schema)?;
+ }
+ } else {
+ yield batch;
+ }
+ }
+ }
+ }
+ }
+ .boxed())
+ }
+
+ /// Merge multiple files column-wise for data evolution, streaming with
bounded memory.
+ ///
+ /// Uses field IDs (not column names) to resolve which file provides which
column,
+ /// ensuring correctness across schema evolution (column rename, add,
drop).
+ fn merge_files_by_columns(
+ &self,
+ split: &DataSplit,
+ row_ranges: Option<Vec<RowRange>>,
+ ) -> crate::Result<ArrowRecordBatchStream> {
+ let data_files = split.data_files();
+ if data_files.is_empty() {
+ return Ok(futures::stream::empty().boxed());
+ }
+
+ let file_io = self.file_io.clone();
+ let schema_manager = self.schema_manager.clone();
+ let table_schema_id = self.table_schema_id;
+ let split = split.clone();
+ let data_files: Vec<DataFileMeta> = data_files.to_vec();
+ let read_type = self.file_read_type.clone();
+ let table_fields = self.table_fields.clone();
+ // Batch size for column-merge output. Matches the default Parquet
reader batch size.
+ const MERGE_BATCH_SIZE: usize = 1024;
+ let output_batch_size: usize = MERGE_BATCH_SIZE;
+ let target_schema = build_target_arrow_schema(&read_type)?;
+
+ Ok(try_stream! {
+ // Pre-load schemas and collect field IDs + data_fields per file.
+ let mut file_info: HashMap<usize, (Vec<i32>,
Option<Vec<DataField>>)> = HashMap::new();
+
+ for (file_idx, file_meta) in data_files.iter().enumerate() {
+ let (field_ids, data_fields) = if file_meta.schema_id !=
table_schema_id {
+ let file_schema =
schema_manager.schema(file_meta.schema_id).await?;
+ let file_fields = file_schema.fields();
+
+ let ids: Vec<i32> = if let Some(ref wc) =
file_meta.write_cols {
+ wc.iter()
+ .filter_map(|name| file_fields.iter().find(|f|
f.name() == name).map(|f| f.id()))
+ .collect()
+ } else {
+ file_fields.iter().map(|f| f.id()).collect()
+ };
+
+ (ids, Some(file_fields.to_vec()))
+ } else {
+ let ids: Vec<i32> = if let Some(ref wc) =
file_meta.write_cols {
+ wc.iter()
+ .filter_map(|name| table_fields.iter().find(|f|
f.name() == name).map(|f| f.id()))
+ .collect()
+ } else {
+ table_fields.iter().map(|f| f.id()).collect()
+ };
+
+ (ids, None)
+ };
+
+ file_info.insert(file_idx, (field_ids, data_fields));
+ }
+
+ // Determine which file provides each field ID, resolving
conflicts by max_sequence_number.
+ let mut field_id_source: HashMap<i32, (usize, i64)> =
HashMap::new();
+ for (file_idx, file_meta) in data_files.iter().enumerate() {
+ let (ref field_ids, _) = file_info[&file_idx];
+ for &fid in field_ids {
+ let entry = field_id_source
+ .entry(fid)
+ .or_insert((file_idx, i64::MIN));
+ if file_meta.max_sequence_number > entry.1 {
+ *entry = (file_idx, file_meta.max_sequence_number);
+ }
+ }
+ }
+
+ // For each projected field, determine which file provides it (by
field ID).
+ let mut file_read_columns: HashMap<usize, Vec<String>> =
HashMap::new();
+ for field in &read_type {
+ if let Some(&(file_idx, _)) = field_id_source.get(&field.id())
{
+ file_read_columns
+ .entry(file_idx)
+ .or_default()
+ .push(field.name().to_string());
+ }
+ }
+
+ let column_plan: Vec<(Option<usize>, String)> = read_type
+ .iter()
+ .map(|field| {
+ let file_idx =
field_id_source.get(&field.id()).map(|&(idx, _)| idx);
+ (file_idx, field.name().to_string())
+ })
+ .collect();
+
+ let active_file_indices: Vec<usize> =
file_read_columns.keys().copied().collect();
+
+ // Edge case: no file provides any projected column.
+ if active_file_indices.is_empty() {
+ let first_row_id = data_files[0].first_row_id.unwrap_or(0);
+ let file_row_count = data_files[0].row_count;
+ let total_rows = match &row_ranges {
+ Some(ranges) => expand_selected_row_ids(first_row_id,
file_row_count, ranges).len(),
+ None => file_row_count as usize,
+ };
+ let mut emitted = 0;
+ while emitted < total_rows {
+ let rows_to_emit = (total_rows -
emitted).min(output_batch_size);
+ let columns: Vec<Arc<dyn arrow_array::Array>> =
target_schema
+ .fields()
+ .iter()
+ .map(|f| arrow_array::new_null_array(f.data_type(),
rows_to_emit))
+ .collect();
+ let batch = if columns.is_empty() {
+ RecordBatch::try_new_with_options(
+ target_schema.clone(),
+ columns,
+
&arrow_array::RecordBatchOptions::new().with_row_count(Some(rows_to_emit)),
+ )
+ } else {
+ RecordBatch::try_new(target_schema.clone(), columns)
+ }
+ .map_err(|e| Error::UnexpectedError {
+ message: format!("Failed to build NULL-filled
RecordBatch: {e}"),
+ source: Some(Box::new(e)),
+ })?;
+ emitted += rows_to_emit;
+ yield batch;
+ }
+ } else {
+ // Open a stream for each active file via DataFileReader.
+ let mut file_streams: HashMap<usize, ArrowRecordBatchStream> =
HashMap::new();
+ for &file_idx in &active_file_indices {
+ let file_cols =
file_read_columns.get(&file_idx).cloned().unwrap_or_default();
+ let file_read_type: Vec<DataField> = file_cols
+ .iter()
+ .filter_map(|col_name| read_type.iter().find(|f|
f.name() == col_name).cloned())
+ .collect();
+
+ let (_, ref data_fields) = file_info[&file_idx];
+
+ let file_reader = DataFileReader::new(
+ file_io.clone(),
+ schema_manager.clone(),
+ table_schema_id,
+ table_fields.clone(),
+ file_read_type,
+ Vec::new(),
+ );
+ let stream = file_reader.read_single_file_stream(
+ &split,
+ data_files[file_idx].clone(),
+ data_fields.clone(),
+ None,
+ row_ranges.clone(),
+ )?;
+ file_streams.insert(file_idx, stream);
+ }
+
+ // Per-file cursor: current batch + offset within it.
+ let mut file_cursors: HashMap<usize, (RecordBatch, usize)> =
HashMap::new();
+
+ loop {
+ for &file_idx in &active_file_indices {
+ let needs_next = match file_cursors.get(&file_idx) {
+ None => true,
+ Some((batch, offset)) => *offset >=
batch.num_rows(),
+ };
+ if needs_next {
+ file_cursors.remove(&file_idx);
+ if let Some(stream) =
file_streams.get_mut(&file_idx) {
+ if let Some(batch_result) =
stream.next().await {
+ let batch = batch_result?;
+ if batch.num_rows() > 0 {
+ file_cursors.insert(file_idx, (batch,
0));
+ }
+ }
+ }
+ }
+ }
+
+ // All files in a merge group have the same row count
(validated above),
+ // so any file stream exhausting means all streams are
done.
+ if active_file_indices.iter().any(|idx|
!file_cursors.contains_key(idx)) {
+ break;
+ }
+
+ let remaining: usize = active_file_indices
+ .iter()
+ .map(|idx| {
+ let (batch, offset) =
file_cursors.get(idx).unwrap();
+ batch.num_rows() - offset
+ })
+ .min()
+ .unwrap_or(0);
+
+ if remaining == 0 {
+ break;
+ }
+
+ let rows_to_emit = remaining.min(output_batch_size);
+
+ let mut columns: Vec<Arc<dyn arrow_array::Array>> =
+ Vec::with_capacity(column_plan.len());
+
+ for (i, (file_idx_opt, col_name)) in
column_plan.iter().enumerate() {
+ let target_field = &target_schema.fields()[i];
+ let col = file_idx_opt
+ .and_then(|file_idx| file_cursors.get(&file_idx))
+ .and_then(|(batch, offset)| {
+ batch
+ .schema()
+ .index_of(col_name)
+ .ok()
+ .map(|col_idx|
batch.column(col_idx).slice(*offset, rows_to_emit))
+ });
+
+ columns.push(col.unwrap_or_else(|| {
+
arrow_array::new_null_array(target_field.data_type(), rows_to_emit)
+ }));
+ }
+
+ for &file_idx in &active_file_indices {
+ if let Some((_, ref mut offset)) =
file_cursors.get_mut(&file_idx) {
+ *offset += rows_to_emit;
+ }
+ }
+
+ let merged = RecordBatch::try_new(target_schema.clone(),
columns).map_err(|e| Error::UnexpectedError {
+ message: format!("Failed to build merged RecordBatch:
{e}"),
+ source: Some(Box::new(e)),
+ })?;
+ yield merged;
+ }
+ }
+ }
+ .boxed())
+ }
+}
diff --git a/crates/paimon/src/table/data_file_reader.rs
b/crates/paimon/src/table/data_file_reader.rs
new file mode 100644
index 0000000..b5f1680
--- /dev/null
+++ b/crates/paimon/src/table/data_file_reader.rs
@@ -0,0 +1,457 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::build_target_arrow_schema;
+use crate::arrow::format::create_format_reader;
+use crate::arrow::schema_evolution::{create_index_mapping, NULL_FIELD_INDEX};
+use crate::deletion_vector::{DeletionVector, DeletionVectorFactory};
+use crate::io::FileIO;
+use crate::spec::{DataField, DataFileMeta, Predicate};
+use crate::table::schema_manager::SchemaManager;
+use crate::table::ArrowRecordBatchStream;
+use crate::table::RowRange;
+use crate::{DataSplit, Error};
+use arrow_array::{Array, Int64Array, RecordBatch};
+use arrow_cast::cast;
+
+use async_stream::try_stream;
+use futures::StreamExt;
+use std::sync::Arc;
+
+/// Reads data from Parquet files.
+#[derive(Clone)]
+pub(crate) struct DataFileReader {
+ file_io: FileIO,
+ schema_manager: SchemaManager,
+ table_schema_id: i64,
+ table_fields: Vec<DataField>,
+ read_type: Vec<DataField>,
+ predicates: Vec<Predicate>,
+}
+
+impl DataFileReader {
+ pub(crate) fn new(
+ file_io: FileIO,
+ schema_manager: SchemaManager,
+ table_schema_id: i64,
+ table_fields: Vec<DataField>,
+ read_type: Vec<DataField>,
+ predicates: Vec<Predicate>,
+ ) -> Self {
+ Self {
+ file_io,
+ schema_manager,
+ table_schema_id,
+ table_fields,
+ read_type,
+ predicates,
+ }
+ }
+
+ /// Take a stream of DataSplits and read every data file in each split.
+ /// Returns a stream of Arrow RecordBatches from all files.
+ ///
+ /// Uses SchemaManager to load the data file's schema (via
`DataFileMeta.schema_id`)
+ /// and computes field-ID-based index mapping for schema evolution (added
columns,
+ /// type promotion, column reordering).
+ ///
+ /// Matches
[RawFileSplitRead.createReader](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java).
+ pub fn read(self, data_splits: &[DataSplit]) ->
crate::Result<ArrowRecordBatchStream> {
+ let splits: Vec<DataSplit> = data_splits.to_vec();
+ let reader = self;
+ Ok(try_stream! {
+ for split in splits {
+ // Create DV factory for this split only.
+ let dv_factory = if split
+ .data_deletion_files()
+ .is_some_and(|files| files.iter().any(Option::is_some))
+ {
+ Some(
+ DeletionVectorFactory::new(
+ &reader.file_io,
+ split.data_files(),
+ split.data_deletion_files(),
+ )
+ .await?,
+ )
+ } else {
+ None
+ };
+
+ for file_meta in split.data_files().to_vec() {
+ let dv = dv_factory
+ .as_ref()
+ .and_then(|factory|
factory.get_deletion_vector(&file_meta.file_name))
+ .cloned();
+
+ // Load data file's schema if it differs from the table
schema.
+ let data_fields: Option<Vec<DataField>> = if
file_meta.schema_id != reader.table_schema_id {
+ let data_schema =
reader.schema_manager.schema(file_meta.schema_id).await?;
+ Some(data_schema.fields().to_vec())
+ } else {
+ None
+ };
+
+ let mut stream = reader.read_single_file_stream(
+ &split,
+ file_meta,
+ data_fields,
+ dv,
+ None,
+ )?;
+ while let Some(batch) = stream.next().await {
+ yield batch?;
+ }
+ }
+ }
+ }
+ .boxed())
+ }
+
+ /// Read a single parquet file from a split, returning a lazy stream of
batches.
+ /// Optionally applies a deletion vector.
+ ///
+ /// Handles schema evolution using field-ID-based index mapping:
+ /// - `data_fields`: if `Some`, the fields from the data file's schema
(loaded via SchemaManager).
+ /// Used to compute index mapping between `read_type` and data fields by
field ID.
+ /// - Columns missing from the file are filled with null arrays.
+ /// - Columns whose Arrow type differs from the target type are cast (type
promotion).
+ ///
+ /// Reference:
[RawFileSplitRead.createFileReader](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java)
+ pub(super) fn read_single_file_stream(
+ &self,
+ split: &DataSplit,
+ file_meta: DataFileMeta,
+ data_fields: Option<Vec<DataField>>,
+ dv: Option<Arc<DeletionVector>>,
+ row_ranges: Option<Vec<RowRange>>,
+ ) -> crate::Result<ArrowRecordBatchStream> {
+ let read_type = self.read_type.clone();
+ let table_fields = self.table_fields.clone();
+ let predicates = self.predicates.clone();
+ let file_io = self.file_io.clone();
+ let split = split.clone();
+
+ let target_schema = build_target_arrow_schema(&read_type)?;
+ let file_fields = data_fields.clone().unwrap_or_else(||
table_fields.clone());
+
+ // Compute index mapping and determine which columns to read from the
file.
+ let (projected_read_fields, index_mapping) = if let Some(ref df) =
data_fields {
+ let mapping = create_index_mapping(&read_type, df);
+ match mapping {
+ Some(ref idx_map) => {
+ let mut seen = std::collections::HashSet::new();
+ let fields_to_read: Vec<DataField> = idx_map
+ .iter()
+ .filter(|&&idx| idx != NULL_FIELD_INDEX &&
seen.insert(idx))
+ .map(|&idx| df[idx as usize].clone())
+ .collect();
+ (fields_to_read, Some(idx_map.clone()))
+ }
+ None => (df.clone(), None),
+ }
+ } else {
+ (read_type.clone(), None)
+ };
+
+ // Remap predicates from table-level to file-level indices.
+ let file_predicates = {
+ let remapped = crate::arrow::filtering::remap_predicates_to_file(
+ &predicates,
+ &table_fields,
+ &file_fields,
+ );
+ if remapped.is_empty() {
+ None
+ } else {
+ Some(crate::arrow::format::FilePredicates {
+ predicates: remapped,
+ file_fields: file_fields.clone(),
+ })
+ }
+ };
+
+ Ok(try_stream! {
+ let path_to_read = split.data_file_path(&file_meta);
+ let format_reader = create_format_reader(&path_to_read)?;
+ let input_file = file_io.new_input(&path_to_read)?;
+ let file_reader = input_file.reader().await?;
+ let local_ranges = row_ranges.as_ref().map(|ranges| {
+ to_local_row_ranges(ranges,
file_meta.first_row_id.unwrap_or(0), file_meta.row_count)
+ });
+
+ let row_selection = merge_row_selection(
+ file_meta.row_count,
+ dv.as_deref(),
+ local_ranges.as_deref(),
+ );
+
+ let mut batch_stream = format_reader.read_batch_stream(
+ Box::new(file_reader),
+ file_meta.file_size as u64,
+ &projected_read_fields,
+ file_predicates.as_ref(),
+ None,
+ row_selection,
+ ).await?;
+
+ while let Some(batch) = batch_stream.next().await {
+ let batch = batch?;
+ let num_rows = batch.num_rows();
+ let batch_schema = batch.schema();
+
+ // Build output columns using index mapping (field-ID-based)
or by name.
+ let mut columns: Vec<Arc<dyn arrow_array::Array>> =
Vec::with_capacity(target_schema.fields().len());
+ for (i, target_field) in
target_schema.fields().iter().enumerate() {
+ let source_col = if let Some(ref idx_map) = index_mapping {
+ let data_idx = idx_map[i];
+ if data_idx == NULL_FIELD_INDEX {
+ None
+ } else {
+ let data_field =
&data_fields.as_ref().unwrap()[data_idx as usize];
+ batch_schema
+ .index_of(data_field.name())
+ .ok()
+ .map(|col_idx| batch.column(col_idx))
+ }
+ } else if let Some(ref df) = data_fields {
+ batch_schema
+ .index_of(df[i].name())
+ .ok()
+ .map(|col_idx| batch.column(col_idx))
+ } else {
+ batch_schema
+ .index_of(target_field.name())
+ .ok()
+ .map(|col_idx| batch.column(col_idx))
+ };
+
+ match source_col {
+ Some(col) => {
+ if col.data_type() == target_field.data_type() {
+ columns.push(col.clone());
+ } else {
+ let casted = cast(col,
target_field.data_type()).map_err(|e| {
+ Error::UnexpectedError {
+ message: format!(
+ "Failed to cast column '{}' from
{:?} to {:?}: {e}",
+ target_field.name(),
+ col.data_type(),
+ target_field.data_type()
+ ),
+ source: Some(Box::new(e)),
+ }
+ })?;
+ columns.push(casted);
+ }
+ }
+ None => {
+ let null_array =
arrow_array::new_null_array(target_field.data_type(), num_rows);
+ columns.push(null_array);
+ }
+ }
+ }
+
+ let result = if columns.is_empty() {
+ RecordBatch::try_new_with_options(
+ target_schema.clone(),
+ columns,
+
&arrow_array::RecordBatchOptions::new().with_row_count(Some(num_rows)),
+ )
+ } else {
+ RecordBatch::try_new(target_schema.clone(), columns)
+ }
+ .map_err(|e| {
+ Error::UnexpectedError {
+ message: format!("Failed to build schema-evolved
RecordBatch: {e}"),
+ source: Some(Box::new(e)),
+ }
+ })?;
+ yield result;
+ }
+ }
+ .boxed())
+ }
+}
+
+/// Convert absolute RowRanges to file-local 0-based ranges.
+fn to_local_row_ranges(
+ row_ranges: &[RowRange],
+ first_row_id: i64,
+ row_count: i64,
+) -> Vec<RowRange> {
+ let file_end = first_row_id + row_count - 1;
+ row_ranges
+ .iter()
+ .filter_map(|r| {
+ if r.to() < first_row_id || r.from() > file_end {
+ return None;
+ }
+ let local_from = (r.from() - first_row_id).max(0);
+ let local_to = (r.to() - first_row_id).min(row_count - 1);
+ Some(RowRange::new(local_from, local_to))
+ })
+ .collect()
+}
+
+/// Merge DV and row_ranges into a unified list of 0-based inclusive RowRanges.
+/// Returns `None` if no filtering is needed (no DV and no ranges).
+///
+/// Complexity: O(D + R) where D = number of deleted rows, R = number of
ranges.
+fn merge_row_selection(
+ row_count: i64,
+ dv: Option<&DeletionVector>,
+ row_ranges: Option<&[RowRange]>,
+) -> Option<Vec<RowRange>> {
+ let has_dv = dv.is_some_and(|d| !d.is_empty());
+ let has_ranges = row_ranges.is_some();
+ if !has_dv && !has_ranges {
+ return None;
+ }
+
+ if !has_dv {
+ return row_ranges.map(|r| r.to_vec());
+ }
+
+ let dv_ranges = dv_to_non_deleted_ranges(dv.unwrap(), row_count);
+
+ match row_ranges {
+ Some(ranges) => Some(intersect_sorted_ranges(&dv_ranges, ranges)),
+ None => Some(dv_ranges),
+ }
+}
+
+/// Convert a DeletionVector into sorted non-deleted inclusive RowRanges.
+fn dv_to_non_deleted_ranges(dv: &DeletionVector, row_count: i64) ->
Vec<RowRange> {
+ let mut result = Vec::new();
+ let mut cursor: i64 = 0;
+ for deleted in dv.iter() {
+ let del = deleted as i64;
+ if del >= row_count {
+ break;
+ }
+ if del > cursor {
+ result.push(RowRange::new(cursor, del - 1));
+ }
+ cursor = del + 1;
+ }
+ if cursor < row_count {
+ result.push(RowRange::new(cursor, row_count - 1));
+ }
+ result
+}
+
+/// Intersect two sorted lists of inclusive RowRanges using a merge-style scan.
+fn intersect_sorted_ranges(a: &[RowRange], b: &[RowRange]) -> Vec<RowRange> {
+ let mut result = Vec::new();
+ let (mut i, mut j) = (0, 0);
+ while i < a.len() && j < b.len() {
+ let from = a[i].from().max(b[j].from());
+ let to = a[i].to().min(b[j].to());
+ if from <= to {
+ result.push(RowRange::new(from, to));
+ }
+ if a[i].to() < b[j].to() {
+ i += 1;
+ } else {
+ j += 1;
+ }
+ }
+ result
+}
+
+/// Expand row_ranges into a flat sequence of selected row IDs for a file.
+/// Intended for per-batch _ROW_ID attachment — callers should not pass
+/// whole-file ranges with millions of rows, as this allocates a Vec<i64>
+/// proportional to the selected range size.
+pub(super) fn expand_selected_row_ids(
+ first_row_id: i64,
+ row_count: i64,
+ row_ranges: &[RowRange],
+) -> Vec<i64> {
+ if row_count == 0 {
+ return Vec::new();
+ }
+ let file_end = first_row_id + row_count - 1;
+ let mut ids = Vec::new();
+ for r in row_ranges {
+ let from = r.from().max(first_row_id);
+ let to = r.to().min(file_end);
+ for id in from..=to {
+ ids.push(id);
+ }
+ }
+ ids
+}
+
+pub(super) fn attach_row_id(
+ batch: RecordBatch,
+ row_id_index: usize,
+ selected_row_ids: &[i64],
+ row_id_offset: &mut usize,
+ output_schema: &Arc<arrow_schema::Schema>,
+) -> crate::Result<RecordBatch> {
+ let num_rows = batch.num_rows();
+ let end = *row_id_offset + num_rows;
+ if end > selected_row_ids.len() {
+ return Err(Error::UnexpectedError {
+ message: format!(
+ "Row ID offset out of bounds: need {}..{} but selected_row_ids
has {} entries",
+ *row_id_offset,
+ end,
+ selected_row_ids.len()
+ ),
+ source: None,
+ });
+ }
+ let batch_ids = &selected_row_ids[*row_id_offset..end];
+ *row_id_offset = end;
+ let array: Arc<dyn arrow_array::Array> =
Arc::new(Int64Array::from(batch_ids.to_vec()));
+ insert_column_at(batch, array, row_id_index, output_schema)
+}
+
+pub(super) fn insert_column_at(
+ batch: RecordBatch,
+ column: Arc<dyn arrow_array::Array>,
+ insert_index: usize,
+ output_schema: &Arc<arrow_schema::Schema>,
+) -> crate::Result<RecordBatch> {
+ let mut columns: Vec<Arc<dyn arrow_array::Array>> =
Vec::with_capacity(batch.num_columns() + 1);
+ for (i, col) in batch.columns().iter().enumerate() {
+ if i == insert_index {
+ columns.push(column.clone());
+ }
+ columns.push(col.clone());
+ }
+ if insert_index >= batch.num_columns() {
+ columns.push(column);
+ }
+ RecordBatch::try_new(output_schema.clone(), columns).map_err(|e|
Error::UnexpectedError {
+ message: format!("Failed to insert column into RecordBatch: {e}"),
+ source: Some(Box::new(e)),
+ })
+}
+
+/// Append a null `_ROW_ID` column for files without `first_row_id`.
+pub(super) fn append_null_row_id_column(
+ batch: RecordBatch,
+ insert_index: usize,
+ output_schema: &Arc<arrow_schema::Schema>,
+) -> crate::Result<RecordBatch> {
+ let array: Arc<dyn arrow_array::Array> =
Arc::new(Int64Array::new_null(batch.num_rows()));
+ insert_column_at(batch, array, insert_index, output_schema)
+}
diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs
index 431857f..2ec5cd9 100644
--- a/crates/paimon/src/table/mod.rs
+++ b/crates/paimon/src/table/mod.rs
@@ -20,7 +20,9 @@
pub(crate) mod bin_pack;
mod bucket_filter;
mod commit_message;
+mod data_evolution_reader;
pub mod data_evolution_writer;
+mod data_file_reader;
mod data_file_writer;
#[cfg(feature = "fulltext")]
mod full_text_search_builder;
@@ -34,6 +36,7 @@ mod snapshot_manager;
mod source;
mod stats_filter;
pub(crate) mod table_commit;
+mod table_read;
mod table_scan;
pub(crate) mod table_write;
mod tag_manager;
@@ -46,7 +49,7 @@ pub use data_evolution_writer::DataEvolutionWriter;
#[cfg(feature = "fulltext")]
pub use full_text_search_builder::FullTextSearchBuilder;
use futures::stream::BoxStream;
-pub use read_builder::{ReadBuilder, TableRead};
+pub use read_builder::ReadBuilder;
pub use rest_env::RESTEnv;
pub use schema_manager::SchemaManager;
pub use snapshot_commit::{RESTSnapshotCommit, RenamingSnapshotCommit,
SnapshotCommit};
@@ -55,6 +58,7 @@ pub use source::{
merge_row_ranges, DataSplit, DataSplitBuilder, DeletionFile,
PartitionBucket, Plan, RowRange,
};
pub use table_commit::TableCommit;
+pub use table_read::TableRead;
pub use table_scan::TableScan;
pub use table_write::TableWrite;
pub use tag_manager::TagManager;
diff --git a/crates/paimon/src/table/read_builder.rs
b/crates/paimon/src/table/read_builder.rs
index 63d9917..3daa24a 100644
--- a/crates/paimon/src/table/read_builder.rs
+++ b/crates/paimon/src/table/read_builder.rs
@@ -21,13 +21,12 @@
//! and
[TypeUtils.project](https://github.com/apache/paimon/blob/master/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java).
use super::bucket_filter::{extract_predicate_for_keys,
split_partition_and_data_predicates};
-use super::{ArrowRecordBatchStream, Table, TableScan};
+use super::table_read::TableRead;
+use super::{Table, TableScan};
use crate::arrow::filtering::reader_pruning_predicates;
-use crate::arrow::ArrowReaderBuilder;
use crate::spec::{CoreOptions, DataField, Predicate};
use crate::table::source::RowRange;
-use crate::Result;
-use crate::{DataSplit, Error};
+use crate::{Error, Result};
use std::collections::{HashMap, HashSet};
#[derive(Debug, Clone, Default)]
@@ -37,7 +36,10 @@ struct NormalizedFilter {
bucket_predicate: Option<Predicate>,
}
-fn split_scan_predicates(table: &Table, filter: Predicate) ->
(Option<Predicate>, Vec<Predicate>) {
+pub(super) fn split_scan_predicates(
+ table: &Table,
+ filter: Predicate,
+) -> (Option<Predicate>, Vec<Predicate>) {
let partition_keys = table.schema().partition_keys();
if partition_keys.is_empty() {
(None, filter.split_and())
@@ -91,11 +93,6 @@ fn normalize_filter(table: &Table, filter: Predicate) ->
NormalizedFilter {
}
}
-fn read_data_predicates(table: &Table, filter: Predicate) -> Vec<Predicate> {
- let (_, data_predicates) = split_scan_predicates(table, filter);
- reader_pruning_predicates(data_predicates)
-}
-
/// Builder for table scan and table read (new_scan, new_read).
///
/// Rust keeps a names-based projection API for ergonomics, while aligning the
@@ -262,95 +259,9 @@ impl<'a> ReadBuilder<'a> {
}
}
-/// Table read: reads data from splits (e.g. produced by [TableScan::plan]).
-///
-/// Reference:
[pypaimon.read.table_read.TableRead](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/read/table_read.py)
-#[derive(Debug, Clone)]
-pub struct TableRead<'a> {
- table: &'a Table,
- read_type: Vec<DataField>,
- data_predicates: Vec<Predicate>,
-}
-
-impl<'a> TableRead<'a> {
- /// Create a new TableRead with a specific read type (projected fields).
- pub fn new(
- table: &'a Table,
- read_type: Vec<DataField>,
- data_predicates: Vec<Predicate>,
- ) -> Self {
- Self {
- table,
- read_type,
- data_predicates,
- }
- }
-
- /// Schema (fields) that this read will produce.
- pub fn read_type(&self) -> &[DataField] {
- &self.read_type
- }
-
- /// Data predicates for read-side pruning.
- pub fn data_predicates(&self) -> &[Predicate] {
- &self.data_predicates
- }
-
- /// Table for this read.
- pub fn table(&self) -> &Table {
- self.table
- }
-
- /// Set a filter predicate for conservative read-side pruning.
- ///
- /// This is the direct-`TableRead` equivalent of
[`ReadBuilder::with_filter`].
- /// Supported non-partition data predicates may be used only on the regular
- /// Parquet read path for row-group pruning and native Parquet row
- /// filtering. Callers should still keep residual filtering at the query
- /// layer for unsupported predicates, non-Parquet files, and data-evolution
- /// reads.
- pub fn with_filter(mut self, filter: Predicate) -> Self {
- self.data_predicates = read_data_predicates(self.table, filter);
- self
- }
-
- /// Returns an [`ArrowRecordBatchStream`].
- pub fn to_arrow(&self, data_splits: &[DataSplit]) ->
crate::Result<ArrowRecordBatchStream> {
- // todo: consider get read batch size from table
- let has_primary_keys = !self.table.schema.primary_keys().is_empty();
- let core_options = CoreOptions::new(self.table.schema.options());
- let deletion_vectors_enabled = core_options.deletion_vectors_enabled();
- let data_evolution = core_options.data_evolution_enabled();
-
- if has_primary_keys && !deletion_vectors_enabled {
- return Err(Error::Unsupported {
- message: format!(
- "Reading primary-key tables without deletion vectors is
not yet supported. Primary keys: {:?}",
- self.table.schema.primary_keys()
- ),
- });
- }
-
- let reader = ArrowReaderBuilder::new(
- self.table.file_io.clone(),
- self.table.schema_manager().clone(),
- self.table.schema().id(),
- )
- .with_predicates(self.data_predicates.clone())
- .with_table_fields(self.table.schema.fields().to_vec())
- .build(self.read_type().to_vec());
-
- if data_evolution {
- reader.read_data_evolution(data_splits, self.table.schema.fields())
- } else {
- reader.read(data_splits)
- }
- }
-}
-
#[cfg(test)]
mod tests {
- use super::TableRead;
+ use crate::table::TableRead;
mod test_utils {
include!(concat!(env!("CARGO_MANIFEST_DIR"), "/../test_utils.rs"));
}
diff --git a/crates/paimon/src/table/table_read.rs
b/crates/paimon/src/table/table_read.rs
new file mode 100644
index 0000000..cab22f9
--- /dev/null
+++ b/crates/paimon/src/table/table_read.rs
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::data_evolution_reader::DataEvolutionReader;
+use super::data_file_reader::DataFileReader;
+use super::read_builder::split_scan_predicates;
+use super::{ArrowRecordBatchStream, Table};
+use crate::arrow::filtering::reader_pruning_predicates;
+use crate::spec::{CoreOptions, DataField, Predicate};
+use crate::{DataSplit, Error};
+
+/// Table read: reads data from splits (e.g. produced by [TableScan::plan]).
+///
+/// Reference:
[pypaimon.read.table_read.TableRead](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/read/table_read.py)
+#[derive(Debug, Clone)]
+pub struct TableRead<'a> {
+ table: &'a Table,
+ read_type: Vec<DataField>,
+ data_predicates: Vec<Predicate>,
+}
+
+impl<'a> TableRead<'a> {
+ /// Create a new TableRead with a specific read type (projected fields).
+ pub fn new(
+ table: &'a Table,
+ read_type: Vec<DataField>,
+ data_predicates: Vec<Predicate>,
+ ) -> Self {
+ Self {
+ table,
+ read_type,
+ data_predicates,
+ }
+ }
+
+ /// Schema (fields) that this read will produce.
+ pub fn read_type(&self) -> &[DataField] {
+ &self.read_type
+ }
+
+ /// Data predicates for read-side pruning.
+ pub fn data_predicates(&self) -> &[Predicate] {
+ &self.data_predicates
+ }
+
+ /// Table for this read.
+ pub fn table(&self) -> &Table {
+ self.table
+ }
+
+ /// Set a filter predicate for conservative read-side pruning.
+ pub fn with_filter(mut self, filter: Predicate) -> Self {
+ let (_, data_predicates) = split_scan_predicates(self.table, filter);
+ self.data_predicates = reader_pruning_predicates(data_predicates);
+ self
+ }
+
+ /// Returns an [`ArrowRecordBatchStream`].
+ pub fn to_arrow(&self, data_splits: &[DataSplit]) ->
crate::Result<ArrowRecordBatchStream> {
+ let has_primary_keys = !self.table.schema.primary_keys().is_empty();
+ let core_options = CoreOptions::new(self.table.schema.options());
+ let deletion_vectors_enabled = core_options.deletion_vectors_enabled();
+ let data_evolution = core_options.data_evolution_enabled();
+
+ if has_primary_keys && !deletion_vectors_enabled {
+ return Err(Error::Unsupported {
+ message: format!(
+ "Reading primary-key tables without deletion vectors is
not yet supported. Primary keys: {:?}",
+ self.table.schema.primary_keys()
+ ),
+ });
+ }
+
+ if data_evolution {
+ // TODO: data evolution mode does not support read-side predicate
pruning yet.
+ let reader = DataEvolutionReader::new(
+ self.table.file_io.clone(),
+ self.table.schema_manager().clone(),
+ self.table.schema().id(),
+ self.table.schema.fields().to_vec(),
+ self.read_type().to_vec(),
+ )?;
+ reader.read(data_splits)
+ } else {
+ let reader = DataFileReader::new(
+ self.table.file_io.clone(),
+ self.table.schema_manager().clone(),
+ self.table.schema().id(),
+ self.table.schema.fields().to_vec(),
+ self.read_type().to_vec(),
+ self.data_predicates.clone(),
+ );
+ reader.read(data_splits)
+ }
+ }
+}
diff --git a/crates/paimon/src/table/write_builder.rs
b/crates/paimon/src/table/write_builder.rs
index 45db333..6feda23 100644
--- a/crates/paimon/src/table/write_builder.rs
+++ b/crates/paimon/src/table/write_builder.rs
@@ -24,7 +24,7 @@ use uuid::Uuid;
/// Builder for creating table writers and committers.
///
-/// Provides `new_write` (TODO) and `new_commit` methods, with optional
+/// Provides `new_write` and `new_commit` methods, with optional
/// `overwrite` support for partition-level overwrites.
pub struct WriteBuilder<'a> {
table: &'a Table,