This is an automated email from the ASF dual-hosted git repository.

dailai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 45a7a715a2 [Improve][Connector-V2] Reduce the create times of iceberg 
sink writer (#8155)
45a7a715a2 is described below

commit 45a7a715a2ae4e6577b049bd9fb21ccbb3aab68f
Author: Jia Fan <fanjiaemi...@qq.com>
AuthorDate: Thu Dec 5 09:34:54 2024 +0800

    [Improve][Connector-V2] Reduce the create times of iceberg sink writer 
(#8155)
---
 .../seatunnel/iceberg/sink/IcebergSinkWriter.java          | 14 +++++++++-----
 .../iceberg/sink/commit/IcebergFilesCommitter.java         |  8 ++++----
 .../seatunnel/iceberg/sink/writer/IcebergRecordWriter.java |  7 ++++---
 .../seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java  |  5 +++++
 4 files changed, 22 insertions(+), 12 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.java
index 2175080ac7..1028ae21b4 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.java
@@ -61,7 +61,7 @@ public class IcebergSinkWriter
     private SeaTunnelRowType rowType;
     private final SinkConfig config;
     private final IcebergTableLoader icebergTableLoader;
-    private RecordWriter writer;
+    private volatile RecordWriter writer;
     private final IcebergFilesCommitter filesCommitter;
     private final List<WriteResult> results = Lists.newArrayList();
     private String commitUser = UUID.randomUUID().toString();
@@ -79,7 +79,6 @@ public class IcebergSinkWriter
         this.rowType = tableSchema.toPhysicalRowDataType();
         this.filesCommitter = IcebergFilesCommitter.of(config, 
icebergTableLoader);
         this.dataTypeChangeEventHandler = new DataTypeChangeEventDispatcher();
-        tryCreateRecordWriter();
         if (Objects.nonNull(states) && !states.isEmpty()) {
             this.commitUser = states.get(0).getCommitUser();
             preCommit(states);
@@ -107,8 +106,7 @@ public class IcebergSinkWriter
 
     public static IcebergSinkWriter of(
             SinkConfig config, CatalogTable catalogTable, 
List<IcebergSinkState> states) {
-        IcebergTableLoader icebergTableLoader =
-                IcebergTableLoader.create(config, catalogTable).open();
+        IcebergTableLoader icebergTableLoader = 
IcebergTableLoader.create(config, catalogTable);
         return new IcebergSinkWriter(
                 icebergTableLoader, config, catalogTable.getTableSchema(), 
states);
     }
@@ -121,7 +119,12 @@ public class IcebergSinkWriter
 
     @Override
     public Optional<IcebergCommitInfo> prepareCommit() throws IOException {
-        List<WriteResult> writeResults = writer.complete();
+        List<WriteResult> writeResults;
+        if (writer != null) {
+            writeResults = writer.complete();
+        } else {
+            writeResults = Collections.emptyList();
+        }
         IcebergCommitInfo icebergCommitInfo = new 
IcebergCommitInfo(writeResults);
         this.results.addAll(writeResults);
         return Optional.of(icebergCommitInfo);
@@ -134,6 +137,7 @@ public class IcebergSinkWriter
             log.info("changed rowType before: {}", fieldsInfo(rowType));
             this.rowType = 
dataTypeChangeEventHandler.reset(rowType).apply(event);
             log.info("changed rowType after: {}", fieldsInfo(rowType));
+            tryCreateRecordWriter();
             writer.applySchemaChange(this.rowType, event);
         }
     }
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergFilesCommitter.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergFilesCommitter.java
index 0b5e473440..5e44e1d875 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergFilesCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergFilesCommitter.java
@@ -54,12 +54,10 @@ public class IcebergFilesCommitter implements Serializable {
 
     public void doCommit(List<WriteResult> results) {
         TableIdentifier tableIdentifier = 
icebergTableLoader.getTableIdentifier();
-        Table table = icebergTableLoader.loadTable();
-        log.info("do commit table : " + table.toString());
-        commit(tableIdentifier, table, results);
+        commit(tableIdentifier, results);
     }
 
-    private void commit(TableIdentifier tableIdentifier, Table table, 
List<WriteResult> results) {
+    private void commit(TableIdentifier tableIdentifier, List<WriteResult> 
results) {
         List<DataFile> dataFiles =
                 results.stream()
                         .filter(payload -> payload.getDataFiles() != null)
@@ -77,6 +75,8 @@ public class IcebergFilesCommitter implements Serializable {
         if (dataFiles.isEmpty() && deleteFiles.isEmpty()) {
             log.info(String.format("Nothing to commit to table %s, skipping", 
tableIdentifier));
         } else {
+            Table table = icebergTableLoader.loadTable();
+            log.info("do commit table : {}", table.toString());
             if (deleteFiles.isEmpty()) {
                 AppendFiles append = table.newAppend();
                 if (branch != null) {
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java
index 9c8949ba1d..22d0480aa4 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java
@@ -52,7 +52,7 @@ public class IcebergRecordWriter implements RecordWriter {
     private final Table table;
     private final SinkConfig config;
     private final List<WriteResult> writerResults;
-    private TaskWriter<Record> writer;
+    private volatile TaskWriter<Record> writer;
     private RowConverter recordConverter;
     private final IcebergWriterFactory writerFactory;
 
@@ -62,7 +62,6 @@ public class IcebergRecordWriter implements RecordWriter {
         this.writerResults = Lists.newArrayList();
         this.recordConverter = new RowConverter(table, config);
         this.writerFactory = writerFactory;
-        this.writer = createTaskWriter();
     }
 
     private TaskWriter<Record> createTaskWriter() {
@@ -71,6 +70,9 @@ public class IcebergRecordWriter implements RecordWriter {
 
     @Override
     public void write(SeaTunnelRow seaTunnelRow, SeaTunnelRowType rowType) {
+        if (writer == null) {
+            resetWriter();
+        }
         SchemaChangeWrapper updates = new SchemaChangeWrapper();
         Record record = recordConverter.convert(seaTunnelRow, rowType, 
updates);
         if (!updates.empty()) {
@@ -139,7 +141,6 @@ public class IcebergRecordWriter implements RecordWriter {
         flush();
         List<WriteResult> result = Lists.newArrayList(writerResults);
         writerResults.clear();
-        resetWriter();
         return result;
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java
index a7cbba8b89..fa271eb8f6 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java
@@ -209,6 +209,11 @@ public class IcebergSinkCDCIT extends TestSuiteBase 
implements TestResource {
     }
 
     @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason =
+                    "Currently SPARK do not support cdc. In addition, 
currently only the zeta engine supports schema evolution for pr 
https://github.com/apache/seatunnel/pull/5125.";)
     public void testMysqlCdcCheckSchemaChangeE2e(TestContainer container)
             throws IOException, InterruptedException {
         // Clear related content to ensure that multiple operations are not 
affected

Reply via email to