This is an automated email from the ASF dual-hosted git repository.

suxiaogang223 pushed a commit to branch codex/paimon-jni-write
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 6bff4f26319d7f7028b110f7196cc9143606c7bc
Author: suxiaogang <[email protected]>
AuthorDate: Fri Jul 3 16:52:18 2026 +0800

    [test](regression) Add Spark comparison for Paimon write regression
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary: Paimon write regression cases compared results through 
fixed Doris output checks, which made Spark-side consistency coverage harder to 
maintain. This change switches the Paimon write regression case to the 
Spark/Doris result comparison helpers, enables Paimon external table insert 
routing through Nereids, and passes full sink output column names to the BE 
Paimon JNI writer so partial inserts can write rows with default-filled columns 
correctly.
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test: Regression test
        - `./run-regression-test.sh --run -d external_table_p0/paimon/write -s 
test_paimon_write_basic`
    - Behavior changed: No
    - Does this need documentation: No
---
 .../writer/paimon/vpaimon_jni_table_writer.cpp     |  29 ++++
 .../nereids/analyzer/UnboundTableSinkCreator.java  |   3 +
 .../trees/plans/commands/insert/InsertUtils.java   |   3 +
 .../org/apache/doris/planner/PaimonTableSink.java  |  16 +++
 .../paimon/write/test_paimon_write_basic.groovy    | 155 +++++++--------------
 5 files changed, 101 insertions(+), 105 deletions(-)

diff --git a/be/src/exec/sink/writer/paimon/vpaimon_jni_table_writer.cpp 
b/be/src/exec/sink/writer/paimon/vpaimon_jni_table_writer.cpp
index 76409281944..78fc5965421 100644
--- a/be/src/exec/sink/writer/paimon/vpaimon_jni_table_writer.cpp
+++ b/be/src/exec/sink/writer/paimon/vpaimon_jni_table_writer.cpp
@@ -23,6 +23,7 @@
 #include <arrow/record_batch.h>
 #include <arrow/type.h>
 
+#include "common/check.h"
 #include "core/block/block.h"
 #include "format/arrow/arrow_block_convertor.h"
 #include "format/arrow/arrow_row_batch.h"
@@ -36,6 +37,23 @@
 namespace doris {
 
 const std::string PAIMON_JNI_CLASS = "org/apache/doris/paimon/PaimonJniWriter";
+const std::string PAIMON_OUTPUT_COLUMN_NAMES = "doris.output_column_names";
+const char PAIMON_COLUMN_NAME_SEPARATOR = '\x01';
+
+std::vector<std::string> split_paimon_output_column_names(const std::string& 
column_names) {
+    std::vector<std::string> names;
+    size_t begin = 0;
+    while (begin <= column_names.size()) {
+        size_t end = column_names.find(PAIMON_COLUMN_NAME_SEPARATOR, begin);
+        if (end == std::string::npos) {
+            names.emplace_back(column_names.substr(begin));
+            break;
+        }
+        names.emplace_back(column_names.substr(begin, end - begin));
+        begin = end + 1;
+    }
+    return names;
+}
 
 VPaimonJniTableWriter::VPaimonJniTableWriter(const TDataSink& t_sink,
                                              const VExprContextSPtrs& 
output_exprs)
@@ -212,6 +230,17 @@ Status VPaimonJniTableWriter::write(RuntimeState* state, 
::doris::Block& block)
         SCOPED_TIMER(_project_timer);
         RETURN_IF_ERROR(_projection_block(block, &output_block));
     }
