This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9d7c54ff11 [spark] Fix nested struct field mapping for V2 write merge
schema (#7542)
9d7c54ff11 is described below
commit 9d7c54ff11bf3db449c57d3b68e46da84b407972
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Mar 27 08:48:51 2026 +0800
[spark] Fix nested struct field mapping for V2 write merge schema (#7542)
---
.../paimon/spark/SparkInternalRowWrapper.java | 18 +++-
.../paimon/spark/sql/V2WriteMergeSchemaTest.scala | 108 +++++++++++++++++++++
2 files changed, 123 insertions(+), 3 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
index d2e70b9ed3..ffd077741c 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
@@ -63,6 +63,7 @@ public class SparkInternalRowWrapper implements InternalRow,
Serializable {
private final int length;
@Nullable private final UriReaderFactory uriReaderFactory;
@Nullable private final int[] fieldIndexMap;
+ @Nullable private final StructType dataSchema;
private transient org.apache.spark.sql.catalyst.InternalRow internalRow;
@@ -77,6 +78,7 @@ public class SparkInternalRowWrapper implements InternalRow,
Serializable {
CatalogContext catalogContext) {
this.tableSchema = tableSchema;
this.length = length;
+ this.dataSchema = dataSchema;
this.fieldIndexMap =
dataSchema != null ? buildFieldIndexMap(tableSchema,
dataSchema) : null;
this.uriReaderFactory = new UriReaderFactory(catalogContext);
@@ -240,7 +242,11 @@ public class SparkInternalRowWrapper implements
InternalRow, Serializable {
@Override
public Blob getBlob(int pos) {
- byte[] bytes = internalRow.getBinary(pos);
+ int actualPos = getActualFieldPosition(pos);
+ if (actualPos == -1 || internalRow.isNullAt(actualPos)) {
+ return null;
+ }
+ byte[] bytes = internalRow.getBinary(actualPos);
boolean blobDes = BlobDescriptor.isBlobDescriptor(bytes);
if (blobDes) {
BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes);
@@ -284,8 +290,14 @@ public class SparkInternalRowWrapper implements
InternalRow, Serializable {
if (actualPos == -1 || internalRow.isNullAt(actualPos)) {
return null;
}
- return new SparkInternalRowWrapper(
- (StructType)
tableSchema.fields()[actualPos].dataType(), numFields)
+ StructType nestedTableSchema = (StructType)
tableSchema.fields()[pos].dataType();
+ if (dataSchema != null) {
+ StructType nestedDataSchema = (StructType)
dataSchema.fields()[actualPos].dataType();
+ int dataNumFields = nestedDataSchema.size();
+ return new SparkInternalRowWrapper(nestedTableSchema, numFields,
nestedDataSchema, null)
+ .replace(internalRow.getStruct(actualPos, dataNumFields));
+ }
+ return new SparkInternalRowWrapper(nestedTableSchema, numFields)
.replace(internalRow.getStruct(actualPos, numFields));
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteMergeSchemaTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteMergeSchemaTest.scala
index 0b6e589d9f..c73048f7d0 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteMergeSchemaTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteMergeSchemaTest.scala
@@ -316,4 +316,112 @@ class V2WriteMergeSchemaTest extends PaimonSparkTestBase {
}
}
+ test("Write merge schema: nested struct with new fields by sql") {
+ withTable("t") {
+ sql("CREATE TABLE t (id INT, info STRUCT<key1 STRUCT<key2 STRING, key3
STRING>>)")
+ sql("INSERT INTO t VALUES (1, struct(struct('v2a', 'v3a')))")
+ sql("INSERT INTO t VALUES (2, struct(struct('v2b', 'v3b')))")
+ checkAnswer(
+ sql("SELECT * FROM t ORDER BY id"),
+ Seq(Row(1, Row(Row("v2a", "v3a"))), Row(2, Row(Row("v2b", "v3b"))))
+ )
+
+ sql(
+ "INSERT INTO t BY NAME " +
+ "SELECT 3 AS id, " +
+ "named_struct('key1', named_struct('key2', 'v2c', 'key4', 'v4c',
'key3', 'v3c')) AS info")
+ checkAnswer(
+ sql("SELECT * FROM t ORDER BY id"),
+ Seq(
+ Row(1, Row(Row("v2a", "v3a", null))),
+ Row(2, Row(Row("v2b", "v3b", null))),
+ Row(3, Row(Row("v2c", "v3c", "v4c"))))
+ )
+ }
+ }
+
+ test("Write merge schema: deeply nested struct with new fields") {
+ withTable("t") {
+ sql("CREATE TABLE t (id INT, data STRUCT<a STRUCT<b STRUCT<c1 STRING, c2
STRING>>>)")
+ sql("INSERT INTO t VALUES (1, struct(struct(struct('c1v', 'c2v'))))")
+
+ sql(
+ "INSERT INTO t BY NAME " +
+ "SELECT 2 AS id, " +
+ "named_struct('a', named_struct('b', named_struct('c1', 'c1v2',
'c3', 'c3v2', 'c2', 'c2v2'))) AS data")
+ checkAnswer(
+ sql("SELECT * FROM t ORDER BY id"),
+ Seq(
+ Row(1, Row(Row(Row("c1v", "c2v", null)))),
+ Row(2, Row(Row(Row("c1v2", "c2v2", "c3v2")))))
+ )
+ }
+ }
+
+ test("Write merge schema: nested struct new fields and top-level new column
together") {
+ withTable("t") {
+ sql("CREATE TABLE t (id INT, info STRUCT<f1 STRING, f2 STRING>)")
+ sql("INSERT INTO t VALUES (1, struct('a', 'b'))")
+
+ sql(
+ "INSERT INTO t BY NAME " +
+ "SELECT 2 AS id, " +
+ "named_struct('f1', 'c', 'f3', 'd', 'f2', 'e') AS info, " +
+ "'top' AS extra")
+ checkAnswer(
+ sql("SELECT * FROM t ORDER BY id"),
+ Seq(Row(1, Row("a", "b", null), null), Row(2, Row("c", "e", "d"),
"top"))
+ )
+ }
+ }
+
+ test("Write merge schema: nested struct with missing fields") {
+ withTable("t") {
+ sql("CREATE TABLE t (id INT, info STRUCT<f1 STRING, f2 STRING, f3
STRING>)")
+ sql("INSERT INTO t VALUES (1, struct('a', 'b', 'c'))")
+
+ sql(
+ "INSERT INTO t BY NAME " +
+ "SELECT 2 AS id, " +
+ "named_struct('f2', 'y', 'f3', 'z') AS info")
+ checkAnswer(
+ sql("SELECT * FROM t ORDER BY id"),
+ Seq(Row(1, Row("a", "b", "c")), Row(2, Row(null, "y", "z")))
+ )
+
+ sql(
+ "INSERT INTO t BY NAME " +
+ "SELECT 3 AS id, " +
+ "named_struct('f1', 'x', 'f4', 'w', 'f3', 'z') AS info")
+
+ sql(
+ "INSERT INTO t BY NAME " +
+ "SELECT 4 AS id, " +
+ "named_struct('f2', 'p', 'f3', 'q', 'f4', 'r') AS info")
+ checkAnswer(
+ sql("SELECT * FROM t ORDER BY id"),
+ Seq(
+ Row(1, Row("a", "b", "c", null)),
+ Row(2, Row(null, "y", "z", null)),
+ Row(3, Row("x", null, "z", "w")),
+ Row(4, Row(null, "p", "q", "r")))
+ )
+ }
+ }
+
+ test("Write merge schema: nested struct with type evolution") {
+ withTable("t") {
+ sql("CREATE TABLE t (id INT, info STRUCT<f1 INT, f2 STRING>)")
+ sql("INSERT INTO t VALUES (1, struct(10, 'a'))")
+
+ sql(
+ "INSERT INTO t BY NAME " +
+ "SELECT 2 AS id, " +
+ "named_struct('f1', cast(20 as bigint), 'f3', 'new', 'f2', 'b') AS
info")
+ checkAnswer(
+ sql("SELECT * FROM t ORDER BY id"),
+ Seq(Row(1, Row(10L, "a", null)), Row(2, Row(20L, "b", "new")))
+ )
+ }
+ }
}