This is an automated email from the ASF dual-hosted git repository.
hope pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 4ff1c11 feat: support schema evolution read with SchemaManager (#197)
4ff1c11 is described below
commit 4ff1c119ebbe9b26546dda6667e8534b89ff53e7
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Apr 3 22:06:41 2026 +0800
feat: support schema evolution read with SchemaManager (#197)
* feat(arrow): support schema evolution read with SchemaManager and
field-ID-based index mapping
---
Cargo.toml | 1 +
crates/integration_tests/tests/read_tables.rs | 221 ++++++++++-
crates/integrations/datafusion/src/lib.rs | 2 -
crates/integrations/datafusion/src/schema.rs | 116 ------
crates/integrations/datafusion/src/table/mod.rs | 4 +-
.../integrations/datafusion/tests/read_tables.rs | 57 ++-
crates/paimon/Cargo.toml | 1 +
crates/paimon/src/arrow/mod.rs | 123 ++++++
crates/paimon/src/arrow/reader.rs | 427 +++++++++++++++------
crates/paimon/src/arrow/schema_evolution.rs | 151 ++++++++
crates/paimon/src/lib.rs | 2 +-
crates/paimon/src/spec/types.rs | 16 +
crates/paimon/src/table/mod.rs | 11 +
crates/paimon/src/table/read_builder.rs | 8 +-
crates/paimon/src/table/schema_manager.rs | 103 +++++
dev/spark/provision.py | 223 +++++++++++
16 files changed, 1231 insertions(+), 235 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index d7ebd00..f1f7295 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -30,6 +30,7 @@ rust-version = "1.86.0"
[workspace.dependencies]
arrow-array = { version = "57.0", features = ["ffi"] }
arrow-schema = "57.0"
+arrow-cast = "57.0"
arrow-select = "57.0"
parquet = "57.0"
tokio = "1.39.2"
\ No newline at end of file
diff --git a/crates/integration_tests/tests/read_tables.rs
b/crates/integration_tests/tests/read_tables.rs
index 7027110..2775978 100644
--- a/crates/integration_tests/tests/read_tables.rs
+++ b/crates/integration_tests/tests/read_tables.rs
@@ -17,7 +17,7 @@
//! Integration tests for reading Paimon tables provisioned by Spark.
-use arrow_array::{Int32Array, RecordBatch, StringArray};
+use arrow_array::{Array, ArrowPrimitiveType, Int32Array, Int64Array,
RecordBatch, StringArray};
use futures::TryStreamExt;
use paimon::api::ConfigResponse;
use paimon::catalog::{Identifier, RESTCatalog};
@@ -999,3 +999,222 @@ async fn test_limit_pushdown() {
);
}
}
+
+// ---------------------------------------------------------------------------
+// Schema Evolution integration tests
+// ---------------------------------------------------------------------------
+
+/// Test reading a table after ALTER TABLE ADD COLUMNS.
+/// Old data files lack the new column; reader should fill nulls.
+#[tokio::test]
+async fn test_read_schema_evolution_add_column() {
+ let (_, batches) =
scan_and_read_with_fs_catalog("schema_evolution_add_column", None).await;
+
+ let mut rows: Vec<(i32, String, Option<i32>)> = Vec::new();
+ for batch in &batches {
+ let id = batch
+ .column_by_name("id")
+ .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+ .expect("id");
+ let name = batch
+ .column_by_name("name")
+ .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+ .expect("name");
+ let age = batch
+ .column_by_name("age")
+ .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+ .expect("age");
+ for i in 0..batch.num_rows() {
+ let age_val = if age.is_null(i) {
+ None
+ } else {
+ Some(age.value(i))
+ };
+ rows.push((id.value(i), name.value(i).to_string(), age_val));
+ }
+ }
+ rows.sort_by_key(|(id, _, _)| *id);
+
+ assert_eq!(
+ rows,
+ vec![
+ (1, "alice".into(), None),
+ (2, "bob".into(), None),
+ (3, "carol".into(), Some(30)),
+ (4, "dave".into(), Some(40)),
+ ],
+ "Old rows should have null for added column 'age'"
+ );
+}
+
+/// Test reading a table after ALTER TABLE ALTER COLUMN TYPE (INT -> BIGINT).
+/// Old data files have INT; reader should cast to BIGINT.
+#[tokio::test]
+async fn test_read_schema_evolution_type_promotion() {
+ let (_, batches) =
scan_and_read_with_fs_catalog("schema_evolution_type_promotion", None).await;
+
+ // Verify the value column is Int64 (BIGINT) in all batches
+ for batch in &batches {
+ let value_col = batch.column_by_name("value").expect("value column");
+ assert_eq!(
+ value_col.data_type(),
+ &arrow_array::types::Int64Type::DATA_TYPE,
+ "value column should be Int64 (BIGINT) after type promotion"
+ );
+ }
+
+ let mut rows: Vec<(i32, i64)> = Vec::new();
+ for batch in &batches {
+ let id = batch
+ .column_by_name("id")
+ .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+ .expect("id");
+ let value = batch
+ .column_by_name("value")
+ .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
+ .expect("value as Int64Array");
+ for i in 0..batch.num_rows() {
+ rows.push((id.value(i), value.value(i)));
+ }
+ }
+ rows.sort_by_key(|(id, _)| *id);
+
+ assert_eq!(
+ rows,
+ vec![(1, 100i64), (2, 200i64), (3, 3_000_000_000i64)],
+ "INT values should be promoted to BIGINT, including values > INT_MAX"
+ );
+}
+
+/// Test reading a data-evolution table after ALTER TABLE ADD COLUMNS.
+/// Old files lack the new column; reader should fill nulls even in data
evolution mode.
+#[tokio::test]
+async fn test_read_data_evolution_add_column() {
+ let (_, batches) =
scan_and_read_with_fs_catalog("data_evolution_add_column", None).await;
+
+ let mut rows: Vec<(i32, String, i32, Option<String>)> = Vec::new();
+ for batch in &batches {
+ let id = batch
+ .column_by_name("id")
+ .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+ .expect("id");
+ let name = batch
+ .column_by_name("name")
+ .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+ .expect("name");
+ let value = batch
+ .column_by_name("value")
+ .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+ .expect("value");
+ let extra = batch
+ .column_by_name("extra")
+ .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+ .expect("extra");
+ for i in 0..batch.num_rows() {
+ let extra_val = if extra.is_null(i) {
+ None
+ } else {
+ Some(extra.value(i).to_string())
+ };
+ rows.push((
+ id.value(i),
+ name.value(i).to_string(),
+ value.value(i),
+ extra_val,
+ ));
+ }
+ }
+ rows.sort_by_key(|(id, _, _, _)| *id);
+
+ assert_eq!(
+ rows,
+ vec![
+ (1, "alice-v2".into(), 100, None),
+ (2, "bob".into(), 200, None),
+ (3, "carol".into(), 300, Some("new".into())),
+ (4, "dave".into(), 400, Some("new".into())),
+ ],
+ "Data evolution + add column: old rows should have null for 'extra',
MERGE INTO updates name"
+ );
+}
+
+/// Test reading a data-evolution table after ALTER TABLE ALTER COLUMN TYPE
(INT -> BIGINT).
+/// Old files have INT; reader should cast to BIGINT in data evolution mode.
+#[tokio::test]
+async fn test_read_data_evolution_type_promotion() {
+ let (_, batches) =
scan_and_read_with_fs_catalog("data_evolution_type_promotion", None).await;
+
+ // Verify the value column is Int64 (BIGINT) in all batches
+ for batch in &batches {
+ let value_col = batch.column_by_name("value").expect("value column");
+ assert_eq!(
+ value_col.data_type(),
+ &arrow_array::types::Int64Type::DATA_TYPE,
+ "value column should be Int64 (BIGINT) after type promotion in
data evolution mode"
+ );
+ }
+
+ let mut rows: Vec<(i32, i64)> = Vec::new();
+ for batch in &batches {
+ let id = batch
+ .column_by_name("id")
+ .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+ .expect("id");
+ let value = batch
+ .column_by_name("value")
+ .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
+ .expect("value as Int64Array");
+ for i in 0..batch.num_rows() {
+ rows.push((id.value(i), value.value(i)));
+ }
+ }
+ rows.sort_by_key(|(id, _)| *id);
+
+ assert_eq!(
+ rows,
+ vec![(1, 999i64), (2, 200i64), (3, 3_000_000_000i64)],
+ "Data evolution + type promotion: INT should be cast to BIGINT, MERGE
INTO updates value"
+ );
+}
+
+/// Test reading a table after ALTER TABLE DROP COLUMN.
+/// Old data files have the dropped column; reader should ignore it.
+#[tokio::test]
+async fn test_read_schema_evolution_drop_column() {
+ let (_, batches) =
scan_and_read_with_fs_catalog("schema_evolution_drop_column", None).await;
+
+ // Verify the dropped column 'score' is not present in the output.
+ for batch in &batches {
+ assert!(
+ batch.column_by_name("score").is_none(),
+ "Dropped column 'score' should not appear in output"
+ );
+ }
+
+ let mut rows: Vec<(i32, String)> = Vec::new();
+ for batch in &batches {
+ let id = batch
+ .column_by_name("id")
+ .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+ .expect("id");
+ let name = batch
+ .column_by_name("name")
+ .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+ .expect("name");
+ for i in 0..batch.num_rows() {
+ rows.push((id.value(i), name.value(i).to_string()));
+ }
+ }
+ rows.sort_by_key(|(id, _)| *id);
+
+ assert_eq!(
+ rows,
+ vec![
+ (1, "alice".into()),
+ (2, "bob".into()),
+ (3, "carol".into()),
+ (4, "dave".into()),
+ ],
+ "Old rows should be readable after DROP COLUMN, with only remaining
columns"
+ );
+}
diff --git a/crates/integrations/datafusion/src/lib.rs
b/crates/integrations/datafusion/src/lib.rs
index cd73831..7aab461 100644
--- a/crates/integrations/datafusion/src/lib.rs
+++ b/crates/integrations/datafusion/src/lib.rs
@@ -41,12 +41,10 @@ mod error;
mod filter_pushdown;
mod physical_plan;
mod relation_planner;
-mod schema;
mod table;
pub use catalog::{PaimonCatalogProvider, PaimonSchemaProvider};
pub use error::to_datafusion_error;
pub use physical_plan::PaimonTableScan;
pub use relation_planner::PaimonRelationPlanner;
-pub use schema::paimon_schema_to_arrow;
pub use table::PaimonTableProvider;
diff --git a/crates/integrations/datafusion/src/schema.rs
b/crates/integrations/datafusion/src/schema.rs
deleted file mode 100644
index 231431b..0000000
--- a/crates/integrations/datafusion/src/schema.rs
+++ /dev/null
@@ -1,116 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use datafusion::arrow::datatypes::{DataType, Field, Schema};
-use datafusion::common::DataFusionError;
-use datafusion::common::Result as DFResult;
-use std::sync::Arc;
-
-use paimon::spec::{DataField, DataType as PaimonDataType};
-
-/// Converts Paimon table schema (logical row type fields) to DataFusion Arrow
schema.
-pub fn paimon_schema_to_arrow(fields: &[DataField]) -> DFResult<Arc<Schema>> {
- let arrow_fields: Vec<Field> = fields
- .iter()
- .map(|f| {
- let arrow_type = paimon_data_type_to_arrow(f.data_type())?;
- Ok(Field::new(
- f.name(),
- arrow_type,
- f.data_type().is_nullable(),
- ))
- })
- .collect::<DFResult<Vec<_>>>()?;
- Ok(Arc::new(Schema::new(arrow_fields)))
-}
-
-fn paimon_data_type_to_arrow(dt: &PaimonDataType) -> DFResult<DataType> {
- use datafusion::arrow::datatypes::TimeUnit;
-
- Ok(match dt {
- PaimonDataType::Boolean(_) => DataType::Boolean,
- PaimonDataType::TinyInt(_) => DataType::Int8,
- PaimonDataType::SmallInt(_) => DataType::Int16,
- PaimonDataType::Int(_) => DataType::Int32,
- PaimonDataType::BigInt(_) => DataType::Int64,
- PaimonDataType::Float(_) => DataType::Float32,
- PaimonDataType::Double(_) => DataType::Float64,
- PaimonDataType::VarChar(_) | PaimonDataType::Char(_) => DataType::Utf8,
- PaimonDataType::Binary(_) | PaimonDataType::VarBinary(_) =>
DataType::Binary,
- PaimonDataType::Date(_) => DataType::Date32,
- PaimonDataType::Time(t) => match t.precision() {
- // `read.to_arrow(...)` goes through the Parquet Arrow reader,
which exposes INT32
- // TIME values as millisecond precision only. Mirror that here so
provider schema and
- // runtime RecordBatch schema stay identical.
- 0..=3 => DataType::Time32(TimeUnit::Millisecond),
- 4..=6 => DataType::Time64(TimeUnit::Microsecond),
- 7..=9 => DataType::Time64(TimeUnit::Nanosecond),
- precision => {
- return Err(DataFusionError::Internal(format!(
- "Unsupported TIME precision {precision}"
- )));
- }
- },
- PaimonDataType::Timestamp(t) => {
- DataType::Timestamp(timestamp_time_unit(t.precision())?, None)
- }
- PaimonDataType::LocalZonedTimestamp(t) => {
- DataType::Timestamp(timestamp_time_unit(t.precision())?,
Some("UTC".into()))
- }
- PaimonDataType::Decimal(d) => {
- let p = u8::try_from(d.precision()).map_err(|_| {
- DataFusionError::Internal("Decimal precision exceeds
u8".to_string())
- })?;
- let s = i8::try_from(d.scale() as i32).map_err(|_| {
- DataFusionError::Internal("Decimal scale out of i8
range".to_string())
- })?;
- match d.precision() {
- // The Parquet Arrow reader normalizes DECIMAL columns to
Decimal128 regardless of
- // Parquet physical storage width. Mirror that here to avoid
DataFusion schema
- // mismatch between `TableProvider::schema()` and `execute()`
output.
- 1..=38 => DataType::Decimal128(p, s),
- precision => {
- return Err(DataFusionError::Internal(format!(
- "Unsupported DECIMAL precision {precision}"
- )));
- }
- }
- }
- PaimonDataType::Array(_)
- | PaimonDataType::Map(_)
- | PaimonDataType::Multiset(_)
- | PaimonDataType::Row(_) => {
- return Err(DataFusionError::NotImplemented(
- "Paimon DataFusion integration does not yet support nested
types (Array/Map/Row)"
- .to_string(),
- ));
- }
- })
-}
-
-fn timestamp_time_unit(precision: u32) ->
DFResult<datafusion::arrow::datatypes::TimeUnit> {
- use datafusion::arrow::datatypes::TimeUnit;
-
- match precision {
- 0..=3 => Ok(TimeUnit::Millisecond),
- 4..=6 => Ok(TimeUnit::Microsecond),
- 7..=9 => Ok(TimeUnit::Nanosecond),
- _ => Err(DataFusionError::Internal(format!(
- "Unsupported TIMESTAMP precision {precision}"
- ))),
- }
-}
diff --git a/crates/integrations/datafusion/src/table/mod.rs
b/crates/integrations/datafusion/src/table/mod.rs
index 73763e5..26a50c9 100644
--- a/crates/integrations/datafusion/src/table/mod.rs
+++ b/crates/integrations/datafusion/src/table/mod.rs
@@ -32,7 +32,6 @@ use paimon::table::Table;
use crate::error::to_datafusion_error;
use crate::filter_pushdown::{build_pushed_predicate, classify_filter_pushdown};
use crate::physical_plan::PaimonTableScan;
-use crate::schema::paimon_schema_to_arrow;
/// Read-only table provider for a Paimon table.
///
@@ -50,7 +49,8 @@ impl PaimonTableProvider {
/// Loads the table schema and converts it to Arrow for DataFusion.
pub fn try_new(table: Table) -> DFResult<Self> {
let fields = table.schema().fields();
- let schema = paimon_schema_to_arrow(fields)?;
+ let schema =
+
paimon::arrow::build_target_arrow_schema(fields).map_err(to_datafusion_error)?;
Ok(Self { table, schema })
}
diff --git a/crates/integrations/datafusion/tests/read_tables.rs
b/crates/integrations/datafusion/tests/read_tables.rs
index 6a3c995..4a65a10 100644
--- a/crates/integrations/datafusion/tests/read_tables.rs
+++ b/crates/integrations/datafusion/tests/read_tables.rs
@@ -17,7 +17,7 @@
use std::sync::Arc;
-use datafusion::arrow::array::{Int32Array, StringArray};
+use datafusion::arrow::array::{Array, Int32Array, StringArray};
use datafusion::catalog::CatalogProvider;
use datafusion::datasource::TableProvider;
use datafusion::logical_expr::{col, lit, TableProviderFilterPushDown};
@@ -468,3 +468,58 @@ async fn test_time_travel_by_tag_name() {
"Tag 'snapshot2' should contain all rows"
);
}
+
+/// Verifies that data evolution merge correctly NULL-fills columns that no
file in a
+/// merge group provides (e.g. a newly added column after MERGE INTO on old
rows).
+/// Without the fix, `active_file_indices` would be empty and rows would be
silently lost.
+#[tokio::test]
+async fn test_data_evolution_drop_column_null_fill() {
+ let batches = collect_query(
+ "data_evolution_drop_column",
+ "SELECT id, name, extra FROM data_evolution_drop_column",
+ )
+ .await
+ .expect("data_evolution_drop_column query should succeed");
+
+ let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+ assert_eq!(
+ total_rows, 3,
+ "Should return 3 rows (not silently drop rows from merge groups
missing the new column)"
+ );
+
+ let mut rows: Vec<(i32, String, Option<String>)> = Vec::new();
+ for batch in &batches {
+ let id_array = batch
+ .column_by_name("id")
+ .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+ .expect("Expected Int32Array for id");
+ let name_array = batch
+ .column_by_name("name")
+ .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+ .expect("Expected StringArray for name");
+ let extra_array = batch
+ .column_by_name("extra")
+ .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+ .expect("Expected StringArray for extra");
+
+ for i in 0..batch.num_rows() {
+ let extra = if extra_array.is_null(i) {
+ None
+ } else {
+ Some(extra_array.value(i).to_string())
+ };
+ rows.push((id_array.value(i), name_array.value(i).to_string(),
extra));
+ }
+ }
+ rows.sort_by_key(|(id, _, _)| *id);
+
+ assert_eq!(
+ rows,
+ vec![
+ (1, "alice-v2".to_string(), None),
+ (2, "bob".to_string(), None),
+ (3, "carol".to_string(), Some("new".to_string())),
+ ],
+ "Old rows should have extra=NULL, new row should have extra='new'"
+ );
+}
diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml
index 878d1c5..b1677cf 100644
--- a/crates/paimon/Cargo.toml
+++ b/crates/paimon/Cargo.toml
@@ -55,6 +55,7 @@ apache-avro = { version = "0.17", features = ["snappy",
"zstandard"] }
indexmap = "2.5.0"
roaring = "0.11"
arrow-array = { workspace = true }
+arrow-cast = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
futures = "0.3"
diff --git a/crates/paimon/src/arrow/mod.rs b/crates/paimon/src/arrow/mod.rs
index 0ed18b8..e823c90 100644
--- a/crates/paimon/src/arrow/mod.rs
+++ b/crates/paimon/src/arrow/mod.rs
@@ -16,5 +16,128 @@
// under the License.
mod reader;
+pub(crate) mod schema_evolution;
pub use crate::arrow::reader::ArrowReaderBuilder;
+
+use crate::spec::{DataField, DataType as PaimonDataType};
+use arrow_schema::DataType as ArrowDataType;
+use arrow_schema::{Field as ArrowField, Schema as ArrowSchema, TimeUnit};
+use std::sync::Arc;
+
+/// Converts a Paimon [`DataType`](PaimonDataType) to an Arrow
[`DataType`](ArrowDataType).
+pub fn paimon_type_to_arrow(dt: &PaimonDataType) ->
crate::Result<ArrowDataType> {
+ Ok(match dt {
+ PaimonDataType::Boolean(_) => ArrowDataType::Boolean,
+ PaimonDataType::TinyInt(_) => ArrowDataType::Int8,
+ PaimonDataType::SmallInt(_) => ArrowDataType::Int16,
+ PaimonDataType::Int(_) => ArrowDataType::Int32,
+ PaimonDataType::BigInt(_) => ArrowDataType::Int64,
+ PaimonDataType::Float(_) => ArrowDataType::Float32,
+ PaimonDataType::Double(_) => ArrowDataType::Float64,
+ PaimonDataType::VarChar(_) | PaimonDataType::Char(_) =>
ArrowDataType::Utf8,
+ PaimonDataType::Binary(_) | PaimonDataType::VarBinary(_) =>
ArrowDataType::Binary,
+ PaimonDataType::Date(_) => ArrowDataType::Date32,
+ PaimonDataType::Time(_) =>
ArrowDataType::Time32(TimeUnit::Millisecond),
+ PaimonDataType::Timestamp(t) => {
+ ArrowDataType::Timestamp(timestamp_time_unit(t.precision())?, None)
+ }
+ PaimonDataType::LocalZonedTimestamp(t) => {
+ ArrowDataType::Timestamp(timestamp_time_unit(t.precision())?,
Some("UTC".into()))
+ }
+ PaimonDataType::Decimal(d) => {
+ let p = u8::try_from(d.precision()).map_err(|_|
crate::Error::Unsupported {
+ message: "Decimal precision exceeds u8".to_string(),
+ })?;
+ let s = i8::try_from(d.scale() as i32).map_err(|_|
crate::Error::Unsupported {
+ message: "Decimal scale out of i8 range".to_string(),
+ })?;
+ ArrowDataType::Decimal128(p, s)
+ }
+ PaimonDataType::Array(a) => {
+ let element_type = paimon_type_to_arrow(a.element_type())?;
+ ArrowDataType::List(Arc::new(ArrowField::new(
+ "element",
+ element_type,
+ a.element_type().is_nullable(),
+ )))
+ }
+ PaimonDataType::Map(m) => {
+ let key_type = paimon_type_to_arrow(m.key_type())?;
+ let value_type = paimon_type_to_arrow(m.value_type())?;
+ ArrowDataType::Map(
+ Arc::new(ArrowField::new(
+ "entries",
+ ArrowDataType::Struct(
+ vec![
+ ArrowField::new("key", key_type, false),
+ ArrowField::new("value", value_type,
m.value_type().is_nullable()),
+ ]
+ .into(),
+ ),
+ false,
+ )),
+ false,
+ )
+ }
+ PaimonDataType::Multiset(m) => {
+ let element_type = paimon_type_to_arrow(m.element_type())?;
+ ArrowDataType::Map(
+ Arc::new(ArrowField::new(
+ "entries",
+ ArrowDataType::Struct(
+ vec![
+ ArrowField::new("key", element_type,
m.element_type().is_nullable()),
+ ArrowField::new("value", ArrowDataType::Int32,
false),
+ ]
+ .into(),
+ ),
+ false,
+ )),
+ false,
+ )
+ }
+ PaimonDataType::Row(r) => {
+ let fields: Vec<ArrowField> = r
+ .fields()
+ .iter()
+ .map(|f| {
+ let arrow_type = paimon_type_to_arrow(f.data_type())?;
+ Ok(ArrowField::new(
+ f.name(),
+ arrow_type,
+ f.data_type().is_nullable(),
+ ))
+ })
+ .collect::<crate::Result<Vec<_>>>()?;
+ ArrowDataType::Struct(fields.into())
+ }
+ })
+}
+
+fn timestamp_time_unit(precision: u32) -> crate::Result<TimeUnit> {
+ match precision {
+ 0..=3 => Ok(TimeUnit::Millisecond),
+ 4..=6 => Ok(TimeUnit::Microsecond),
+ 7..=9 => Ok(TimeUnit::Nanosecond),
+ _ => Err(crate::Error::Unsupported {
+ message: format!("Unsupported TIMESTAMP precision {precision}"),
+ }),
+ }
+}
+
+/// Build an Arrow [`Schema`](ArrowSchema) from Paimon [`DataField`]s.
+pub fn build_target_arrow_schema(fields: &[DataField]) ->
crate::Result<Arc<ArrowSchema>> {
+ let arrow_fields: Vec<ArrowField> = fields
+ .iter()
+ .map(|f| {
+ let arrow_type = paimon_type_to_arrow(f.data_type())?;
+ Ok(ArrowField::new(
+ f.name(),
+ arrow_type,
+ f.data_type().is_nullable(),
+ ))
+ })
+ .collect::<crate::Result<Vec<_>>>()?;
+ Ok(Arc::new(ArrowSchema::new(arrow_fields)))
+}
diff --git a/crates/paimon/src/arrow/reader.rs
b/crates/paimon/src/arrow/reader.rs
index 7d57243..212f32f 100644
--- a/crates/paimon/src/arrow/reader.rs
+++ b/crates/paimon/src/arrow/reader.rs
@@ -15,13 +15,16 @@
// specific language governing permissions and limitations
// under the License.
+use crate::arrow::build_target_arrow_schema;
+use crate::arrow::schema_evolution::{create_index_mapping, NULL_FIELD_INDEX};
use crate::deletion_vector::{DeletionVector, DeletionVectorFactory};
use crate::io::{FileIO, FileRead, FileStatus};
use crate::spec::{DataField, DataFileMeta};
+use crate::table::schema_manager::SchemaManager;
use crate::table::ArrowRecordBatchStream;
use crate::{DataSplit, Error};
use arrow_array::RecordBatch;
-use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
+use arrow_cast::cast;
use async_stream::try_stream;
use bytes::Bytes;
@@ -41,14 +44,22 @@ use tokio::try_join;
pub struct ArrowReaderBuilder {
batch_size: Option<usize>,
file_io: FileIO,
+ schema_manager: SchemaManager,
+ table_schema_id: i64,
}
impl ArrowReaderBuilder {
/// Create a new ArrowReaderBuilder
- pub(crate) fn new(file_io: FileIO) -> Self {
+ pub(crate) fn new(
+ file_io: FileIO,
+ schema_manager: SchemaManager,
+ table_schema_id: i64,
+ ) -> Self {
ArrowReaderBuilder {
batch_size: None,
file_io,
+ schema_manager,
+ table_schema_id,
}
}
@@ -58,6 +69,8 @@ impl ArrowReaderBuilder {
ArrowReader {
batch_size: self.batch_size,
file_io: self.file_io,
+ schema_manager: self.schema_manager,
+ table_schema_id: self.table_schema_id,
read_type,
}
}
@@ -68,33 +81,30 @@ impl ArrowReaderBuilder {
pub struct ArrowReader {
batch_size: Option<usize>,
file_io: FileIO,
+ schema_manager: SchemaManager,
+ table_schema_id: i64,
read_type: Vec<DataField>,
}
impl ArrowReader {
/// Take a stream of DataSplits and read every data file in each split.
/// Returns a stream of Arrow RecordBatches from all files.
- /// When a split has deletion files (see
[DataSplit::data_deletion_files]), the corresponding
- /// deletion vectors are loaded and applied so that deleted rows are
filtered out from the stream.
- /// Row positions are 0-based within each data file, matching Java's
ApplyDeletionVectorReader.
///
- /// Matches
[RawFileSplitRead.createReader](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java):
- /// one DV factory per DataSplit (created from that split's data files and
deletion files).
+ /// Uses SchemaManager to load the data file's schema (via
`DataFileMeta.schema_id`)
+ /// and computes field-ID-based index mapping for schema evolution (added
columns,
+ /// type promotion, column reordering).
///
- /// Parquet schema is clipped to this reader's read type (column names
from [DataField]s).
- /// File-only columns are not read. See
[ParquetReaderFactory.clipParquetSchema](https://github.com/apache/paimon/blob/master/paimon-format/paimon-format-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java).
+ /// Matches
[RawFileSplitRead.createReader](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java).
pub fn read(self, data_splits: &[DataSplit]) ->
crate::Result<ArrowRecordBatchStream> {
let file_io = self.file_io.clone();
let batch_size = self.batch_size;
let splits: Vec<DataSplit> = data_splits.to_vec();
let read_type = self.read_type;
- let projected_column_names: Vec<String> = read_type
- .iter()
- .map(|field| field.name().to_string())
- .collect();
+ let schema_manager = self.schema_manager;
+ let table_schema_id = self.table_schema_id;
Ok(try_stream! {
for split in splits {
- // Create DV factory for this split only (like Java
createReader(partition, bucket, files, deletionFiles)).
+ // Create DV factory for this split only.
let dv_factory = if split
.data_deletion_files()
.is_some_and(|files| files.iter().any(Option::is_some))
@@ -117,11 +127,20 @@ impl ArrowReader {
.and_then(|factory|
factory.get_deletion_vector(&file_meta.file_name))
.cloned();
+ // Load data file's schema if it differs from the table
schema.
+ let data_fields: Option<Vec<DataField>> = if
file_meta.schema_id != table_schema_id {
+ let data_schema =
schema_manager.schema(file_meta.schema_id).await?;
+ Some(data_schema.fields().to_vec())
+ } else {
+ None
+ };
+
let mut stream = read_single_file_stream(
file_io.clone(),
split.clone(),
file_meta,
- projected_column_names.clone(),
+ read_type.clone(),
+ data_fields,
batch_size,
dv,
)?;
@@ -151,32 +170,38 @@ impl ArrowReader {
let batch_size = self.batch_size;
let splits: Vec<DataSplit> = data_splits.to_vec();
let read_type = self.read_type;
- let table_field_names: Vec<String> =
- table_fields.iter().map(|f| f.name().to_string()).collect();
- let projected_column_names: Vec<String> = read_type
- .iter()
- .map(|field| field.name().to_string())
- .collect();
+ let table_fields: Vec<DataField> = table_fields.to_vec();
+ let schema_manager = self.schema_manager;
+ let table_schema_id = self.table_schema_id;
Ok(try_stream! {
for split in splits {
if split.raw_convertible() || split.data_files().len() == 1 {
- // Single file or raw convertible — stream lazily without
loading all into memory.
for file_meta in split.data_files().to_vec() {
+ let data_fields: Option<Vec<DataField>> = if
file_meta.schema_id != table_schema_id {
+ let data_schema =
schema_manager.schema(file_meta.schema_id).await?;
+ Some(data_schema.fields().to_vec())
+ } else {
+ None
+ };
+
let mut stream = read_single_file_stream(
- file_io.clone(), split.clone(), file_meta,
projected_column_names.clone(), batch_size, None,
+ file_io.clone(), split.clone(), file_meta,
read_type.clone(),
+ data_fields, batch_size, None,
)?;
while let Some(batch) = stream.next().await {
yield batch?;
}
}
} else {
- // Multiple files need column-wise merge — also streamed
lazily.
+ // Multiple files need column-wise merge.
let mut merge_stream = merge_files_by_columns(
&file_io,
&split,
- &projected_column_names,
- &table_field_names,
+ &read_type,
+ &table_fields,
+ schema_manager.clone(),
+ table_schema_id,
batch_size,
)?;
while let Some(batch) = merge_stream.next().await {
@@ -191,14 +216,56 @@ impl ArrowReader {
/// Read a single parquet file from a split, returning a lazy stream of
batches.
/// Optionally applies a deletion vector.
+///
+/// Handles schema evolution using field-ID-based index mapping:
+/// - `data_fields`: if `Some`, the fields from the data file's schema (loaded
via SchemaManager).
+/// Used to compute index mapping between `read_type` and data fields by
field ID.
+/// - Columns missing from the file are filled with null arrays.
+/// - Columns whose Arrow type differs from the target type are cast (type
promotion).
+///
+/// Reference:
[RawFileSplitRead.createFileReader](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java)
fn read_single_file_stream(
file_io: FileIO,
split: DataSplit,
file_meta: DataFileMeta,
- projected_column_names: Vec<String>,
+ read_type: Vec<DataField>,
+ data_fields: Option<Vec<DataField>>,
batch_size: Option<usize>,
dv: Option<Arc<DeletionVector>>,
) -> crate::Result<ArrowRecordBatchStream> {
+ let target_schema = build_target_arrow_schema(&read_type)?;
+
+ // Compute index mapping and determine which columns to read from the
parquet file.
+ // If data_fields is provided, use field-ID-based mapping; otherwise use
read_type names directly.
+ let (parquet_read_fields, index_mapping) = if let Some(ref df) =
data_fields {
+ let mapping = create_index_mapping(&read_type, df);
+ match mapping {
+ Some(ref idx_map) => {
+ // Only read data fields that are referenced by the index
mapping.
+ // Dedup by data field index to avoid duplicate parquet column
projections.
+ let mut seen = std::collections::HashSet::new();
+ let fields_to_read: Vec<DataField> = idx_map
+ .iter()
+ .filter(|&&idx| idx != NULL_FIELD_INDEX &&
seen.insert(idx))
+ .map(|&idx| df[idx as usize].clone())
+ .collect();
+ (fields_to_read, Some(idx_map.clone()))
+ }
+ None => {
+ // Identity mapping — read data fields in order.
+ (df.clone(), None)
+ }
+ }
+ } else {
+ // No schema evolution — read by read_type names.
+ (read_type.clone(), None)
+ };
+
+ let parquet_column_names: Vec<String> = parquet_read_fields
+ .iter()
+ .map(|f| f.name().to_string())
+ .collect();
+
Ok(try_stream! {
let path_to_read = split.data_file_path(&file_meta);
if !path_to_read.to_ascii_lowercase().ends_with(".parquet") {
@@ -219,7 +286,7 @@ fn read_single_file_stream(
// Only project columns that exist in this file.
let parquet_schema = batch_stream_builder.parquet_schema().clone();
let file_column_names: Vec<&str> =
parquet_schema.columns().iter().map(|c| c.name()).collect();
- let available_columns: Vec<&str> = projected_column_names
+ let available_columns: Vec<&str> = parquet_column_names
.iter()
.filter(|name| file_column_names.contains(&name.as_str()))
.map(String::as_str)
@@ -242,24 +309,83 @@ fn read_single_file_stream(
let mut batch_stream = batch_stream_builder.build()?;
while let Some(batch) = batch_stream.next().await {
let batch = batch?;
- // Reorder columns from parquet-schema order to
projected_column_names order,
- // consistent with the normal read() path.
- let reorder_indices: Vec<usize> = projected_column_names
- .iter()
- .filter_map(|name| batch.schema().index_of(name).ok())
- .collect();
- if reorder_indices.len() == batch.num_columns() {
- yield batch.project(&reorder_indices).map_err(|e| {
- Error::UnexpectedError {
- message: "Failed to reorder projected
columns".to_string(),
- source: Some(Box::new(e)),
+ let num_rows = batch.num_rows();
+ let batch_schema = batch.schema();
+
+ // Build output columns using index mapping (field-ID-based) or by
name.
+ let mut columns: Vec<Arc<dyn arrow_array::Array>> =
Vec::with_capacity(target_schema.fields().len());
+ for (i, target_field) in target_schema.fields().iter().enumerate()
{
+ let source_col = if let Some(ref idx_map) = index_mapping {
+ let data_idx = idx_map[i];
+ if data_idx == NULL_FIELD_INDEX {
+ None
+ } else {
+ // Find the column in the batch by the data field's
name.
+ let data_field =
&data_fields.as_ref().unwrap()[data_idx as usize];
+ batch_schema
+ .index_of(data_field.name())
+ .ok()
+ .map(|col_idx| batch.column(col_idx))
}
- })?;
+ } else if let Some(ref df) = data_fields {
+ // Identity mapping with data_fields present (e.g. renamed
column).
+ // Use data field name (old name in parquet) at the same
position.
+ batch_schema
+ .index_of(df[i].name())
+ .ok()
+ .map(|col_idx| batch.column(col_idx))
+ } else {
+ // No schema evolution — look up by target field name.
+ batch_schema
+ .index_of(target_field.name())
+ .ok()
+ .map(|col_idx| batch.column(col_idx))
+ };
+
+ match source_col {
+ Some(col) => {
+ if col.data_type() == target_field.data_type() {
+ columns.push(col.clone());
+ } else {
+ // Type promotion: cast to target type.
+ let casted = cast(col,
target_field.data_type()).map_err(|e| {
+ Error::UnexpectedError {
+ message: format!(
+ "Failed to cast column '{}' from {:?}
to {:?}: {e}",
+ target_field.name(),
+ col.data_type(),
+ target_field.data_type()
+ ),
+ source: Some(Box::new(e)),
+ }
+ })?;
+ columns.push(casted);
+ }
+ }
+ None => {
+ // Column missing from file: fill with nulls.
+ let null_array =
arrow_array::new_null_array(target_field.data_type(), num_rows);
+ columns.push(null_array);
+ }
+ }
+ }
+
+ let result = if columns.is_empty() {
+ RecordBatch::try_new_with_options(
+ target_schema.clone(),
+ columns,
+
&arrow_array::RecordBatchOptions::new().with_row_count(Some(num_rows)),
+ )
} else {
- // Not all projected columns exist in this file (data
evolution case),
- // return as-is; the caller (merge_files_by_columns) handles
missing columns.
- yield batch;
+ RecordBatch::try_new(target_schema.clone(), columns)
}
+ .map_err(|e| {
+ Error::UnexpectedError {
+ message: format!("Failed to build schema-evolved
RecordBatch: {e}"),
+ source: Some(Box::new(e)),
+ }
+ })?;
+ yield result;
}
}
.boxed())
@@ -267,6 +393,9 @@ fn read_single_file_stream(
/// Merge multiple files column-wise for data evolution, streaming with
bounded memory.
///
+/// Uses field IDs (not column names) to resolve which file provides which
column,
+/// ensuring correctness across schema evolution (column rename, add, drop).
+///
/// Opens all file readers simultaneously and maintains a cursor (current
batch + offset)
/// per file. Each poll slices up to `batch_size` rows from each file's
current batch,
/// assembles columns from the winning files, and yields the merged batch.
When a file's
@@ -274,8 +403,10 @@ fn read_single_file_stream(
fn merge_files_by_columns(
file_io: &FileIO,
split: &DataSplit,
- projected_column_names: &[String],
- table_field_names: &[String],
+ read_type: &[DataField],
+ table_fields: &[DataField],
+ schema_manager: SchemaManager,
+ table_schema_id: i64,
batch_size: Option<usize>,
) -> crate::Result<ArrowRecordBatchStream> {
let data_files = split.data_files();
@@ -283,67 +414,140 @@ fn merge_files_by_columns(
return Ok(futures::stream::empty().boxed());
}
- // Determine which columns each file provides and resolve conflicts by
max_sequence_number.
- // column_name -> (file_index, max_sequence_number)
- let mut column_source: HashMap<String, (usize, i64)> = HashMap::new();
+ // Build owned data for the stream closure.
+ let file_io = file_io.clone();
+ let split = split.clone();
+ let data_files: Vec<DataFileMeta> = data_files.to_vec();
+ let read_type = read_type.to_vec();
+ let table_fields = table_fields.to_vec();
+ let output_batch_size = batch_size.unwrap_or(1024);
+ let target_schema = build_target_arrow_schema(&read_type)?;
- for (file_idx, file_meta) in data_files.iter().enumerate() {
- let file_columns: Vec<String> = if let Some(ref wc) =
file_meta.write_cols {
- wc.clone()
- } else {
- table_field_names.to_vec()
- };
+ Ok(try_stream! {
+ // Pre-load schemas and collect field IDs + data_fields per file.
+ // file_idx -> (field_ids, Option<Vec<DataField>>)
+ let mut file_info: HashMap<usize, (Vec<i32>, Option<Vec<DataField>>)>
= HashMap::new();
+
+ for (file_idx, file_meta) in data_files.iter().enumerate() {
+ let (field_ids, data_fields) = if file_meta.schema_id !=
table_schema_id {
+ let file_schema =
schema_manager.schema(file_meta.schema_id).await?;
+ let file_fields = file_schema.fields();
+
+ let ids: Vec<i32> = if let Some(ref wc) = file_meta.write_cols
{
+ // write_cols names are from the file's schema at write
time.
+ wc.iter()
+ .filter_map(|name| file_fields.iter().find(|f|
f.name() == name).map(|f| f.id()))
+ .collect()
+ } else {
+ file_fields.iter().map(|f| f.id()).collect()
+ };
- for col in &file_columns {
- let entry = column_source
- .entry(col.clone())
- .or_insert((file_idx, i64::MIN));
- if file_meta.max_sequence_number > entry.1 {
- *entry = (file_idx, file_meta.max_sequence_number);
- }
+ (ids, Some(file_fields.to_vec()))
+ } else {
+ let ids: Vec<i32> = if let Some(ref wc) = file_meta.write_cols
{
+ // write_cols names are from the current table schema.
+ wc.iter()
+ .filter_map(|name| table_fields.iter().find(|f|
f.name() == name).map(|f| f.id()))
+ .collect()
+ } else {
+ table_fields.iter().map(|f| f.id()).collect()
+ };
+
+ (ids, None)
+ };
+
+ file_info.insert(file_idx, (field_ids, data_fields));
}
- }
- // For each file, determine which projected columns to read from it.
- // file_index -> Vec<column_name>
- let mut file_read_columns: HashMap<usize, Vec<String>> = HashMap::new();
- for col_name in projected_column_names {
- if let Some(&(file_idx, _)) = column_source.get(col_name) {
- file_read_columns
- .entry(file_idx)
- .or_default()
- .push(col_name.clone());
+ // Determine which file provides each field ID, resolving conflicts by
max_sequence_number.
+ // field_id -> (file_index, max_sequence_number)
+ let mut field_id_source: HashMap<i32, (usize, i64)> = HashMap::new();
+ for (file_idx, file_meta) in data_files.iter().enumerate() {
+ let (ref field_ids, _) = file_info[&file_idx];
+ for &fid in field_ids {
+ let entry = field_id_source
+ .entry(fid)
+ .or_insert((file_idx, i64::MIN));
+ if file_meta.max_sequence_number > entry.1 {
+ *entry = (file_idx, file_meta.max_sequence_number);
+ }
+ }
}
- }
- // For each projected column, record (file_index, column_name) for
assembly.
- let column_plan: Vec<(Option<usize>, String)> = projected_column_names
- .iter()
- .map(|col_name| {
- let file_idx = column_source.get(col_name).map(|&(idx, _)| idx);
- (file_idx, col_name.clone())
- })
- .collect();
+ // For each projected field, determine which file provides it (by
field ID).
+ // file_index -> Vec<column_name> (target column names)
+ let mut file_read_columns: HashMap<usize, Vec<String>> =
HashMap::new();
+ for field in &read_type {
+ if let Some(&(file_idx, _)) = field_id_source.get(&field.id()) {
+ file_read_columns
+ .entry(file_idx)
+ .or_default()
+ .push(field.name().to_string());
+ }
+ }
- // Collect which file indices we need to open streams for.
- let active_file_indices: Vec<usize> =
file_read_columns.keys().copied().collect();
+ // For each projected field, record (file_index, target_column_name)
for assembly.
+ let column_plan: Vec<(Option<usize>, String)> = read_type
+ .iter()
+ .map(|field| {
+ let file_idx = field_id_source.get(&field.id()).map(|&(idx,
_)| idx);
+ (file_idx, field.name().to_string())
+ })
+ .collect();
- // Build owned data for the stream closure.
- let file_io = file_io.clone();
- let split = split.clone();
- let data_files: Vec<DataFileMeta> = data_files.to_vec();
- let projected_column_names = projected_column_names.to_vec();
- let output_batch_size = batch_size.unwrap_or(1024);
+ // Collect which file indices we need to open streams for.
+ let active_file_indices: Vec<usize> =
file_read_columns.keys().copied().collect();
+
+ // Edge case: if no file provides any projected column (e.g. SELECT on
a newly added
+ // column that no file contains yet), we still need to emit
NULL-filled rows to
+ // preserve the correct row count.
+ if active_file_indices.is_empty() {
+ // All files in a merge group cover the same rows; use the first
file's row_count.
+ let total_rows = data_files[0].row_count as usize;
+ let mut emitted = 0;
+ while emitted < total_rows {
+ let rows_to_emit = (total_rows -
emitted).min(output_batch_size);
+ let columns: Vec<Arc<dyn arrow_array::Array>> = target_schema
+ .fields()
+ .iter()
+ .map(|f| arrow_array::new_null_array(f.data_type(),
rows_to_emit))
+ .collect();
+ let batch = if columns.is_empty() {
+ RecordBatch::try_new_with_options(
+ target_schema.clone(),
+ columns,
+
&arrow_array::RecordBatchOptions::new().with_row_count(Some(rows_to_emit)),
+ )
+ } else {
+ RecordBatch::try_new(target_schema.clone(), columns)
+ }
+ .map_err(|e| Error::UnexpectedError {
+ message: format!("Failed to build NULL-filled RecordBatch:
{e}"),
+ source: Some(Box::new(e)),
+ })?;
+ emitted += rows_to_emit;
+ yield batch;
+ }
+ } else {
- Ok(try_stream! {
// Open a stream for each active file.
+ // Build per-file read_type: only the DataFields this file is
responsible for.
let mut file_streams: HashMap<usize, ArrowRecordBatchStream> =
HashMap::new();
for &file_idx in &active_file_indices {
+ let file_cols =
file_read_columns.get(&file_idx).cloned().unwrap_or_default();
+ let file_read_type: Vec<DataField> = file_cols
+ .iter()
+ .filter_map(|col_name| read_type.iter().find(|f| f.name() ==
col_name).cloned())
+ .collect();
+
+ let (_, ref data_fields) = file_info[&file_idx];
+
let stream = read_single_file_stream(
file_io.clone(),
split.clone(),
data_files[file_idx].clone(),
- projected_column_names.clone(),
+ file_read_type,
+ data_fields.clone(),
batch_size,
None,
)?;
@@ -397,20 +601,25 @@ fn merge_files_by_columns(
let rows_to_emit = remaining.min(output_batch_size);
// Slice each file's current batch and assemble columns.
+ // Use the target schema so that missing columns are null-filled.
let mut columns: Vec<Arc<dyn arrow_array::Array>> =
Vec::with_capacity(column_plan.len());
- let mut schema_fields: Vec<ArrowField> =
Vec::with_capacity(column_plan.len());
-
- for (file_idx_opt, col_name) in &column_plan {
- if let Some(file_idx) = file_idx_opt {
- if let Some((batch, offset)) = file_cursors.get(file_idx) {
- if let Ok(col_idx) = batch.schema().index_of(col_name)
{
- let col = batch.column(col_idx).slice(*offset,
rows_to_emit);
- columns.push(col);
-
schema_fields.push(batch.schema().field(col_idx).clone());
- }
- }
- }
+
+ for (i, (file_idx_opt, col_name)) in
column_plan.iter().enumerate() {
+ let target_field = &target_schema.fields()[i];
+ let col = file_idx_opt
+ .and_then(|file_idx| file_cursors.get(&file_idx))
+ .and_then(|(batch, offset)| {
+ batch
+ .schema()
+ .index_of(col_name)
+ .ok()
+ .map(|col_idx|
batch.column(col_idx).slice(*offset, rows_to_emit))
+ });
+
+ columns.push(col.unwrap_or_else(|| {
+ arrow_array::new_null_array(target_field.data_type(),
rows_to_emit)
+ }));
}
// Advance all cursors.
@@ -420,15 +629,13 @@ fn merge_files_by_columns(
}
}
- if !columns.is_empty() {
- let schema = Arc::new(ArrowSchema::new(schema_fields));
- let merged = RecordBatch::try_new(schema, columns).map_err(|e|
Error::UnexpectedError {
- message: format!("Failed to build merged RecordBatch:
{e}"),
- source: Some(Box::new(e)),
- })?;
- yield merged;
- }
+ let merged = RecordBatch::try_new(target_schema.clone(),
columns).map_err(|e| Error::UnexpectedError {
+ message: format!("Failed to build merged RecordBatch: {e}"),
+ source: Some(Box::new(e)),
+ })?;
+ yield merged;
}
+ } // end else (active_file_indices non-empty)
}
.boxed())
}
diff --git a/crates/paimon/src/arrow/schema_evolution.rs
b/crates/paimon/src/arrow/schema_evolution.rs
new file mode 100644
index 0000000..ea9450b
--- /dev/null
+++ b/crates/paimon/src/arrow/schema_evolution.rs
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Schema evolution utilities for mapping between table schema and data file
schema.
+//!
+//! Reference:
[org.apache.paimon.schema.SchemaEvolutionUtil](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java)
+
+use crate::spec::DataField;
+use std::collections::HashMap;
+
+/// Sentinel value indicating a field does not exist in the data schema.
+pub const NULL_FIELD_INDEX: i32 = -1;
+
+/// Create index mapping from table fields to underlying data fields using
field IDs.
+///
+/// For example, the table and data fields are as follows:
+/// - table fields: `1->c, 6->b, 3->a`
+/// - data fields: `1->a, 3->c`
+///
+/// We get the index mapping `[0, -1, 1]`, where:
+/// - `0` is the index of table field `1->c` in data fields
+/// - `-1` means field `6->b` does not exist in data fields
+/// - `1` is the index of table field `3->a` in data fields
+///
+/// Returns `None` if the mapping is identity (no evolution needed).
+///
+/// Reference:
[SchemaEvolutionUtil.createIndexMapping](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java)
+pub fn create_index_mapping(
+ table_fields: &[DataField],
+ data_fields: &[DataField],
+) -> Option<Vec<i32>> {
+ let mut field_id_to_index: HashMap<i32, i32> =
HashMap::with_capacity(data_fields.len());
+ for (i, field) in data_fields.iter().enumerate() {
+ field_id_to_index.insert(field.id(), i as i32);
+ }
+
+ let mut index_mapping = Vec::with_capacity(table_fields.len());
+ for field in table_fields {
+ let data_index = field_id_to_index
+ .get(&field.id())
+ .copied()
+ .unwrap_or(NULL_FIELD_INDEX);
+ index_mapping.push(data_index);
+ }
+
+ // Check if mapping is identity (no evolution needed).
+ let is_identity = index_mapping.len() == data_fields.len()
+ && index_mapping
+ .iter()
+ .enumerate()
+ .all(|(i, &idx)| idx == i as i32);
+
+ if is_identity {
+ None
+ } else {
+ Some(index_mapping)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::spec::{DataType, IntType, VarCharType};
+
+ fn field(id: i32, name: &str) -> DataField {
+ DataField::new(id, name.to_string(), DataType::Int(IntType::new()))
+ }
+
+ #[test]
+ fn test_identity_mapping() {
+ let table_fields = vec![field(0, "a"), field(1, "b"), field(2, "c")];
+ let data_fields = vec![field(0, "a"), field(1, "b"), field(2, "c")];
+ assert_eq!(create_index_mapping(&table_fields, &data_fields), None);
+ }
+
+ #[test]
+ fn test_added_column() {
+ // Table has 3 fields, data file only has the first 2
+ let table_fields = vec![field(0, "a"), field(1, "b"), field(2, "c")];
+ let data_fields = vec![field(0, "a"), field(1, "b")];
+ assert_eq!(
+ create_index_mapping(&table_fields, &data_fields),
+ Some(vec![0, 1, -1])
+ );
+ }
+
+ #[test]
+ fn test_reordered_fields() {
+ let table_fields = vec![field(1, "c"), field(6, "b"), field(3, "a")];
+ let data_fields = vec![field(1, "a"), field(3, "c")];
+ assert_eq!(
+ create_index_mapping(&table_fields, &data_fields),
+ Some(vec![0, -1, 1])
+ );
+ }
+
+ #[test]
+ fn test_renamed_column() {
+ // Field ID stays the same even if name changed
+ let table_fields = vec![field(0, "id"), field(1, "new_name")];
+ let data_fields = vec![field(0, "id"), field(1, "old_name")];
+ // Identity mapping since field IDs match positionally
+ assert_eq!(create_index_mapping(&table_fields, &data_fields), None);
+ }
+
+ #[test]
+ fn test_empty_data_fields() {
+ let table_fields = vec![field(0, "a"), field(1, "b")];
+ let data_fields: Vec<DataField> = vec![];
+ assert_eq!(
+ create_index_mapping(&table_fields, &data_fields),
+ Some(vec![-1, -1])
+ );
+ }
+
+ #[test]
+ fn test_type_promotion_same_mapping() {
+ // Type promotion doesn't affect index mapping — only field IDs matter
+ let table_fields = vec![
+ DataField::new(0, "id".to_string(), DataType::Int(IntType::new())),
+ DataField::new(
+ 1,
+ "name".to_string(),
+ DataType::VarChar(VarCharType::string_type()),
+ ),
+ ];
+ let data_fields = vec![
+ DataField::new(0, "id".to_string(), DataType::Int(IntType::new())),
+ DataField::new(
+ 1,
+ "name".to_string(),
+ DataType::VarChar(VarCharType::string_type()),
+ ),
+ ];
+ assert_eq!(create_index_mapping(&table_fields, &data_fields), None);
+ }
+}
diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs
index c5961dd..082f8f9 100644
--- a/crates/paimon/src/lib.rs
+++ b/crates/paimon/src/lib.rs
@@ -25,7 +25,7 @@ pub use common::{CatalogOptions, Options};
pub mod api;
pub use api::rest_api::RESTApi;
-mod arrow;
+pub mod arrow;
pub mod catalog;
mod deletion_vector;
pub mod file_index;
diff --git a/crates/paimon/src/spec/types.rs b/crates/paimon/src/spec/types.rs
index e0ba04a..1bb4566 100644
--- a/crates/paimon/src/spec/types.rs
+++ b/crates/paimon/src/spec/types.rs
@@ -227,6 +227,10 @@ impl ArrayType {
pub fn family(&self) -> DataTypeFamily {
DataTypeFamily::CONSTRUCTED | DataTypeFamily::COLLECTION
}
+
+ pub fn element_type(&self) -> &DataType {
+ &self.element_type
+ }
}
/// BigIntType for paimon.
@@ -1348,6 +1352,14 @@ impl MapType {
pub fn family(&self) -> DataTypeFamily {
DataTypeFamily::CONSTRUCTED | DataTypeFamily::COLLECTION
}
+
+ pub fn key_type(&self) -> &DataType {
+ &self.key_type
+ }
+
+ pub fn value_type(&self) -> &DataType {
+ &self.value_type
+ }
}
/// MultisetType for paimon.
@@ -1381,6 +1393,10 @@ impl MultisetType {
pub fn family(&self) -> DataTypeFamily {
DataTypeFamily::CONSTRUCTED | DataTypeFamily::COLLECTION
}
+
+ pub fn element_type(&self) -> &DataType {
+ &self.element_type
+ }
}
/// RowType for paimon.
diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs
index 32f755a..bda5673 100644
--- a/crates/paimon/src/table/mod.rs
+++ b/crates/paimon/src/table/mod.rs
@@ -19,6 +19,7 @@
pub(crate) mod bin_pack;
mod read_builder;
+pub(crate) mod schema_manager;
mod snapshot_manager;
mod source;
mod table_scan;
@@ -28,6 +29,7 @@ use crate::Result;
use arrow_array::RecordBatch;
use futures::stream::BoxStream;
pub use read_builder::{ReadBuilder, TableRead};
+pub use schema_manager::SchemaManager;
pub use snapshot_manager::SnapshotManager;
pub use source::{DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket,
Plan};
pub use table_scan::TableScan;
@@ -45,6 +47,7 @@ pub struct Table {
identifier: Identifier,
location: String,
schema: TableSchema,
+ schema_manager: SchemaManager,
}
#[allow(dead_code)]
@@ -56,11 +59,13 @@ impl Table {
location: String,
schema: TableSchema,
) -> Self {
+ let schema_manager = SchemaManager::new(file_io.clone(),
location.clone());
Self {
file_io,
identifier,
location,
schema,
+ schema_manager,
}
}
@@ -84,6 +89,11 @@ impl Table {
&self.file_io
}
+ /// Get the SchemaManager for this table.
+ pub fn schema_manager(&self) -> &SchemaManager {
+ &self.schema_manager
+ }
+
/// Create a read builder for scan/read.
///
/// Reference: [pypaimon
FileStoreTable.new_read_builder](https://github.com/apache/paimon/blob/release-1.3/paimon-python/pypaimon/table/file_store_table.py).
@@ -98,6 +108,7 @@ impl Table {
identifier: self.identifier.clone(),
location: self.location.clone(),
schema: self.schema.copy_with_options(extra),
+ schema_manager: self.schema_manager.clone(),
}
}
}
diff --git a/crates/paimon/src/table/read_builder.rs
b/crates/paimon/src/table/read_builder.rs
index 884d7e4..db5c42c 100644
--- a/crates/paimon/src/table/read_builder.rs
+++ b/crates/paimon/src/table/read_builder.rs
@@ -181,8 +181,12 @@ impl<'a> TableRead<'a> {
});
}
- let reader =
-
ArrowReaderBuilder::new(self.table.file_io.clone()).build(self.read_type().to_vec());
+ let reader = ArrowReaderBuilder::new(
+ self.table.file_io.clone(),
+ self.table.schema_manager().clone(),
+ self.table.schema().id(),
+ )
+ .build(self.read_type().to_vec());
if data_evolution {
reader.read_data_evolution(data_splits, self.table.schema.fields())
diff --git a/crates/paimon/src/table/schema_manager.rs
b/crates/paimon/src/table/schema_manager.rs
new file mode 100644
index 0000000..057dc3f
--- /dev/null
+++ b/crates/paimon/src/table/schema_manager.rs
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Schema manager for reading versioned table schemas.
+//!
+//! Reference:
[org.apache.paimon.schema.SchemaManager](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java)
+
+use crate::io::FileIO;
+use crate::spec::TableSchema;
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex};
+
+const SCHEMA_DIR: &str = "schema";
+const SCHEMA_PREFIX: &str = "schema-";
+
+/// Manager for versioned table schema files.
+///
+/// Each table stores schema versions as JSON files under
`{table_path}/schema/schema-{id}`.
+/// When a schema evolution occurs (e.g. ADD COLUMN, ALTER COLUMN TYPE), a new
schema file
+/// is written with an incremented ID. Data files record which schema they
were written with
+/// via `DataFileMeta.schema_id`.
+///
+/// The schema cache is shared across clones via `Arc`, so multiple readers
+/// (e.g. parallel split streams) benefit from a single cache.
+///
+/// Reference:
[org.apache.paimon.schema.SchemaManager](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java)
+#[derive(Debug, Clone)]
+pub struct SchemaManager {
+ file_io: FileIO,
+ table_path: String,
+ /// Shared cache of loaded schemas by ID.
+ cache: Arc<Mutex<HashMap<i64, Arc<TableSchema>>>>,
+}
+
+impl SchemaManager {
+ pub fn new(file_io: FileIO, table_path: String) -> Self {
+ Self {
+ file_io,
+ table_path,
+ cache: Arc::new(Mutex::new(HashMap::new())),
+ }
+ }
+
+ /// Path to the schema directory (e.g. `{table_path}/schema`).
+ fn schema_directory(&self) -> String {
+ format!("{}/{}", self.table_path.trim_end_matches('/'), SCHEMA_DIR)
+ }
+
+ /// Path to a specific schema file (e.g. `{table_path}/schema/schema-0`).
+ fn schema_path(&self, schema_id: i64) -> String {
+ format!("{}/{}{}", self.schema_directory(), SCHEMA_PREFIX, schema_id)
+ }
+
+ /// Load a schema by ID. Returns cached version if available.
+ ///
+ /// The cache is shared across all clones of this `SchemaManager`, so
loading
+ /// a schema in one stream makes it available to all other streams reading
+ /// from the same table.
+ ///
+ /// Reference:
[SchemaManager.schema(long)](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java)
+ pub async fn schema(&self, schema_id: i64) ->
crate::Result<Arc<TableSchema>> {
+ // Fast path: check cache under a short lock.
+ {
+ let cache = self.cache.lock().unwrap();
+ if let Some(schema) = cache.get(&schema_id) {
+ return Ok(schema.clone());
+ }
+ }
+
+ // Cache miss — load from file (no lock held during I/O).
+ let path = self.schema_path(schema_id);
+ let input = self.file_io.new_input(&path)?;
+ let bytes = input.read().await?;
+ let schema: TableSchema =
+ serde_json::from_slice(&bytes).map_err(|e|
crate::Error::DataInvalid {
+ message: format!("Failed to parse schema file: {path}"),
+ source: Some(Box::new(e)),
+ })?;
+ let schema = Arc::new(schema);
+
+ // Insert into shared cache (short lock).
+ {
+ let mut cache = self.cache.lock().unwrap();
+ cache.entry(schema_id).or_insert_with(|| schema.clone());
+ }
+
+ Ok(schema)
+ }
+}
diff --git a/dev/spark/provision.py b/dev/spark/provision.py
index 0a5e13c..7bca4d2 100644
--- a/dev/spark/provision.py
+++ b/dev/spark/provision.py
@@ -292,6 +292,229 @@ def main():
spark.sql("CALL sys.create_tag('default.time_travel_table', 'snapshot1',
1)")
spark.sql("CALL sys.create_tag('default.time_travel_table', 'snapshot2',
2)")
+ # ===== Schema Evolution: Add Column =====
+ # Old files have (id, name); after ALTER TABLE ADD COLUMNS, new files have
(id, name, age).
+ # Reader must fill nulls for 'age' when reading old files.
+ spark.sql(
+ """
+ CREATE TABLE IF NOT EXISTS schema_evolution_add_column (
+ id INT,
+ name STRING
+ ) USING paimon
+ """
+ )
+ spark.sql(
+ "INSERT INTO schema_evolution_add_column VALUES (1, 'alice'), (2,
'bob')"
+ )
+ spark.sql("ALTER TABLE schema_evolution_add_column ADD COLUMNS (age INT)")
+ spark.sql(
+ "INSERT INTO schema_evolution_add_column VALUES (3, 'carol', 30), (4,
'dave', 40)"
+ )
+
+ # ===== Schema Evolution: Type Promotion (INT -> BIGINT) =====
+ # Old files have value as INT; after ALTER TABLE, new files have value as
BIGINT.
+ # Reader must cast INT to BIGINT when reading old files.
+ spark.sql(
+ """
+ CREATE TABLE IF NOT EXISTS schema_evolution_type_promotion (
+ id INT,
+ value INT
+ ) USING paimon
+ """
+ )
+ spark.sql(
+ "INSERT INTO schema_evolution_type_promotion VALUES (1, 100), (2, 200)"
+ )
+ spark.sql(
+ "ALTER TABLE schema_evolution_type_promotion ALTER COLUMN value TYPE
BIGINT"
+ )
+ spark.sql(
+ "INSERT INTO schema_evolution_type_promotion VALUES (3, 3000000000)"
+ )
+
+ # ===== Data Evolution + Schema Evolution: Add Column =====
+ # Combines data-evolution (row-tracking + MERGE INTO) with ALTER TABLE ADD
COLUMNS.
+ # Old files lack the new column; MERGE INTO produces partial-column files.
+ # Reader must fill nulls for missing columns AND merge columns across
files.
+ spark.sql(
+ """
+ CREATE TABLE IF NOT EXISTS data_evolution_add_column (
+ id INT,
+ name STRING,
+ value INT
+ ) USING paimon
+ TBLPROPERTIES (
+ 'row-tracking.enabled' = 'true',
+ 'data-evolution.enabled' = 'true'
+ )
+ """
+ )
+ spark.sql(
+ """
+ INSERT INTO data_evolution_add_column VALUES
+ (1, 'alice', 100),
+ (2, 'bob', 200)
+ """
+ )
+ spark.sql("ALTER TABLE data_evolution_add_column ADD COLUMNS (extra
STRING)")
+ spark.sql(
+ """
+ INSERT INTO data_evolution_add_column VALUES
+ (3, 'carol', 300, 'new'),
+ (4, 'dave', 400, 'new')
+ """
+ )
+ # MERGE INTO to trigger merge_files_by_columns with schema evolution.
+ spark.sql(
+ """
+ CREATE TABLE IF NOT EXISTS data_evolution_add_column_updates (
+ id INT,
+ name STRING
+ ) USING paimon
+ """
+ )
+ spark.sql(
+ "INSERT INTO data_evolution_add_column_updates VALUES (1, 'alice-v2')"
+ )
+ spark.sql(
+ """
+ MERGE INTO data_evolution_add_column t
+ USING data_evolution_add_column_updates s
+ ON t.id = s.id
+ WHEN MATCHED THEN UPDATE SET t.name = s.name
+ """
+ )
+ spark.sql("DROP TABLE data_evolution_add_column_updates")
+
+ # ===== Data Evolution + Schema Evolution: Type Promotion =====
+ # Combines data-evolution with ALTER TABLE ALTER COLUMN TYPE (INT ->
BIGINT).
+ # Old files have INT; new files have BIGINT. MERGE INTO updates some rows.
+ # Reader must cast old INT columns to BIGINT AND merge columns across
files.
+ spark.sql(
+ """
+ CREATE TABLE IF NOT EXISTS data_evolution_type_promotion (
+ id INT,
+ value INT
+ ) USING paimon
+ TBLPROPERTIES (
+ 'row-tracking.enabled' = 'true',
+ 'data-evolution.enabled' = 'true'
+ )
+ """
+ )
+ spark.sql(
+ "INSERT INTO data_evolution_type_promotion VALUES (1, 100), (2, 200)"
+ )
+ spark.sql(
+ "ALTER TABLE data_evolution_type_promotion ALTER COLUMN value TYPE
BIGINT"
+ )
+ spark.sql(
+ "INSERT INTO data_evolution_type_promotion VALUES (3, 3000000000)"
+ )
+ # MERGE INTO to trigger merge_files_by_columns with type promotion.
+ spark.sql(
+ """
+ CREATE TABLE IF NOT EXISTS data_evolution_type_promotion_updates (
+ id INT,
+ value BIGINT
+ ) USING paimon
+ """
+ )
+ spark.sql(
+ "INSERT INTO data_evolution_type_promotion_updates VALUES (1, 999)"
+ )
+ spark.sql(
+ """
+ MERGE INTO data_evolution_type_promotion t
+ USING data_evolution_type_promotion_updates s
+ ON t.id = s.id
+ WHEN MATCHED THEN UPDATE SET t.value = s.value
+ """
+ )
+ spark.sql("DROP TABLE data_evolution_type_promotion_updates")
+
+ # ===== Data Evolution + Drop Column: tests NULL-fill when no file
provides a column =====
+ # After MERGE INTO on old rows, the merge group files all predate ADD
COLUMN.
+ # SELECT on the new column should return NULLs for old rows (not silently
drop them).
+ spark.sql(
+ """
+ CREATE TABLE IF NOT EXISTS data_evolution_drop_column (
+ id INT,
+ name STRING,
+ value INT
+ ) USING paimon
+ TBLPROPERTIES (
+ 'row-tracking.enabled' = 'true',
+ 'data-evolution.enabled' = 'true'
+ )
+ """
+ )
+ spark.sql(
+ """
+ INSERT INTO data_evolution_drop_column VALUES
+ (1, 'alice', 100),
+ (2, 'bob', 200)
+ """
+ )
+ # MERGE INTO to create a partial-column file in the same row_id range.
+ spark.sql(
+ """
+ CREATE TABLE IF NOT EXISTS data_evolution_drop_column_updates (
+ id INT,
+ name STRING
+ ) USING paimon
+ """
+ )
+ spark.sql(
+ "INSERT INTO data_evolution_drop_column_updates VALUES (1, 'alice-v2')"
+ )
+ spark.sql(
+ """
+ MERGE INTO data_evolution_drop_column t
+ USING data_evolution_drop_column_updates s
+ ON t.id = s.id
+ WHEN MATCHED THEN UPDATE SET t.name = s.name
+ """
+ )
+ spark.sql("DROP TABLE data_evolution_drop_column_updates")
+ # Add a new column that no existing file contains.
+ spark.sql("ALTER TABLE data_evolution_drop_column ADD COLUMNS (extra
STRING)")
+ # Insert new rows that DO have the extra column.
+ spark.sql(
+ """
+ INSERT INTO data_evolution_drop_column VALUES
+ (3, 'carol', 300, 'new')
+ """
+ )
+
+ # ===== Schema Evolution: Drop Column =====
+ # Old files have (id, name, score); after ALTER TABLE DROP COLUMN, table
has (id, name).
+ # Reader should ignore the dropped column when reading old files.
+ spark.sql(
+ """
+ CREATE TABLE IF NOT EXISTS schema_evolution_drop_column (
+ id INT,
+ name STRING,
+ score INT
+ ) USING paimon
+ """
+ )
+ spark.sql(
+ """
+ INSERT INTO schema_evolution_drop_column VALUES
+ (1, 'alice', 100),
+ (2, 'bob', 200)
+ """
+ )
+ spark.sql("ALTER TABLE schema_evolution_drop_column DROP COLUMN score")
+ spark.sql(
+ """
+ INSERT INTO schema_evolution_drop_column VALUES
+ (3, 'carol'),
+ (4, 'dave')
+ """
+ )
+
if __name__ == "__main__":
main()