+    const auto& paimon_sink = _t_sink.paimon_table_sink;
+    if (paimon_sink.__isset.paimon_options) {
+        auto column_names = 
paimon_sink.paimon_options.find(PAIMON_OUTPUT_COLUMN_NAMES);
+        if (column_names != paimon_sink.paimon_options.end()) {
+            auto output_column_names = 
split_paimon_output_column_names(column_names->second);
+            DORIS_CHECK(output_column_names.size() == output_block.columns());
+            for (size_t i = 0; i < output_column_names.size(); ++i) {
+                output_block.get_by_position(i).name = output_column_names[i];
+            }
+        }
+    }
 
     if (output_block.rows() >= _batch_max_rows || output_block.bytes() >= 
_batch_max_bytes) {
         RETURN_IF_ERROR(_flush_buffer());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java
index 31e291be216..6809bbdcfce 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java
@@ -149,6 +149,9 @@ public class UnboundTableSinkCreator {
         } else if (curCatalog instanceof IcebergExternalCatalog && 
!isAutoDetectPartition) {
             return new UnboundIcebergTableSink<>(nameParts, colNames, hints, 
partitions,
                     dmlCommandType, Optional.empty(), Optional.empty(), plan, 
staticPartitionKeyValues, false);
+        } else if (curCatalog instanceof PaimonExternalCatalog && 
!isAutoDetectPartition) {
+            return new UnboundPaimonTableSink<>(nameParts, colNames, hints, 
partitions,
+                    dmlCommandType, Optional.empty(), Optional.empty(), plan);
         } else if (curCatalog instanceof MaxComputeExternalCatalog && 
!isAutoDetectPartition) {
             return new UnboundMaxComputeTableSink<>(nameParts, colNames, 
hints, partitions,
                     dmlCommandType, Optional.empty(), Optional.empty(), plan, 
staticPartitionKeyValues);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
index fa5e34046d1..4400e0b00c9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
@@ -42,6 +42,7 @@ import org.apache.doris.nereids.analyzer.UnboundHiveTableSink;
 import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink;
 import org.apache.doris.nereids.analyzer.UnboundInlineTable;
 import org.apache.doris.nereids.analyzer.UnboundMaxComputeTableSink;
+import org.apache.doris.nereids.analyzer.UnboundPaimonTableSink;
 import org.apache.doris.nereids.analyzer.UnboundSlot;
 import org.apache.doris.nereids.analyzer.UnboundStar;
 import org.apache.doris.nereids.analyzer.UnboundTableSink;
@@ -600,6 +601,8 @@ public class InsertUtils {
             unboundTableSink = (UnboundHiveTableSink<? extends Plan>) plan;
         } else if (plan instanceof UnboundIcebergTableSink) {
             unboundTableSink = (UnboundIcebergTableSink<? extends Plan>) plan;
+        } else if (plan instanceof UnboundPaimonTableSink) {
+            unboundTableSink = (UnboundPaimonTableSink<? extends Plan>) plan;
         } else if (plan instanceof UnboundDictionarySink) {
             unboundTableSink = (UnboundDictionarySink<? extends Plan>) plan;
         } else if (plan instanceof UnboundBlackholeSink) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/PaimonTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PaimonTableSink.java
index 581813e7ed8..bec43606d18 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PaimonTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PaimonTableSink.java
@@ -56,6 +56,8 @@ import java.util.Set;
  */
 public class PaimonTableSink extends BaseExternalTableDataSink {
     private static final Logger LOG = 
LogManager.getLogger(PaimonTableSink.class);
+    private static final String OUTPUT_COLUMN_NAMES = 
"doris.output_column_names";
+    private static final String COLUMN_NAME_SEPARATOR = "\u0001";
     private final PaimonExternalTable targetTable;
     private static final Base64.Encoder BASE64_ENCODER = 
java.util.Base64.getUrlEncoder().withoutPadding();
     private List<Expr> outputExprs;
@@ -123,6 +125,7 @@ public class PaimonTableSink extends 
BaseExternalTableDataSink {
                 paimonOptions.put("doris.commit_user", ctx.getCommitUser());
             }
         }
+        paimonOptions.put(OUTPUT_COLUMN_NAMES, 
String.join(COLUMN_NAME_SEPARATOR, outputColumnNames()));
 
         if (ConnectContext.get() != null) {
             String hadoopUser = hadoopConfig.get("hadoop.username");
@@ -261,4 +264,17 @@ public class PaimonTableSink extends 
BaseExternalTableDataSink {
             options.put(key, value);
         }
     }
+
+    private List<String> outputColumnNames() throws AnalysisException {
+        List<Column> fullSchema = targetTable.getFullSchema();
+        if (fullSchema.size() != outputExprs.size()) {
+            throw new AnalysisException("Paimon sink output column size 
mismatch, schema size="
+                    + fullSchema.size() + ", output expr size=" + 
outputExprs.size());
+        }
+        ArrayList<String> outputColumnNames = new 
ArrayList<>(fullSchema.size());
+        for (Column column : fullSchema) {
+            outputColumnNames.add(column.getName());
+        }
+        return outputColumnNames;
+    }
 }
diff --git 
a/regression-test/suites/external_table_p0/paimon/write/test_paimon_write_basic.groovy
 
b/regression-test/suites/external_table_p0/paimon/write/test_paimon_write_basic.groovy
index 8726c0ddc02..abe21b04af4 100644
--- 
a/regression-test/suites/external_table_p0/paimon/write/test_paimon_write_basic.groovy
+++ 
b/regression-test/suites/external_table_p0/paimon/write/test_paimon_write_basic.groovy
@@ -26,9 +26,34 @@ suite("test_paimon_write_basic", "basic,external") {
     String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
     String catalogName = "test_paimon_write_basic"
     String dbName = "paimon_write_basic"
+    def tableNames = [
+            "append_basic",
+            "append_partitioned",
+            "append_partial",
+            "append_from_select",
+            "fixed_default_bucket",
+            "fixed_mod_bucket",
+            "pk_fixed_bucket",
+            "pk_partial_reject",
+            "dynamic_bucket_reject",
+            "insert_overwrite_reject"
+    ]
+    String dropPaimonTables = tableNames.collect {
+        "DROP TABLE IF EXISTS paimon.${dbName}.${it};"
+    }.join("\n")
 
-    def sparkSql = { String sqlText ->
-        spark_paimon sqlText, 180
+    def assertPaimonDorisQueryEquals = { String tableName, String selectList, 
String orderBy ->
+        def sparkRows = spark_paimon """
+            SELECT ${selectList}
+            FROM paimon.${dbName}.${tableName}
+            ORDER BY ${orderBy}
+        """
+        def dorisRows = sql """
+            SELECT ${selectList}
+            FROM ${tableName}
+            ORDER BY ${orderBy}
+        """
+        assertSparkDorisResultEquals(sparkRows, dorisRows)
     }
 
     sql """drop catalog if exists ${catalogName}"""
@@ -45,24 +70,12 @@ suite("test_paimon_write_basic", "basic,external") {
     """
 
     try {
-        sparkSql """CREATE DATABASE IF NOT EXISTS paimon.${dbName}"""
-        [
-                "append_basic",
-                "append_partitioned",
-                "append_partial",
-                "append_from_select",
-                "fixed_default_bucket",
-                "fixed_mod_bucket",
-                "pk_fixed_bucket",
-                "pk_partial_reject",
-                "dynamic_bucket_reject",
-                "insert_overwrite_reject"
-        ].each { tableName ->
-            sparkSql """DROP TABLE IF EXISTS paimon.${dbName}.${tableName}"""
-        }
+        spark_paimon_multi """
+            CREATE DATABASE IF NOT EXISTS paimon.${dbName};
+            ${dropPaimonTables}
+        """
 
-        // Case 1: append-only bucket-unaware table, covering full-row INSERT 
VALUES and basic scalar types.
-        sparkSql """
+        spark_paimon_multi """
             CREATE TABLE paimon.${dbName}.append_basic (
                 id INT,
                 name STRING,
@@ -74,11 +87,8 @@ suite("test_paimon_write_basic", "basic,external") {
             ) USING paimon
             TBLPROPERTIES (
                 'bucket' = '-1'
-            )
-        """
+            );
 
-        // Case 2: append-only partitioned table, covering dynamic partition 
routing during write.
-        sparkSql """
             CREATE TABLE paimon.${dbName}.append_partitioned (
                 id INT,
                 name STRING,
@@ -88,11 +98,8 @@ suite("test_paimon_write_basic", "basic,external") {
             PARTITIONED BY (pt)
             TBLPROPERTIES (
                 'bucket' = '-1'
-            )
-        """
+            );
 
-        // Case 3: append-only table used to verify partial column INSERT.
-        sparkSql """
             CREATE TABLE paimon.${dbName}.append_partial (
                 id INT,
                 name STRING,
@@ -101,11 +108,8 @@ suite("test_paimon_write_basic", "basic,external") {
             ) USING paimon
             TBLPROPERTIES (
                 'bucket' = '-1'
-            )
-        """
+            );
 
-        // Case 4: append-only table used to verify INSERT INTO SELECT, not 
only INSERT VALUES.
-        sparkSql """
             CREATE TABLE paimon.${dbName}.append_from_select (
                 id INT,
                 name STRING,
@@ -114,11 +118,8 @@ suite("test_paimon_write_basic", "basic,external") {
             ) USING paimon
             TBLPROPERTIES (
                 'bucket' = '-1'
-            )
-        """
+            );
 
