This is an automated email from the ASF dual-hosted git repository.

hope pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 4ff1c11  feat: support schema evolution read with SchemaManager (#197)
4ff1c11 is described below

commit 4ff1c119ebbe9b26546dda6667e8534b89ff53e7
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Apr 3 22:06:41 2026 +0800

    feat: support schema evolution read with SchemaManager (#197)
    
    * feat(arrow): support schema evolution read with SchemaManager and 
field-ID-based index mapping
---
 Cargo.toml                                         |   1 +
 crates/integration_tests/tests/read_tables.rs      | 221 ++++++++++-
 crates/integrations/datafusion/src/lib.rs          |   2 -
 crates/integrations/datafusion/src/schema.rs       | 116 ------
 crates/integrations/datafusion/src/table/mod.rs    |   4 +-
 .../integrations/datafusion/tests/read_tables.rs   |  57 ++-
 crates/paimon/Cargo.toml                           |   1 +
 crates/paimon/src/arrow/mod.rs                     | 123 ++++++
 crates/paimon/src/arrow/reader.rs                  | 427 +++++++++++++++------
 crates/paimon/src/arrow/schema_evolution.rs        | 151 ++++++++
 crates/paimon/src/lib.rs                           |   2 +-
 crates/paimon/src/spec/types.rs                    |  16 +
 crates/paimon/src/table/mod.rs                     |  11 +
 crates/paimon/src/table/read_builder.rs            |   8 +-
 crates/paimon/src/table/schema_manager.rs          | 103 +++++
 dev/spark/provision.py                             | 223 +++++++++++
 16 files changed, 1231 insertions(+), 235 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index d7ebd00..f1f7295 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -30,6 +30,7 @@ rust-version = "1.86.0"
 [workspace.dependencies]
 arrow-array = { version = "57.0", features = ["ffi"] }
 arrow-schema = "57.0"
+arrow-cast = "57.0"
 arrow-select = "57.0"
 parquet = "57.0"
 tokio = "1.39.2"
\ No newline at end of file
diff --git a/crates/integration_tests/tests/read_tables.rs 
b/crates/integration_tests/tests/read_tables.rs
index 7027110..2775978 100644
--- a/crates/integration_tests/tests/read_tables.rs
+++ b/crates/integration_tests/tests/read_tables.rs
@@ -17,7 +17,7 @@
 
 //! Integration tests for reading Paimon tables provisioned by Spark.
 
-use arrow_array::{Int32Array, RecordBatch, StringArray};
+use arrow_array::{Array, ArrowPrimitiveType, Int32Array, Int64Array, 
RecordBatch, StringArray};
 use futures::TryStreamExt;
 use paimon::api::ConfigResponse;
 use paimon::catalog::{Identifier, RESTCatalog};
@@ -999,3 +999,222 @@ async fn test_limit_pushdown() {
         );
     }
 }
+
+// ---------------------------------------------------------------------------
+// Schema Evolution integration tests
+// ---------------------------------------------------------------------------
+
+/// Test reading a table after ALTER TABLE ADD COLUMNS.
+/// Old data files lack the new column; reader should fill nulls.
+#[tokio::test]
+async fn test_read_schema_evolution_add_column() {
+    let (_, batches) = 
scan_and_read_with_fs_catalog("schema_evolution_add_column", None).await;
+
+    let mut rows: Vec<(i32, String, Option<i32>)> = Vec::new();
+    for batch in &batches {
+        let id = batch
+            .column_by_name("id")
+            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+            .expect("id");
+        let name = batch
+            .column_by_name("name")
+            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+            .expect("name");
+        let age = batch
+            .column_by_name("age")
+            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+            .expect("age");
+        for i in 0..batch.num_rows() {
+            let age_val = if age.is_null(i) {
+                None
+            } else {
+                Some(age.value(i))
+            };
+            rows.push((id.value(i), name.value(i).to_string(), age_val));
+        }
+    }
+    rows.sort_by_key(|(id, _, _)| *id);
+
+    assert_eq!(
+        rows,
+        vec![
+            (1, "alice".into(), None),
+            (2, "bob".into(), None),
+            (3, "carol".into(), Some(30)),
+            (4, "dave".into(), Some(40)),
+        ],
+        "Old rows should have null for added column 'age'"
+    );
+}
+
+/// Test reading a table after ALTER TABLE ALTER COLUMN TYPE (INT -> BIGINT).
+/// Old data files have INT; reader should cast to BIGINT.
+#[tokio::test]
+async fn test_read_schema_evolution_type_promotion() {
+    let (_, batches) = 
scan_and_read_with_fs_catalog("schema_evolution_type_promotion", None).await;
+
+    // Verify the value column is Int64 (BIGINT) in all batches
+    for batch in &batches {
+        let value_col = batch.column_by_name("value").expect("value column");
+        assert_eq!(
+            value_col.data_type(),
+            &arrow_array::types::Int64Type::DATA_TYPE,
+            "value column should be Int64 (BIGINT) after type promotion"
+        );
+    }
+
+    let mut rows: Vec<(i32, i64)> = Vec::new();
+    for batch in &batches {
+        let id = batch
+            .column_by_name("id")
+            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+            .expect("id");
+        let value = batch
+            .column_by_name("value")
+            .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
+            .expect("value as Int64Array");
+        for i in 0..batch.num_rows() {
+            rows.push((id.value(i), value.value(i)));
+        }
+    }
+    rows.sort_by_key(|(id, _)| *id);
+
+    assert_eq!(
+        rows,
+        vec![(1, 100i64), (2, 200i64), (3, 3_000_000_000i64)],
+        "INT values should be promoted to BIGINT, including values > INT_MAX"
+    );
+}
+
+/// Test reading a data-evolution table after ALTER TABLE ADD COLUMNS.
+/// Old files lack the new column; reader should fill nulls even in data 
evolution mode.
+#[tokio::test]
+async fn test_read_data_evolution_add_column() {
+    let (_, batches) = 
scan_and_read_with_fs_catalog("data_evolution_add_column", None).await;
+
+    let mut rows: Vec<(i32, String, i32, Option<String>)> = Vec::new();
+    for batch in &batches {
+        let id = batch
+            .column_by_name("id")
+            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+            .expect("id");
+        let name = batch
+            .column_by_name("name")
+            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+            .expect("name");
+        let value = batch
+            .column_by_name("value")
+            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+            .expect("value");
+        let extra = batch
+            .column_by_name("extra")
+            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+            .expect("extra");
+        for i in 0..batch.num_rows() {
+            let extra_val = if extra.is_null(i) {
+                None
+            } else {
+                Some(extra.value(i).to_string())
+            };
+            rows.push((
+                id.value(i),
+                name.value(i).to_string(),
+                value.value(i),
+                extra_val,
+            ));
+        }
+    }
+    rows.sort_by_key(|(id, _, _, _)| *id);
+
+    assert_eq!(
+        rows,
+        vec![
+            (1, "alice-v2".into(), 100, None),
+            (2, "bob".into(), 200, None),
+            (3, "carol".into(), 300, Some("new".into())),
+            (4, "dave".into(), 400, Some("new".into())),
+        ],
+        "Data evolution + add column: old rows should have null for 'extra', 
MERGE INTO updates name"
+    );
+}
+
+/// Test reading a data-evolution table after ALTER TABLE ALTER COLUMN TYPE 
(INT -> BIGINT).
+/// Old files have INT; reader should cast to BIGINT in data evolution mode.
+#[tokio::test]
+async fn test_read_data_evolution_type_promotion() {
+    let (_, batches) = 
scan_and_read_with_fs_catalog("data_evolution_type_promotion", None).await;
+
+    // Verify the value column is Int64 (BIGINT) in all batches
+    for batch in &batches {
+        let value_col = batch.column_by_name("value").expect("value column");
+        assert_eq!(
+            value_col.data_type(),
+            &arrow_array::types::Int64Type::DATA_TYPE,
+            "value column should be Int64 (BIGINT) after type promotion in 
data evolution mode"
+        );
+    }
+
+    let mut rows: Vec<(i32, i64)> = Vec::new();
+    for batch in &batches {
+        let id = batch
+            .column_by_name("id")
+            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+            .expect("id");
+        let value = batch
+            .column_by_name("value")
+            .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
+            .expect("value as Int64Array");
+        for i in 0..batch.num_rows() {
+            rows.push((id.value(i), value.value(i)));
+        }
+    }
+    rows.sort_by_key(|(id, _)| *id);
+
+    assert_eq!(
+        rows,
+        vec![(1, 999i64), (2, 200i64), (3, 3_000_000_000i64)],
+        "Data evolution + type promotion: INT should be cast to BIGINT, MERGE 
INTO updates value"
+    );
+}
+
+/// Test reading a table after ALTER TABLE DROP COLUMN.
+/// Old data files have the dropped column; reader should ignore it.
+#[tokio::test]
+async fn test_read_schema_evolution_drop_column() {
+    let (_, batches) = 
scan_and_read_with_fs_catalog("schema_evolution_drop_column", None).await;
+
+    // Verify the dropped column 'score' is not present in the output.
+    for batch in &batches {
+        assert!(
+            batch.column_by_name("score").is_none(),
+            "Dropped column 'score' should not appear in output"
+        );
+    }
+
+    let mut rows: Vec<(i32, String)> = Vec::new();
+    for batch in &batches {
+        let id = batch
+            .column_by_name("id")
+            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+            .expect("id");
+        let name = batch
+            .column_by_name("name")
+            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+            .expect("name");
+        for i in 0..batch.num_rows() {
+            rows.push((id.value(i), name.value(i).to_string()));
+        }
+    }
+    rows.sort_by_key(|(id, _)| *id);
+
+    assert_eq!(
+        rows,
+        vec![
+            (1, "alice".into()),
+            (2, "bob".into()),
+            (3, "carol".into()),
+            (4, "dave".into()),
+        ],
+        "Old rows should be readable after DROP COLUMN, with only remaining 
columns"
+    );
+}
diff --git a/crates/integrations/datafusion/src/lib.rs 
b/crates/integrations/datafusion/src/lib.rs
index cd73831..7aab461 100644
--- a/crates/integrations/datafusion/src/lib.rs
+++ b/crates/integrations/datafusion/src/lib.rs
@@ -41,12 +41,10 @@ mod error;
 mod filter_pushdown;
 mod physical_plan;
 mod relation_planner;
