This is an automated email from the ASF dual-hosted git repository.

hope pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new cd3670c  feat: support data evolution table mode (#193)
cd3670c is described below

commit cd3670cd4bf9137b998315c199e2d1c54100c85c
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Apr 2 21:48:52 2026 +0800

    feat: support data evolution table mode (#193)
    
    * feat: support data evolution table mode
---
 Cargo.toml                                    |   2 +
 crates/integration_tests/tests/read_tables.rs |  94 +++++++
 crates/paimon/Cargo.toml                      |   2 +
 crates/paimon/src/arrow/reader.rs             | 379 +++++++++++++++++++++-----
 crates/paimon/src/spec/core_options.rs        |   8 +
 crates/paimon/src/spec/data_file.rs           |  23 +-
 crates/paimon/src/spec/objects_file.rs        |   4 +
 crates/paimon/src/table/bin_pack.rs           |   2 +
 crates/paimon/src/table/read_builder.rs       |   8 +-
 crates/paimon/src/table/source.rs             |  17 ++
 crates/paimon/src/table/table_scan.rs         | 222 ++++++++++++++-
 dev/spark/provision.py                        |  64 +++++
 12 files changed, 749 insertions(+), 76 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 6282c09..2367c90 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -29,5 +29,7 @@ rust-version = "1.86.0"
 
 [workspace.dependencies]
 arrow-array = { version = "57.0", features = ["ffi"] }
+arrow-schema = "57.0"
+arrow-select = "57.0"
 parquet = "57.0"
 tokio = "1.39.2"
\ No newline at end of file
diff --git a/crates/integration_tests/tests/read_tables.rs 
b/crates/integration_tests/tests/read_tables.rs
index 9d7e891..090197d 100644
--- a/crates/integration_tests/tests/read_tables.rs
+++ b/crates/integration_tests/tests/read_tables.rs
@@ -853,3 +853,97 @@ async fn test_rest_catalog_read_append_table() {
         "REST catalog append table rows should match expected values"
     );
 }
+
+// ---------------------------------------------------------------------------
+// Data Evolution integration tests
+// ---------------------------------------------------------------------------
+
+/// Test reading a data-evolution enabled append-only table.
+///
+/// The table is provisioned by Spark with `data-evolution.enabled=true` and
+/// `row-tracking.enabled=true`. Multiple inserts produce files with 
`first_row_id`
+/// set, exercising the data evolution scan and read path.
+#[tokio::test]
+async fn test_read_data_evolution_table() {
+    let (plan, batches) = 
scan_and_read_with_fs_catalog("data_evolution_table", None).await;
+
+    assert!(
+        !plan.splits().is_empty(),
+        "Data evolution table should have at least one split"
+    );
+
+    let mut rows: Vec<(i32, String, i32)> = Vec::new();
+    for batch in &batches {
+        let id = batch
+            .column_by_name("id")
+            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+            .expect("id");
+        let name = batch
+            .column_by_name("name")
+            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+            .expect("name");
+        let value = batch
+            .column_by_name("value")
+            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+            .expect("value");
+        for i in 0..batch.num_rows() {
+            rows.push((id.value(i), name.value(i).to_string(), 
value.value(i)));
+        }
+    }
+    rows.sort_by_key(|(id, _, _)| *id);
+
+    assert_eq!(
+        rows,
+        vec![
+            (1, "alice-v2".into(), 100),
+            (2, "bob".into(), 200),
+            (3, "carol-v2".into(), 300),
+            (4, "dave".into(), 400),
+            (5, "eve".into(), 500),
+        ],
+        "Data evolution table should return merged rows after MERGE INTO"
+    );
+}
+
+/// Test reading a data-evolution table with column projection.
+#[tokio::test]
+async fn test_read_data_evolution_table_with_projection() {
+    let (_, batches) =
+        scan_and_read_with_fs_catalog("data_evolution_table", Some(&["value", 
"id"])).await;
+
+    for batch in &batches {
+        let schema = batch.schema();
+        let field_names: Vec<&str> = schema.fields().iter().map(|f| 
f.name().as_str()).collect();
+        assert_eq!(
+            field_names,
+            vec!["value", "id"],
+            "Projection order should be preserved"
+        );
+        assert!(
+            batch.column_by_name("name").is_none(),
+            "Non-projected column 'name' should be absent"
+        );
+    }
+
+    let mut rows: Vec<(i32, i32)> = Vec::new();
+    for batch in &batches {
+        let id = batch
+            .column_by_name("id")
+            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+            .expect("id");
+        let value = batch
+            .column_by_name("value")
+            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+            .expect("value");
+        for i in 0..batch.num_rows() {
+            rows.push((id.value(i), value.value(i)));
+        }
+    }
+    rows.sort_by_key(|(id, _)| *id);
+
+    assert_eq!(
+        rows,
+        vec![(1, 100), (2, 200), (3, 300), (4, 400), (5, 500)],
+        "Projected data evolution read should return correct values"
+    );
+}
diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml
index ab96fec..878d1c5 100644
--- a/crates/paimon/Cargo.toml
+++ b/crates/paimon/Cargo.toml
@@ -55,6 +55,8 @@ apache-avro = { version = "0.17", features = ["snappy", 
"zstandard"] }
 indexmap = "2.5.0"
 roaring = "0.11"
 arrow-array = { workspace = true }
+arrow-schema = { workspace = true }
+arrow-select = { workspace = true }
 futures = "0.3"
 parquet = { workspace = true, features = ["async", "zstd"] }
 async-stream = "0.3.6"
diff --git a/crates/paimon/src/arrow/reader.rs 
b/crates/paimon/src/arrow/reader.rs
index 693aaf8..7d57243 100644
--- a/crates/paimon/src/arrow/reader.rs
+++ b/crates/paimon/src/arrow/reader.rs
@@ -17,9 +17,12 @@
 
 use crate::deletion_vector::{DeletionVector, DeletionVectorFactory};
 use crate::io::{FileIO, FileRead, FileStatus};
-use crate::spec::DataField;
+use crate::spec::{DataField, DataFileMeta};
 use crate::table::ArrowRecordBatchStream;
 use crate::{DataSplit, Error};
+use arrow_array::RecordBatch;
+use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
+
 use async_stream::try_stream;
 use bytes::Bytes;
 use futures::future::BoxFuture;
@@ -29,6 +32,7 @@ use parquet::arrow::async_reader::{AsyncFileReader, 
MetadataFetch};
 use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
 use parquet::file::metadata::ParquetMetaDataReader;
 use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
+use std::collections::HashMap;
 use std::ops::Range;
 use std::sync::Arc;
 use tokio::try_join;
@@ -82,7 +86,6 @@ impl ArrowReader {
     pub fn read(self, data_splits: &[DataSplit]) -> 
crate::Result<ArrowRecordBatchStream> {
         let file_io = self.file_io.clone();
         let batch_size = self.batch_size;
-        // Owned list of splits so the stream does not hold references.
         let splits: Vec<DataSplit> = data_splits.to_vec();
         let read_type = self.read_type;
         let projected_column_names: Vec<String> = read_type
@@ -92,7 +95,6 @@ impl ArrowReader {
         Ok(try_stream! {
             for split in splits {
                 // Create DV factory for this split only (like Java 
createReader(partition, bucket, files, deletionFiles)).
-                let core_data_files = split.data_files();
                 let dv_factory = if split
                     .data_deletion_files()
                     .is_some_and(|files| files.iter().any(Option::is_some))
@@ -100,7 +102,7 @@ impl ArrowReader {
                     Some(
                         DeletionVectorFactory::new(
                             &file_io,
-                            core_data_files,
+                            split.data_files(),
                             split.data_deletion_files(),
                         )
                         .await?,
@@ -109,81 +111,326 @@ impl ArrowReader {
                     None
                 };
 
-                for file_meta in core_data_files {
-                    let path_to_read = split.data_file_path(file_meta);
-                    if 
!path_to_read.to_ascii_lowercase().ends_with(".parquet") {
-                        Err(Error::Unsupported {
-                            message: format!(
-                                "unsupported file format: only .parquet is 
supported, got: {path_to_read}"
-                            ),
-                        })?
-                    }
+                for file_meta in split.data_files().to_vec() {
                     let dv = dv_factory
                         .as_ref()
-                        .and_then(|factory| 
factory.get_deletion_vector(&file_meta.file_name));
+                        .and_then(|factory| 
factory.get_deletion_vector(&file_meta.file_name))
+                        .cloned();
 
-                    let parquet_file = file_io.new_input(&path_to_read)?;
-                    let (parquet_metadata, parquet_reader) = try_join!(
-                        parquet_file.metadata(),
-                        parquet_file.reader()
+                    let mut stream = read_single_file_stream(
+                        file_io.clone(),
+                        split.clone(),
+                        file_meta,
+                        projected_column_names.clone(),
+                        batch_size,
+                        dv,
                     )?;
-                    let arrow_file_reader = 
ArrowFileReader::new(parquet_metadata, parquet_reader);
-
-                    let mut batch_stream_builder =
-                        ParquetRecordBatchStreamBuilder::new(arrow_file_reader)
-                            .await?;
-                    // ProjectionMask preserves parquet-schema order; 
read_type order is restored below.
-                    let mask = {
-                        let parquet_schema = 
batch_stream_builder.parquet_schema();
-                        ProjectionMask::columns(
-                            parquet_schema,
-                            projected_column_names.iter().map(String::as_str),
-                        )
-                    };
-                    batch_stream_builder = 
batch_stream_builder.with_projection(mask);
-
-                    if let Some(dv) = dv {
-                        if !dv.is_empty() {
-                            let row_selection =
-                                
build_deletes_row_selection(batch_stream_builder.metadata().row_groups(), dv)?;
-                            batch_stream_builder = 
batch_stream_builder.with_row_selection(row_selection);
+                    while let Some(batch) = stream.next().await {
+                        yield batch?;
+                    }
+                }
+            }
+        }
+        .boxed())
+    }
+
+    /// Read data files in data evolution mode, merging columns from files 
that share the same row ID range.
+    ///
+    /// Each DataSplit contains files grouped by `first_row_id`. Files within 
a split may contain
+    /// different columns for the same logical rows. This method reads each 
file and merges them
+    /// column-wise, respecting `max_sequence_number` for conflict resolution.
+    ///
+    /// `table_fields` is the full table schema fields, used to determine 
which columns each file
+    /// provides when `write_cols` is not set.
+    pub fn read_data_evolution(
+        self,
+        data_splits: &[DataSplit],
+        table_fields: &[DataField],
+    ) -> crate::Result<ArrowRecordBatchStream> {
+        let file_io = self.file_io.clone();
+        let batch_size = self.batch_size;
+        let splits: Vec<DataSplit> = data_splits.to_vec();
+        let read_type = self.read_type;
+        let table_field_names: Vec<String> =
+            table_fields.iter().map(|f| f.name().to_string()).collect();
+        let projected_column_names: Vec<String> = read_type
+            .iter()
+            .map(|field| field.name().to_string())
+            .collect();
+
+        Ok(try_stream! {
+            for split in splits {
+                if split.raw_convertible() || split.data_files().len() == 1 {
+                    // Single file or raw convertible — stream lazily without 
loading all into memory.
+                    for file_meta in split.data_files().to_vec() {
+                        let mut stream = read_single_file_stream(
+                            file_io.clone(), split.clone(), file_meta, 
projected_column_names.clone(), batch_size, None,
+                        )?;
+                        while let Some(batch) = stream.next().await {
+                            yield batch?;
                         }
                     }
-                    if let Some(size) = batch_size {
-                        batch_stream_builder = 
batch_stream_builder.with_batch_size(size);
+                } else {
+                    // Multiple files need column-wise merge — also streamed 
lazily.
+                    let mut merge_stream = merge_files_by_columns(
+                        &file_io,
+                        &split,
+                        &projected_column_names,
+                        &table_field_names,
+                        batch_size,
+                    )?;
+                    while let Some(batch) = merge_stream.next().await {
+                        yield batch?;
+                    }
+                }
+            }
+        }
+        .boxed())
+    }
+}
+
+/// Read a single parquet file from a split, returning a lazy stream of 
batches.
+/// Optionally applies a deletion vector.
+fn read_single_file_stream(
+    file_io: FileIO,
+    split: DataSplit,
+    file_meta: DataFileMeta,
+    projected_column_names: Vec<String>,
+    batch_size: Option<usize>,
+    dv: Option<Arc<DeletionVector>>,
+) -> crate::Result<ArrowRecordBatchStream> {
+    Ok(try_stream! {
+        let path_to_read = split.data_file_path(&file_meta);
+        if !path_to_read.to_ascii_lowercase().ends_with(".parquet") {
+            Err(Error::Unsupported {
+                message: format!(
+                    "unsupported file format: only .parquet is supported, got: 
{path_to_read}"
+                ),
+            })?
+        }
+
+        let parquet_file = file_io.new_input(&path_to_read)?;
+        let (parquet_metadata, parquet_reader) =
+            try_join!(parquet_file.metadata(), parquet_file.reader())?;
+        let arrow_file_reader = ArrowFileReader::new(parquet_metadata, 
parquet_reader);
+
+        let mut batch_stream_builder = 
ParquetRecordBatchStreamBuilder::new(arrow_file_reader).await?;
+
+        // Only project columns that exist in this file.
+        let parquet_schema = batch_stream_builder.parquet_schema().clone();
+        let file_column_names: Vec<&str> = 
parquet_schema.columns().iter().map(|c| c.name()).collect();
+        let available_columns: Vec<&str> = projected_column_names
+            .iter()
+            .filter(|name| file_column_names.contains(&name.as_str()))
+            .map(String::as_str)
+            .collect();
+
+        let mask = ProjectionMask::columns(&parquet_schema, 
available_columns.iter().copied());
+        batch_stream_builder = batch_stream_builder.with_projection(mask);
+
+        if let Some(ref dv) = dv {
+            if !dv.is_empty() {
+                let row_selection =
+                    
build_deletes_row_selection(batch_stream_builder.metadata().row_groups(), dv)?;
+                batch_stream_builder = 
batch_stream_builder.with_row_selection(row_selection);
+            }
+        }
+        if let Some(size) = batch_size {
+            batch_stream_builder = batch_stream_builder.with_batch_size(size);
+        }
+
+        let mut batch_stream = batch_stream_builder.build()?;
+        while let Some(batch) = batch_stream.next().await {
+            let batch = batch?;
+            // Reorder columns from parquet-schema order to 
projected_column_names order,
+            // consistent with the normal read() path.
+            let reorder_indices: Vec<usize> = projected_column_names
+                .iter()
+                .filter_map(|name| batch.schema().index_of(name).ok())
+                .collect();
+            if reorder_indices.len() == batch.num_columns() {
+                yield batch.project(&reorder_indices).map_err(|e| {
+                    Error::UnexpectedError {
+                        message: "Failed to reorder projected 
columns".to_string(),
+                        source: Some(Box::new(e)),
                     }
-                    let mut batch_stream = batch_stream_builder.build()?;
-
-                    while let Some(batch) = batch_stream.next().await {
-                        let batch = batch?;
-                        // Reorder columns from parquet-schema order to 
read_type order.
-                        // Every projected column must exist in the batch; a 
missing
-                        // column indicates schema mismatch and must not be 
silenced.
-                        let reorder_indices: Vec<usize> = 
projected_column_names
-                            .iter()
-                            .map(|name| {
-                                batch.schema().index_of(name).map_err(|_| {
-                                    Error::UnexpectedError {
-                                        message: format!(
-                                            "Projected column '{name}' not 
found in Parquet batch schema of file {path_to_read}"
-                                        ),
-                                        source: None,
-                                    }
-                                })
-                            })
-                            .collect::<crate::Result<Vec<_>>>()?;
-                        yield batch.project(&reorder_indices).map_err(|e| {
-                            Error::UnexpectedError {
-                                message: "Failed to reorder projected 
columns".to_string(),
-                                source: Some(Box::new(e)),
+                })?;
+            } else {
+                // Not all projected columns exist in this file (data 
evolution case),
+                // return as-is; the caller (merge_files_by_columns) handles 
missing columns.
+                yield batch;
+            }
+        }
+    }
+    .boxed())
+}
+
+/// Merge multiple files column-wise for data evolution, streaming with 
bounded memory.
+///
+/// Opens all file readers simultaneously and maintains a cursor (current 
batch + offset)
+/// per file. Each poll slices up to `batch_size` rows from each file's 
current batch,
+/// assembles columns from the winning files, and yields the merged batch. 
When a file's
+/// current batch is exhausted, the next batch is read from its stream on 
demand.
+fn merge_files_by_columns(
+    file_io: &FileIO,
+    split: &DataSplit,
+    projected_column_names: &[String],
+    table_field_names: &[String],
+    batch_size: Option<usize>,
+) -> crate::Result<ArrowRecordBatchStream> {
+    let data_files = split.data_files();
+    if data_files.is_empty() {
+        return Ok(futures::stream::empty().boxed());
+    }
+
+    // Determine which columns each file provides and resolve conflicts by 
max_sequence_number.
+    // column_name -> (file_index, max_sequence_number)
+    let mut column_source: HashMap<String, (usize, i64)> = HashMap::new();
+
+    for (file_idx, file_meta) in data_files.iter().enumerate() {
+        let file_columns: Vec<String> = if let Some(ref wc) = 
file_meta.write_cols {
+            wc.clone()
+        } else {
+            table_field_names.to_vec()
+        };
+
+        for col in &file_columns {
+            let entry = column_source
+                .entry(col.clone())
+                .or_insert((file_idx, i64::MIN));
+            if file_meta.max_sequence_number > entry.1 {
+                *entry = (file_idx, file_meta.max_sequence_number);
+            }
+        }
+    }
+
+    // For each file, determine which projected columns to read from it.
+    // file_index -> Vec<column_name>
+    let mut file_read_columns: HashMap<usize, Vec<String>> = HashMap::new();
+    for col_name in projected_column_names {
+        if let Some(&(file_idx, _)) = column_source.get(col_name) {
+            file_read_columns
+                .entry(file_idx)
+                .or_default()
+                .push(col_name.clone());
+        }
+    }
+
+    // For each projected column, record (file_index, column_name) for 
assembly.
+    let column_plan: Vec<(Option<usize>, String)> = projected_column_names
+        .iter()
+        .map(|col_name| {
+            let file_idx = column_source.get(col_name).map(|&(idx, _)| idx);
+            (file_idx, col_name.clone())
+        })
+        .collect();
+
+    // Collect which file indices we need to open streams for.
+    let active_file_indices: Vec<usize> = 
file_read_columns.keys().copied().collect();
+
+    // Build owned data for the stream closure.
+    let file_io = file_io.clone();
+    let split = split.clone();
+    let data_files: Vec<DataFileMeta> = data_files.to_vec();
+    let projected_column_names = projected_column_names.to_vec();
+    let output_batch_size = batch_size.unwrap_or(1024);
+
+    Ok(try_stream! {
+        // Open a stream for each active file.
+        let mut file_streams: HashMap<usize, ArrowRecordBatchStream> = 
HashMap::new();
+        for &file_idx in &active_file_indices {
+            let stream = read_single_file_stream(
+                file_io.clone(),
+                split.clone(),
+                data_files[file_idx].clone(),
+                projected_column_names.clone(),
+                batch_size,
+                None,
+            )?;
+            file_streams.insert(file_idx, stream);
+        }
+
+        // Per-file cursor: current batch + offset within it.
+        let mut file_cursors: HashMap<usize, (RecordBatch, usize)> = 
HashMap::new();
+
+        loop {
+            // Ensure each active file has a current batch. If a file's cursor 
is exhausted
+            // or not yet initialized, read the next batch from its stream.
+            for &file_idx in &active_file_indices {
+                let needs_next = match file_cursors.get(&file_idx) {
+                    None => true,
+                    Some((batch, offset)) => *offset >= batch.num_rows(),
+                };
+                if needs_next {
+                    file_cursors.remove(&file_idx);
+                    if let Some(stream) = file_streams.get_mut(&file_idx) {
+                        if let Some(batch_result) = stream.next().await {
+                            let batch = batch_result?;
+                            if batch.num_rows() > 0 {
+                                file_cursors.insert(file_idx, (batch, 0));
                             }
-                        })?;
+                        }
+                    }
+                }
+            }
+
+            // All active files must have a cursor to assemble a valid row.
+            // If any file has no cursor (stream exhausted), we're done.
+            if active_file_indices.iter().any(|idx| 
!file_cursors.contains_key(idx)) {
+                break;
+            }
+
+            // Determine how many rows we can emit: min of remaining rows 
across all files.
+            let remaining: usize = active_file_indices
+                .iter()
+                .map(|idx| {
+                    let (batch, offset) = file_cursors.get(idx).unwrap();
+                    batch.num_rows() - offset
+                })
+                .min()
+                .unwrap_or(0);
+
+            if remaining == 0 {
+                break;
+            }
+
+            let rows_to_emit = remaining.min(output_batch_size);
+
+            // Slice each file's current batch and assemble columns.
+            let mut columns: Vec<Arc<dyn arrow_array::Array>> =
+                Vec::with_capacity(column_plan.len());
+            let mut schema_fields: Vec<ArrowField> = 
Vec::with_capacity(column_plan.len());
+
+            for (file_idx_opt, col_name) in &column_plan {
+                if let Some(file_idx) = file_idx_opt {
+                    if let Some((batch, offset)) = file_cursors.get(file_idx) {
+                        if let Ok(col_idx) = batch.schema().index_of(col_name) 
{
+                            let col = batch.column(col_idx).slice(*offset, 
rows_to_emit);
+                            columns.push(col);
+                            
schema_fields.push(batch.schema().field(col_idx).clone());
+                        }
                     }
                 }
             }
+
+            // Advance all cursors.
+            for &file_idx in &active_file_indices {
+                if let Some((_, ref mut offset)) = 
file_cursors.get_mut(&file_idx) {
+                    *offset += rows_to_emit;
+                }
+            }
+
+            if !columns.is_empty() {
+                let schema = Arc::new(ArrowSchema::new(schema_fields));
+                let merged = RecordBatch::try_new(schema, columns).map_err(|e| 
Error::UnexpectedError {
+                    message: format!("Failed to build merged RecordBatch: 
{e}"),
+                    source: Some(Box::new(e)),
+                })?;
+                yield merged;
+            }
         }
-            .boxed())
     }
+    .boxed())
 }
 
 /// Builds a Parquet [RowSelection] from deletion vector.
diff --git a/crates/paimon/src/spec/core_options.rs 
b/crates/paimon/src/spec/core_options.rs
index f6164d0..0f15922 100644
--- a/crates/paimon/src/spec/core_options.rs
+++ b/crates/paimon/src/spec/core_options.rs
@@ -18,6 +18,7 @@
 use std::collections::HashMap;
 
 const DELETION_VECTORS_ENABLED_OPTION: &str = "deletion-vectors.enabled";
+const DATA_EVOLUTION_ENABLED_OPTION: &str = "data-evolution.enabled";
 const SOURCE_SPLIT_TARGET_SIZE_OPTION: &str = "source.split.target-size";
 const SOURCE_SPLIT_OPEN_FILE_COST_OPTION: &str = "source.split.open-file-cost";
 const PARTITION_DEFAULT_NAME_OPTION: &str = "partition.default-name";
@@ -46,6 +47,13 @@ impl<'a> CoreOptions<'a> {
             .unwrap_or(false)
     }
 
+    pub fn data_evolution_enabled(&self) -> bool {
+        self.options
+            .get(DATA_EVOLUTION_ENABLED_OPTION)
+            .map(|value| value.eq_ignore_ascii_case("true"))
+            .unwrap_or(false)
+    }
+
     pub fn source_split_target_size(&self) -> i64 {
         self.options
             .get(SOURCE_SPLIT_TARGET_SIZE_OPTION)
diff --git a/crates/paimon/src/spec/data_file.rs 
b/crates/paimon/src/spec/data_file.rs
index 8733b1d..48bfe78 100644
--- a/crates/paimon/src/spec/data_file.rs
+++ b/crates/paimon/src/spec/data_file.rs
@@ -396,6 +396,22 @@ pub struct DataFileMeta {
     // file index filter bytes, if it is small, store in data file meta
     #[serde(rename = "_EMBEDDED_FILE_INDEX", with = "serde_bytes")]
     pub embedded_index: Option<Vec<u8>>,
+
+    /// The starting row ID for this file's data (used in data evolution mode).
+    #[serde(
+        rename = "_FIRST_ROW_ID",
+        default,
+        skip_serializing_if = "Option::is_none"
+    )]
+    pub first_row_id: Option<i64>,
+
+    /// Which table columns this file contains (used in data evolution mode).
+    #[serde(
+        rename = "_WRITE_COLS",
+        default,
+        skip_serializing_if = "Option::is_none"
+    )]
+    pub write_cols: Option<Vec<String>>,
 }
 
 impl Display for DataFileMeta {
@@ -405,7 +421,12 @@ impl Display for DataFileMeta {
 }
 
 #[allow(dead_code)]
-impl DataFileMeta {}
+impl DataFileMeta {
+    /// Returns the row ID range `[first_row_id, first_row_id + row_count - 
1]` if `first_row_id` is set.
+    pub fn row_id_range(&self) -> Option<(i64, i64)> {
+        self.first_row_id.map(|fid| (fid, fid + self.row_count - 1))
+    }
+}
 
 #[cfg(test)]
 mod tests {
diff --git a/crates/paimon/src/spec/objects_file.rs 
b/crates/paimon/src/spec/objects_file.rs
index fde8e20..9685f23 100644
--- a/crates/paimon/src/spec/objects_file.rs
+++ b/crates/paimon/src/spec/objects_file.rs
@@ -120,6 +120,8 @@ mod tests {
                             .unwrap(),
                         delete_row_count: Some(0),
                         embedded_index: None,
+                        first_row_id: None,
+                        write_cols: None,
                     },
                     2
                 ),
@@ -154,6 +156,8 @@ mod tests {
                             .unwrap(),
                         delete_row_count: Some(1),
                         embedded_index: None,
+                        first_row_id: None,
+                        write_cols: None,
                     },
                     2
                 ),
diff --git a/crates/paimon/src/table/bin_pack.rs 
b/crates/paimon/src/table/bin_pack.rs
index 2d64e6b..6a567cd 100644
--- a/crates/paimon/src/table/bin_pack.rs
+++ b/crates/paimon/src/table/bin_pack.rs
@@ -96,6 +96,8 @@ mod tests {
             creation_time: DateTime::<Utc>::from_timestamp(0, 0).unwrap(),
             delete_row_count: None,
             embedded_index: None,
+            first_row_id: None,
+            write_cols: None,
         }
     }
 
diff --git a/crates/paimon/src/table/read_builder.rs 
b/crates/paimon/src/table/read_builder.rs
index e0aae28..65c89b8 100644
--- a/crates/paimon/src/table/read_builder.rs
+++ b/crates/paimon/src/table/read_builder.rs
@@ -155,6 +155,7 @@ impl<'a> TableRead<'a> {
         let has_primary_keys = !self.table.schema.primary_keys().is_empty();
         let core_options = CoreOptions::new(self.table.schema.options());
         let deletion_vectors_enabled = core_options.deletion_vectors_enabled();
+        let data_evolution = core_options.data_evolution_enabled();
 
         if has_primary_keys && !deletion_vectors_enabled {
             return Err(Error::Unsupported {
@@ -167,6 +168,11 @@ impl<'a> TableRead<'a> {
 
         let reader =
             
ArrowReaderBuilder::new(self.table.file_io.clone()).build(self.read_type().to_vec());
-        reader.read(data_splits)
+
+        if data_evolution {
+            reader.read_data_evolution(data_splits, self.table.schema.fields())
+        } else {
+            reader.read(data_splits)
+        }
     }
 }
diff --git a/crates/paimon/src/table/source.rs 
b/crates/paimon/src/table/source.rs
index ca9ba9b..7416ca9 100644
--- a/crates/paimon/src/table/source.rs
+++ b/crates/paimon/src/table/source.rs
@@ -108,6 +108,10 @@ pub struct DataSplit {
     /// Deletion file for each data file, same order as `data_files`.
     /// `None` at index `i` means no deletion file for `data_files[i]` 
(matches Java getDeletionFiles() / List<DeletionFile> with null elements).
     data_deletion_files: Option<Vec<Option<DeletionFile>>>,
+    /// Whether this split can be read file-by-file without merging.
+    /// `false` when files need column-wise merge (e.g. data evolution) or
+    /// key-value merge (e.g. primary key tables without deletion vectors).
+    raw_convertible: bool,
 }
 
 impl DataSplit {
@@ -136,6 +140,11 @@ impl DataSplit {
         self.data_deletion_files.as_deref()
     }
 
+    /// Whether this split can be read without column-wise merging.
+    pub fn raw_convertible(&self) -> bool {
+        self.raw_convertible
+    }
+
     /// Returns the deletion file for the data file at the given index, if 
any. `None` at that index means no deletion file.
     pub fn deletion_file_for_data_file_index(&self, index: usize) -> 
Option<&DeletionFile> {
         self.data_deletion_files
@@ -194,6 +203,7 @@ pub struct DataSplitBuilder {
     data_files: Option<Vec<DataFileMeta>>,
     /// Same length as data_files; `None` at index i = no deletion file for 
data_files[i].
     data_deletion_files: Option<Vec<Option<DeletionFile>>>,
+    raw_convertible: bool,
 }
 
 impl DataSplitBuilder {
@@ -206,6 +216,7 @@ impl DataSplitBuilder {
             total_buckets: -1,
             data_files: None,
             data_deletion_files: None,
+            raw_convertible: false,
         }
     }
 
@@ -243,6 +254,11 @@ impl DataSplitBuilder {
         self
     }
 
+    pub fn with_raw_convertible(mut self, raw_convertible: bool) -> Self {
+        self.raw_convertible = raw_convertible;
+        self
+    }
+
     pub fn build(self) -> crate::Result<DataSplit> {
         if self.snapshot_id == -1 {
             return Err(crate::Error::UnexpectedError {
@@ -294,6 +310,7 @@ impl DataSplitBuilder {
             total_buckets: self.total_buckets,
             data_files,
             data_deletion_files: self.data_deletion_files,
+            raw_convertible: self.raw_convertible,
         })
     }
 }
diff --git a/crates/paimon/src/table/table_scan.rs 
b/crates/paimon/src/table/table_scan.rs
index 031d15f..9992e0b 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -23,8 +23,8 @@
 use super::Table;
 use crate::io::FileIO;
 use crate::spec::{
-    eval_row, field_idx_to_partition_idx, BinaryRow, CoreOptions, FileKind, 
IndexManifest,
-    ManifestEntry, PartitionComputer, Predicate, Snapshot,
+    eval_row, field_idx_to_partition_idx, BinaryRow, CoreOptions, 
DataFileMeta, FileKind,
+    IndexManifest, ManifestEntry, PartitionComputer, Predicate, Snapshot,
 };
 use crate::table::bin_pack::split_for_batch;
 use crate::table::source::{DataSplitBuilder, DeletionFile, PartitionBucket, 
Plan};
@@ -166,6 +166,59 @@ fn partition_matches_predicate(
     }
 }
 
+/// Groups data files by overlapping `row_id_range` for data evolution.
+///
+/// Files are sorted by `(first_row_id, -max_sequence_number)`. Files whose 
row ID ranges
+/// overlap are merged into the same group (they contain different columns for 
the same rows).
+/// Files without `first_row_id` become their own group.
+///
+/// Reference: 
[DataEvolutionSplitGenerator](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitGenerator.java)
+fn group_by_overlapping_row_id(mut files: Vec<DataFileMeta>) -> 
Vec<Vec<DataFileMeta>> {
+    files.sort_by(|a, b| {
+        let a_row_id = a.first_row_id.unwrap_or(i64::MIN);
+        let b_row_id = b.first_row_id.unwrap_or(i64::MIN);
+        a_row_id
+            .cmp(&b_row_id)
+            .then_with(|| b.max_sequence_number.cmp(&a.max_sequence_number))
+    });
+
+    let mut result: Vec<Vec<DataFileMeta>> = Vec::new();
+    let mut current_group: Vec<DataFileMeta> = Vec::new();
+    // Track the end of the current merged row_id range.
+    let mut current_range_end: i64 = i64::MIN;
+
+    for file in files {
+        match file.row_id_range() {
+            None => {
+                // Files without first_row_id become their own group.
+                if !current_group.is_empty() {
+                    result.push(std::mem::take(&mut current_group));
+                    current_range_end = i64::MIN;
+                }
+                result.push(vec![file]);
+            }
+            Some((start, end)) => {
+                if current_group.is_empty() || start <= current_range_end {
+                    // Overlaps with current range — merge into current group.
+                    if end > current_range_end {
+                        current_range_end = end;
+                    }
+                    current_group.push(file);
+                } else {
+                    // No overlap — start a new group.
+                    result.push(std::mem::take(&mut current_group));
+                    current_range_end = end;
+                    current_group.push(file);
+                }
+            }
+        }
+    }
+    if !current_group.is_empty() {
+        result.push(current_group);
+    }
+    result
+}
+
 /// TableScan for full table scan (no incremental, no predicate).
 ///
 /// Reference: 
[pypaimon.read.table_scan.TableScan](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/read/table_scan.py)
@@ -198,6 +251,7 @@ impl<'a> TableScan<'a> {
         let table_path = self.table.location();
         let core_options = CoreOptions::new(self.table.schema().options());
         let deletion_vectors_enabled = core_options.deletion_vectors_enabled();
+        let data_evolution_enabled = core_options.data_evolution_enabled();
         let target_split_size = core_options.source_split_target_size();
         let open_file_cost = core_options.source_split_open_file_cost();
         let entries = read_all_manifest_entries(file_io, table_path, 
&snapshot).await?;
@@ -321,8 +375,35 @@ impl<'a> TableScan<'a> {
                 .as_ref()
                 .and_then(|map| map.get(&PartitionBucket::new(partition, 
bucket)));
 
-            let file_groups = split_for_batch(data_files, target_split_size, 
open_file_cost);
-            for file_group in file_groups {
+            // Split files into groups: data evolution merges overlapping 
row_id ranges;
+            // multi-file groups need column-wise merge, single-file groups 
can be bin-packed.
+            let file_groups_with_raw: Vec<(Vec<DataFileMeta>, bool)> = if 
data_evolution_enabled {
+                let row_id_groups = group_by_overlapping_row_id(data_files);
+                let (singles, multis): (Vec<_>, Vec<_>) =
+                    row_id_groups.into_iter().partition(|g| g.len() == 1);
+
+                let mut result: Vec<(Vec<DataFileMeta>, bool)> = Vec::new();
+
+                // Multi-file groups: each becomes its own split, 
raw_convertible=false
+                for group in multis {
+                    result.push((group, false));
+                }
+
+                // Single-file groups: flatten and bin-pack, 
raw_convertible=true
+                let single_files: Vec<DataFileMeta> = 
singles.into_iter().flatten().collect();
+                for file_group in split_for_batch(single_files, 
target_split_size, open_file_cost) {
+                    result.push((file_group, true));
+                }
+
+                result
+            } else {
+                split_for_batch(data_files, target_split_size, open_file_cost)
+                    .into_iter()
+                    .map(|g| (g, true))
+                    .collect()
+            };
+
+            for (file_group, raw_convertible) in file_groups_with_raw {
                 let data_deletion_files = 
per_bucket_deletion_map.map(|per_bucket| {
                     file_group
                         .iter()
@@ -336,7 +417,8 @@ impl<'a> TableScan<'a> {
                     .with_bucket(bucket)
                     .with_bucket_path(bucket_path.clone())
                     .with_total_buckets(total_buckets)
-                    .with_data_files(file_group);
+                    .with_data_files(file_group)
+                    .with_raw_convertible(raw_convertible);
                 if let Some(files) = data_deletion_files {
                     builder = builder.with_data_deletion_files(files);
                 }
@@ -349,12 +431,49 @@ impl<'a> TableScan<'a> {
 
 #[cfg(test)]
 mod tests {
-    use super::partition_matches_predicate;
+    use super::{group_by_overlapping_row_id, partition_matches_predicate};
     use crate::spec::{
-        ArrayType, DataField, DataType, Datum, IntType, Predicate, 
PredicateBuilder,
-        PredicateOperator, VarCharType,
+        stats::BinaryTableStats, ArrayType, DataField, DataFileMeta, DataType, 
Datum, IntType,
+        Predicate, PredicateBuilder, PredicateOperator, VarCharType,
     };
     use crate::Error;
+    use chrono::{DateTime, Utc};
+
+    /// Helper to build a DataFileMeta with data evolution fields.
+    fn make_evo_file(
+        name: &str,
+        file_size: i64,
+        row_count: i64,
+        max_seq: i64,
+        first_row_id: Option<i64>,
+    ) -> DataFileMeta {
+        DataFileMeta {
+            file_name: name.to_string(),
+            file_size,
+            row_count,
+            min_key: Vec::new(),
+            max_key: Vec::new(),
+            key_stats: BinaryTableStats::new(Vec::new(), Vec::new(), 
Vec::new()),
+            value_stats: BinaryTableStats::new(Vec::new(), Vec::new(), 
Vec::new()),
+            min_sequence_number: 0,
+            max_sequence_number: max_seq,
+            schema_id: 0,
+            level: 0,
+            extra_files: Vec::new(),
+            creation_time: DateTime::<Utc>::from_timestamp(0, 0).unwrap(),
+            delete_row_count: None,
+            embedded_index: None,
+            first_row_id,
+            write_cols: None,
+        }
+    }
+
+    fn file_names(groups: &[Vec<DataFileMeta>]) -> Vec<Vec<&str>> {
+        groups
+            .iter()
+            .map(|g| g.iter().map(|f| f.file_name.as_str()).collect())
+            .collect()
+    }
 
     struct SerializedBinaryRowBuilder {
         arity: i32,
@@ -432,4 +551,91 @@ mod tests {
             "Expected extract_datum unsupported error, got: {err:?}"
         );
     }
+
+    // ==================== group_by_overlapping_row_id tests 
====================
+
+    #[test]
+    fn test_group_by_overlapping_row_id_empty() {
+        let result = group_by_overlapping_row_id(vec![]);
+        assert!(result.is_empty());
+    }
+
+    #[test]
+    fn test_group_by_overlapping_row_id_no_row_ids() {
+        // Files without first_row_id each become their own group.
+        // Sorted by (i64::MIN, -max_seq), so b(seq=2) before a(seq=1).
+        let files = vec![
+            make_evo_file("a", 10, 100, 1, None),
+            make_evo_file("b", 10, 100, 2, None),
+        ];
+        let groups = group_by_overlapping_row_id(files);
+        assert_eq!(file_names(&groups), vec![vec!["b"], vec!["a"]]);
+    }
+
+    #[test]
+    fn test_group_by_overlapping_row_id_same_range() {
+        // Two files with the same first_row_id and row_count → same range → 
one group.
+        let files = vec![
+            make_evo_file("a", 10, 100, 2, Some(0)),
+            make_evo_file("b", 10, 100, 1, Some(0)),
+        ];
+        let groups = group_by_overlapping_row_id(files);
+        assert_eq!(groups.len(), 1);
+        assert_eq!(file_names(&groups), vec![vec!["a", "b"]]);
+    }
+
+    #[test]
+    fn test_group_by_overlapping_row_id_overlapping_ranges() {
+        // File a: rows [0, 99], file b: rows [50, 149] → overlapping → one 
group.
+        let files = vec![
+            make_evo_file("a", 10, 100, 1, Some(0)),
+            make_evo_file("b", 10, 100, 2, Some(50)),
+        ];
+        let groups = group_by_overlapping_row_id(files);
+        assert_eq!(groups.len(), 1);
+        assert_eq!(file_names(&groups), vec![vec!["a", "b"]]);
+    }
+
+    #[test]
+    fn test_group_by_overlapping_row_id_non_overlapping() {
+        // File a: rows [0, 99], file b: rows [100, 199] → no overlap → two 
groups.
+        let files = vec![
+            make_evo_file("a", 10, 100, 1, Some(0)),
+            make_evo_file("b", 10, 100, 2, Some(100)),
+        ];
+        let groups = group_by_overlapping_row_id(files);
+        assert_eq!(groups.len(), 2);
+        assert_eq!(file_names(&groups), vec![vec!["a"], vec!["b"]]);
+    }
+
+    #[test]
+    fn test_group_by_overlapping_row_id_mixed() {
+        // a: [0,99], b: [0,99] (overlap), c: None (own group), d: [200,299]
+        // After sort: c(None→MIN) comes first, then b(seq=2), a(seq=1), d.
+        let files = vec![
+            make_evo_file("a", 10, 100, 1, Some(0)),
+            make_evo_file("b", 10, 100, 2, Some(0)),
+            make_evo_file("c", 10, 100, 3, None),
+            make_evo_file("d", 10, 100, 4, Some(200)),
+        ];
+        let groups = group_by_overlapping_row_id(files);
+        assert_eq!(
+            file_names(&groups),
+            vec![vec!["c"], vec!["b", "a"], vec!["d"]]
+        );
+    }
+
+    #[test]
+    fn test_group_by_overlapping_row_id_sorted_by_seq() {
+        // Within a group, files are sorted by (first_row_id, 
-max_sequence_number).
+        let files = vec![
+            make_evo_file("a", 10, 100, 1, Some(0)),
+            make_evo_file("b", 10, 100, 3, Some(0)),
+            make_evo_file("c", 10, 100, 2, Some(0)),
+        ];
+        let groups = group_by_overlapping_row_id(files);
+        assert_eq!(groups.len(), 1);
+        // Sorted by descending max_sequence_number: b(3), c(2), a(1)
+        assert_eq!(file_names(&groups), vec![vec!["b", "c", "a"]]);
+    }
 }
diff --git a/dev/spark/provision.py b/dev/spark/provision.py
index a813238..cdc538a 100644
--- a/dev/spark/provision.py
+++ b/dev/spark/provision.py
@@ -196,6 +196,70 @@ def main():
         """
     )
 
+    # ===== Data Evolution table: append-only with row tracking =====
+    # data-evolution.enabled + row-tracking.enabled allows partial column 
updates
+    # via MERGE INTO. This produces files with different write_cols covering 
the
+    # same row ID ranges, exercising the column-wise merge read path.
+    spark.sql(
+        """
+        CREATE TABLE IF NOT EXISTS data_evolution_table (
+            id INT,
+            name STRING,
+            value INT
+        ) USING paimon
+        TBLPROPERTIES (
+            'row-tracking.enabled' = 'true',
+            'data-evolution.enabled' = 'true'
+        )
+        """
+    )
+
+    # First batch: rows with row_id 0, 1, 2 — all columns written
+    spark.sql(
+        """
+        INSERT INTO data_evolution_table VALUES
+            (1, 'alice', 100),
+            (2, 'bob', 200),
+            (3, 'carol', 300)
+        """
+    )
+
+    # Second batch: rows with row_id 3, 4
+    spark.sql(
+        """
+        INSERT INTO data_evolution_table VALUES
+            (4, 'dave', 400),
+            (5, 'eve', 500)
+        """
+    )
+
+    # MERGE INTO: partial column update on existing rows.
+    # This writes new files containing only the updated column (name) with the
+    # same first_row_id, so the reader must merge columns from multiple files.
+    # Paimon 1.3.1 requires the source table to be a Paimon table.
+    spark.sql(
+        """
+        CREATE TABLE IF NOT EXISTS data_evolution_updates (
+            id INT,
+            name STRING
+        ) USING paimon
+        """
+    )
+    spark.sql(
+        """
+        INSERT INTO data_evolution_updates VALUES (1, 'alice-v2'), (3, 
'carol-v2')
+        """
+    )
+    spark.sql(
+        """
+        MERGE INTO data_evolution_table t
+        USING data_evolution_updates s
+        ON t.id = s.id
+        WHEN MATCHED THEN UPDATE SET t.name = s.name
+        """
+    )
+    spark.sql("DROP TABLE data_evolution_updates")
+
 
 if __name__ == "__main__":
     main()


Reply via email to