-        // Case 5: fixed bucket table using Paimon's default bucket function.
-        sparkSql """
             CREATE TABLE paimon.${dbName}.fixed_default_bucket (
                 id INT,
                 name STRING,
@@ -129,11 +130,8 @@ suite("test_paimon_write_basic", "basic,external") {
             TBLPROPERTIES (
                 'bucket' = '4',
                 'bucket-key' = 'id'
-            )
-        """
+            );
 
-        // Case 6: fixed bucket table using Paimon's MOD bucket function.
-        sparkSql """
             CREATE TABLE paimon.${dbName}.fixed_mod_bucket (
                 id BIGINT,
                 name STRING,
@@ -145,11 +143,8 @@ suite("test_paimon_write_basic", "basic,external") {
                 'bucket' = '4',
                 'bucket-key' = 'id',
                 'bucket-function.type' = 'MOD'
-            )
-        """
+            );
 
-        // Case 7: primary-key fixed bucket table, covering full-row upsert 
semantics.
-        sparkSql """
             CREATE TABLE paimon.${dbName}.pk_fixed_bucket (
                 id INT,
                 name STRING,
@@ -161,11 +156,8 @@ suite("test_paimon_write_basic", "basic,external") {
                 'primary-key' = 'pt,id',
                 'bucket' = '4',
                 'bucket-key' = 'id'
-            )
-        """
+            );
 