-mod schema;
 mod table;
 
 pub use catalog::{PaimonCatalogProvider, PaimonSchemaProvider};
 pub use error::to_datafusion_error;
 pub use physical_plan::PaimonTableScan;
 pub use relation_planner::PaimonRelationPlanner;
-pub use schema::paimon_schema_to_arrow;
 pub use table::PaimonTableProvider;
diff --git a/crates/integrations/datafusion/src/schema.rs 
b/crates/integrations/datafusion/src/schema.rs
deleted file mode 100644
index 231431b..0000000
--- a/crates/integrations/datafusion/src/schema.rs
+++ /dev/null
@@ -1,116 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use datafusion::arrow::datatypes::{DataType, Field, Schema};
-use datafusion::common::DataFusionError;
-use datafusion::common::Result as DFResult;
-use std::sync::Arc;
-
-use paimon::spec::{DataField, DataType as PaimonDataType};
-
-/// Converts Paimon table schema (logical row type fields) to DataFusion Arrow 
schema.
-pub fn paimon_schema_to_arrow(fields: &[DataField]) -> DFResult<Arc<Schema>> {
-    let arrow_fields: Vec<Field> = fields
-        .iter()
-        .map(|f| {
-            let arrow_type = paimon_data_type_to_arrow(f.data_type())?;
-            Ok(Field::new(
-                f.name(),
-                arrow_type,
-                f.data_type().is_nullable(),
-            ))
-        })
-        .collect::<DFResult<Vec<_>>>()?;
-    Ok(Arc::new(Schema::new(arrow_fields)))
-}
-
-fn paimon_data_type_to_arrow(dt: &PaimonDataType) -> DFResult<DataType> {
-    use datafusion::arrow::datatypes::TimeUnit;
-
-    Ok(match dt {
-        PaimonDataType::Boolean(_) => DataType::Boolean,
-        PaimonDataType::TinyInt(_) => DataType::Int8,
-        PaimonDataType::SmallInt(_) => DataType::Int16,
-        PaimonDataType::Int(_) => DataType::Int32,
-        PaimonDataType::BigInt(_) => DataType::Int64,
-        PaimonDataType::Float(_) => DataType::Float32,
-        PaimonDataType::Double(_) => DataType::Float64,
-        PaimonDataType::VarChar(_) | PaimonDataType::Char(_) => DataType::Utf8,
-        PaimonDataType::Binary(_) | PaimonDataType::VarBinary(_) => 
DataType::Binary,
-        PaimonDataType::Date(_) => DataType::Date32,
-        PaimonDataType::Time(t) => match t.precision() {
-            // `read.to_arrow(...)` goes through the Parquet Arrow reader, 
which exposes INT32
-            // TIME values as millisecond precision only. Mirror that here so 
provider schema and
-            // runtime RecordBatch schema stay identical.
-            0..=3 => DataType::Time32(TimeUnit::Millisecond),
-            4..=6 => DataType::Time64(TimeUnit::Microsecond),
-            7..=9 => DataType::Time64(TimeUnit::Nanosecond),
-            precision => {
-                return Err(DataFusionError::Internal(format!(
-                    "Unsupported TIME precision {precision}"
-                )));
-            }
-        },
-        PaimonDataType::Timestamp(t) => {
-            DataType::Timestamp(timestamp_time_unit(t.precision())?, None)
-        }
-        PaimonDataType::LocalZonedTimestamp(t) => {
-            DataType::Timestamp(timestamp_time_unit(t.precision())?, 
Some("UTC".into()))
-        }
-        PaimonDataType::Decimal(d) => {
-            let p = u8::try_from(d.precision()).map_err(|_| {
-                DataFusionError::Internal("Decimal precision exceeds 
u8".to_string())
-            })?;
-            let s = i8::try_from(d.scale() as i32).map_err(|_| {
-                DataFusionError::Internal("Decimal scale out of i8 
range".to_string())
-            })?;
-            match d.precision() {
-                // The Parquet Arrow reader normalizes DECIMAL columns to 
Decimal128 regardless of
-                // Parquet physical storage width. Mirror that here to avoid 
DataFusion schema
-                // mismatch between `TableProvider::schema()` and `execute()` 
output.
-                1..=38 => DataType::Decimal128(p, s),
-                precision => {
-                    return Err(DataFusionError::Internal(format!(
-                        "Unsupported DECIMAL precision {precision}"
-                    )));
-                }
-            }
-        }
-        PaimonDataType::Array(_)
-        | PaimonDataType::Map(_)
-        | PaimonDataType::Multiset(_)
-        | PaimonDataType::Row(_) => {
-            return Err(DataFusionError::NotImplemented(
-                "Paimon DataFusion integration does not yet support nested 
types (Array/Map/Row)"
-                    .to_string(),
-            ));
-        }
-    })
-}
-
-fn timestamp_time_unit(precision: u32) -> 
DFResult<datafusion::arrow::datatypes::TimeUnit> {
-    use datafusion::arrow::datatypes::TimeUnit;
-
-    match precision {
-        0..=3 => Ok(TimeUnit::Millisecond),
-        4..=6 => Ok(TimeUnit::Microsecond),
-        7..=9 => Ok(TimeUnit::Nanosecond),
-        _ => Err(DataFusionError::Internal(format!(
-            "Unsupported TIMESTAMP precision {precision}"
-        ))),
-    }
-}
diff --git a/crates/integrations/datafusion/src/table/mod.rs 
b/crates/integrations/datafusion/src/table/mod.rs
index 73763e5..26a50c9 100644
--- a/crates/integrations/datafusion/src/table/mod.rs
+++ b/crates/integrations/datafusion/src/table/mod.rs
@@ -32,7 +32,6 @@ use paimon::table::Table;
 use crate::error::to_datafusion_error;
 use crate::filter_pushdown::{build_pushed_predicate, classify_filter_pushdown};
 use crate::physical_plan::PaimonTableScan;
-use crate::schema::paimon_schema_to_arrow;
 
 /// Read-only table provider for a Paimon table.
 ///
@@ -50,7 +49,8 @@ impl PaimonTableProvider {
     /// Loads the table schema and converts it to Arrow for DataFusion.
     pub fn try_new(table: Table) -> DFResult<Self> {
         let fields = table.schema().fields();
-        let schema = paimon_schema_to_arrow(fields)?;
+        let schema =
+            
paimon::arrow::build_target_arrow_schema(fields).map_err(to_datafusion_error)?;
         Ok(Self { table, schema })
     }
 
diff --git a/crates/integrations/datafusion/tests/read_tables.rs 
b/crates/integrations/datafusion/tests/read_tables.rs
index 6a3c995..4a65a10 100644
--- a/crates/integrations/datafusion/tests/read_tables.rs
+++ b/crates/integrations/datafusion/tests/read_tables.rs
@@ -17,7 +17,7 @@
 
 use std::sync::Arc;
 
-use datafusion::arrow::array::{Int32Array, StringArray};
+use datafusion::arrow::array::{Array, Int32Array, StringArray};
 use datafusion::catalog::CatalogProvider;
 use datafusion::datasource::TableProvider;
 use datafusion::logical_expr::{col, lit, TableProviderFilterPushDown};
@@ -468,3 +468,58 @@ async fn test_time_travel_by_tag_name() {
         "Tag 'snapshot2' should contain all rows"
     );
 }
+
+/// Verifies that data evolution merge correctly NULL-fills columns that no 
file in a
+/// merge group provides (e.g. a newly added column after MERGE INTO on old 
rows).
+/// Without the fix, `active_file_indices` would be empty and rows would be 
silently lost.
+#[tokio::test]
+async fn test_data_evolution_drop_column_null_fill() {
+    let batches = collect_query(
+        "data_evolution_drop_column",
+        "SELECT id, name, extra FROM data_evolution_drop_column",
+    )
+    .await
+    .expect("data_evolution_drop_column query should succeed");
+
+    let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+    assert_eq!(
+        total_rows, 3,
+        "Should return 3 rows (not silently drop rows from merge groups 
missing the new column)"
+    );
+
+    let mut rows: Vec<(i32, String, Option<String>)> = Vec::new();
+    for batch in &batches {
+        let id_array = batch
+            .column_by_name("id")
+            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+            .expect("Expected Int32Array for id");
+        let name_array = batch
+            .column_by_name("name")
+            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+            .expect("Expected StringArray for name");
+        let extra_array = batch
+            .column_by_name("extra")
+            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+            .expect("Expected StringArray for extra");
+
+        for i in 0..batch.num_rows() {
+            let extra = if extra_array.is_null(i) {
+                None
+            } else {
+                Some(extra_array.value(i).to_string())
+            };
+            rows.push((id_array.value(i), name_array.value(i).to_string(), 
extra));
+        }
+    }
+    rows.sort_by_key(|(id, _, _)| *id);
+
+    assert_eq!(
+        rows,
+        vec![
+            (1, "alice-v2".to_string(), None),
+            (2, "bob".to_string(), None),
+            (3, "carol".to_string(), Some("new".to_string())),
+        ],
+        "Old rows should have extra=NULL, new row should have extra='new'"
+    );
+}
diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml
index 878d1c5..b1677cf 100644
--- a/crates/paimon/Cargo.toml
+++ b/crates/paimon/Cargo.toml
@@ -55,6 +55,7 @@ apache-avro = { version = "0.17", features = ["snappy", 
"zstandard"] }
 indexmap = "2.5.0"
 roaring = "0.11"
 arrow-array = { workspace = true }
