This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 085023ad0d [fix][connectors-v2] repeated commit cause task exceptions
(#9665)
085023ad0d is described below
commit 085023ad0db1a8729ce724a14855c0077f051dfe
Author: zhangdonghao <[email protected]>
AuthorDate: Mon Aug 11 13:44:04 2025 +0800
[fix][connectors-v2] repeated commit cause task exceptions (#9665)
---
.../paimon/handler/PaimonSaveModeHandler.java | 5 +-
.../seatunnel/paimon/sink/PaimonSink.java | 23 ++++++
.../seatunnel/paimon/sink/PaimonSinkWriter.java | 64 ++++++++---------
.../seatunnel/paimon/sink/SupportLoadTable.java | 2 +
.../sink/commit/PaimonAggregatedCommitInfo.java | 2 +
.../sink/commit/PaimonAggregatedCommitter.java | 83 ++++++++++++++++------
.../paimon/sink/commit/PaimonCommitInfo.java | 2 +
.../paimon/sink/state/PaimonSinkState.java | 2 +-
.../paimon/sink/writer/PaimonWriteTest.java | 6 ++
.../paimon/PaimonSinkWithSchemaEvolutionIT.java | 54 ++++++++++----
.../changelog_fake_cdc_sink_paimon_case1_ddl.conf | 2 +-
...log_fake_cdc_sink_paimon_case1_insert_data.conf | 2 +-
...log_fake_cdc_sink_paimon_case1_update_data.conf | 2 +-
.../changelog_fake_cdc_sink_paimon_case2.conf | 2 +-
.../test/resources/changelog_paimon_to_paimon.conf | 2 +-
15 files changed, 176 insertions(+), 77 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java
index b479ebf14b..7e93ee3512 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java
@@ -53,6 +53,9 @@ public class PaimonSaveModeHandler extends
DefaultSaveModeHandler {
TablePath tablePath = catalogTable.getTablePath();
Table paimonTable = ((PaimonCatalog)
catalog).getPaimonTable(tablePath);
// load paimon table and set it into paimon sink
- this.supportLoadTable.setLoadTable(paimonTable);
+ Table loadTable = this.supportLoadTable.getLoadTable();
+ if (loadTable == null || this.schemaSaveMode ==
SchemaSaveMode.RECREATE_SCHEMA) {
+ this.supportLoadTable.setLoadTable(paimonTable);
+ }
}
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
index 29db1001ab..5b185004c6 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
@@ -29,6 +29,7 @@ import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.schema.SchemaChangeType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog;
@@ -48,6 +49,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
+import java.util.UUID;
public class PaimonSink
implements SeaTunnelSink<
@@ -78,12 +80,26 @@ public class PaimonSink
private final PaimonBucketAssignerFactory paimonBucketAssignerFactory;
+ private final String commitUser = UUID.randomUUID().toString();
+
public PaimonSink(ReadonlyConfig readonlyConfig, CatalogTable
catalogTable) {
this.readonlyConfig = readonlyConfig;
this.paimonSinkConfig = new PaimonSinkConfig(readonlyConfig);
this.catalogTable = catalogTable;
this.paimonHadoopConfiguration =
PaimonSecurityContext.loadHadoopConfig(paimonSinkConfig);
this.paimonBucketAssignerFactory = new PaimonBucketAssignerFactory();
+ try (PaimonCatalog paimonCatalog =
PaimonCatalog.loadPaimonCatalog(readonlyConfig)) {
+ paimonCatalog.open();
+ boolean databaseExists =
+
paimonCatalog.databaseExists(this.paimonSinkConfig.getNamespace());
+ if (databaseExists) {
+ TablePath tablePath = catalogTable.getTablePath();
+ boolean tableExists = paimonCatalog.tableExists(tablePath);
+ if (tableExists) {
+ this.paimonTable = paimonCatalog.getPaimonTable(tablePath);
+ }
+ }
+ }
}
@Override
@@ -98,6 +114,7 @@ public class PaimonSink
readonlyConfig,
catalogTable,
paimonTable,
+ commitUser,
jobContext,
paimonSinkConfig,
paimonHadoopConfiguration,
@@ -118,6 +135,7 @@ public class PaimonSink
readonlyConfig,
catalogTable,
paimonTable,
+ commitUser,
states,
jobContext,
paimonSinkConfig,
@@ -158,6 +176,11 @@ public class PaimonSink
this.paimonTable = table;
}
+ @Override
+ public Table getLoadTable() {
+ return paimonTable;
+ }
+
@Override
public Optional<CatalogTable> getWriteCatalogTable() {
return Optional.ofNullable(catalogTable);
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
index 36c9010588..64ced58c5d 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
@@ -54,11 +54,9 @@ import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.sink.TableCommit;
+import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWrite;
-import org.apache.paimon.table.sink.WriteBuilder;
import lombok.extern.slf4j.Slf4j;
@@ -67,10 +65,10 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.UUID;
import java.util.stream.Collectors;
import static org.apache.paimon.disk.IOManagerImpl.splitPaths;
@@ -81,11 +79,9 @@ public class PaimonSinkWriter
SupportMultiTableSinkWriter<Void>,
SupportSchemaEvolutionSinkWriter {
- private String commitUser = UUID.randomUUID().toString();
+ private final String commitUser;
- private FileStoreTable paimonFileStoretable;
-
- private WriteBuilder tableWriteBuilder;
+ private FileStoreTable paimonTable;
private TableWrite tableWrite;
@@ -124,7 +120,8 @@ public class PaimonSinkWriter
Context context,
ReadonlyConfig readonlyConfig,
CatalogTable catalogTable,
- Table paimonTable,
+ Table paimonFileStoretable,
+ String commitUser,
JobContext jobContext,
PaimonSinkConfig paimonSinkConfig,
PaimonHadoopConfiguration paimonHadoopConfiguration,
@@ -135,9 +132,10 @@ public class PaimonSinkWriter
this.paimonTablePath = catalogTable.getTablePath();
this.paimonCatalog = PaimonCatalog.loadPaimonCatalog(readonlyConfig);
this.paimonCatalog.open();
- this.paimonFileStoretable = (FileStoreTable) paimonTable;
+ this.paimonTable = (FileStoreTable) paimonFileStoretable;
+ this.commitUser = commitUser;
CoreOptions.ChangelogProducer changelogProducer =
- this.paimonFileStoretable.coreOptions().changelogProducer();
+ this.paimonTable.coreOptions().changelogProducer();
if (Objects.nonNull(paimonSinkConfig.getChangelogProducer())
&& changelogProducer !=
paimonSinkConfig.getChangelogProducer()) {
log.warn(
@@ -145,15 +143,15 @@ public class PaimonSinkWriter
}
this.rowAssignerChannelComputer =
new RowAssignerChannelComputer(
- paimonFileStoretable.schema(),
context.getNumberOfParallelSubtasks());
+ paimonTable.schema(),
context.getNumberOfParallelSubtasks());
rowAssignerChannelComputer.setup(context.getNumberOfParallelSubtasks());
this.paimonBucketAssignerFactory = paimonBucketAssignerFactory;
this.parallelism = context.getNumberOfParallelSubtasks();
this.taskIndex = context.getIndexOfSubtask();
this.paimonSinkConfig = paimonSinkConfig;
- this.sinkPaimonTableSchema = this.paimonFileStoretable.schema();
+ this.sinkPaimonTableSchema = this.paimonTable.schema();
this.newTableWrite();
- BucketMode bucketMode = this.paimonFileStoretable.bucketMode();
+ BucketMode bucketMode = this.paimonTable.bucketMode();
//
https://paimon.apache.org/docs/master/primary-key-table/data-distribution/#dynamic-bucket
// When you need cross partition upsert (primary keys not contain all
partition fields),
// Dynamic Bucket mode directly maintains the mapping of keys to
partition and bucket, uses
@@ -166,7 +164,7 @@ public class PaimonSinkWriter
"Cross Partitions Upsert Dynamic Bucket Mode is not
supported.");
}
this.dynamicBucket = BucketMode.HASH_DYNAMIC == bucketMode;
- int bucket = ((FileStoreTable) paimonTable).coreOptions().bucket();
+ int bucket = paimonTable.coreOptions().bucket();
if (bucket == -1 && BucketMode.BUCKET_UNAWARE == bucketMode) {
log.warn("Append only table currently do not support dynamic
bucket");
}
@@ -181,6 +179,7 @@ public class PaimonSinkWriter
ReadonlyConfig readonlyConfig,
CatalogTable catalogTable,
Table paimonFileStoretable,
+ String commitUser,
List<PaimonSinkState> states,
JobContext jobContext,
PaimonSinkConfig paimonSinkConfig,
@@ -191,6 +190,7 @@ public class PaimonSinkWriter
readonlyConfig,
catalogTable,
paimonFileStoretable,
+ commitUser,
jobContext,
paimonSinkConfig,
paimonHadoopConfiguration,
@@ -198,21 +198,20 @@ public class PaimonSinkWriter
if (Objects.isNull(states) || states.isEmpty()) {
return;
}
- this.commitUser = states.get(0).getCommitUser();
- long checkpointId = states.get(0).getCheckpointId();
- try (TableCommit tableCommit = tableWriteBuilder.newCommit()) {
- List<CommitMessage> commitables =
+ try (TableCommitImpl tableCommit =
paimonTable.newCommit(states.get(0).getCommitUser())) {
+ Map<Long, List<CommitMessage>> commitMessagesMap =
states.stream()
- .map(PaimonSinkState::getCommittables)
- .flatMap(List::stream)
- .collect(Collectors.toList());
+ .collect(
+ Collectors.toMap(
+ PaimonSinkState::getCheckpointId,
+ PaimonSinkState::getCommitTables));
// batch mode without checkpoint has no state to commit
- if (commitables.isEmpty()) {
+ if (commitMessagesMap.isEmpty()) {
return;
}
// streaming mode or batch mode with checkpoint need to recommit
by stream api
- log.info("Trying to recommit states {}", commitables);
- ((StreamTableCommit) tableCommit).commit(checkpointId,
commitables);
+ log.info("Trying to recommit states {}", commitMessagesMap);
+ tableCommit.filterAndCommit(commitMessagesMap);
} catch (Exception e) {
throw new PaimonConnectorException(
PaimonConnectorErrorCode.TABLE_WRITE_COMMIT_FAILED, e);
@@ -268,21 +267,20 @@ public class PaimonSinkWriter
private void reOpenTableWrite() {
this.seaTunnelRowType = this.sourceTableSchema.toPhysicalRowDataType();
- this.paimonFileStoretable = (FileStoreTable)
paimonCatalog.getPaimonTable(paimonTablePath);
- this.sinkPaimonTableSchema = this.paimonFileStoretable.schema();
+ this.paimonTable = (FileStoreTable)
paimonCatalog.getPaimonTable(paimonTablePath);
+ this.sinkPaimonTableSchema = this.paimonTable.schema();
this.newTableWrite();
}
private void newTableWrite() {
- this.tableWriteBuilder =
this.paimonFileStoretable.newStreamWriteBuilder();
TableWrite oldTableWrite = this.tableWrite;
+ tableWriteClose(oldTableWrite);
this.tableWrite =
- tableWriteBuilder
- .newWrite()
+ this.paimonTable
+ .newWrite(commitUser)
.withIOManager(
IOManager.create(
splitPaths(paimonSinkConfig.getChangelogTmpPath())));
- tableWriteClose(oldTableWrite);
}
@Override
@@ -301,7 +299,7 @@ public class PaimonSinkWriter
bucketAssigners.clear();
assigners.forEach(assigner ->
assigner.prepareCommit(checkpointId));
}
- return Optional.of(new PaimonCommitInfo(fileCommittables,
checkpointId));
+ return Optional.of(new PaimonCommitInfo(fileCommittables,
checkpointId, commitUser));
} catch (Exception e) {
throw new PaimonConnectorException(
PaimonConnectorErrorCode.TABLE_PRE_COMMIT_FAILED,
@@ -350,7 +348,7 @@ public class PaimonSinkWriter
if (JobMode.BATCH.equals(jobContext.getJobMode())) {
return true;
}
- CoreOptions coreOptions = this.paimonFileStoretable.coreOptions();
+ CoreOptions coreOptions = this.paimonTable.coreOptions();
if (coreOptions.writeOnly()) {
return false;
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/SupportLoadTable.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/SupportLoadTable.java
index 734762e23c..538fb3b6c4 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/SupportLoadTable.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/SupportLoadTable.java
@@ -19,4 +19,6 @@ package org.apache.seatunnel.connectors.seatunnel.paimon.sink;
public interface SupportLoadTable<T> {
void setLoadTable(T table);
+
+ T getLoadTable();
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitInfo.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitInfo.java
index 83ed71f615..ffa17ff2e9 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitInfo.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitInfo.java
@@ -35,4 +35,6 @@ public class PaimonAggregatedCommitInfo implements
Serializable {
// key: checkpointId value: Paimon commit message List
private Map<Long, List<CommitMessage>> committablesMap;
+
+ private String commitUser;
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
index e452b6cf03..31052713ef 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
@@ -24,11 +24,10 @@ import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnecto
import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.paimon.security.PaimonSecurityContext;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.table.sink.StreamTableCommit;
-import org.apache.paimon.table.sink.TableCommit;
-import org.apache.paimon.table.sink.WriteBuilder;
+import org.apache.paimon.table.sink.TableCommitImpl;
import lombok.extern.slf4j.Slf4j;
@@ -38,6 +37,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.stream.Collectors;
/** Paimon connector aggregated committer class */
@Slf4j
@@ -47,57 +47,94 @@ public class PaimonAggregatedCommitter
private static final long serialVersionUID = 1L;
- private final WriteBuilder tableWriteBuilder;
+ private final FileStoreTable table;
public PaimonAggregatedCommitter(
Table table, PaimonHadoopConfiguration paimonHadoopConfiguration) {
- this.tableWriteBuilder = table.newStreamWriteBuilder();
+ this.table = (FileStoreTable) table;
PaimonSecurityContext.shouldEnableKerberos(paimonHadoopConfiguration);
}
@Override
public List<PaimonAggregatedCommitInfo> commit(
List<PaimonAggregatedCommitInfo> aggregatedCommitInfo) throws
IOException {
- try (TableCommit tableCommit = tableWriteBuilder.newCommit()) {
+ aggregatedCommitInfo.stream()
+
.collect(Collectors.groupingBy(PaimonAggregatedCommitInfo::getCommitUser))
+ .forEach(this::commit);
+ return Collections.emptyList();
+ }
+
+ private void commit(String commitUser, List<PaimonAggregatedCommitInfo>
aggregatedCommitInfo) {
+ try (TableCommitImpl tableCommit = table.newCommit(commitUser)) {
PaimonSecurityContext.runSecured(
() -> {
log.debug("Trying to commit states streaming mode");
- aggregatedCommitInfo.stream()
- .flatMap(
- paimonAggregatedCommitInfo ->
-
paimonAggregatedCommitInfo.getCommittablesMap()
- .entrySet().stream())
- .forEach(
- entry ->
- ((StreamTableCommit)
tableCommit)
-
.commit(entry.getKey(), entry.getValue()));
+ Map<Long, List<CommitMessage>> committablesMap =
+ aggregatedCommitInfo.stream()
+ .flatMap(
+ paimonAggregatedCommitInfo ->
+
paimonAggregatedCommitInfo
+
.getCommittablesMap().entrySet()
+ .stream())
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
Map.Entry::getValue));
+ if (!committablesMap.isEmpty()) {
+ tableCommit.filterAndCommit(committablesMap);
+ }
return null;
});
} catch (Exception e) {
throw new PaimonConnectorException(
- PaimonConnectorErrorCode.TABLE_WRITE_COMMIT_FAILED,
- "Paimon table storage write-commit Failed.",
- e);
+ PaimonConnectorErrorCode.TABLE_WRITE_COMMIT_FAILED, e);
}
- return Collections.emptyList();
}
@Override
public PaimonAggregatedCommitInfo combine(List<PaimonCommitInfo>
commitInfos) {
- Map<Long, List<CommitMessage>> committables = new HashMap<>();
+ String commitUser = commitInfos.get(0).getCommitUser();
+ Map<Long, List<CommitMessage>> commitTables = new HashMap<>();
commitInfos.forEach(
commitInfo ->
- committables
+ commitTables
.computeIfAbsent(
commitInfo.getCheckpointId(),
id -> new CopyOnWriteArrayList<>())
.addAll(commitInfo.getCommittables()));
- return new PaimonAggregatedCommitInfo(committables);
+ return new PaimonAggregatedCommitInfo(commitTables, commitUser);
}
@Override
public void abort(List<PaimonAggregatedCommitInfo> aggregatedCommitInfo)
throws Exception {
- // TODO find the right way to abort
+ aggregatedCommitInfo.stream()
+
.collect(Collectors.groupingBy(PaimonAggregatedCommitInfo::getCommitUser))
+ .forEach(this::abort);
+ }
+
+ private void abort(String commitUser, List<PaimonAggregatedCommitInfo>
aggregatedCommitInfo) {
+ try (TableCommitImpl tableCommit = table.newCommit(commitUser)) {
+ PaimonSecurityContext.runSecured(
+ () -> {
+ log.debug("Trying to commit states streaming mode");
+ Map<Long, List<CommitMessage>> committablesMap =
+ aggregatedCommitInfo.stream()
+ .flatMap(
+ paimonAggregatedCommitInfo ->
+
paimonAggregatedCommitInfo
+
.getCommittablesMap().entrySet()
+ .stream())
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
Map.Entry::getValue));
+ if (!committablesMap.isEmpty()) {
+
committablesMap.values().forEach(tableCommit::abort);
+ }
+ return null;
+ });
+ } catch (Exception e) {
+ throw new PaimonConnectorException(
+ PaimonConnectorErrorCode.TABLE_WRITE_COMMIT_FAILED, e);
+ }
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonCommitInfo.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonCommitInfo.java
index 1d9844103f..1753e15c9e 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonCommitInfo.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonCommitInfo.java
@@ -34,4 +34,6 @@ public class PaimonCommitInfo implements Serializable {
List<CommitMessage> committables;
Long checkpointId;
+
+ String commitUser;
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/state/PaimonSinkState.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/state/PaimonSinkState.java
index e239f12f33..01b95eeeb0 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/state/PaimonSinkState.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/state/PaimonSinkState.java
@@ -32,7 +32,7 @@ public class PaimonSinkState implements Serializable {
private static final long serialVersionUID = 1L;
- private List<CommitMessage> committables;
+ private List<CommitMessage> commitTables;
private String commitUser;
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/writer/PaimonWriteTest.java
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/writer/PaimonWriteTest.java
index 6c10c7f23d..b0d2f6d3b0 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/writer/PaimonWriteTest.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/writer/PaimonWriteTest.java
@@ -47,6 +47,7 @@ import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
+import java.util.UUID;
public class PaimonWriteTest {
@@ -58,6 +59,7 @@ public class PaimonWriteTest {
private PaimonSinkWriter paimonSinkWriter;
private ReadonlyConfig readonlyConfig;
private SinkWriter.Context context;
+ private final String commitUser = UUID.randomUUID().toString();
@BeforeEach
public void before() {
@@ -237,6 +239,7 @@ public class PaimonWriteTest {
readonlyConfig,
paimonCatalog.getTable(tablePath),
paimonCatalog.getPaimonTable(tablePath),
+ commitUser,
jobContext,
new PaimonSinkConfig(readonlyConfig),
new PaimonHadoopConfiguration(),
@@ -250,6 +253,7 @@ public class PaimonWriteTest {
readonlyConfig,
paimonCatalog.getTable(tablePath),
paimonCatalog.getPaimonTable(tablePath),
+ commitUser,
jobContext,
new PaimonSinkConfig(readonlyConfig),
new PaimonHadoopConfiguration(),
@@ -271,6 +275,7 @@ public class PaimonWriteTest {
readonlyConfig,
paimonCatalog.getTable(tablePath),
paimonCatalog.getPaimonTable(tablePath),
+ commitUser,
jobContext,
new PaimonSinkConfig(readonlyConfig),
new PaimonHadoopConfiguration(),
@@ -285,6 +290,7 @@ public class PaimonWriteTest {
readonlyConfig,
paimonCatalog.getTable(tablePath),
paimonCatalog.getPaimonTable(tablePath),
+ commitUser,
jobContext,
new PaimonSinkConfig(readonlyConfig),
new PaimonHadoopConfiguration(),
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java
index 73bf50df42..337828b951 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java
@@ -33,6 +33,7 @@ import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
@@ -153,20 +154,20 @@ public class PaimonSinkWithSchemaEvolutionIT extends
AbstractPaimonIT implements
}
@TestTemplate
- public void testMysqlCdcSinkPaimonWithSchemaChange(TestContainer
container) throws Exception {
+ public void testMysqlCdcSinkPaimonWithSchemaChangeAndRestore(TestContainer
container)
+ throws Exception {
+ String jobId = String.valueOf(JobIdGenerator.newJobId());
String jobConfigFile = "/mysql_cdc_to_paimon_with_schema_change.conf";
CompletableFuture.runAsync(
() -> {
try {
- container.executeJob(jobConfigFile);
+ container.executeJob(jobConfigFile, jobId);
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
}
});
-
- // Waiting for auto create sink table
- Thread.sleep(15000);
+ verifyJobStatus(container, jobId);
await().atMost(30, TimeUnit.SECONDS)
.untilAsserted(
@@ -182,25 +183,50 @@ public class PaimonSinkWithSchemaEvolutionIT extends
AbstractPaimonIT implements
// with default value at same time, the history data in paimon has no
value.
List<ImmutableTriple<String[], Integer, Integer>>
idRangesWithFiledProjection1 =
getIdRangesWithFiledProjectionImmutableTriplesCase1();
- vertifySchemaAndData(container, idRangesWithFiledProjection1);
+ verifySchemaAndData(container, idRangesWithFiledProjection1);
+
+ // savepoint job
+ Container.ExecResult execResult = container.savepointJob(jobId);
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ // restore job
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ container.restoreJob(jobConfigFile, jobId);
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ });
+ verifyJobStatus(container, jobId);
// Case 2: Drop columns with data at same time
shopDatabase.setTemplateName("drop_columns").createAndInitialize();
List<ImmutableTriple<String[], Integer, Integer>>
idRangesWithFiledProjection2 =
getIdRangesWithFiledProjectionImmutableTriplesCase2();
- vertifySchemaAndData(container, idRangesWithFiledProjection2);
+ verifySchemaAndData(container, idRangesWithFiledProjection2);
// Case 3: Change columns with data at same time
shopDatabase.setTemplateName("change_columns").createAndInitialize();
List<ImmutableTriple<String[], Integer, Integer>>
idRangesWithFiledProjection3 =
getIdRangesWithFiledProjectionImmutableTriplesCase3();
- vertifySchemaAndData(container, idRangesWithFiledProjection3);
+ verifySchemaAndData(container, idRangesWithFiledProjection3);
// Case 4: Modify columns with data at same time
shopDatabase.setTemplateName("modify_columns").createAndInitialize();
List<ImmutableTriple<String[], Integer, Integer>>
idRangesWithFiledProjection4 =
getIdRangesWithFiledProjectionImmutableTriplesCase4();
- vertifySchemaAndData(container, idRangesWithFiledProjection4);
+ verifySchemaAndData(container, idRangesWithFiledProjection4);
+ }
+
+ private void verifyJobStatus(TestContainer container, String jobId) {
+ await().pollDelay(30, TimeUnit.SECONDS)
+ .atMost(45, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ String jobStatus = container.getJobStatus(jobId);
+ Assertions.assertEquals("RUNNING", jobStatus);
+ });
}
private List<ImmutableTriple<String[], Integer, Integer>>
@@ -328,15 +354,15 @@ public class PaimonSinkWithSchemaEvolutionIT extends
AbstractPaimonIT implements
};
}
- private void vertifySchemaAndData(
+ private void verifySchemaAndData(
TestContainer container,
List<ImmutableTriple<String[], Integer, Integer>>
idRangesWithFiledProjection) {
- await().pollDelay(3, TimeUnit.SECONDS)
- .atMost(30, TimeUnit.SECONDS)
+ await().pollDelay(5, TimeUnit.SECONDS)
+ .atMost(40, TimeUnit.SECONDS)
.untilAsserted(
() -> {
// 1. Vertify the schema
- vertifySchema();
+ verifySchema();
// 2. Vertify the data
idRangesWithFiledProjection.forEach(
@@ -362,7 +388,7 @@ public class PaimonSinkWithSchemaEvolutionIT extends
AbstractPaimonIT implements
});
}
- private void vertifySchema() {
+ private void verifySchema() {
try (MySqlCatalog mySqlCatalog =
new MySqlCatalog(
"mysql",
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_ddl.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_ddl.conf
index 4d4b30d7ac..5856db9a02 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_ddl.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_ddl.conf
@@ -48,7 +48,7 @@ sink {
paimon.table.write-props = {
changelog-producer = lookup
changelog-tmp-path = "/tmp/paimon/changelog"
- file-format = parquet
+ file.format = parquet
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_insert_data.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_insert_data.conf
index 3ff070d2d5..6168f76729 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_insert_data.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_insert_data.conf
@@ -62,7 +62,7 @@ sink {
paimon.table.write-props = {
changelog-producer = lookup
changelog-tmp-path = "/tmp/paimon/changelog"
- file-format = parquet
+ file.format = parquet
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_update_data.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_update_data.conf
index 209b2a7e41..434662bee9 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_update_data.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_update_data.conf
@@ -66,7 +66,7 @@ sink {
paimon.table.write-props = {
changelog-producer = lookup
changelog-tmp-path = "/tmp/paimon/changelog"
- file-format = parquet
+ file.format = parquet
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case2.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case2.conf
index e5660abc55..333f4c00fc 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case2.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case2.conf
@@ -78,7 +78,7 @@ sink {
paimon.table.write-props = {
changelog-producer = full-compaction
changelog-tmp-path = "/tmp/paimon/changelog"
- file-format = parquet
+ file.format = parquet
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_paimon_to_paimon.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_paimon_to_paimon.conf
index d63810fa19..0f4b547b93 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_paimon_to_paimon.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_paimon_to_paimon.conf
@@ -44,7 +44,7 @@ sink {
paimon.table.non-primary-key = true
paimon.table.write-props = {
write-only = true
- file-format = parquet
+ file.format = parquet
}
}
}