This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 085023ad0d [fix][connectors-v2] repeated commit cause task exceptions 
(#9665)
085023ad0d is described below

commit 085023ad0db1a8729ce724a14855c0077f051dfe
Author: zhangdonghao <[email protected]>
AuthorDate: Mon Aug 11 13:44:04 2025 +0800

    [fix][connectors-v2] repeated commit cause task exceptions (#9665)
---
 .../paimon/handler/PaimonSaveModeHandler.java      |  5 +-
 .../seatunnel/paimon/sink/PaimonSink.java          | 23 ++++++
 .../seatunnel/paimon/sink/PaimonSinkWriter.java    | 64 ++++++++---------
 .../seatunnel/paimon/sink/SupportLoadTable.java    |  2 +
 .../sink/commit/PaimonAggregatedCommitInfo.java    |  2 +
 .../sink/commit/PaimonAggregatedCommitter.java     | 83 ++++++++++++++++------
 .../paimon/sink/commit/PaimonCommitInfo.java       |  2 +
 .../paimon/sink/state/PaimonSinkState.java         |  2 +-
 .../paimon/sink/writer/PaimonWriteTest.java        |  6 ++
 .../paimon/PaimonSinkWithSchemaEvolutionIT.java    | 54 ++++++++++----
 .../changelog_fake_cdc_sink_paimon_case1_ddl.conf  |  2 +-
 ...log_fake_cdc_sink_paimon_case1_insert_data.conf |  2 +-
 ...log_fake_cdc_sink_paimon_case1_update_data.conf |  2 +-
 .../changelog_fake_cdc_sink_paimon_case2.conf      |  2 +-
 .../test/resources/changelog_paimon_to_paimon.conf |  2 +-
 15 files changed, 176 insertions(+), 77 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java
index b479ebf14b..7e93ee3512 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java
@@ -53,6 +53,9 @@ public class PaimonSaveModeHandler extends 
DefaultSaveModeHandler {
         TablePath tablePath = catalogTable.getTablePath();
         Table paimonTable = ((PaimonCatalog) 
catalog).getPaimonTable(tablePath);
         // load paimon table and set it into paimon sink
-        this.supportLoadTable.setLoadTable(paimonTable);
+        Table loadTable = this.supportLoadTable.getLoadTable();
+        if (loadTable == null || this.schemaSaveMode == 
SchemaSaveMode.RECREATE_SCHEMA) {
+            this.supportLoadTable.setLoadTable(paimonTable);
+        }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
index 29db1001ab..5b185004c6 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
@@ -29,6 +29,7 @@ import org.apache.seatunnel.api.sink.SupportMultiTableSink;
 import org.apache.seatunnel.api.sink.SupportSaveMode;
 import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSink;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.schema.SchemaChangeType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog;
@@ -48,6 +49,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
+import java.util.UUID;
 
 public class PaimonSink
         implements SeaTunnelSink<
@@ -78,12 +80,26 @@ public class PaimonSink
 
     private final PaimonBucketAssignerFactory paimonBucketAssignerFactory;
 
+    private final String commitUser = UUID.randomUUID().toString();
+
     public PaimonSink(ReadonlyConfig readonlyConfig, CatalogTable 
catalogTable) {
         this.readonlyConfig = readonlyConfig;
         this.paimonSinkConfig = new PaimonSinkConfig(readonlyConfig);
         this.catalogTable = catalogTable;
         this.paimonHadoopConfiguration = 
PaimonSecurityContext.loadHadoopConfig(paimonSinkConfig);
         this.paimonBucketAssignerFactory = new PaimonBucketAssignerFactory();
+        try (PaimonCatalog paimonCatalog = 
PaimonCatalog.loadPaimonCatalog(readonlyConfig)) {
+            paimonCatalog.open();
+            boolean databaseExists =
+                    
paimonCatalog.databaseExists(this.paimonSinkConfig.getNamespace());
+            if (databaseExists) {
+                TablePath tablePath = catalogTable.getTablePath();
+                boolean tableExists = paimonCatalog.tableExists(tablePath);
+                if (tableExists) {
+                    this.paimonTable = paimonCatalog.getPaimonTable(tablePath);
+                }
+            }
+        }
     }
 
     @Override
@@ -98,6 +114,7 @@ public class PaimonSink
                 readonlyConfig,
                 catalogTable,
                 paimonTable,
+                commitUser,
                 jobContext,
                 paimonSinkConfig,
                 paimonHadoopConfiguration,
@@ -118,6 +135,7 @@ public class PaimonSink
                 readonlyConfig,
                 catalogTable,
                 paimonTable,
+                commitUser,
                 states,
                 jobContext,
                 paimonSinkConfig,
@@ -158,6 +176,11 @@ public class PaimonSink
         this.paimonTable = table;
     }
 
+    @Override
+    public Table getLoadTable() {
+        return paimonTable;
+    }
+
     @Override
     public Optional<CatalogTable> getWriteCatalogTable() {
         return Optional.ofNullable(catalogTable);
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
index 36c9010588..64ced58c5d 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
@@ -54,11 +54,9 @@ import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.sink.TableCommit;
+import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.table.sink.TableWrite;
-import org.apache.paimon.table.sink.WriteBuilder;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -67,10 +65,10 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.UUID;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.disk.IOManagerImpl.splitPaths;
@@ -81,11 +79,9 @@ public class PaimonSinkWriter
                 SupportMultiTableSinkWriter<Void>,
                 SupportSchemaEvolutionSinkWriter {
 
-    private String commitUser = UUID.randomUUID().toString();
+    private final String commitUser;
 
-    private FileStoreTable paimonFileStoretable;
-
-    private WriteBuilder tableWriteBuilder;
+    private FileStoreTable paimonTable;
 
     private TableWrite tableWrite;
 
@@ -124,7 +120,8 @@ public class PaimonSinkWriter
             Context context,
             ReadonlyConfig readonlyConfig,
             CatalogTable catalogTable,
-            Table paimonTable,
+            Table paimonFileStoretable,
+            String commitUser,
             JobContext jobContext,
             PaimonSinkConfig paimonSinkConfig,
             PaimonHadoopConfiguration paimonHadoopConfiguration,
@@ -135,9 +132,10 @@ public class PaimonSinkWriter
         this.paimonTablePath = catalogTable.getTablePath();
         this.paimonCatalog = PaimonCatalog.loadPaimonCatalog(readonlyConfig);
         this.paimonCatalog.open();
-        this.paimonFileStoretable = (FileStoreTable) paimonTable;
+        this.paimonTable = (FileStoreTable) paimonFileStoretable;
+        this.commitUser = commitUser;
         CoreOptions.ChangelogProducer changelogProducer =
-                this.paimonFileStoretable.coreOptions().changelogProducer();
+                this.paimonTable.coreOptions().changelogProducer();
         if (Objects.nonNull(paimonSinkConfig.getChangelogProducer())
                 && changelogProducer != 
paimonSinkConfig.getChangelogProducer()) {
             log.warn(
@@ -145,15 +143,15 @@ public class PaimonSinkWriter
         }
         this.rowAssignerChannelComputer =
                 new RowAssignerChannelComputer(
-                        paimonFileStoretable.schema(), 
context.getNumberOfParallelSubtasks());
+                        paimonTable.schema(), 
context.getNumberOfParallelSubtasks());
         
rowAssignerChannelComputer.setup(context.getNumberOfParallelSubtasks());
         this.paimonBucketAssignerFactory = paimonBucketAssignerFactory;
         this.parallelism = context.getNumberOfParallelSubtasks();
         this.taskIndex = context.getIndexOfSubtask();
         this.paimonSinkConfig = paimonSinkConfig;
-        this.sinkPaimonTableSchema = this.paimonFileStoretable.schema();
+        this.sinkPaimonTableSchema = this.paimonTable.schema();
         this.newTableWrite();
-        BucketMode bucketMode = this.paimonFileStoretable.bucketMode();
+        BucketMode bucketMode = this.paimonTable.bucketMode();
         // 
https://paimon.apache.org/docs/master/primary-key-table/data-distribution/#dynamic-bucket
         // When you need cross partition upsert (primary keys not contain all 
partition fields),
         // Dynamic Bucket mode directly maintains the mapping of keys to 
partition and bucket, uses
@@ -166,7 +164,7 @@ public class PaimonSinkWriter
                     "Cross Partitions Upsert Dynamic Bucket Mode is not 
supported.");
         }
         this.dynamicBucket = BucketMode.HASH_DYNAMIC == bucketMode;
-        int bucket = ((FileStoreTable) paimonTable).coreOptions().bucket();
+        int bucket = paimonTable.coreOptions().bucket();
         if (bucket == -1 && BucketMode.BUCKET_UNAWARE == bucketMode) {
             log.warn("Append only table currently do not support dynamic 
bucket");
         }
@@ -181,6 +179,7 @@ public class PaimonSinkWriter
             ReadonlyConfig readonlyConfig,
             CatalogTable catalogTable,
             Table paimonFileStoretable,
+            String commitUser,
             List<PaimonSinkState> states,
             JobContext jobContext,
             PaimonSinkConfig paimonSinkConfig,
@@ -191,6 +190,7 @@ public class PaimonSinkWriter
                 readonlyConfig,
                 catalogTable,
                 paimonFileStoretable,
+                commitUser,
                 jobContext,
                 paimonSinkConfig,
                 paimonHadoopConfiguration,
@@ -198,21 +198,20 @@ public class PaimonSinkWriter
         if (Objects.isNull(states) || states.isEmpty()) {
             return;
         }
-        this.commitUser = states.get(0).getCommitUser();
-        long checkpointId = states.get(0).getCheckpointId();
-        try (TableCommit tableCommit = tableWriteBuilder.newCommit()) {
-            List<CommitMessage> commitables =
+        try (TableCommitImpl tableCommit = 
paimonTable.newCommit(states.get(0).getCommitUser())) {
+            Map<Long, List<CommitMessage>> commitMessagesMap =
                     states.stream()
-                            .map(PaimonSinkState::getCommittables)
-                            .flatMap(List::stream)
-                            .collect(Collectors.toList());
+                            .collect(
+                                    Collectors.toMap(
+                                            PaimonSinkState::getCheckpointId,
+                                            PaimonSinkState::getCommitTables));
             // batch mode without checkpoint has no state to commit
-            if (commitables.isEmpty()) {
+            if (commitMessagesMap.isEmpty()) {
                 return;
             }
             // streaming mode or batch mode with checkpoint need to recommit 
by stream api
-            log.info("Trying to recommit states {}", commitables);
-            ((StreamTableCommit) tableCommit).commit(checkpointId, 
commitables);
+            log.info("Trying to recommit states {}", commitMessagesMap);
+            tableCommit.filterAndCommit(commitMessagesMap);
         } catch (Exception e) {
             throw new PaimonConnectorException(
                     PaimonConnectorErrorCode.TABLE_WRITE_COMMIT_FAILED, e);
@@ -268,21 +267,20 @@ public class PaimonSinkWriter
 
     private void reOpenTableWrite() {
         this.seaTunnelRowType = this.sourceTableSchema.toPhysicalRowDataType();
-        this.paimonFileStoretable = (FileStoreTable) 
paimonCatalog.getPaimonTable(paimonTablePath);
-        this.sinkPaimonTableSchema = this.paimonFileStoretable.schema();
+        this.paimonTable = (FileStoreTable) 
paimonCatalog.getPaimonTable(paimonTablePath);
+        this.sinkPaimonTableSchema = this.paimonTable.schema();
         this.newTableWrite();
     }
 
     private void newTableWrite() {
-        this.tableWriteBuilder = 
this.paimonFileStoretable.newStreamWriteBuilder();
         TableWrite oldTableWrite = this.tableWrite;
+        tableWriteClose(oldTableWrite);
         this.tableWrite =
-                tableWriteBuilder
-                        .newWrite()
+                this.paimonTable
+                        .newWrite(commitUser)
                         .withIOManager(
                                 IOManager.create(
                                         
splitPaths(paimonSinkConfig.getChangelogTmpPath())));
-        tableWriteClose(oldTableWrite);
     }
 
     @Override
@@ -301,7 +299,7 @@ public class PaimonSinkWriter
                 bucketAssigners.clear();
                 assigners.forEach(assigner -> 
assigner.prepareCommit(checkpointId));
             }
-            return Optional.of(new PaimonCommitInfo(fileCommittables, 
checkpointId));
+            return Optional.of(new PaimonCommitInfo(fileCommittables, 
checkpointId, commitUser));
         } catch (Exception e) {
             throw new PaimonConnectorException(
                     PaimonConnectorErrorCode.TABLE_PRE_COMMIT_FAILED,
@@ -350,7 +348,7 @@ public class PaimonSinkWriter
         if (JobMode.BATCH.equals(jobContext.getJobMode())) {
             return true;
         }
-        CoreOptions coreOptions = this.paimonFileStoretable.coreOptions();
+        CoreOptions coreOptions = this.paimonTable.coreOptions();
         if (coreOptions.writeOnly()) {
             return false;
         }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/SupportLoadTable.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/SupportLoadTable.java
index 734762e23c..538fb3b6c4 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/SupportLoadTable.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/SupportLoadTable.java
@@ -19,4 +19,6 @@ package org.apache.seatunnel.connectors.seatunnel.paimon.sink;
 
 public interface SupportLoadTable<T> {
     void setLoadTable(T table);
+
+    T getLoadTable();
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitInfo.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitInfo.java
index 83ed71f615..ffa17ff2e9 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitInfo.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitInfo.java
@@ -35,4 +35,6 @@ public class PaimonAggregatedCommitInfo implements 
Serializable {
 
     // key: checkpointId value: Paimon commit message List
     private Map<Long, List<CommitMessage>> committablesMap;
+
+    private String commitUser;
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
index e452b6cf03..31052713ef 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
@@ -24,11 +24,10 @@ import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnecto
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.security.PaimonSecurityContext;
 
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.table.sink.StreamTableCommit;
-import org.apache.paimon.table.sink.TableCommit;
-import org.apache.paimon.table.sink.WriteBuilder;
+import org.apache.paimon.table.sink.TableCommitImpl;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -38,6 +37,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.stream.Collectors;
 
 /** Paimon connector aggregated committer class */
 @Slf4j
@@ -47,57 +47,94 @@ public class PaimonAggregatedCommitter
 
     private static final long serialVersionUID = 1L;
 
-    private final WriteBuilder tableWriteBuilder;
+    private final FileStoreTable table;
 
     public PaimonAggregatedCommitter(
             Table table, PaimonHadoopConfiguration paimonHadoopConfiguration) {
-        this.tableWriteBuilder = table.newStreamWriteBuilder();
+        this.table = (FileStoreTable) table;
         PaimonSecurityContext.shouldEnableKerberos(paimonHadoopConfiguration);
     }
 
     @Override
     public List<PaimonAggregatedCommitInfo> commit(
             List<PaimonAggregatedCommitInfo> aggregatedCommitInfo) throws 
IOException {
-        try (TableCommit tableCommit = tableWriteBuilder.newCommit()) {
+        aggregatedCommitInfo.stream()
+                
.collect(Collectors.groupingBy(PaimonAggregatedCommitInfo::getCommitUser))
+                .forEach(this::commit);
+        return Collections.emptyList();
+    }
+
+    private void commit(String commitUser, List<PaimonAggregatedCommitInfo> 
aggregatedCommitInfo) {
+        try (TableCommitImpl tableCommit = table.newCommit(commitUser)) {
             PaimonSecurityContext.runSecured(
                     () -> {
                         log.debug("Trying to commit states streaming mode");
-                        aggregatedCommitInfo.stream()
-                                .flatMap(
-                                        paimonAggregatedCommitInfo ->
-                                                
paimonAggregatedCommitInfo.getCommittablesMap()
-                                                        .entrySet().stream())
-                                .forEach(
-                                        entry ->
-                                                ((StreamTableCommit) 
tableCommit)
-                                                        
.commit(entry.getKey(), entry.getValue()));
+                        Map<Long, List<CommitMessage>> committablesMap =
+                                aggregatedCommitInfo.stream()
+                                        .flatMap(
+                                                paimonAggregatedCommitInfo ->
+                                                        
paimonAggregatedCommitInfo
+                                                                
.getCommittablesMap().entrySet()
+                                                                .stream())
+                                        .collect(
+                                                Collectors.toMap(
+                                                        Map.Entry::getKey, 
Map.Entry::getValue));
+                        if (!committablesMap.isEmpty()) {
+                            tableCommit.filterAndCommit(committablesMap);
+                        }
                         return null;
                     });
         } catch (Exception e) {
             throw new PaimonConnectorException(
-                    PaimonConnectorErrorCode.TABLE_WRITE_COMMIT_FAILED,
-                    "Paimon table storage write-commit Failed.",
-                    e);
+                    PaimonConnectorErrorCode.TABLE_WRITE_COMMIT_FAILED, e);
         }
-        return Collections.emptyList();
     }
 
     @Override
     public PaimonAggregatedCommitInfo combine(List<PaimonCommitInfo> 
commitInfos) {
-        Map<Long, List<CommitMessage>> committables = new HashMap<>();
+        String commitUser = commitInfos.get(0).getCommitUser();
+        Map<Long, List<CommitMessage>> commitTables = new HashMap<>();
         commitInfos.forEach(
                 commitInfo ->
-                        committables
+                        commitTables
                                 .computeIfAbsent(
                                         commitInfo.getCheckpointId(),
                                         id -> new CopyOnWriteArrayList<>())
                                 .addAll(commitInfo.getCommittables()));
-        return new PaimonAggregatedCommitInfo(committables);
+        return new PaimonAggregatedCommitInfo(commitTables, commitUser);
     }
 
     @Override
     public void abort(List<PaimonAggregatedCommitInfo> aggregatedCommitInfo) 
throws Exception {
-        // TODO find the right way to abort
+        aggregatedCommitInfo.stream()
+                
.collect(Collectors.groupingBy(PaimonAggregatedCommitInfo::getCommitUser))
+                .forEach(this::abort);
+    }
+
+    private void abort(String commitUser, List<PaimonAggregatedCommitInfo> 
aggregatedCommitInfo) {
+        try (TableCommitImpl tableCommit = table.newCommit(commitUser)) {
+            PaimonSecurityContext.runSecured(
+                    () -> {
+                        log.debug("Trying to commit states streaming mode");
+                        Map<Long, List<CommitMessage>> committablesMap =
+                                aggregatedCommitInfo.stream()
+                                        .flatMap(
+                                                paimonAggregatedCommitInfo ->
+                                                        
paimonAggregatedCommitInfo
+                                                                
.getCommittablesMap().entrySet()
+                                                                .stream())
+                                        .collect(
+                                                Collectors.toMap(
+                                                        Map.Entry::getKey, 
Map.Entry::getValue));
+                        if (!committablesMap.isEmpty()) {
+                            
committablesMap.values().forEach(tableCommit::abort);
+                        }
+                        return null;
+                    });
+        } catch (Exception e) {
+            throw new PaimonConnectorException(
+                    PaimonConnectorErrorCode.TABLE_WRITE_COMMIT_FAILED, e);
+        }
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonCommitInfo.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonCommitInfo.java
index 1d9844103f..1753e15c9e 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonCommitInfo.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonCommitInfo.java
@@ -34,4 +34,6 @@ public class PaimonCommitInfo implements Serializable {
     List<CommitMessage> committables;
 
     Long checkpointId;
+
+    String commitUser;
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/state/PaimonSinkState.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/state/PaimonSinkState.java
index e239f12f33..01b95eeeb0 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/state/PaimonSinkState.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/state/PaimonSinkState.java
@@ -32,7 +32,7 @@ public class PaimonSinkState implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
-    private List<CommitMessage> committables;
+    private List<CommitMessage> commitTables;
 
     private String commitUser;
 
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/writer/PaimonWriteTest.java
 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/writer/PaimonWriteTest.java
index 6c10c7f23d..b0d2f6d3b0 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/writer/PaimonWriteTest.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/writer/PaimonWriteTest.java
@@ -47,6 +47,7 @@ import org.junit.jupiter.api.Test;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.UUID;
 
 public class PaimonWriteTest {
 
@@ -58,6 +59,7 @@ public class PaimonWriteTest {
     private PaimonSinkWriter paimonSinkWriter;
     private ReadonlyConfig readonlyConfig;
     private SinkWriter.Context context;
+    private final String commitUser = UUID.randomUUID().toString();
 
     @BeforeEach
     public void before() {
@@ -237,6 +239,7 @@ public class PaimonWriteTest {
                         readonlyConfig,
                         paimonCatalog.getTable(tablePath),
                         paimonCatalog.getPaimonTable(tablePath),
+                        commitUser,
                         jobContext,
                         new PaimonSinkConfig(readonlyConfig),
                         new PaimonHadoopConfiguration(),
@@ -250,6 +253,7 @@ public class PaimonWriteTest {
                         readonlyConfig,
                         paimonCatalog.getTable(tablePath),
                         paimonCatalog.getPaimonTable(tablePath),
+                        commitUser,
                         jobContext,
                         new PaimonSinkConfig(readonlyConfig),
                         new PaimonHadoopConfiguration(),
@@ -271,6 +275,7 @@ public class PaimonWriteTest {
                         readonlyConfig,
                         paimonCatalog.getTable(tablePath),
                         paimonCatalog.getPaimonTable(tablePath),
+                        commitUser,
                         jobContext,
                         new PaimonSinkConfig(readonlyConfig),
                         new PaimonHadoopConfiguration(),
@@ -285,6 +290,7 @@ public class PaimonWriteTest {
                         readonlyConfig,
                         paimonCatalog.getTable(tablePath),
                         paimonCatalog.getPaimonTable(tablePath),
+                        commitUser,
                         jobContext,
                         new PaimonSinkConfig(readonlyConfig),
                         new PaimonHadoopConfiguration(),
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java
index 73bf50df42..337828b951 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java
@@ -33,6 +33,7 @@ import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
 import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
 
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Decimal;
@@ -153,20 +154,20 @@ public class PaimonSinkWithSchemaEvolutionIT extends 
AbstractPaimonIT implements
     }
 
     @TestTemplate
-    public void testMysqlCdcSinkPaimonWithSchemaChange(TestContainer 
container) throws Exception {
+    public void testMysqlCdcSinkPaimonWithSchemaChangeAndRestore(TestContainer 
container)
+            throws Exception {
+        String jobId = String.valueOf(JobIdGenerator.newJobId());
         String jobConfigFile = "/mysql_cdc_to_paimon_with_schema_change.conf";
         CompletableFuture.runAsync(
                 () -> {
                     try {
-                        container.executeJob(jobConfigFile);
+                        container.executeJob(jobConfigFile, jobId);
                     } catch (Exception e) {
                         log.error("Commit task exception :" + e.getMessage());
                         throw new RuntimeException(e);
                     }
                 });
-
-        // Waiting for auto create sink table
-        Thread.sleep(15000);
+        verifyJobStatus(container, jobId);
 
         await().atMost(30, TimeUnit.SECONDS)
                 .untilAsserted(
@@ -182,25 +183,50 @@ public class PaimonSinkWithSchemaEvolutionIT extends 
AbstractPaimonIT implements
         // with default value at same time, the history data in paimon has no 
value.
         List<ImmutableTriple<String[], Integer, Integer>> 
idRangesWithFiledProjection1 =
                 getIdRangesWithFiledProjectionImmutableTriplesCase1();
-        vertifySchemaAndData(container, idRangesWithFiledProjection1);
+        verifySchemaAndData(container, idRangesWithFiledProjection1);
+
+        // savepoint job
+        Container.ExecResult execResult = container.savepointJob(jobId);
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+        // restore job
+        CompletableFuture.runAsync(
+                () -> {
+                    try {
+                        container.restoreJob(jobConfigFile, jobId);
+                    } catch (Exception e) {
+                        log.error("Commit task exception :" + e.getMessage());
+                        throw new RuntimeException(e);
+                    }
+                });
+        verifyJobStatus(container, jobId);
 
         // Case 2: Drop columns with data at same time
         shopDatabase.setTemplateName("drop_columns").createAndInitialize();
         List<ImmutableTriple<String[], Integer, Integer>> 
idRangesWithFiledProjection2 =
                 getIdRangesWithFiledProjectionImmutableTriplesCase2();
-        vertifySchemaAndData(container, idRangesWithFiledProjection2);
+        verifySchemaAndData(container, idRangesWithFiledProjection2);
 
         // Case 3: Change columns with data at same time
         shopDatabase.setTemplateName("change_columns").createAndInitialize();
         List<ImmutableTriple<String[], Integer, Integer>> 
idRangesWithFiledProjection3 =
                 getIdRangesWithFiledProjectionImmutableTriplesCase3();
-        vertifySchemaAndData(container, idRangesWithFiledProjection3);
+        verifySchemaAndData(container, idRangesWithFiledProjection3);
 
         // Case 4: Modify columns with data at same time
         shopDatabase.setTemplateName("modify_columns").createAndInitialize();
         List<ImmutableTriple<String[], Integer, Integer>> 
idRangesWithFiledProjection4 =
                 getIdRangesWithFiledProjectionImmutableTriplesCase4();
-        vertifySchemaAndData(container, idRangesWithFiledProjection4);
+        verifySchemaAndData(container, idRangesWithFiledProjection4);
+    }
+
+    private void verifyJobStatus(TestContainer container, String jobId) {
+        await().pollDelay(30, TimeUnit.SECONDS)
+                .atMost(45, TimeUnit.SECONDS)
+                .untilAsserted(
+                        () -> {
+                            String jobStatus = container.getJobStatus(jobId);
+                            Assertions.assertEquals("RUNNING", jobStatus);
+                        });
     }
 
     private List<ImmutableTriple<String[], Integer, Integer>>
@@ -328,15 +354,15 @@ public class PaimonSinkWithSchemaEvolutionIT extends 
AbstractPaimonIT implements
         };
     }
 
-    private void vertifySchemaAndData(
+    private void verifySchemaAndData(
             TestContainer container,
             List<ImmutableTriple<String[], Integer, Integer>> 
idRangesWithFiledProjection) {
-        await().pollDelay(3, TimeUnit.SECONDS)
-                .atMost(30, TimeUnit.SECONDS)
+        await().pollDelay(5, TimeUnit.SECONDS)
+                .atMost(40, TimeUnit.SECONDS)
                 .untilAsserted(
                         () -> {
                             // 1. Vertify the schema
-                            vertifySchema();
+                            verifySchema();
 
                             // 2. Vertify the data
                             idRangesWithFiledProjection.forEach(
@@ -362,7 +388,7 @@ public class PaimonSinkWithSchemaEvolutionIT extends 
AbstractPaimonIT implements
                         });
     }
 
-    private void vertifySchema() {
+    private void verifySchema() {
         try (MySqlCatalog mySqlCatalog =
                 new MySqlCatalog(
                         "mysql",
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_ddl.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_ddl.conf
index 4d4b30d7ac..5856db9a02 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_ddl.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_ddl.conf
@@ -48,7 +48,7 @@ sink {
     paimon.table.write-props = {
       changelog-producer = lookup
       changelog-tmp-path = "/tmp/paimon/changelog"
-      file-format = parquet
+      file.format = parquet
     }
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_insert_data.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_insert_data.conf
index 3ff070d2d5..6168f76729 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_insert_data.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_insert_data.conf
@@ -62,7 +62,7 @@ sink {
     paimon.table.write-props = {
       changelog-producer = lookup
       changelog-tmp-path = "/tmp/paimon/changelog"
-      file-format = parquet
+      file.format = parquet
     }
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_update_data.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_update_data.conf
index 209b2a7e41..434662bee9 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_update_data.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_update_data.conf
@@ -66,7 +66,7 @@ sink {
     paimon.table.write-props = {
       changelog-producer = lookup
       changelog-tmp-path = "/tmp/paimon/changelog"
-      file-format = parquet
+      file.format = parquet
     }
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case2.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case2.conf
index e5660abc55..333f4c00fc 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case2.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case2.conf
@@ -78,7 +78,7 @@ sink {
     paimon.table.write-props = {
       changelog-producer = full-compaction
       changelog-tmp-path = "/tmp/paimon/changelog"
-      file-format = parquet
+      file.format = parquet
     }
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_paimon_to_paimon.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_paimon_to_paimon.conf
index d63810fa19..0f4b547b93 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_paimon_to_paimon.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_paimon_to_paimon.conf
@@ -44,7 +44,7 @@ sink {
     paimon.table.non-primary-key = true
     paimon.table.write-props = {
       write-only = true
-      file-format = parquet
+      file.format = parquet
     }
   }
 }


Reply via email to