+arrow-cast = { workspace = true }
 arrow-schema = { workspace = true }
 arrow-select = { workspace = true }
 futures = "0.3"
diff --git a/crates/paimon/src/arrow/mod.rs b/crates/paimon/src/arrow/mod.rs
index 0ed18b8..e823c90 100644
--- a/crates/paimon/src/arrow/mod.rs
+++ b/crates/paimon/src/arrow/mod.rs
@@ -16,5 +16,128 @@
 // under the License.
 
 mod reader;
+pub(crate) mod schema_evolution;
 
 pub use crate::arrow::reader::ArrowReaderBuilder;
+
+use crate::spec::{DataField, DataType as PaimonDataType};
+use arrow_schema::DataType as ArrowDataType;
+use arrow_schema::{Field as ArrowField, Schema as ArrowSchema, TimeUnit};
+use std::sync::Arc;
+
+/// Converts a Paimon [`DataType`](PaimonDataType) to an Arrow 
[`DataType`](ArrowDataType).
+pub fn paimon_type_to_arrow(dt: &PaimonDataType) -> 
crate::Result<ArrowDataType> {
+    Ok(match dt {
+        PaimonDataType::Boolean(_) => ArrowDataType::Boolean,
+        PaimonDataType::TinyInt(_) => ArrowDataType::Int8,
+        PaimonDataType::SmallInt(_) => ArrowDataType::Int16,
+        PaimonDataType::Int(_) => ArrowDataType::Int32,
+        PaimonDataType::BigInt(_) => ArrowDataType::Int64,
+        PaimonDataType::Float(_) => ArrowDataType::Float32,
+        PaimonDataType::Double(_) => ArrowDataType::Float64,
+        PaimonDataType::VarChar(_) | PaimonDataType::Char(_) => 
ArrowDataType::Utf8,
+        PaimonDataType::Binary(_) | PaimonDataType::VarBinary(_) => 
ArrowDataType::Binary,
+        PaimonDataType::Date(_) => ArrowDataType::Date32,
+        PaimonDataType::Time(_) => 
ArrowDataType::Time32(TimeUnit::Millisecond),
+        PaimonDataType::Timestamp(t) => {
+            ArrowDataType::Timestamp(timestamp_time_unit(t.precision())?, None)
+        }
+        PaimonDataType::LocalZonedTimestamp(t) => {
+            ArrowDataType::Timestamp(timestamp_time_unit(t.precision())?, 
Some("UTC".into()))
+        }
+        PaimonDataType::Decimal(d) => {
+            let p = u8::try_from(d.precision()).map_err(|_| 
crate::Error::Unsupported {
+                message: "Decimal precision exceeds u8".to_string(),
+            })?;
+            let s = i8::try_from(d.scale() as i32).map_err(|_| 
crate::Error::Unsupported {
+                message: "Decimal scale out of i8 range".to_string(),
+            })?;
+            ArrowDataType::Decimal128(p, s)
+        }
+        PaimonDataType::Array(a) => {
+            let element_type = paimon_type_to_arrow(a.element_type())?;
+            ArrowDataType::List(Arc::new(ArrowField::new(
+                "element",
+                element_type,
+                a.element_type().is_nullable(),
+            )))
+        }
+        PaimonDataType::Map(m) => {
+            let key_type = paimon_type_to_arrow(m.key_type())?;
+            let value_type = paimon_type_to_arrow(m.value_type())?;
+            ArrowDataType::Map(
+                Arc::new(ArrowField::new(
+                    "entries",
+                    ArrowDataType::Struct(
+                        vec![
+                            ArrowField::new("key", key_type, false),
+                            ArrowField::new("value", value_type, 
m.value_type().is_nullable()),
+                        ]
+                        .into(),
+                    ),
+                    false,
+                )),
+                false,
+            )
+        }
+        PaimonDataType::Multiset(m) => {
+            let element_type = paimon_type_to_arrow(m.element_type())?;
+            ArrowDataType::Map(
+                Arc::new(ArrowField::new(
+                    "entries",
+                    ArrowDataType::Struct(
+                        vec![
+                            ArrowField::new("key", element_type, 
m.element_type().is_nullable()),
+                            ArrowField::new("value", ArrowDataType::Int32, 
false),
+                        ]
+                        .into(),
+                    ),
+                    false,
+                )),
+                false,
+            )
+        }
+        PaimonDataType::Row(r) => {
+            let fields: Vec<ArrowField> = r
+                .fields()
+                .iter()
+                .map(|f| {
+                    let arrow_type = paimon_type_to_arrow(f.data_type())?;
+                    Ok(ArrowField::new(
+                        f.name(),
+                        arrow_type,
+                        f.data_type().is_nullable(),
+                    ))
+                })
+                .collect::<crate::Result<Vec<_>>>()?;
+            ArrowDataType::Struct(fields.into())
+        }
+    })
+}
+
+fn timestamp_time_unit(precision: u32) -> crate::Result<TimeUnit> {
+    match precision {
+        0..=3 => Ok(TimeUnit::Millisecond),
+        4..=6 => Ok(TimeUnit::Microsecond),
+        7..=9 => Ok(TimeUnit::Nanosecond),
+        _ => Err(crate::Error::Unsupported {
+            message: format!("Unsupported TIMESTAMP precision {precision}"),
+        }),
+    }
+}
+
+/// Build an Arrow [`Schema`](ArrowSchema) from Paimon [`DataField`]s.
+pub fn build_target_arrow_schema(fields: &[DataField]) -> 
crate::Result<Arc<ArrowSchema>> {
+    let arrow_fields: Vec<ArrowField> = fields
+        .iter()
+        .map(|f| {
+            let arrow_type = paimon_type_to_arrow(f.data_type())?;
+            Ok(ArrowField::new(
+                f.name(),
+                arrow_type,
+                f.data_type().is_nullable(),
+            ))
+        })
+        .collect::<crate::Result<Vec<_>>>()?;
+    Ok(Arc::new(ArrowSchema::new(arrow_fields)))
+}
diff --git a/crates/paimon/src/arrow/reader.rs 
b/crates/paimon/src/arrow/reader.rs
index 7d57243..212f32f 100644
--- a/crates/paimon/src/arrow/reader.rs
+++ b/crates/paimon/src/arrow/reader.rs
@@ -15,13 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::arrow::build_target_arrow_schema;
+use crate::arrow::schema_evolution::{create_index_mapping, NULL_FIELD_INDEX};
 use crate::deletion_vector::{DeletionVector, DeletionVectorFactory};
 use crate::io::{FileIO, FileRead, FileStatus};
 use crate::spec::{DataField, DataFileMeta};
+use crate::table::schema_manager::SchemaManager;
 use crate::table::ArrowRecordBatchStream;
 use crate::{DataSplit, Error};
 use arrow_array::RecordBatch;
-use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
+use arrow_cast::cast;
 
 use async_stream::try_stream;
 use bytes::Bytes;
