This is an automated email from the ASF dual-hosted git repository.
hope pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new cd3670c feat: support data evolution table mode (#193)
cd3670c is described below
commit cd3670cd4bf9137b998315c199e2d1c54100c85c
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Apr 2 21:48:52 2026 +0800
feat: support data evolution table mode (#193)
* feat: support data evolution table mode
---
Cargo.toml | 2 +
crates/integration_tests/tests/read_tables.rs | 94 +++++++
crates/paimon/Cargo.toml | 2 +
crates/paimon/src/arrow/reader.rs | 379 +++++++++++++++++++++-----
crates/paimon/src/spec/core_options.rs | 8 +
crates/paimon/src/spec/data_file.rs | 23 +-
crates/paimon/src/spec/objects_file.rs | 4 +
crates/paimon/src/table/bin_pack.rs | 2 +
crates/paimon/src/table/read_builder.rs | 8 +-
crates/paimon/src/table/source.rs | 17 ++
crates/paimon/src/table/table_scan.rs | 222 ++++++++++++++-
dev/spark/provision.py | 64 +++++
12 files changed, 749 insertions(+), 76 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 6282c09..2367c90 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -29,5 +29,7 @@ rust-version = "1.86.0"
[workspace.dependencies]
arrow-array = { version = "57.0", features = ["ffi"] }
+arrow-schema = "57.0"
+arrow-select = "57.0"
parquet = "57.0"
tokio = "1.39.2"
\ No newline at end of file
diff --git a/crates/integration_tests/tests/read_tables.rs
b/crates/integration_tests/tests/read_tables.rs
index 9d7e891..090197d 100644
--- a/crates/integration_tests/tests/read_tables.rs
+++ b/crates/integration_tests/tests/read_tables.rs
@@ -853,3 +853,97 @@ async fn test_rest_catalog_read_append_table() {
"REST catalog append table rows should match expected values"
);
}
+
+// ---------------------------------------------------------------------------
+// Data Evolution integration tests
+// ---------------------------------------------------------------------------
+
+/// Test reading a data-evolution enabled append-only table.
+///
+/// The table is provisioned by Spark with `data-evolution.enabled=true` and
+/// `row-tracking.enabled=true`. Multiple inserts produce files with
`first_row_id`
+/// set, exercising the data evolution scan and read path.
+#[tokio::test]
+async fn test_read_data_evolution_table() {
+ let (plan, batches) =
scan_and_read_with_fs_catalog("data_evolution_table", None).await;
+
+ assert!(
+ !plan.splits().is_empty(),
+ "Data evolution table should have at least one split"
+ );
+
+ let mut rows: Vec<(i32, String, i32)> = Vec::new();
+ for batch in &batches {
+ let id = batch
+ .column_by_name("id")
+ .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+ .expect("id");
+ let name = batch
+ .column_by_name("name")
+ .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+ .expect("name");
+ let value = batch
+ .column_by_name("value")
+ .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+ .expect("value");
+ for i in 0..batch.num_rows() {
+ rows.push((id.value(i), name.value(i).to_string(),
value.value(i)));
+ }
+ }
+ rows.sort_by_key(|(id, _, _)| *id);
+
+ assert_eq!(
+ rows,
+ vec![
+ (1, "alice-v2".into(), 100),
+ (2, "bob".into(), 200),
+ (3, "carol-v2".into(), 300),
+ (4, "dave".into(), 400),
+ (5, "eve".into(), 500),
+ ],
+ "Data evolution table should return merged rows after MERGE INTO"
+ );
+}
+
+/// Test reading a data-evolution table with column projection.
+#[tokio::test]
+async fn test_read_data_evolution_table_with_projection() {
+ let (_, batches) =
+ scan_and_read_with_fs_catalog("data_evolution_table", Some(&["value",
"id"])).await;
+
+ for batch in &batches {
+ let schema = batch.schema();
+ let field_names: Vec<&str> = schema.fields().iter().map(|f|
f.name().as_str()).collect();
+ assert_eq!(
+ field_names,
+ vec!["value", "id"],
+ "Projection order should be preserved"
+ );
+ assert!(
+ batch.column_by_name("name").is_none(),
+ "Non-projected column 'name' should be absent"
+ );
+ }
+
+ let mut rows: Vec<(i32, i32)> = Vec::new();
+ for batch in &batches {
+ let id = batch
+ .column_by_name("id")
+ .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+ .expect("id");
+ let value = batch
+ .column_by_name("value")
+ .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+ .expect("value");
+ for i in 0..batch.num_rows() {
+ rows.push((id.value(i), value.value(i)));
+ }
+ }
+ rows.sort_by_key(|(id, _)| *id);
+
+ assert_eq!(
+ rows,
+ vec![(1, 100), (2, 200), (3, 300), (4, 400), (5, 500)],
+ "Projected data evolution read should return correct values"
+ );
+}
diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml
index ab96fec..878d1c5 100644
--- a/crates/paimon/Cargo.toml
+++ b/crates/paimon/Cargo.toml
@@ -55,6 +55,8 @@ apache-avro = { version = "0.17", features = ["snappy",
"zstandard"] }
indexmap = "2.5.0"
roaring = "0.11"
arrow-array = { workspace = true }
+arrow-schema = { workspace = true }
+arrow-select = { workspace = true }
futures = "0.3"
parquet = { workspace = true, features = ["async", "zstd"] }
async-stream = "0.3.6"
diff --git a/crates/paimon/src/arrow/reader.rs
b/crates/paimon/src/arrow/reader.rs
index 693aaf8..7d57243 100644
--- a/crates/paimon/src/arrow/reader.rs
+++ b/crates/paimon/src/arrow/reader.rs
@@ -17,9 +17,12 @@
use crate::deletion_vector::{DeletionVector, DeletionVectorFactory};
use crate::io::{FileIO, FileRead, FileStatus};
-use crate::spec::DataField;
+use crate::spec::{DataField, DataFileMeta};
use crate::table::ArrowRecordBatchStream;
use crate::{DataSplit, Error};
+use arrow_array::RecordBatch;
+use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
+
use async_stream::try_stream;
use bytes::Bytes;
use futures::future::BoxFuture;
@@ -29,6 +32,7 @@ use parquet::arrow::async_reader::{AsyncFileReader,
MetadataFetch};
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::file::metadata::ParquetMetaDataReader;
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
+use std::collections::HashMap;
use std::ops::Range;
use std::sync::Arc;
use tokio::try_join;
@@ -82,7 +86,6 @@ impl ArrowReader {
pub fn read(self, data_splits: &[DataSplit]) ->
crate::Result<ArrowRecordBatchStream> {
let file_io = self.file_io.clone();
let batch_size = self.batch_size;
- // Owned list of splits so the stream does not hold references.
let splits: Vec<DataSplit> = data_splits.to_vec();
let read_type = self.read_type;
let projected_column_names: Vec<String> = read_type
@@ -92,7 +95,6 @@ impl ArrowReader {
Ok(try_stream! {
for split in splits {
// Create DV factory for this split only (like Java
createReader(partition, bucket, files, deletionFiles)).
- let core_data_files = split.data_files();
let dv_factory = if split
.data_deletion_files()
.is_some_and(|files| files.iter().any(Option::is_some))
@@ -100,7 +102,7 @@ impl ArrowReader {
Some(
DeletionVectorFactory::new(
&file_io,
- core_data_files,
+ split.data_files(),
split.data_deletion_files(),
)
.await?,
@@ -109,81 +111,326 @@ impl ArrowReader {
None
};
- for file_meta in core_data_files {
- let path_to_read = split.data_file_path(file_meta);
- if
!path_to_read.to_ascii_lowercase().ends_with(".parquet") {
- Err(Error::Unsupported {
- message: format!(
- "unsupported file format: only .parquet is
supported, got: {path_to_read}"
- ),
- })?
- }
+ for file_meta in split.data_files().to_vec() {
let dv = dv_factory
.as_ref()
- .and_then(|factory|
factory.get_deletion_vector(&file_meta.file_name));
+ .and_then(|factory|
factory.get_deletion_vector(&file_meta.file_name))
+ .cloned();
- let parquet_file = file_io.new_input(&path_to_read)?;
- let (parquet_metadata, parquet_reader) = try_join!(
- parquet_file.metadata(),
- parquet_file.reader()
+ let mut stream = read_single_file_stream(
+ file_io.clone(),
+ split.clone(),
+ file_meta,
+ projected_column_names.clone(),
+ batch_size,
+ dv,
)?;
- let arrow_file_reader =
ArrowFileReader::new(parquet_metadata, parquet_reader);
-
- let mut batch_stream_builder =
- ParquetRecordBatchStreamBuilder::new(arrow_file_reader)
- .await?;
- // ProjectionMask preserves parquet-schema order;
read_type order is restored below.
- let mask = {
- let parquet_schema =
batch_stream_builder.parquet_schema();
- ProjectionMask::columns(
- parquet_schema,
- projected_column_names.iter().map(String::as_str),
- )
- };
- batch_stream_builder =
batch_stream_builder.with_projection(mask);
-
- if let Some(dv) = dv {
- if !dv.is_empty() {
- let row_selection =
-
build_deletes_row_selection(batch_stream_builder.metadata().row_groups(), dv)?;
- batch_stream_builder =
batch_stream_builder.with_row_selection(row_selection);
+ while let Some(batch) = stream.next().await {
+ yield batch?;
+ }
+ }
+ }
+ }
+ .boxed())
+ }
+
+ /// Read data files in data evolution mode, merging columns from files
that share the same row ID range.
+ ///
+ /// Each DataSplit contains files grouped by `first_row_id`. Files within
a split may contain
+ /// different columns for the same logical rows. This method reads each
file and merges them
+ /// column-wise, respecting `max_sequence_number` for conflict resolution.
+ ///
+ /// `table_fields` is the full table schema fields, used to determine
which columns each file
+ /// provides when `write_cols` is not set.
+ pub fn read_data_evolution(
+ self,
+ data_splits: &[DataSplit],
+ table_fields: &[DataField],
+ ) -> crate::Result<ArrowRecordBatchStream> {
+ let file_io = self.file_io.clone();
+ let batch_size = self.batch_size;
+ let splits: Vec<DataSplit> = data_splits.to_vec();
+ let read_type = self.read_type;
+ let table_field_names: Vec<String> =
+ table_fields.iter().map(|f| f.name().to_string()).collect();
+ let projected_column_names: Vec<String> = read_type
+ .iter()
+ .map(|field| field.name().to_string())
+ .collect();
+
+ Ok(try_stream! {
+ for split in splits {
+ if split.raw_convertible() || split.data_files().len() == 1 {
+ // Single file or raw convertible — stream lazily without
loading all into memory.
+ for file_meta in split.data_files().to_vec() {
+ let mut stream = read_single_file_stream(
+ file_io.clone(), split.clone(), file_meta,
projected_column_names.clone(), batch_size, None,
+ )?;
+ while let Some(batch) = stream.next().await {
+ yield batch?;
}
}
- if let Some(size) = batch_size {
- batch_stream_builder =
batch_stream_builder.with_batch_size(size);
+ } else {
+ // Multiple files need column-wise merge — also streamed
lazily.
+ let mut merge_stream = merge_files_by_columns(
+ &file_io,
+ &split,
+ &projected_column_names,
+ &table_field_names,
+ batch_size,
+ )?;
+ while let Some(batch) = merge_stream.next().await {
+ yield batch?;
+ }
+ }
+ }
+ }
+ .boxed())
+ }
+}
+
+/// Read a single parquet file from a split, returning a lazy stream of
batches.
+/// Optionally applies a deletion vector.
+fn read_single_file_stream(
+ file_io: FileIO,
+ split: DataSplit,
+ file_meta: DataFileMeta,
+ projected_column_names: Vec<String>,
+ batch_size: Option<usize>,
+ dv: Option<Arc<DeletionVector>>,
+) -> crate::Result<ArrowRecordBatchStream> {
+ Ok(try_stream! {
+ let path_to_read = split.data_file_path(&file_meta);
+ if !path_to_read.to_ascii_lowercase().ends_with(".parquet") {
+ Err(Error::Unsupported {
+ message: format!(
+ "unsupported file format: only .parquet is supported, got:
{path_to_read}"
+ ),
+ })?
+ }
+
+ let parquet_file = file_io.new_input(&path_to_read)?;
+ let (parquet_metadata, parquet_reader) =
+ try_join!(parquet_file.metadata(), parquet_file.reader())?;
+ let arrow_file_reader = ArrowFileReader::new(parquet_metadata,
parquet_reader);
+
+ let mut batch_stream_builder =
ParquetRecordBatchStreamBuilder::new(arrow_file_reader).await?;
+
+ // Only project columns that exist in this file.
+ let parquet_schema = batch_stream_builder.parquet_schema().clone();
+ let file_column_names: Vec<&str> =
parquet_schema.columns().iter().map(|c| c.name()).collect();
+ let available_columns: Vec<&str> = projected_column_names
+ .iter()
+ .filter(|name| file_column_names.contains(&name.as_str()))
+ .map(String::as_str)
+ .collect();
+
+ let mask = ProjectionMask::columns(&parquet_schema,
available_columns.iter().copied());
+ batch_stream_builder = batch_stream_builder.with_projection(mask);
+
+ if let Some(ref dv) = dv {
+ if !dv.is_empty() {
+ let row_selection =
+
build_deletes_row_selection(batch_stream_builder.metadata().row_groups(), dv)?;
+ batch_stream_builder =
batch_stream_builder.with_row_selection(row_selection);
+ }
+ }
+ if let Some(size) = batch_size {
+ batch_stream_builder = batch_stream_builder.with_batch_size(size);
+ }
+
+ let mut batch_stream = batch_stream_builder.build()?;
+ while let Some(batch) = batch_stream.next().await {
+ let batch = batch?;
+ // Reorder columns from parquet-schema order to
projected_column_names order,
+ // consistent with the normal read() path.
+ let reorder_indices: Vec<usize> = projected_column_names
+ .iter()
+ .filter_map(|name| batch.schema().index_of(name).ok())
+ .collect();
+ if reorder_indices.len() == batch.num_columns() {
+ yield batch.project(&reorder_indices).map_err(|e| {
+ Error::UnexpectedError {
+ message: "Failed to reorder projected
columns".to_string(),
+ source: Some(Box::new(e)),
}
- let mut batch_stream = batch_stream_builder.build()?;
-
- while let Some(batch) = batch_stream.next().await {
- let batch = batch?;
- // Reorder columns from parquet-schema order to
read_type order.
- // Every projected column must exist in the batch; a
missing
- // column indicates schema mismatch and must not be
silenced.
- let reorder_indices: Vec<usize> =
projected_column_names
- .iter()
- .map(|name| {
- batch.schema().index_of(name).map_err(|_| {
- Error::UnexpectedError {
- message: format!(
- "Projected column '{name}' not
found in Parquet batch schema of file {path_to_read}"
- ),
- source: None,
- }
- })
- })
- .collect::<crate::Result<Vec<_>>>()?;
- yield batch.project(&reorder_indices).map_err(|e| {
- Error::UnexpectedError {
- message: "Failed to reorder projected
columns".to_string(),
- source: Some(Box::new(e)),
+ })?;
+ } else {
+ // Not all projected columns exist in this file (data
evolution case),
+ // return as-is; the caller (merge_files_by_columns) handles
missing columns.
+ yield batch;
+ }
+ }
+ }
+ .boxed())
+}
+
+/// Merge multiple files column-wise for data evolution, streaming with
bounded memory.
+///
+/// Opens all file readers simultaneously and maintains a cursor (current
batch + offset)
+/// per file. Each poll slices up to `batch_size` rows from each file's
current batch,
+/// assembles columns from the winning files, and yields the merged batch.
When a file's
+/// current batch is exhausted, the next batch is read from its stream on
demand.
+fn merge_files_by_columns(
+ file_io: &FileIO,
+ split: &DataSplit,
+ projected_column_names: &[String],
+ table_field_names: &[String],
+ batch_size: Option<usize>,
+) -> crate::Result<ArrowRecordBatchStream> {
+ let data_files = split.data_files();
+ if data_files.is_empty() {
+ return Ok(futures::stream::empty().boxed());
+ }
+
+ // Determine which columns each file provides and resolve conflicts by
max_sequence_number.
+ // column_name -> (file_index, max_sequence_number)
+ let mut column_source: HashMap<String, (usize, i64)> = HashMap::new();
+
+ for (file_idx, file_meta) in data_files.iter().enumerate() {
+ let file_columns: Vec<String> = if let Some(ref wc) =
file_meta.write_cols {
+ wc.clone()
+ } else {
+ table_field_names.to_vec()
+ };
+
+ for col in &file_columns {
+ let entry = column_source
+ .entry(col.clone())
+ .or_insert((file_idx, i64::MIN));
+ if file_meta.max_sequence_number > entry.1 {
+ *entry = (file_idx, file_meta.max_sequence_number);
+ }
+ }
+ }
+
+ // For each file, determine which projected columns to read from it.
+ // file_index -> Vec<column_name>
+ let mut file_read_columns: HashMap<usize, Vec<String>> = HashMap::new();
+ for col_name in projected_column_names {
+ if let Some(&(file_idx, _)) = column_source.get(col_name) {
+ file_read_columns
+ .entry(file_idx)
+ .or_default()
+ .push(col_name.clone());
+ }
+ }
+
+ // For each projected column, record (file_index, column_name) for
assembly.
+ let column_plan: Vec<(Option<usize>, String)> = projected_column_names
+ .iter()
+ .map(|col_name| {
+ let file_idx = column_source.get(col_name).map(|&(idx, _)| idx);
+ (file_idx, col_name.clone())
+ })
+ .collect();
+
+ // Collect which file indices we need to open streams for.
+ let active_file_indices: Vec<usize> =
file_read_columns.keys().copied().collect();
+
+ // Build owned data for the stream closure.
+ let file_io = file_io.clone();
+ let split = split.clone();
+ let data_files: Vec<DataFileMeta> = data_files.to_vec();
+ let projected_column_names = projected_column_names.to_vec();
+ let output_batch_size = batch_size.unwrap_or(1024);
+
+ Ok(try_stream! {
+ // Open a stream for each active file.
+ let mut file_streams: HashMap<usize, ArrowRecordBatchStream> =
HashMap::new();
+ for &file_idx in &active_file_indices {
+ let stream = read_single_file_stream(
+ file_io.clone(),
+ split.clone(),
+ data_files[file_idx].clone(),
+ projected_column_names.clone(),
+ batch_size,
+ None,
+ )?;
+ file_streams.insert(file_idx, stream);
+ }
+
+ // Per-file cursor: current batch + offset within it.
+ let mut file_cursors: HashMap<usize, (RecordBatch, usize)> =
HashMap::new();
+
+ loop {
+ // Ensure each active file has a current batch. If a file's cursor
is exhausted
+ // or not yet initialized, read the next batch from its stream.
+ for &file_idx in &active_file_indices {
+ let needs_next = match file_cursors.get(&file_idx) {
+ None => true,
+ Some((batch, offset)) => *offset >= batch.num_rows(),
+ };
+ if needs_next {
+ file_cursors.remove(&file_idx);
+ if let Some(stream) = file_streams.get_mut(&file_idx) {
+ if let Some(batch_result) = stream.next().await {
+ let batch = batch_result?;
+ if batch.num_rows() > 0 {
+ file_cursors.insert(file_idx, (batch, 0));
}
- })?;
+ }
+ }
+ }
+ }
+
+ // All active files must have a cursor to assemble a valid row.
+ // If any file has no cursor (stream exhausted), we're done.
+ if active_file_indices.iter().any(|idx|
!file_cursors.contains_key(idx)) {
+ break;
+ }
+
+ // Determine how many rows we can emit: min of remaining rows
across all files.
+ let remaining: usize = active_file_indices
+ .iter()
+ .map(|idx| {
+ let (batch, offset) = file_cursors.get(idx).unwrap();
+ batch.num_rows() - offset
+ })
+ .min()
+ .unwrap_or(0);
+
+ if remaining == 0 {
+ break;
+ }
+
+ let rows_to_emit = remaining.min(output_batch_size);
+
+ // Slice each file's current batch and assemble columns.
+ let mut columns: Vec<Arc<dyn arrow_array::Array>> =
+ Vec::with_capacity(column_plan.len());
+ let mut schema_fields: Vec<ArrowField> =
Vec::with_capacity(column_plan.len());
+
+ for (file_idx_opt, col_name) in &column_plan {
+ if let Some(file_idx) = file_idx_opt {
+ if let Some((batch, offset)) = file_cursors.get(file_idx) {
+ if let Ok(col_idx) = batch.schema().index_of(col_name)
{
+ let col = batch.column(col_idx).slice(*offset,
rows_to_emit);
+ columns.push(col);
+
schema_fields.push(batch.schema().field(col_idx).clone());
+ }
}
}
}
+
+ // Advance all cursors.
+ for &file_idx in &active_file_indices {
+ if let Some((_, ref mut offset)) =
file_cursors.get_mut(&file_idx) {
+ *offset += rows_to_emit;
+ }
+ }
+
+ if !columns.is_empty() {
+ let schema = Arc::new(ArrowSchema::new(schema_fields));
+ let merged = RecordBatch::try_new(schema, columns).map_err(|e|
Error::UnexpectedError {
+ message: format!("Failed to build merged RecordBatch:
{e}"),
+ source: Some(Box::new(e)),
+ })?;
+ yield merged;
+ }
}
- .boxed())
}
+ .boxed())
}
/// Builds a Parquet [RowSelection] from deletion vector.
diff --git a/crates/paimon/src/spec/core_options.rs
b/crates/paimon/src/spec/core_options.rs
index f6164d0..0f15922 100644
--- a/crates/paimon/src/spec/core_options.rs
+++ b/crates/paimon/src/spec/core_options.rs
@@ -18,6 +18,7 @@
use std::collections::HashMap;
const DELETION_VECTORS_ENABLED_OPTION: &str = "deletion-vectors.enabled";
+const DATA_EVOLUTION_ENABLED_OPTION: &str = "data-evolution.enabled";
const SOURCE_SPLIT_TARGET_SIZE_OPTION: &str = "source.split.target-size";
const SOURCE_SPLIT_OPEN_FILE_COST_OPTION: &str = "source.split.open-file-cost";
const PARTITION_DEFAULT_NAME_OPTION: &str = "partition.default-name";
@@ -46,6 +47,13 @@ impl<'a> CoreOptions<'a> {
.unwrap_or(false)
}
+ pub fn data_evolution_enabled(&self) -> bool {
+ self.options
+ .get(DATA_EVOLUTION_ENABLED_OPTION)
+ .map(|value| value.eq_ignore_ascii_case("true"))
+ .unwrap_or(false)
+ }
+
pub fn source_split_target_size(&self) -> i64 {
self.options
.get(SOURCE_SPLIT_TARGET_SIZE_OPTION)
diff --git a/crates/paimon/src/spec/data_file.rs
b/crates/paimon/src/spec/data_file.rs
index 8733b1d..48bfe78 100644
--- a/crates/paimon/src/spec/data_file.rs
+++ b/crates/paimon/src/spec/data_file.rs
@@ -396,6 +396,22 @@ pub struct DataFileMeta {
// file index filter bytes, if it is small, store in data file meta
#[serde(rename = "_EMBEDDED_FILE_INDEX", with = "serde_bytes")]
pub embedded_index: Option<Vec<u8>>,
+
+ /// The starting row ID for this file's data (used in data evolution mode).
+ #[serde(
+ rename = "_FIRST_ROW_ID",
+ default,
+ skip_serializing_if = "Option::is_none"
+ )]
+ pub first_row_id: Option<i64>,
+
+ /// Which table columns this file contains (used in data evolution mode).
+ #[serde(
+ rename = "_WRITE_COLS",
+ default,
+ skip_serializing_if = "Option::is_none"
+ )]
+ pub write_cols: Option<Vec<String>>,
}
impl Display for DataFileMeta {
@@ -405,7 +421,12 @@ impl Display for DataFileMeta {
}
#[allow(dead_code)]
-impl DataFileMeta {}
+impl DataFileMeta {
+ /// Returns the row ID range `[first_row_id, first_row_id + row_count -
1]` if `first_row_id` is set.
+ pub fn row_id_range(&self) -> Option<(i64, i64)> {
+ self.first_row_id.map(|fid| (fid, fid + self.row_count - 1))
+ }
+}
#[cfg(test)]
mod tests {
diff --git a/crates/paimon/src/spec/objects_file.rs
b/crates/paimon/src/spec/objects_file.rs
index fde8e20..9685f23 100644
--- a/crates/paimon/src/spec/objects_file.rs
+++ b/crates/paimon/src/spec/objects_file.rs
@@ -120,6 +120,8 @@ mod tests {
.unwrap(),
delete_row_count: Some(0),
embedded_index: None,
+ first_row_id: None,
+ write_cols: None,
},
2
),
@@ -154,6 +156,8 @@ mod tests {
.unwrap(),
delete_row_count: Some(1),
embedded_index: None,
+ first_row_id: None,
+ write_cols: None,
},
2
),
diff --git a/crates/paimon/src/table/bin_pack.rs
b/crates/paimon/src/table/bin_pack.rs
index 2d64e6b..6a567cd 100644
--- a/crates/paimon/src/table/bin_pack.rs
+++ b/crates/paimon/src/table/bin_pack.rs
@@ -96,6 +96,8 @@ mod tests {
creation_time: DateTime::<Utc>::from_timestamp(0, 0).unwrap(),
delete_row_count: None,
embedded_index: None,
+ first_row_id: None,
+ write_cols: None,
}
}
diff --git a/crates/paimon/src/table/read_builder.rs
b/crates/paimon/src/table/read_builder.rs
index e0aae28..65c89b8 100644
--- a/crates/paimon/src/table/read_builder.rs
+++ b/crates/paimon/src/table/read_builder.rs
@@ -155,6 +155,7 @@ impl<'a> TableRead<'a> {
let has_primary_keys = !self.table.schema.primary_keys().is_empty();
let core_options = CoreOptions::new(self.table.schema.options());
let deletion_vectors_enabled = core_options.deletion_vectors_enabled();
+ let data_evolution = core_options.data_evolution_enabled();
if has_primary_keys && !deletion_vectors_enabled {
return Err(Error::Unsupported {
@@ -167,6 +168,11 @@ impl<'a> TableRead<'a> {
let reader =
ArrowReaderBuilder::new(self.table.file_io.clone()).build(self.read_type().to_vec());
- reader.read(data_splits)
+
+ if data_evolution {
+ reader.read_data_evolution(data_splits, self.table.schema.fields())
+ } else {
+ reader.read(data_splits)
+ }
}
}
diff --git a/crates/paimon/src/table/source.rs
b/crates/paimon/src/table/source.rs
index ca9ba9b..7416ca9 100644
--- a/crates/paimon/src/table/source.rs
+++ b/crates/paimon/src/table/source.rs
@@ -108,6 +108,10 @@ pub struct DataSplit {
/// Deletion file for each data file, same order as `data_files`.
/// `None` at index `i` means no deletion file for `data_files[i]`
(matches Java getDeletionFiles() / List<DeletionFile> with null elements).
data_deletion_files: Option<Vec<Option<DeletionFile>>>,
+ /// Whether this split can be read file-by-file without merging.
+ /// `false` when files need column-wise merge (e.g. data evolution) or
+ /// key-value merge (e.g. primary key tables without deletion vectors).
+ raw_convertible: bool,
}
impl DataSplit {
@@ -136,6 +140,11 @@ impl DataSplit {
self.data_deletion_files.as_deref()
}
+ /// Whether this split can be read without column-wise merging.
+ pub fn raw_convertible(&self) -> bool {
+ self.raw_convertible
+ }
+
/// Returns the deletion file for the data file at the given index, if
any. `None` at that index means no deletion file.
pub fn deletion_file_for_data_file_index(&self, index: usize) ->
Option<&DeletionFile> {
self.data_deletion_files
@@ -194,6 +203,7 @@ pub struct DataSplitBuilder {
data_files: Option<Vec<DataFileMeta>>,
/// Same length as data_files; `None` at index i = no deletion file for
data_files[i].
data_deletion_files: Option<Vec<Option<DeletionFile>>>,
+ raw_convertible: bool,
}
impl DataSplitBuilder {
@@ -206,6 +216,7 @@ impl DataSplitBuilder {
total_buckets: -1,
data_files: None,
data_deletion_files: None,
+ raw_convertible: false,
}
}
@@ -243,6 +254,11 @@ impl DataSplitBuilder {
self
}
+ pub fn with_raw_convertible(mut self, raw_convertible: bool) -> Self {
+ self.raw_convertible = raw_convertible;
+ self
+ }
+
pub fn build(self) -> crate::Result<DataSplit> {
if self.snapshot_id == -1 {
return Err(crate::Error::UnexpectedError {
@@ -294,6 +310,7 @@ impl DataSplitBuilder {
total_buckets: self.total_buckets,
data_files,
data_deletion_files: self.data_deletion_files,
+ raw_convertible: self.raw_convertible,
})
}
}
diff --git a/crates/paimon/src/table/table_scan.rs
b/crates/paimon/src/table/table_scan.rs
index 031d15f..9992e0b 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -23,8 +23,8 @@
use super::Table;
use crate::io::FileIO;
use crate::spec::{
- eval_row, field_idx_to_partition_idx, BinaryRow, CoreOptions, FileKind,
IndexManifest,
- ManifestEntry, PartitionComputer, Predicate, Snapshot,
+ eval_row, field_idx_to_partition_idx, BinaryRow, CoreOptions,
DataFileMeta, FileKind,
+ IndexManifest, ManifestEntry, PartitionComputer, Predicate, Snapshot,
};
use crate::table::bin_pack::split_for_batch;
use crate::table::source::{DataSplitBuilder, DeletionFile, PartitionBucket,
Plan};
@@ -166,6 +166,59 @@ fn partition_matches_predicate(
}
}
+/// Groups data files by overlapping `row_id_range` for data evolution.
+///
+/// Files are sorted by `(first_row_id, -max_sequence_number)`. Files whose
row ID ranges
+/// overlap are merged into the same group (they contain different columns for
the same rows).
+/// Files without `first_row_id` become their own group.
+///
+/// Reference:
[DataEvolutionSplitGenerator](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitGenerator.java)
+fn group_by_overlapping_row_id(mut files: Vec<DataFileMeta>) ->
Vec<Vec<DataFileMeta>> {
+ files.sort_by(|a, b| {
+ let a_row_id = a.first_row_id.unwrap_or(i64::MIN);
+ let b_row_id = b.first_row_id.unwrap_or(i64::MIN);
+ a_row_id
+ .cmp(&b_row_id)
+ .then_with(|| b.max_sequence_number.cmp(&a.max_sequence_number))
+ });
+
+ let mut result: Vec<Vec<DataFileMeta>> = Vec::new();
+ let mut current_group: Vec<DataFileMeta> = Vec::new();
+ // Track the end of the current merged row_id range.
+ let mut current_range_end: i64 = i64::MIN;
+
+ for file in files {
+ match file.row_id_range() {
+ None => {
+ // Files without first_row_id become their own group.
+ if !current_group.is_empty() {
+ result.push(std::mem::take(&mut current_group));
+ current_range_end = i64::MIN;
+ }
+ result.push(vec![file]);
+ }
+ Some((start, end)) => {
+ if current_group.is_empty() || start <= current_range_end {
+ // Overlaps with current range — merge into current group.
+ if end > current_range_end {
+ current_range_end = end;
+ }
+ current_group.push(file);
+ } else {
+ // No overlap — start a new group.
+ result.push(std::mem::take(&mut current_group));
+ current_range_end = end;
+ current_group.push(file);
+ }
+ }
+ }
+ }
+ if !current_group.is_empty() {
+ result.push(current_group);
+ }
+ result
+}
+
/// TableScan for full table scan (no incremental, no predicate).
///
/// Reference:
[pypaimon.read.table_scan.TableScan](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/read/table_scan.py)
@@ -198,6 +251,7 @@ impl<'a> TableScan<'a> {
let table_path = self.table.location();
let core_options = CoreOptions::new(self.table.schema().options());
let deletion_vectors_enabled = core_options.deletion_vectors_enabled();
+ let data_evolution_enabled = core_options.data_evolution_enabled();
let target_split_size = core_options.source_split_target_size();
let open_file_cost = core_options.source_split_open_file_cost();
let entries = read_all_manifest_entries(file_io, table_path,
&snapshot).await?;
@@ -321,8 +375,35 @@ impl<'a> TableScan<'a> {
.as_ref()
.and_then(|map| map.get(&PartitionBucket::new(partition,
bucket)));
- let file_groups = split_for_batch(data_files, target_split_size,
open_file_cost);
- for file_group in file_groups {
+ // Split files into groups: data evolution merges overlapping
row_id ranges;
+ // multi-file groups need column-wise merge, single-file groups
can be bin-packed.
+ let file_groups_with_raw: Vec<(Vec<DataFileMeta>, bool)> = if
data_evolution_enabled {
+ let row_id_groups = group_by_overlapping_row_id(data_files);
+ let (singles, multis): (Vec<_>, Vec<_>) =
+ row_id_groups.into_iter().partition(|g| g.len() == 1);
+
+ let mut result: Vec<(Vec<DataFileMeta>, bool)> = Vec::new();
+
+ // Multi-file groups: each becomes its own split,
raw_convertible=false
+ for group in multis {
+ result.push((group, false));
+ }
+
+ // Single-file groups: flatten and bin-pack,
raw_convertible=true
+ let single_files: Vec<DataFileMeta> =
singles.into_iter().flatten().collect();
+ for file_group in split_for_batch(single_files,
target_split_size, open_file_cost) {
+ result.push((file_group, true));
+ }
+
+ result
+ } else {
+ split_for_batch(data_files, target_split_size, open_file_cost)
+ .into_iter()
+ .map(|g| (g, true))
+ .collect()
+ };
+
+ for (file_group, raw_convertible) in file_groups_with_raw {
let data_deletion_files =
per_bucket_deletion_map.map(|per_bucket| {
file_group
.iter()
@@ -336,7 +417,8 @@ impl<'a> TableScan<'a> {
.with_bucket(bucket)
.with_bucket_path(bucket_path.clone())
.with_total_buckets(total_buckets)
- .with_data_files(file_group);
+ .with_data_files(file_group)
+ .with_raw_convertible(raw_convertible);
if let Some(files) = data_deletion_files {
builder = builder.with_data_deletion_files(files);
}
@@ -349,12 +431,49 @@ impl<'a> TableScan<'a> {
#[cfg(test)]
mod tests {
- use super::partition_matches_predicate;
+ use super::{group_by_overlapping_row_id, partition_matches_predicate};
use crate::spec::{
- ArrayType, DataField, DataType, Datum, IntType, Predicate,
PredicateBuilder,
- PredicateOperator, VarCharType,
+ stats::BinaryTableStats, ArrayType, DataField, DataFileMeta, DataType,
Datum, IntType,
+ Predicate, PredicateBuilder, PredicateOperator, VarCharType,
};
use crate::Error;
+ use chrono::{DateTime, Utc};
+
+ /// Helper to build a DataFileMeta with data evolution fields.
+ fn make_evo_file(
+ name: &str,
+ file_size: i64,
+ row_count: i64,
+ max_seq: i64,
+ first_row_id: Option<i64>,
+ ) -> DataFileMeta {
+ DataFileMeta {
+ file_name: name.to_string(),
+ file_size,
+ row_count,
+ min_key: Vec::new(),
+ max_key: Vec::new(),
+ key_stats: BinaryTableStats::new(Vec::new(), Vec::new(),
Vec::new()),
+ value_stats: BinaryTableStats::new(Vec::new(), Vec::new(),
Vec::new()),
+ min_sequence_number: 0,
+ max_sequence_number: max_seq,
+ schema_id: 0,
+ level: 0,
+ extra_files: Vec::new(),
+ creation_time: DateTime::<Utc>::from_timestamp(0, 0).unwrap(),
+ delete_row_count: None,
+ embedded_index: None,
+ first_row_id,
+ write_cols: None,
+ }
+ }
+
+ fn file_names(groups: &[Vec<DataFileMeta>]) -> Vec<Vec<&str>> {
+ groups
+ .iter()
+ .map(|g| g.iter().map(|f| f.file_name.as_str()).collect())
+ .collect()
+ }
struct SerializedBinaryRowBuilder {
arity: i32,
@@ -432,4 +551,91 @@ mod tests {
"Expected extract_datum unsupported error, got: {err:?}"
);
}
+
+ // ==================== group_by_overlapping_row_id tests
====================
+
+ #[test]
+ fn test_group_by_overlapping_row_id_empty() {
+ let result = group_by_overlapping_row_id(vec![]);
+ assert!(result.is_empty());
+ }
+
+ #[test]
+ fn test_group_by_overlapping_row_id_no_row_ids() {
+ // Files without first_row_id each become their own group.
+ // Sorted by (i64::MIN, -max_seq), so b(seq=2) before a(seq=1).
+ let files = vec![
+ make_evo_file("a", 10, 100, 1, None),
+ make_evo_file("b", 10, 100, 2, None),
+ ];
+ let groups = group_by_overlapping_row_id(files);
+ assert_eq!(file_names(&groups), vec![vec!["b"], vec!["a"]]);
+ }
+
+ #[test]
+ fn test_group_by_overlapping_row_id_same_range() {
+ // Two files with the same first_row_id and row_count → same range →
one group.
+ let files = vec![
+ make_evo_file("a", 10, 100, 2, Some(0)),
+ make_evo_file("b", 10, 100, 1, Some(0)),
+ ];
+ let groups = group_by_overlapping_row_id(files);
+ assert_eq!(groups.len(), 1);
+ assert_eq!(file_names(&groups), vec![vec!["a", "b"]]);
+ }
+
+ #[test]
+ fn test_group_by_overlapping_row_id_overlapping_ranges() {
+ // File a: rows [0, 99], file b: rows [50, 149] → overlapping → one
group.
+ let files = vec![
+ make_evo_file("a", 10, 100, 1, Some(0)),
+ make_evo_file("b", 10, 100, 2, Some(50)),
+ ];
+ let groups = group_by_overlapping_row_id(files);
+ assert_eq!(groups.len(), 1);
+ assert_eq!(file_names(&groups), vec![vec!["a", "b"]]);
+ }
+
+ #[test]
+ fn test_group_by_overlapping_row_id_non_overlapping() {
+ // File a: rows [0, 99], file b: rows [100, 199] → no overlap → two
groups.
+ let files = vec![
+ make_evo_file("a", 10, 100, 1, Some(0)),
+ make_evo_file("b", 10, 100, 2, Some(100)),
+ ];
+ let groups = group_by_overlapping_row_id(files);
+ assert_eq!(groups.len(), 2);
+ assert_eq!(file_names(&groups), vec![vec!["a"], vec!["b"]]);
+ }
+
+ #[test]
+ fn test_group_by_overlapping_row_id_mixed() {
+ // a: [0,99], b: [0,99] (overlap), c: None (own group), d: [200,299]
+ // After sort: c(None→MIN) comes first, then b(seq=2), a(seq=1), d.
+ let files = vec![
+ make_evo_file("a", 10, 100, 1, Some(0)),
+ make_evo_file("b", 10, 100, 2, Some(0)),
+ make_evo_file("c", 10, 100, 3, None),
+ make_evo_file("d", 10, 100, 4, Some(200)),
+ ];
+ let groups = group_by_overlapping_row_id(files);
+ assert_eq!(
+ file_names(&groups),
+ vec![vec!["c"], vec!["b", "a"], vec!["d"]]
+ );
+ }
+
+ #[test]
+ fn test_group_by_overlapping_row_id_sorted_by_seq() {
+ // Within a group, files are sorted by (first_row_id,
-max_sequence_number).
+ let files = vec![
+ make_evo_file("a", 10, 100, 1, Some(0)),
+ make_evo_file("b", 10, 100, 3, Some(0)),
+ make_evo_file("c", 10, 100, 2, Some(0)),
+ ];
+ let groups = group_by_overlapping_row_id(files);
+ assert_eq!(groups.len(), 1);
+ // Sorted by descending max_sequence_number: b(3), c(2), a(1)
+ assert_eq!(file_names(&groups), vec![vec!["b", "c", "a"]]);
+ }
}
diff --git a/dev/spark/provision.py b/dev/spark/provision.py
index a813238..cdc538a 100644
--- a/dev/spark/provision.py
+++ b/dev/spark/provision.py
@@ -196,6 +196,70 @@ def main():
"""
)
+ # ===== Data Evolution table: append-only with row tracking =====
+ # data-evolution.enabled + row-tracking.enabled allows partial column
updates
+ # via MERGE INTO. This produces files with different write_cols covering
the
+ # same row ID ranges, exercising the column-wise merge read path.
+ spark.sql(
+ """
+ CREATE TABLE IF NOT EXISTS data_evolution_table (
+ id INT,
+ name STRING,
+ value INT
+ ) USING paimon
+ TBLPROPERTIES (
+ 'row-tracking.enabled' = 'true',
+ 'data-evolution.enabled' = 'true'
+ )
+ """
+ )
+
+ # First batch: rows with row_id 0, 1, 2 — all columns written
+ spark.sql(
+ """
+ INSERT INTO data_evolution_table VALUES
+ (1, 'alice', 100),
+ (2, 'bob', 200),
+ (3, 'carol', 300)
+ """
+ )
+
+ # Second batch: rows with row_id 3, 4
+ spark.sql(
+ """
+ INSERT INTO data_evolution_table VALUES
+ (4, 'dave', 400),
+ (5, 'eve', 500)
+ """
+ )
+
+ # MERGE INTO: partial column update on existing rows.
+ # This writes new files containing only the updated column (name) with the
+ # same first_row_id, so the reader must merge columns from multiple files.
+ # Paimon 1.3.1 requires the source table to be a Paimon table.
+ spark.sql(
+ """
+ CREATE TABLE IF NOT EXISTS data_evolution_updates (
+ id INT,
+ name STRING
+ ) USING paimon
+ """
+ )
+ spark.sql(
+ """
+ INSERT INTO data_evolution_updates VALUES (1, 'alice-v2'), (3,
'carol-v2')
+ """
+ )
+ spark.sql(
+ """
+ MERGE INTO data_evolution_table t
+ USING data_evolution_updates s
+ ON t.id = s.id
+ WHEN MATCHED THEN UPDATE SET t.name = s.name
+ """
+ )
+ spark.sql("DROP TABLE data_evolution_updates")
+
if __name__ == "__main__":
main()