This is an automated email from the ASF dual-hosted git repository. hope pushed a commit to branch release-1.4 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit a657f46d517b51d8ae63606206bad8e445cbe8e0 Author: Zouxxyy <[email protected]> AuthorDate: Fri Mar 27 08:48:51 2026 +0800 [spark] Fix nested struct field mapping for V2 write merge schema (#7542) --- .../paimon/spark/SparkInternalRowWrapper.java | 18 +++- .../paimon/spark/sql/V2WriteMergeSchemaTest.scala | 108 +++++++++++++++++++++ 2 files changed, 123 insertions(+), 3 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java index d2e70b9ed3..ffd077741c 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java @@ -63,6 +63,7 @@ public class SparkInternalRowWrapper implements InternalRow, Serializable { private final int length; @Nullable private final UriReaderFactory uriReaderFactory; @Nullable private final int[] fieldIndexMap; + @Nullable private final StructType dataSchema; private transient org.apache.spark.sql.catalyst.InternalRow internalRow; @@ -77,6 +78,7 @@ public class SparkInternalRowWrapper implements InternalRow, Serializable { CatalogContext catalogContext) { this.tableSchema = tableSchema; this.length = length; + this.dataSchema = dataSchema; this.fieldIndexMap = dataSchema != null ? buildFieldIndexMap(tableSchema, dataSchema) : null; this.uriReaderFactory = new UriReaderFactory(catalogContext); @@ -240,7 +242,11 @@ public class SparkInternalRowWrapper implements InternalRow, Serializable { @Override public Blob getBlob(int pos) { - byte[] bytes = internalRow.getBinary(pos); + int actualPos = getActualFieldPosition(pos); + if (actualPos == -1 || internalRow.isNullAt(actualPos)) { + return null; + } + byte[] bytes = internalRow.getBinary(actualPos); boolean blobDes = BlobDescriptor.isBlobDescriptor(bytes); if (blobDes) { BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes); @@ -284,8 +290,14 @@ public class SparkInternalRowWrapper implements InternalRow, Serializable { if (actualPos == -1 || internalRow.isNullAt(actualPos)) { return null; } - return new SparkInternalRowWrapper( - (StructType) tableSchema.fields()[actualPos].dataType(), numFields) + StructType nestedTableSchema = (StructType) tableSchema.fields()[pos].dataType(); + if (dataSchema != null) { + StructType nestedDataSchema = (StructType) dataSchema.fields()[actualPos].dataType(); + int dataNumFields = nestedDataSchema.size(); + return new SparkInternalRowWrapper(nestedTableSchema, numFields, nestedDataSchema, null) + .replace(internalRow.getStruct(actualPos, dataNumFields)); + } + return new SparkInternalRowWrapper(nestedTableSchema, numFields) .replace(internalRow.getStruct(actualPos, numFields)); } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteMergeSchemaTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteMergeSchemaTest.scala index 0b6e589d9f..c73048f7d0 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteMergeSchemaTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteMergeSchemaTest.scala @@ -316,4 +316,112 @@ class V2WriteMergeSchemaTest extends PaimonSparkTestBase { } } + test("Write merge schema: nested struct with new fields by sql") { + withTable("t") { + sql("CREATE TABLE t (id INT, info STRUCT<key1 STRUCT<key2 STRING, key3 STRING>>)") + sql("INSERT INTO t VALUES (1, struct(struct('v2a', 'v3a')))") + sql("INSERT INTO t VALUES (2, struct(struct('v2b', 'v3b')))") + checkAnswer( + sql("SELECT * FROM t ORDER BY id"), + Seq(Row(1, Row(Row("v2a", "v3a"))), Row(2, Row(Row("v2b", "v3b")))) + ) + + sql( + "INSERT INTO t BY NAME " + + "SELECT 3 AS id, " + + "named_struct('key1', named_struct('key2', 'v2c', 'key4', 'v4c', 'key3', 'v3c')) AS info") + checkAnswer( + sql("SELECT * FROM t ORDER BY id"), + Seq( + Row(1, Row(Row("v2a", "v3a", null))), + Row(2, Row(Row("v2b", "v3b", null))), + Row(3, Row(Row("v2c", "v3c", "v4c")))) + ) + } + } + + test("Write merge schema: deeply nested struct with new fields") { + withTable("t") { + sql("CREATE TABLE t (id INT, data STRUCT<a STRUCT<b STRUCT<c1 STRING, c2 STRING>>>)") + sql("INSERT INTO t VALUES (1, struct(struct(struct('c1v', 'c2v'))))") + + sql( + "INSERT INTO t BY NAME " + + "SELECT 2 AS id, " + + "named_struct('a', named_struct('b', named_struct('c1', 'c1v2', 'c3', 'c3v2', 'c2', 'c2v2'))) AS data") + checkAnswer( + sql("SELECT * FROM t ORDER BY id"), + Seq( + Row(1, Row(Row(Row("c1v", "c2v", null)))), + Row(2, Row(Row(Row("c1v2", "c2v2", "c3v2"))))) + ) + } + } + + test("Write merge schema: nested struct new fields and top-level new column together") { + withTable("t") { + sql("CREATE TABLE t (id INT, info STRUCT<f1 STRING, f2 STRING>)") + sql("INSERT INTO t VALUES (1, struct('a', 'b'))") + + sql( + "INSERT INTO t BY NAME " + + "SELECT 2 AS id, " + + "named_struct('f1', 'c', 'f3', 'd', 'f2', 'e') AS info, " + + "'top' AS extra") + checkAnswer( + sql("SELECT * FROM t ORDER BY id"), + Seq(Row(1, Row("a", "b", null), null), Row(2, Row("c", "e", "d"), "top")) + ) + } + } + + test("Write merge schema: nested struct with missing fields") { + withTable("t") { + sql("CREATE TABLE t (id INT, info STRUCT<f1 STRING, f2 STRING, f3 STRING>)") + sql("INSERT INTO t VALUES (1, struct('a', 'b', 'c'))") + + sql( + "INSERT INTO t BY NAME " + + "SELECT 2 AS id, " + + "named_struct('f2', 'y', 'f3', 'z') AS info") + checkAnswer( + sql("SELECT * FROM t ORDER BY id"), + Seq(Row(1, Row("a", "b", "c")), Row(2, Row(null, "y", "z"))) + ) + + sql( + "INSERT INTO t BY NAME " + + "SELECT 3 AS id, " + + "named_struct('f1', 'x', 'f4', 'w', 'f3', 'z') AS info") + + sql( + "INSERT INTO t BY NAME " + + "SELECT 4 AS id, " + + "named_struct('f2', 'p', 'f3', 'q', 'f4', 'r') AS info") + checkAnswer( + sql("SELECT * FROM t ORDER BY id"), + Seq( + Row(1, Row("a", "b", "c", null)), + Row(2, Row(null, "y", "z", null)), + Row(3, Row("x", null, "z", "w")), + Row(4, Row(null, "p", "q", "r"))) + ) + } + } + + test("Write merge schema: nested struct with type evolution") { + withTable("t") { + sql("CREATE TABLE t (id INT, info STRUCT<f1 INT, f2 STRING>)") + sql("INSERT INTO t VALUES (1, struct(10, 'a'))") + + sql( + "INSERT INTO t BY NAME " + + "SELECT 2 AS id, " + + "named_struct('f1', cast(20 as bigint), 'f3', 'new', 'f2', 'b') AS info") + checkAnswer( + sql("SELECT * FROM t ORDER BY id"), + Seq(Row(1, Row(10L, "a", null)), Row(2, Row(20L, "b", "new"))) + ) + } + } }
