This is an automated email from the ASF dual-hosted git repository. dailai pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 45a7a715a2 [Improve][Connector-V2] Reduce the create times of iceberg sink writer (#8155) 45a7a715a2 is described below commit 45a7a715a2ae4e6577b049bd9fb21ccbb3aab68f Author: Jia Fan <fanjiaemi...@qq.com> AuthorDate: Thu Dec 5 09:34:54 2024 +0800 [Improve][Connector-V2] Reduce the create times of iceberg sink writer (#8155) --- .../seatunnel/iceberg/sink/IcebergSinkWriter.java | 14 +++++++++----- .../iceberg/sink/commit/IcebergFilesCommitter.java | 8 ++++---- .../seatunnel/iceberg/sink/writer/IcebergRecordWriter.java | 7 ++++--- .../seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java | 5 +++++ 4 files changed, 22 insertions(+), 12 deletions(-) diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.java index 2175080ac7..1028ae21b4 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.java @@ -61,7 +61,7 @@ public class IcebergSinkWriter private SeaTunnelRowType rowType; private final SinkConfig config; private final IcebergTableLoader icebergTableLoader; - private RecordWriter writer; + private volatile RecordWriter writer; private final IcebergFilesCommitter filesCommitter; private final List<WriteResult> results = Lists.newArrayList(); private String commitUser = UUID.randomUUID().toString(); @@ -79,7 +79,6 @@ public class IcebergSinkWriter this.rowType = tableSchema.toPhysicalRowDataType(); this.filesCommitter = IcebergFilesCommitter.of(config, icebergTableLoader); this.dataTypeChangeEventHandler = new DataTypeChangeEventDispatcher(); - tryCreateRecordWriter(); if (Objects.nonNull(states) && !states.isEmpty()) { this.commitUser = states.get(0).getCommitUser(); preCommit(states); @@ -107,8 +106,7 @@ public class IcebergSinkWriter public static IcebergSinkWriter of( SinkConfig config, CatalogTable catalogTable, List<IcebergSinkState> states) { - IcebergTableLoader icebergTableLoader = - IcebergTableLoader.create(config, catalogTable).open(); + IcebergTableLoader icebergTableLoader = IcebergTableLoader.create(config, catalogTable); return new IcebergSinkWriter( icebergTableLoader, config, catalogTable.getTableSchema(), states); } @@ -121,7 +119,12 @@ public class IcebergSinkWriter @Override public Optional<IcebergCommitInfo> prepareCommit() throws IOException { - List<WriteResult> writeResults = writer.complete(); + List<WriteResult> writeResults; + if (writer != null) { + writeResults = writer.complete(); + } else { + writeResults = Collections.emptyList(); + } IcebergCommitInfo icebergCommitInfo = new IcebergCommitInfo(writeResults); this.results.addAll(writeResults); return Optional.of(icebergCommitInfo); @@ -134,6 +137,7 @@ public class IcebergSinkWriter log.info("changed rowType before: {}", fieldsInfo(rowType)); this.rowType = dataTypeChangeEventHandler.reset(rowType).apply(event); log.info("changed rowType after: {}", fieldsInfo(rowType)); + tryCreateRecordWriter(); writer.applySchemaChange(this.rowType, event); } } diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergFilesCommitter.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergFilesCommitter.java index 0b5e473440..5e44e1d875 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergFilesCommitter.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergFilesCommitter.java @@ -54,12 +54,10 @@ public class IcebergFilesCommitter implements Serializable { public void doCommit(List<WriteResult> results) { TableIdentifier tableIdentifier = icebergTableLoader.getTableIdentifier(); - Table table = icebergTableLoader.loadTable(); - log.info("do commit table : " + table.toString()); - commit(tableIdentifier, table, results); + commit(tableIdentifier, results); } - private void commit(TableIdentifier tableIdentifier, Table table, List<WriteResult> results) { + private void commit(TableIdentifier tableIdentifier, List<WriteResult> results) { List<DataFile> dataFiles = results.stream() .filter(payload -> payload.getDataFiles() != null) @@ -77,6 +75,8 @@ public class IcebergFilesCommitter implements Serializable { if (dataFiles.isEmpty() && deleteFiles.isEmpty()) { log.info(String.format("Nothing to commit to table %s, skipping", tableIdentifier)); } else { + Table table = icebergTableLoader.loadTable(); + log.info("do commit table : {}", table.toString()); if (deleteFiles.isEmpty()) { AppendFiles append = table.newAppend(); if (branch != null) { diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java index 9c8949ba1d..22d0480aa4 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java @@ -52,7 +52,7 @@ public class IcebergRecordWriter implements RecordWriter { private final Table table; private final SinkConfig config; private final List<WriteResult> writerResults; - private TaskWriter<Record> writer; + private volatile TaskWriter<Record> writer; private RowConverter recordConverter; private final IcebergWriterFactory writerFactory; @@ -62,7 +62,6 @@ public class IcebergRecordWriter implements RecordWriter { this.writerResults = Lists.newArrayList(); this.recordConverter = new RowConverter(table, config); this.writerFactory = writerFactory; - this.writer = createTaskWriter(); } private TaskWriter<Record> createTaskWriter() { @@ -71,6 +70,9 @@ public class IcebergRecordWriter implements RecordWriter { @Override public void write(SeaTunnelRow seaTunnelRow, SeaTunnelRowType rowType) { + if (writer == null) { + resetWriter(); + } SchemaChangeWrapper updates = new SchemaChangeWrapper(); Record record = recordConverter.convert(seaTunnelRow, rowType, updates); if (!updates.empty()) { @@ -139,7 +141,6 @@ public class IcebergRecordWriter implements RecordWriter { flush(); List<WriteResult> result = Lists.newArrayList(writerResults); writerResults.clear(); - resetWriter(); return result; } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java index a7cbba8b89..fa271eb8f6 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java @@ -209,6 +209,11 @@ public class IcebergSinkCDCIT extends TestSuiteBase implements TestResource { } @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = + "Currently SPARK do not support cdc. In addition, currently only the zeta engine supports schema evolution for pr https://github.com/apache/seatunnel/pull/5125.") public void testMysqlCdcCheckSchemaChangeE2e(TestContainer container) throws IOException, InterruptedException { // Clear related content to ensure that multiple operations are not affected