-        // Case 8: primary-key table used to verify partial column writes are 
rejected.
-        sparkSql """
             CREATE TABLE paimon.${dbName}.pk_partial_reject (
                 id INT,
                 name STRING,
@@ -175,11 +167,8 @@ suite("test_paimon_write_basic", "basic,external") {
                 'primary-key' = 'id',
                 'bucket' = '2',
                 'bucket-key' = 'id'
-            )
-        """
+            );
 
-        // Case 9: dynamic bucket primary-key table used to verify unsupported 
bucket modes are rejected.
-        sparkSql """
             CREATE TABLE paimon.${dbName}.dynamic_bucket_reject (
                 id INT,
                 name STRING,
@@ -188,18 +177,15 @@ suite("test_paimon_write_basic", "basic,external") {
             TBLPROPERTIES (
                 'primary-key' = 'id',
                 'bucket' = '-1'
-            )
-        """
+            );
 
-        // Case 10: append-only table used to verify Paimon INSERT OVERWRITE 
is still unsupported.
-        sparkSql """
             CREATE TABLE paimon.${dbName}.insert_overwrite_reject (
                 id INT,
                 name STRING
             ) USING paimon
             TBLPROPERTIES (
                 'bucket' = '-1'
-            )
+            );
         """
 
         sql """refresh catalog ${catalogName}"""