@@ -41,14 +44,22 @@ use tokio::try_join;
 pub struct ArrowReaderBuilder {
     batch_size: Option<usize>,
     file_io: FileIO,
+    schema_manager: SchemaManager,
+    table_schema_id: i64,
 }
 
 impl ArrowReaderBuilder {
     /// Create a new ArrowReaderBuilder
-    pub(crate) fn new(file_io: FileIO) -> Self {
+    pub(crate) fn new(
+        file_io: FileIO,
+        schema_manager: SchemaManager,
+        table_schema_id: i64,
+    ) -> Self {
         ArrowReaderBuilder {
             batch_size: None,
             file_io,
+            schema_manager,
+            table_schema_id,
         }
     }
 
@@ -58,6 +69,8 @@ impl ArrowReaderBuilder {
         ArrowReader {
             batch_size: self.batch_size,
             file_io: self.file_io,
+            schema_manager: self.schema_manager,
+            table_schema_id: self.table_schema_id,
             read_type,
         }
     }
@@ -68,33 +81,30 @@ impl ArrowReaderBuilder {
 pub struct ArrowReader {
     batch_size: Option<usize>,
     file_io: FileIO,
+    schema_manager: SchemaManager,
+    table_schema_id: i64,
     read_type: Vec<DataField>,
 }
 
 impl ArrowReader {
     /// Take a stream of DataSplits and read every data file in each split.
     /// Returns a stream of Arrow RecordBatches from all files.
-    /// When a split has deletion files (see 
[DataSplit::data_deletion_files]), the corresponding
-    /// deletion vectors are loaded and applied so that deleted rows are 
filtered out from the stream.
-    /// Row positions are 0-based within each data file, matching Java's 
ApplyDeletionVectorReader.
     ///
-    /// Matches 
[RawFileSplitRead.createReader](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java):
-    /// one DV factory per DataSplit (created from that split's data files and 
deletion files).
+    /// Uses SchemaManager to load the data file's schema (via 
`DataFileMeta.schema_id`)
+    /// and computes field-ID-based index mapping for schema evolution (added 
columns,
+    /// type promotion, column reordering).
     ///
-    /// Parquet schema is clipped to this reader's read type (column names 
from [DataField]s).
-    /// File-only columns are not read. See 
[ParquetReaderFactory.clipParquetSchema](https://github.com/apache/paimon/blob/master/paimon-format/paimon-format-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java).
+    /// Matches 
[RawFileSplitRead.createReader](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java).
     pub fn read(self, data_splits: &[DataSplit]) -> 
crate::Result<ArrowRecordBatchStream> {
         let file_io = self.file_io.clone();
         let batch_size = self.batch_size;
         let splits: Vec<DataSplit> = data_splits.to_vec();
         let read_type = self.read_type;
-        let projected_column_names: Vec<String> = read_type
-            .iter()
-            .map(|field| field.name().to_string())
-            .collect();
+        let schema_manager = self.schema_manager;
+        let table_schema_id = self.table_schema_id;
         Ok(try_stream! {
             for split in splits {
-                // Create DV factory for this split only (like Java 
createReader(partition, bucket, files, deletionFiles)).
+                // Create DV factory for this split only.
                 let dv_factory = if split
                     .data_deletion_files()
                     .is_some_and(|files| files.iter().any(Option::is_some))
@@ -117,11 +127,20 @@ impl ArrowReader {
                         .and_then(|factory| 
factory.get_deletion_vector(&file_meta.file_name))
                         .cloned();
 
+                    // Load data file's schema if it differs from the table 
schema.
+                    let data_fields: Option<Vec<DataField>> = if 
file_meta.schema_id != table_schema_id {
+                        let data_schema = 
schema_manager.schema(file_meta.schema_id).await?;
+                        Some(data_schema.fields().to_vec())
+                    } else {
+                        None
+                    };
+
                     let mut stream = read_single_file_stream(
                         file_io.clone(),
                         split.clone(),
                         file_meta,
-                        projected_column_names.clone(),
+                        read_type.clone(),
+                        data_fields,
                         batch_size,
                         dv,
                     )?;
@@ -151,32 +170,38 @@ impl ArrowReader {
         let batch_size = self.batch_size;
         let splits: Vec<DataSplit> = data_splits.to_vec();
         let read_type = self.read_type;
-        let table_field_names: Vec<String> =
-            table_fields.iter().map(|f| f.name().to_string()).collect();
-        let projected_column_names: Vec<String> = read_type
-            .iter()
-            .map(|field| field.name().to_string())
-            .collect();
+        let table_fields: Vec<DataField> = table_fields.to_vec();
+        let schema_manager = self.schema_manager;
+        let table_schema_id = self.table_schema_id;
 
         Ok(try_stream! {
             for split in splits {
                 if split.raw_convertible() || split.data_files().len() == 1 {
-                    // Single file or raw convertible — stream lazily without 
loading all into memory.
                     for file_meta in split.data_files().to_vec() {
+                        let data_fields: Option<Vec<DataField>> = if 
file_meta.schema_id != table_schema_id {
+                            let data_schema = 
schema_manager.schema(file_meta.schema_id).await?;
+                            Some(data_schema.fields().to_vec())
+                        } else {
+                            None
+                        };
+
                         let mut stream = read_single_file_stream(
-                            file_io.clone(), split.clone(), file_meta, 
projected_column_names.clone(), batch_size, None,
+                            file_io.clone(), split.clone(), file_meta, 
read_type.clone(),
+                            data_fields, batch_size, None,
                         )?;
                         while let Some(batch) = stream.next().await {
                             yield batch?;
                         }
                     }
                 } else {
-                    // Multiple files need column-wise merge — also streamed 
lazily.
+                    // Multiple files need column-wise merge.
                     let mut merge_stream = merge_files_by_columns(
                         &file_io,
                         &split,
-                        &projected_column_names,
-                        &table_field_names,
+                        &read_type,
+                        &table_fields,
+                        schema_manager.clone(),
+                        table_schema_id,
                         batch_size,
                     )?;
                     while let Some(batch) = merge_stream.next().await {
@@ -191,14 +216,56 @@ impl ArrowReader {
 
 /// Read a single parquet file from a split, returning a lazy stream of 
batches.
 /// Optionally applies a deletion vector.
+///
+/// Handles schema evolution using field-ID-based index mapping:
+/// - `data_fields`: if `Some`, the fields from the data file's schema (loaded 
via SchemaManager).
+///   Used to compute index mapping between `read_type` and data fields by 
field ID.
+/// - Columns missing from the file are filled with null arrays.
+/// - Columns whose Arrow type differs from the target type are cast (type 
promotion).
+///
+/// Reference: 
[RawFileSplitRead.createFileReader](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java)
 fn read_single_file_stream(
     file_io: FileIO,
     split: DataSplit,
     file_meta: DataFileMeta,
-    projected_column_names: Vec<String>,
+    read_type: Vec<DataField>,
+    data_fields: Option<Vec<DataField>>,
     batch_size: Option<usize>,
     dv: Option<Arc<DeletionVector>>,
 ) -> crate::Result<ArrowRecordBatchStream> {
+    let target_schema = build_target_arrow_schema(&read_type)?;
+
+    // Compute index mapping and determine which columns to read from the 
parquet file.
+    // If data_fields is provided, use field-ID-based mapping; otherwise use 
read_type names directly.
+    let (parquet_read_fields, index_mapping) = if let Some(ref df) = 
data_fields {
+        let mapping = create_index_mapping(&read_type, df);
+        match mapping {
+            Some(ref idx_map) => {
+                // Only read data fields that are referenced by the index 
mapping.
+                // Dedup by data field index to avoid duplicate parquet column 
projections.
+                let mut seen = std::collections::HashSet::new();
+                let fields_to_read: Vec<DataField> = idx_map
+                    .iter()
+                    .filter(|&&idx| idx != NULL_FIELD_INDEX && 
seen.insert(idx))
+                    .map(|&idx| df[idx as usize].clone())
+                    .collect();
+                (fields_to_read, Some(idx_map.clone()))
+            }
+            None => {
+                // Identity mapping — read data fields in order.
+                (df.clone(), None)
+            }
+        }
+    } else {
+        // No schema evolution — read by read_type names.
+        (read_type.clone(), None)
+    };
+
+    let parquet_column_names: Vec<String> = parquet_read_fields
+        .iter()
+        .map(|f| f.name().to_string())
+        .collect();
+
     Ok(try_stream! {
         let path_to_read = split.data_file_path(&file_meta);
         if !path_to_read.to_ascii_lowercase().ends_with(".parquet") {
@@ -219,7 +286,7 @@ fn read_single_file_stream(
         // Only project columns that exist in this file.
         let parquet_schema = batch_stream_builder.parquet_schema().clone();
         let file_column_names: Vec<&str> = 
parquet_schema.columns().iter().map(|c| c.name()).collect();
-        let available_columns: Vec<&str> = projected_column_names
+        let available_columns: Vec<&str> = parquet_column_names
             .iter()
             .filter(|name| file_column_names.contains(&name.as_str()))
             .map(String::as_str)
@@ -242,24 +309,83 @@ fn read_single_file_stream(
         let mut batch_stream = batch_stream_builder.build()?;
         while let Some(batch) = batch_stream.next().await {
             let batch = batch?;
-            // Reorder columns from parquet-schema order to 
projected_column_names order,
-            // consistent with the normal read() path.
-            let reorder_indices: Vec<usize> = projected_column_names
-                .iter()
-                .filter_map(|name| batch.schema().index_of(name).ok())
-                .collect();
-            if reorder_indices.len() == batch.num_columns() {
-                yield batch.project(&reorder_indices).map_err(|e| {
-                    Error::UnexpectedError {
-                        message: "Failed to reorder projected 
columns".to_string(),
-                        source: Some(Box::new(e)),
+            let num_rows = batch.num_rows();
+            let batch_schema = batch.schema();
+
+            // Build output columns using index mapping (field-ID-based) or by 
name.
+            let mut columns: Vec<Arc<dyn arrow_array::Array>> = 
Vec::with_capacity(target_schema.fields().len());
+            for (i, target_field) in target_schema.fields().iter().enumerate() 
{
+                let source_col = if let Some(ref idx_map) = index_mapping {
+                    let data_idx = idx_map[i];
+                    if data_idx == NULL_FIELD_INDEX {
+                        None
+                    } else {
+                        // Find the column in the batch by the data field's 
name.
+                        let data_field = 
&data_fields.as_ref().unwrap()[data_idx as usize];
+                        batch_schema
+                            .index_of(data_field.name())
+                            .ok()
+                            .map(|col_idx| batch.column(col_idx))
                     }
-                })?;
+                } else if let Some(ref df) = data_fields {
+                    // Identity mapping with data_fields present (e.g. renamed 
column).
+                    // Use data field name (old name in parquet) at the same 
position.
+                    batch_schema
+                        .index_of(df[i].name())
+                        .ok()
+                        .map(|col_idx| batch.column(col_idx))
+                } else {
+                    // No schema evolution — look up by target field name.
+                    batch_schema
+                        .index_of(target_field.name())
+                        .ok()
+                        .map(|col_idx| batch.column(col_idx))
+                };
+
+                match source_col {
+                    Some(col) => {
+                        if col.data_type() == target_field.data_type() {
+                            columns.push(col.clone());
+                        } else {
+                            // Type promotion: cast to target type.
+                            let casted = cast(col, 
target_field.data_type()).map_err(|e| {
+                                Error::UnexpectedError {
+                                    message: format!(
+                                        "Failed to cast column '{}' from {:?} 
to {:?}: {e}",
+                                        target_field.name(),
+                                        col.data_type(),
+                                        target_field.data_type()
+                                    ),
+                                    source: Some(Box::new(e)),
+                                }
+                            })?;
+                            columns.push(casted);
+                        }
+                    }
+                    None => {
+                        // Column missing from file: fill with nulls.
+                        let null_array = 
arrow_array::new_null_array(target_field.data_type(), num_rows);
+                        columns.push(null_array);
+                    }
+                }
+            }
+
+            let result = if columns.is_empty() {
+                RecordBatch::try_new_with_options(
+                    target_schema.clone(),
+                    columns,
+                    
&arrow_array::RecordBatchOptions::new().with_row_count(Some(num_rows)),
+                )
             } else {
-                // Not all projected columns exist in this file (data 
evolution case),
-                // return as-is; the caller (merge_files_by_columns) handles 
missing columns.
-                yield batch;
+                RecordBatch::try_new(target_schema.clone(), columns)
             }
+            .map_err(|e| {
+                Error::UnexpectedError {
+                    message: format!("Failed to build schema-evolved 
RecordBatch: {e}"),
+                    source: Some(Box::new(e)),
+                }
+            })?;
+            yield result;
         }
     }
     .boxed())
@@ -267,6 +393,9 @@ fn read_single_file_stream(
 
 /// Merge multiple files column-wise for data evolution, streaming with 
bounded memory.
 ///
+/// Uses field IDs (not column names) to resolve which file provides which 
column,
+/// ensuring correctness across schema evolution (column rename, add, drop).
+///
 /// Opens all file readers simultaneously and maintains a cursor (current 
batch + offset)
 /// per file. Each poll slices up to `batch_size` rows from each file's 
current batch,
 /// assembles columns from the winning files, and yields the merged batch. 
When a file's
@@ -274,8 +403,10 @@ fn read_single_file_stream(
 fn merge_files_by_columns(
     file_io: &FileIO,
     split: &DataSplit,
-    projected_column_names: &[String],
-    table_field_names: &[String],
+    read_type: &[DataField],
+    table_fields: &[DataField],
+    schema_manager: SchemaManager,
+    table_schema_id: i64,
     batch_size: Option<usize>,
 ) -> crate::Result<ArrowRecordBatchStream> {
     let data_files = split.data_files();
@@ -283,67 +414,140 @@ fn merge_files_by_columns(
         return Ok(futures::stream::empty().boxed());
     }
 
-    // Determine which columns each file provides and resolve conflicts by 
max_sequence_number.
-    // column_name -> (file_index, max_sequence_number)
-    let mut column_source: HashMap<String, (usize, i64)> = HashMap::new();
+    // Build owned data for the stream closure.
+    let file_io = file_io.clone();
+    let split = split.clone();
+    let data_files: Vec<DataFileMeta> = data_files.to_vec();
+    let read_type = read_type.to_vec();
+    let table_fields = table_fields.to_vec();
+    let output_batch_size = batch_size.unwrap_or(1024);
+    let target_schema = build_target_arrow_schema(&read_type)?;
 
-    for (file_idx, file_meta) in data_files.iter().enumerate() {
-        let file_columns: Vec<String> = if let Some(ref wc) = 
file_meta.write_cols {
-            wc.clone()
-        } else {
-            table_field_names.to_vec()
-        };
+    Ok(try_stream! {
+        // Pre-load schemas and collect field IDs + data_fields per file.
+        // file_idx -> (field_ids, Option<Vec<DataField>>)
+        let mut file_info: HashMap<usize, (Vec<i32>, Option<Vec<DataField>>)> 
= HashMap::new();
+
+        for (file_idx, file_meta) in data_files.iter().enumerate() {
+            let (field_ids, data_fields) = if file_meta.schema_id != 
table_schema_id {
+                let file_schema = 
schema_manager.schema(file_meta.schema_id).await?;
+                let file_fields = file_schema.fields();
+
+                let ids: Vec<i32> = if let Some(ref wc) = file_meta.write_cols 
{
+                    // write_cols names are from the file's schema at write 
time.
+                    wc.iter()
+                        .filter_map(|name| file_fields.iter().find(|f| 
f.name() == name).map(|f| f.id()))
+                        .collect()
+                } else {
+                    file_fields.iter().map(|f| f.id()).collect()
+                };
 
-        for col in &file_columns {
-            let entry = column_source
-                .entry(col.clone())
-                .or_insert((file_idx, i64::MIN));
-            if file_meta.max_sequence_number > entry.1 {
-                *entry = (file_idx, file_meta.max_sequence_number);
-            }
+                (ids, Some(file_fields.to_vec()))
+            } else {
+                let ids: Vec<i32> = if let Some(ref wc) = file_meta.write_cols 
{
+                    // write_cols names are from the current table schema.
+                    wc.iter()
+                        .filter_map(|name| table_fields.iter().find(|f| 
f.name() == name).map(|f| f.id()))
+                        .collect()
+                } else {
+                    table_fields.iter().map(|f| f.id()).collect()
+                };
+
+                (ids, None)
+            };
+
+            file_info.insert(file_idx, (field_ids, data_fields));
         }
-    }
 
-    // For each file, determine which projected columns to read from it.
-    // file_index -> Vec<column_name>
-    let mut file_read_columns: HashMap<usize, Vec<String>> = HashMap::new();
-    for col_name in projected_column_names {
-        if let Some(&(file_idx, _)) = column_source.get(col_name) {
-            file_read_columns
-                .entry(file_idx)
-                .or_default()
-                .push(col_name.clone());
+        // Determine which file provides each field ID, resolving conflicts by 
max_sequence_number.
+        // field_id -> (file_index, max_sequence_number)
+        let mut field_id_source: HashMap<i32, (usize, i64)> = HashMap::new();
+        for (file_idx, file_meta) in data_files.iter().enumerate() {
+            let (ref field_ids, _) = file_info[&file_idx];
+            for &fid in field_ids {
+                let entry = field_id_source
+                    .entry(fid)
+                    .or_insert((file_idx, i64::MIN));
+                if file_meta.max_sequence_number > entry.1 {
+                    *entry = (file_idx, file_meta.max_sequence_number);
+                }
+            }
         }
-    }
 
-    // For each projected column, record (file_index, column_name) for 
assembly.
-    let column_plan: Vec<(Option<usize>, String)> = projected_column_names
-        .iter()
-        .map(|col_name| {
-            let file_idx = column_source.get(col_name).map(|&(idx, _)| idx);
-            (file_idx, col_name.clone())
-        })
-        .collect();
+        // For each projected field, determine which file provides it (by 
field ID).
+        // file_index -> Vec<column_name>  (target column names)
+        let mut file_read_columns: HashMap<usize, Vec<String>> = 
HashMap::new();
+        for field in &read_type {
+            if let Some(&(file_idx, _)) = field_id_source.get(&field.id()) {
+                file_read_columns
+                    .entry(file_idx)
+                    .or_default()
+                    .push(field.name().to_string());
+            }
+        }
 
-    // Collect which file indices we need to open streams for.
-    let active_file_indices: Vec<usize> = 
file_read_columns.keys().copied().collect();
+        // For each projected field, record (file_index, target_column_name) 
for assembly.
+        let column_plan: Vec<(Option<usize>, String)> = read_type
+            .iter()
+            .map(|field| {
+                let file_idx = field_id_source.get(&field.id()).map(|&(idx, 
_)| idx);
+                (file_idx, field.name().to_string())
+            })
+            .collect();
 
-    // Build owned data for the stream closure.
-    let file_io = file_io.clone();
-    let split = split.clone();
-    let data_files: Vec<DataFileMeta> = data_files.to_vec();
-    let projected_column_names = projected_column_names.to_vec();
-    let output_batch_size = batch_size.unwrap_or(1024);
+        // Collect which file indices we need to open streams for.
+        let active_file_indices: Vec<usize> = 
file_read_columns.keys().copied().collect();
+
+        // Edge case: if no file provides any projected column (e.g. SELECT on 
a newly added
+        // column that no file contains yet), we still need to emit 
NULL-filled rows to
+        // preserve the correct row count.
+        if active_file_indices.is_empty() {
+            // All files in a merge group cover the same rows; use the first 
file's row_count.
+            let total_rows = data_files[0].row_count as usize;
+            let mut emitted = 0;
+            while emitted < total_rows {
+                let rows_to_emit = (total_rows - 
emitted).min(output_batch_size);
+                let columns: Vec<Arc<dyn arrow_array::Array>> = target_schema
+                    .fields()
+                    .iter()
+                    .map(|f| arrow_array::new_null_array(f.data_type(), 
rows_to_emit))
+                    .collect();
+                let batch = if columns.is_empty() {
+                    RecordBatch::try_new_with_options(
+                        target_schema.clone(),
+                        columns,
+                        
&arrow_array::RecordBatchOptions::new().with_row_count(Some(rows_to_emit)),
+                    )
+                } else {
+                    RecordBatch::try_new(target_schema.clone(), columns)
+                }
+                .map_err(|e| Error::UnexpectedError {
+                    message: format!("Failed to build NULL-filled RecordBatch: 
{e}"),
+                    source: Some(Box::new(e)),
+                })?;
+                emitted += rows_to_emit;
+                yield batch;
+            }
+        } else {
 
-    Ok(try_stream! {
         // Open a stream for each active file.
+        // Build per-file read_type: only the DataFields this file is 
responsible for.
         let mut file_streams: HashMap<usize, ArrowRecordBatchStream> = 
HashMap::new();
         for &file_idx in &active_file_indices {
+            let file_cols = 
file_read_columns.get(&file_idx).cloned().unwrap_or_default();
+            let file_read_type: Vec<DataField> = file_cols
+                .iter()
+                .filter_map(|col_name| read_type.iter().find(|f| f.name() == 
col_name).cloned())
+                .collect();
+
+            let (_, ref data_fields) = file_info[&file_idx];
+
             let stream = read_single_file_stream(
                 file_io.clone(),
                 split.clone(),
                 data_files[file_idx].clone(),
-                projected_column_names.clone(),
+                file_read_type,
+                data_fields.clone(),
                 batch_size,
                 None,
             )?;
@@ -397,20 +601,25 @@ fn merge_files_by_columns(
             let rows_to_emit = remaining.min(output_batch_size);
 
             // Slice each file's current batch and assemble columns.
+            // Use the target schema so that missing columns are null-filled.
             let mut columns: Vec<Arc<dyn arrow_array::Array>> =
                 Vec::with_capacity(column_plan.len());
-            let mut schema_fields: Vec<ArrowField> = 
Vec::with_capacity(column_plan.len());
-
-            for (file_idx_opt, col_name) in &column_plan {
-                if let Some(file_idx) = file_idx_opt {
-                    if let Some((batch, offset)) = file_cursors.get(file_idx) {
-                        if let Ok(col_idx) = batch.schema().index_of(col_name) 
{
-                            let col = batch.column(col_idx).slice(*offset, 
rows_to_emit);
-                            columns.push(col);
-                            
schema_fields.push(batch.schema().field(col_idx).clone());
-                        }
-                    }
-                }
+
+            for (i, (file_idx_opt, col_name)) in 
column_plan.iter().enumerate() {
+                let target_field = &target_schema.fields()[i];
+                let col = file_idx_opt
+                    .and_then(|file_idx| file_cursors.get(&file_idx))
+                    .and_then(|(batch, offset)| {
+                        batch
+                            .schema()
+                            .index_of(col_name)
+                            .ok()
+                            .map(|col_idx| 
batch.column(col_idx).slice(*offset, rows_to_emit))
+                    });
+
+                columns.push(col.unwrap_or_else(|| {
+                    arrow_array::new_null_array(target_field.data_type(), 
rows_to_emit)
+                }));
             }
 
             // Advance all cursors.
@@ -420,15 +629,13 @@ fn merge_files_by_columns(
                 }
             }
 
-            if !columns.is_empty() {
-                let schema = Arc::new(ArrowSchema::new(schema_fields));
-                let merged = RecordBatch::try_new(schema, columns).map_err(|e| 
Error::UnexpectedError {
-                    message: format!("Failed to build merged RecordBatch: 
{e}"),
-                    source: Some(Box::new(e)),
-                })?;
-                yield merged;
-            }
+            let merged = RecordBatch::try_new(target_schema.clone(), 
columns).map_err(|e| Error::UnexpectedError {
+                message: format!("Failed to build merged RecordBatch: {e}"),
+                source: Some(Box::new(e)),
+            })?;
+            yield merged;
         }
+        } // end else (active_file_indices non-empty)
     }
     .boxed())
 }
diff --git a/crates/paimon/src/arrow/schema_evolution.rs 
b/crates/paimon/src/arrow/schema_evolution.rs
new file mode 100644
index 0000000..ea9450b
--- /dev/null
+++ b/crates/paimon/src/arrow/schema_evolution.rs
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Schema evolution utilities for mapping between table schema and data file 
schema.
+//!
+//! Reference: 
[org.apache.paimon.schema.SchemaEvolutionUtil](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java)
+
+use crate::spec::DataField;
+use std::collections::HashMap;
+
+/// Sentinel value indicating a field does not exist in the data schema.
+pub const NULL_FIELD_INDEX: i32 = -1;
+
+/// Create index mapping from table fields to underlying data fields using 
field IDs.
+///
+/// For example, the table and data fields are as follows:
+/// - table fields: `1->c, 6->b, 3->a`
+/// - data fields: `1->a, 3->c`
+///
+/// We get the index mapping `[0, -1, 1]`, where:
+/// - `0` is the index of table field `1->c` in data fields
+/// - `-1` means field `6->b` does not exist in data fields
+/// - `1` is the index of table field `3->a` in data fields
+///
+/// Returns `None` if the mapping is identity (no evolution needed).
+///
+/// Reference: 
[SchemaEvolutionUtil.createIndexMapping](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java)
+pub fn create_index_mapping(
+    table_fields: &[DataField],
+    data_fields: &[DataField],
+) -> Option<Vec<i32>> {
+    let mut field_id_to_index: HashMap<i32, i32> = 
HashMap::with_capacity(data_fields.len());
+    for (i, field) in data_fields.iter().enumerate() {
+        field_id_to_index.insert(field.id(), i as i32);
+    }
+
+    let mut index_mapping = Vec::with_capacity(table_fields.len());
+    for field in table_fields {
+        let data_index = field_id_to_index
+            .get(&field.id())
+            .copied()
+            .unwrap_or(NULL_FIELD_INDEX);
+        index_mapping.push(data_index);
+    }
+
+    // Check if mapping is identity (no evolution needed).
+    let is_identity = index_mapping.len() == data_fields.len()
+        && index_mapping
+            .iter()
+            .enumerate()
+            .all(|(i, &idx)| idx == i as i32);
+
+    if is_identity {
+        None
+    } else {
+        Some(index_mapping)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::spec::{DataType, IntType, VarCharType};
+
+    fn field(id: i32, name: &str) -> DataField {
+        DataField::new(id, name.to_string(), DataType::Int(IntType::new()))
+    }
+
+    #[test]
+    fn test_identity_mapping() {
+        let table_fields = vec![field(0, "a"), field(1, "b"), field(2, "c")];
+        let data_fields = vec![field(0, "a"), field(1, "b"), field(2, "c")];
+        assert_eq!(create_index_mapping(&table_fields, &data_fields), None);
+    }
+
+    #[test]
+    fn test_added_column() {
+        // Table has 3 fields, data file only has the first 2
+        let table_fields = vec![field(0, "a"), field(1, "b"), field(2, "c")];
+        let data_fields = vec![field(0, "a"), field(1, "b")];
+        assert_eq!(
+            create_index_mapping(&table_fields, &data_fields),
+            Some(vec![0, 1, -1])
+        );
+    }
+
+    #[test]
+    fn test_reordered_fields() {
+        let table_fields = vec![field(1, "c"), field(6, "b"), field(3, "a")];
+        let data_fields = vec![field(1, "a"), field(3, "c")];
+        assert_eq!(
+            create_index_mapping(&table_fields, &data_fields),
+            Some(vec![0, -1, 1])
+        );
+    }
+
+    #[test]
+    fn test_renamed_column() {
+        // Field ID stays the same even if name changed
+        let table_fields = vec![field(0, "id"), field(1, "new_name")];
+        let data_fields = vec![field(0, "id"), field(1, "old_name")];
+        // Identity mapping since field IDs match positionally
+        assert_eq!(create_index_mapping(&table_fields, &data_fields), None);
+    }
+
+    #[test]
+    fn test_empty_data_fields() {
+        let table_fields = vec![field(0, "a"), field(1, "b")];
+        let data_fields: Vec<DataField> = vec![];
+        assert_eq!(
+            create_index_mapping(&table_fields, &data_fields),
+            Some(vec![-1, -1])
+        );
+    }
+
+    #[test]
+    fn test_type_promotion_same_mapping() {
+        // Type promotion doesn't affect index mapping — only field IDs matter
+        let table_fields = vec![
+            DataField::new(0, "id".to_string(), DataType::Int(IntType::new())),
+            DataField::new(
+                1,
+                "name".to_string(),
+                DataType::VarChar(VarCharType::string_type()),
+            ),
+        ];
+        let data_fields = vec![
+            DataField::new(0, "id".to_string(), DataType::Int(IntType::new())),
+            DataField::new(
+                1,
+                "name".to_string(),
+                DataType::VarChar(VarCharType::string_type()),
+            ),
+        ];
+        assert_eq!(create_index_mapping(&table_fields, &data_fields), None);
+    }
+}
diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs
index c5961dd..082f8f9 100644
--- a/crates/paimon/src/lib.rs
+++ b/crates/paimon/src/lib.rs
@@ -25,7 +25,7 @@ pub use common::{CatalogOptions, Options};
 pub mod api;
 pub use api::rest_api::RESTApi;
 
-mod arrow;
+pub mod arrow;
 pub mod catalog;
 mod deletion_vector;
 pub mod file_index;
diff --git a/crates/paimon/src/spec/types.rs b/crates/paimon/src/spec/types.rs
index e0ba04a..1bb4566 100644
--- a/crates/paimon/src/spec/types.rs
+++ b/crates/paimon/src/spec/types.rs
@@ -227,6 +227,10 @@ impl ArrayType {
     pub fn family(&self) -> DataTypeFamily {
         DataTypeFamily::CONSTRUCTED | DataTypeFamily::COLLECTION
     }
+
+    pub fn element_type(&self) -> &DataType {
+        &self.element_type
+    }
 }
 
 /// BigIntType for paimon.
@@ -1348,6 +1352,14 @@ impl MapType {
     pub fn family(&self) -> DataTypeFamily {
         DataTypeFamily::CONSTRUCTED | DataTypeFamily::COLLECTION
     }
+
+    pub fn key_type(&self) -> &DataType {
+        &self.key_type
+    }
+
+    pub fn value_type(&self) -> &DataType {
+        &self.value_type
+    }
 }
 
 /// MultisetType for paimon.
@@ -1381,6 +1393,10 @@ impl MultisetType {
     pub fn family(&self) -> DataTypeFamily {
         DataTypeFamily::CONSTRUCTED | DataTypeFamily::COLLECTION
     }
+
+    pub fn element_type(&self) -> &DataType {
+        &self.element_type
+    }
 }
 
 /// RowType for paimon.
diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs
index 32f755a..bda5673 100644
--- a/crates/paimon/src/table/mod.rs
+++ b/crates/paimon/src/table/mod.rs
@@ -19,6 +19,7 @@
 
 pub(crate) mod bin_pack;
 mod read_builder;
+pub(crate) mod schema_manager;
 mod snapshot_manager;
 mod source;
 mod table_scan;
@@ -28,6 +29,7 @@ use crate::Result;
 use arrow_array::RecordBatch;
 use futures::stream::BoxStream;
 pub use read_builder::{ReadBuilder, TableRead};
+pub use schema_manager::SchemaManager;
 pub use snapshot_manager::SnapshotManager;
 pub use source::{DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, 
Plan};
 pub use table_scan::TableScan;
@@ -45,6 +47,7 @@ pub struct Table {
     identifier: Identifier,
     location: String,
     schema: TableSchema,
+    schema_manager: SchemaManager,
 }
 
 #[allow(dead_code)]
@@ -56,11 +59,13 @@ impl Table {
         location: String,
         schema: TableSchema,
     ) -> Self {
+        let schema_manager = SchemaManager::new(file_io.clone(), 
location.clone());
         Self {
             file_io,
             identifier,
             location,
             schema,
+            schema_manager,
         }
     }
 
@@ -84,6 +89,11 @@ impl Table {
         &self.file_io
     }
 
+    /// Get the SchemaManager for this table.
+    pub fn schema_manager(&self) -> &SchemaManager {
+        &self.schema_manager
+    }
+
     /// Create a read builder for scan/read.
     ///
     /// Reference: [pypaimon 
FileStoreTable.new_read_builder](https://github.com/apache/paimon/blob/release-1.3/paimon-python/pypaimon/table/file_store_table.py).
@@ -98,6 +108,7 @@ impl Table {
             identifier: self.identifier.clone(),
             location: self.location.clone(),
             schema: self.schema.copy_with_options(extra),
+            schema_manager: self.schema_manager.clone(),
         }
     }
 }
diff --git a/crates/paimon/src/table/read_builder.rs 
b/crates/paimon/src/table/read_builder.rs
index 884d7e4..db5c42c 100644
--- a/crates/paimon/src/table/read_builder.rs
+++ b/crates/paimon/src/table/read_builder.rs
@@ -181,8 +181,12 @@ impl<'a> TableRead<'a> {
             });
         }
 
-        let reader =
-            
ArrowReaderBuilder::new(self.table.file_io.clone()).build(self.read_type().to_vec());
+        let reader = ArrowReaderBuilder::new(
+            self.table.file_io.clone(),
+            self.table.schema_manager().clone(),
+            self.table.schema().id(),
+        )
+        .build(self.read_type().to_vec());
 
         if data_evolution {
             reader.read_data_evolution(data_splits, self.table.schema.fields())
diff --git a/crates/paimon/src/table/schema_manager.rs 
b/crates/paimon/src/table/schema_manager.rs
new file mode 100644
index 0000000..057dc3f
--- /dev/null
+++ b/crates/paimon/src/table/schema_manager.rs
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Schema manager for reading versioned table schemas.
+//!
+//! Reference: 
[org.apache.paimon.schema.SchemaManager](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java)
+
+use crate::io::FileIO;
+use crate::spec::TableSchema;
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex};
+
+const SCHEMA_DIR: &str = "schema";
+const SCHEMA_PREFIX: &str = "schema-";
+
+/// Manager for versioned table schema files.
+///
+/// Each table stores schema versions as JSON files under 
`{table_path}/schema/schema-{id}`.
+/// When a schema evolution occurs (e.g. ADD COLUMN, ALTER COLUMN TYPE), a new 
schema file
+/// is written with an incremented ID. Data files record which schema they 
were written with
+/// via `DataFileMeta.schema_id`.
+///
+/// The schema cache is shared across clones via `Arc`, so multiple readers
+/// (e.g. parallel split streams) benefit from a single cache.
+///
+/// Reference: 
[org.apache.paimon.schema.SchemaManager](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java)
+#[derive(Debug, Clone)]
+pub struct SchemaManager {
+    file_io: FileIO,
+    table_path: String,
+    /// Shared cache of loaded schemas by ID.
+    cache: Arc<Mutex<HashMap<i64, Arc<TableSchema>>>>,
+}
+
+impl SchemaManager {
+    pub fn new(file_io: FileIO, table_path: String) -> Self {
+        Self {
+            file_io,
+            table_path,
+            cache: Arc::new(Mutex::new(HashMap::new())),
+        }
+    }
+
+    /// Path to the schema directory (e.g. `{table_path}/schema`).
+    fn schema_directory(&self) -> String {
+        format!("{}/{}", self.table_path.trim_end_matches('/'), SCHEMA_DIR)
+    }
+
+    /// Path to a specific schema file (e.g. `{table_path}/schema/schema-0`).
+    fn schema_path(&self, schema_id: i64) -> String {
+        format!("{}/{}{}", self.schema_directory(), SCHEMA_PREFIX, schema_id)
+    }
+
+    /// Load a schema by ID. Returns cached version if available.
+    ///
+    /// The cache is shared across all clones of this `SchemaManager`, so 
loading
+    /// a schema in one stream makes it available to all other streams reading
+    /// from the same table.
+    ///
+    /// Reference: 
[SchemaManager.schema(long)](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java)
+    pub async fn schema(&self, schema_id: i64) -> 
crate::Result<Arc<TableSchema>> {
+        // Fast path: check cache under a short lock.
+        {
+            let cache = self.cache.lock().unwrap();
+            if let Some(schema) = cache.get(&schema_id) {
+                return Ok(schema.clone());
+            }
+        }
+
+        // Cache miss — load from file (no lock held during I/O).
+        let path = self.schema_path(schema_id);
+        let input = self.file_io.new_input(&path)?;
+        let bytes = input.read().await?;
+        let schema: TableSchema =
+            serde_json::from_slice(&bytes).map_err(|e| 
crate::Error::DataInvalid {
+                message: format!("Failed to parse schema file: {path}"),
+                source: Some(Box::new(e)),
+            })?;
+        let schema = Arc::new(schema);
+
+        // Insert into shared cache (short lock).
+        {
+            let mut cache = self.cache.lock().unwrap();
+            cache.entry(schema_id).or_insert_with(|| schema.clone());
+        }
+
+        Ok(schema)
+    }
+}
diff --git a/dev/spark/provision.py b/dev/spark/provision.py
index 0a5e13c..7bca4d2 100644
--- a/dev/spark/provision.py
+++ b/dev/spark/provision.py
@@ -292,6 +292,229 @@ def main():
     spark.sql("CALL sys.create_tag('default.time_travel_table', 'snapshot1', 
1)")
     spark.sql("CALL sys.create_tag('default.time_travel_table', 'snapshot2', 
2)")
 
+    # ===== Schema Evolution: Add Column =====
+    # Old files have (id, name); after ALTER TABLE ADD COLUMNS, new files have 
(id, name, age).
+    # Reader must fill nulls for 'age' when reading old files.
+    spark.sql(
+        """
+        CREATE TABLE IF NOT EXISTS schema_evolution_add_column (
+            id INT,
+            name STRING
+        ) USING paimon
+        """
+    )
+    spark.sql(
+        "INSERT INTO schema_evolution_add_column VALUES (1, 'alice'), (2, 
'bob')"
+    )
+    spark.sql("ALTER TABLE schema_evolution_add_column ADD COLUMNS (age INT)")
+    spark.sql(
+        "INSERT INTO schema_evolution_add_column VALUES (3, 'carol', 30), (4, 
'dave', 40)"
+    )
+
+    # ===== Schema Evolution: Type Promotion (INT -> BIGINT) =====
+    # Old files have value as INT; after ALTER TABLE, new files have value as 
BIGINT.
+    # Reader must cast INT to BIGINT when reading old files.
+    spark.sql(
+        """
+        CREATE TABLE IF NOT EXISTS schema_evolution_type_promotion (
+            id INT,
+            value INT
+        ) USING paimon
+        """
+    )
+    spark.sql(
+        "INSERT INTO schema_evolution_type_promotion VALUES (1, 100), (2, 200)"
+    )
+    spark.sql(
+        "ALTER TABLE schema_evolution_type_promotion ALTER COLUMN value TYPE 
BIGINT"
+    )
+    spark.sql(
+        "INSERT INTO schema_evolution_type_promotion VALUES (3, 3000000000)"
+    )
+
+    # ===== Data Evolution + Schema Evolution: Add Column =====
+    # Combines data-evolution (row-tracking + MERGE INTO) with ALTER TABLE ADD 
COLUMNS.
+    # Old files lack the new column; MERGE INTO produces partial-column files.
+    # Reader must fill nulls for missing columns AND merge columns across 
files.
+    spark.sql(
+        """
+        CREATE TABLE IF NOT EXISTS data_evolution_add_column (
+            id INT,
+            name STRING,
+            value INT
+        ) USING paimon
+        TBLPROPERTIES (
+            'row-tracking.enabled' = 'true',
+            'data-evolution.enabled' = 'true'
+        )
+        """
+    )
+    spark.sql(
+        """
+        INSERT INTO data_evolution_add_column VALUES
+            (1, 'alice', 100),
+            (2, 'bob', 200)
+        """
+    )
+    spark.sql("ALTER TABLE data_evolution_add_column ADD COLUMNS (extra 
STRING)")
+    spark.sql(
+        """
+        INSERT INTO data_evolution_add_column VALUES
+            (3, 'carol', 300, 'new'),
+            (4, 'dave', 400, 'new')
+        """
+    )
+    # MERGE INTO to trigger merge_files_by_columns with schema evolution.
+    spark.sql(
+        """
+        CREATE TABLE IF NOT EXISTS data_evolution_add_column_updates (
+            id INT,
+            name STRING
+        ) USING paimon
+        """
+    )
+    spark.sql(
+        "INSERT INTO data_evolution_add_column_updates VALUES (1, 'alice-v2')"
+    )
+    spark.sql(
+        """
+        MERGE INTO data_evolution_add_column t
+        USING data_evolution_add_column_updates s
+        ON t.id = s.id
+        WHEN MATCHED THEN UPDATE SET t.name = s.name
+        """
+    )
+    spark.sql("DROP TABLE data_evolution_add_column_updates")
+
+    # ===== Data Evolution + Schema Evolution: Type Promotion =====
+    # Combines data-evolution with ALTER TABLE ALTER COLUMN TYPE (INT -> 
BIGINT).
+    # Old files have INT; new files have BIGINT. MERGE INTO updates some rows.
+    # Reader must cast old INT columns to BIGINT AND merge columns across 
files.
+    spark.sql(
+        """
+        CREATE TABLE IF NOT EXISTS data_evolution_type_promotion (
+            id INT,
+            value INT
+        ) USING paimon
+        TBLPROPERTIES (
+            'row-tracking.enabled' = 'true',
+            'data-evolution.enabled' = 'true'
+        )
+        """
+    )
+    spark.sql(
+        "INSERT INTO data_evolution_type_promotion VALUES (1, 100), (2, 200)"
+    )
+    spark.sql(
+        "ALTER TABLE data_evolution_type_promotion ALTER COLUMN value TYPE 
BIGINT"
+    )
+    spark.sql(
+        "INSERT INTO data_evolution_type_promotion VALUES (3, 3000000000)"
+    )
+    # MERGE INTO to trigger merge_files_by_columns with type promotion.
+    spark.sql(
+        """
+        CREATE TABLE IF NOT EXISTS data_evolution_type_promotion_updates (
+            id INT,
+            value BIGINT
+        ) USING paimon
+        """
+    )
+    spark.sql(
+        "INSERT INTO data_evolution_type_promotion_updates VALUES (1, 999)"
+    )
+    spark.sql(
+        """
+        MERGE INTO data_evolution_type_promotion t
+        USING data_evolution_type_promotion_updates s
+        ON t.id = s.id
+        WHEN MATCHED THEN UPDATE SET t.value = s.value
+        """
+    )
+    spark.sql("DROP TABLE data_evolution_type_promotion_updates")
+
+    # ===== Data Evolution + Drop Column: tests NULL-fill when no file 
provides a column =====
+    # After MERGE INTO on old rows, the merge group files all predate ADD 
COLUMN.
+    # SELECT on the new column should return NULLs for old rows (not silently 
drop them).
+    spark.sql(
+        """
+        CREATE TABLE IF NOT EXISTS data_evolution_drop_column (
+            id INT,
+            name STRING,
+            value INT
+        ) USING paimon
+        TBLPROPERTIES (
+            'row-tracking.enabled' = 'true',
+            'data-evolution.enabled' = 'true'
+        )
+        """
+    )
+    spark.sql(
+        """
+        INSERT INTO data_evolution_drop_column VALUES
+            (1, 'alice', 100),
+            (2, 'bob', 200)
+        """
+    )
+    # MERGE INTO to create a partial-column file in the same row_id range.
+    spark.sql(
+        """
+        CREATE TABLE IF NOT EXISTS data_evolution_drop_column_updates (
+            id INT,
+            name STRING
+        ) USING paimon
+        """
+    )
+    spark.sql(
+        "INSERT INTO data_evolution_drop_column_updates VALUES (1, 'alice-v2')"
+    )
+    spark.sql(
+        """
+        MERGE INTO data_evolution_drop_column t
+        USING data_evolution_drop_column_updates s
+        ON t.id = s.id
+        WHEN MATCHED THEN UPDATE SET t.name = s.name
+        """
+    )
+    spark.sql("DROP TABLE data_evolution_drop_column_updates")
+    # Add a new column that no existing file contains.
+    spark.sql("ALTER TABLE data_evolution_drop_column ADD COLUMNS (extra 
STRING)")
+    # Insert new rows that DO have the extra column.
+    spark.sql(
+        """
+        INSERT INTO data_evolution_drop_column VALUES
+            (3, 'carol', 300, 'new')
+        """
+    )
+
+    # ===== Schema Evolution: Drop Column =====
+    # Old files have (id, name, score); after ALTER TABLE DROP COLUMN, table 
has (id, name).
+    # Reader should ignore the dropped column when reading old files.
+    spark.sql(
+        """
+        CREATE TABLE IF NOT EXISTS schema_evolution_drop_column (
+            id INT,
+            name STRING,
+            score INT
+        ) USING paimon
+        """
+    )
+    spark.sql(
+        """
+        INSERT INTO schema_evolution_drop_column VALUES
+            (1, 'alice', 100),
+            (2, 'bob', 200)
+        """
+    )
+    spark.sql("ALTER TABLE schema_evolution_drop_column DROP COLUMN score")
+    spark.sql(
+        """
+        INSERT INTO schema_evolution_drop_column VALUES
+            (3, 'carol'),
+            (4, 'dave')
+        """
+    )
+
 
 if __name__ == "__main__":
     main()

Reply via email to