@@ -212,11 +198,7 @@ suite("test_paimon_write_basic", "basic,external") {
                 (2, 'bob', 200, false, DATE '2026-01-02', TIMESTAMP 
'2026-01-02 11:30:00', 20.75),
                 (3, 'carol', NULL, true, NULL, NULL, NULL)
         """
-        order_qt_append_basic """
-            SELECT id, name, score, flag, dt, ts, amount
-            FROM append_basic
-            ORDER BY id
-        """
+        assertPaimonDorisQueryEquals("append_basic", "id, name, score, flag, 
dt, ts, amount", "id")
 
         sql """
             INSERT INTO append_partitioned VALUES
@@ -225,22 +207,14 @@ suite("test_paimon_write_basic", "basic,external") {
                 (3, 'c', 30, 'p2'),
                 (4, 'd', 40, 'p2')
         """
-        order_qt_append_partitioned """
-            SELECT pt, id, name, score
-            FROM append_partitioned
-            ORDER BY pt, id
-        """
+        assertPaimonDorisQueryEquals("append_partitioned", "pt, id, name, 
score", "pt, id")
 
         sql """
             INSERT INTO append_partial(id, name) VALUES
                 (1, 'partial-a'),
                 (2, 'partial-b')
         """
-        order_qt_append_partial """
-            SELECT id, name, score, note
-            FROM append_partial
-            ORDER BY id
-        """
+        assertPaimonDorisQueryEquals("append_partial", "id, name, score, 
note", "id")
 
         sql """
             INSERT INTO append_from_select
@@ -248,11 +222,7 @@ suite("test_paimon_write_basic", "basic,external") {
             FROM append_partitioned
             WHERE pt = 'p1'
         """
-        order_qt_append_from_select """
-            SELECT pt, id, name, score
-            FROM append_from_select
-            ORDER BY pt, id
-        """
+        assertPaimonDorisQueryEquals("append_from_select", "pt, id, name, 
score", "pt, id")
 
         sql """
             INSERT INTO fixed_default_bucket VALUES
@@ -262,11 +232,7 @@ suite("test_paimon_write_basic", "basic,external") {
                 (4, 'default-d', 40, 'p2'),
                 (-5, 'default-negative', 50, 'p2')
         """
-        order_qt_fixed_default_bucket """
-            SELECT pt, id, name, score
-            FROM fixed_default_bucket
-            ORDER BY pt, id
-        """
+        assertPaimonDorisQueryEquals("fixed_default_bucket", "pt, id, name, 
score", "pt, id")
 
         sql """
             INSERT INTO fixed_mod_bucket VALUES
@@ -275,11 +241,7 @@ suite("test_paimon_write_basic", "basic,external") {
                 (5, 'mod-b', 30, 'p2'),
                 (-6, 'mod-negative-b', 40, 'p2')
         """
-        order_qt_fixed_mod_bucket """
-            SELECT pt, id, name, score
-            FROM fixed_mod_bucket
-            ORDER BY pt, id
-        """
+        assertPaimonDorisQueryEquals("fixed_mod_bucket", "pt, id, name, 
score", "pt, id")
 
         sql """
             INSERT INTO pk_fixed_bucket VALUES
@@ -291,11 +253,7 @@ suite("test_paimon_write_basic", "basic,external") {
             INSERT INTO pk_fixed_bucket VALUES
                 (1, 'pk-a-updated', 30, 'p1')
         """
-        order_qt_pk_fixed_bucket """
-            SELECT pt, id, name, score
-            FROM pk_fixed_bucket
-            ORDER BY pt, id
-        """
+        assertPaimonDorisQueryEquals("pk_fixed_bucket", "pt, id, name, score", 
"pt, id")
 
         test {
             sql """
@@ -322,20 +280,7 @@ suite("test_paimon_write_basic", "basic,external") {
         }
     } finally {
         try {
-            [
-                    "append_basic",
-                    "append_partitioned",
-                    "append_partial",
-                    "append_from_select",
-                    "fixed_default_bucket",
-                    "fixed_mod_bucket",
-                    "pk_fixed_bucket",
-                    "pk_partial_reject",
-                    "dynamic_bucket_reject",
-                    "insert_overwrite_reject"
-            ].each { tableName ->
-                sparkSql """DROP TABLE IF EXISTS 
paimon.${dbName}.${tableName}"""
-            }
+            spark_paimon_multi dropPaimonTables
         } catch (Exception e) {
             logger.warn("Failed to drop Paimon write basic test tables: 
${e.message}".toString())
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to