This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f49b263e65 [Improve][Connector-V2] Remove hard code iceberg table 
format version (#7500)
f49b263e65 is described below

commit f49b263e6517cee13a7c853a83472ce863f4b9d1
Author: Jia Fan <fanjiaemi...@qq.com>
AuthorDate: Thu Aug 29 22:41:19 2024 +0800

    [Improve][Connector-V2] Remove hard code iceberg table format version 
(#7500)
---
 docs/en/faq.md                                     | 21 -----------
 docs/zh/faq.md                                     | 21 -----------
 .../seatunnel/iceberg/IcebergCatalogLoader.java    | 11 ++----
 .../seatunnel/iceberg/catalog/IcebergCatalog.java  | 37 +++++++++----------
 .../seatunnel/iceberg/data/RowConverter.java       | 22 ++++-------
 .../seatunnel/iceberg/sink/IcebergSink.java        |  6 +--
 .../seatunnel/iceberg/sink/IcebergSinkWriter.java  | 10 ++---
 .../iceberg/sink/writer/IcebergRecordWriter.java   | 12 +-----
 .../iceberg/sink/writer/IcebergWriterFactory.java  |  4 --
 .../seatunnel/iceberg/source/IcebergSource.java    | 10 ++---
 .../seatunnel/iceberg/utils/SchemaUtils.java       | 43 +++++-----------------
 11 files changed, 51 insertions(+), 146 deletions(-)

diff --git a/docs/en/faq.md b/docs/en/faq.md
index 2e50c9d461..02c125ad4f 100644
--- a/docs/en/faq.md
+++ b/docs/en/faq.md
@@ -203,23 +203,6 @@ spark {
 }
 ```
 
-## How do I specify a different JDK version for SeaTunnel on YARN?
-
-For example, if you want to set the JDK version to JDK8, there are two cases:
-
-- The YARN cluster has deployed JDK8, but the default JDK is not JDK8. Add two 
configurations to the SeaTunnel config file:
-
-  ```
-    env {
-   ...
-   spark.executorEnv.JAVA_HOME="/your/java_8_home/directory"
-   spark.yarn.appMasterEnv.JAVA_HOME="/your/java_8_home/directory"
-   ...
-  }
-  ```
-- YARN cluster does not deploy JDK8. At this time, start SeaTunnel attached 
with JDK8. For detailed operations, see:
-  https://www.cnblogs.com/jasondan/p/spark-specific-jdk-version.html
-
 ## What should I do if OOM always appears when running SeaTunnel in Spark 
local[*] mode?
 
 If you run in local mode, you need to modify the `start-seatunnel.sh` startup 
script. After `spark-submit`, add a parameter `--driver-memory 4g` . Under 
normal circumstances, local mode is not used in the production environment. 
Therefore, this parameter generally does not need to be set during On YARN. 
See: [Application 
Properties](https://spark.apache.org/docs/latest/configuration.html#application-properties)
 for details.
@@ -334,10 +317,6 @@ spark-submit --verbose
     ...
 ```
 
-## How do I use SeaTunnel to synchronize data across HDFS clusters?
-
-Just configure hdfs-site.xml properly. Refer to: 
https://www.cnblogs.com/suanec/p/7828139.html.
-
 ## I want to learn the source code of SeaTunnel. Where should I start?
 
 SeaTunnel has a completely abstract and structured code implementation, and 
many people have chosen SeaTunnel As a way to learn Spark. You can learn the 
source code from the main program entry: SeaTunnel.java
diff --git a/docs/zh/faq.md b/docs/zh/faq.md
index 3be6ce38e5..4fc24e6a3a 100644
--- a/docs/zh/faq.md
+++ b/docs/zh/faq.md
@@ -204,23 +204,6 @@ spark {
 }
 ```
 
-## 如何为 YARN 上的 SeaTunnel 指定不同的 JDK 版本?
-
-例如要设置JDK版本为JDK8,有两种情况:
-
-- YARN集群已部署JDK8,但默认JDK不是JDK8。 在 SeaTunnel 配置文件中添加两个配置:
-
-  ```
-    env {
-   ...
-   spark.executorEnv.JAVA_HOME="/your/java_8_home/directory"
-   spark.yarn.appMasterEnv.JAVA_HOME="/your/java_8_home/directory"
-   ...
-  }
-  ```
-- YARN集群未部署JDK8。 此时,启动附带JDK8的SeaTunnel。 详细操作参见:
-  https://www.cnblogs.com/jasondan/p/spark-specific-jdk-version.html
-
 ## Spark local[*]模式运行SeaTunnel时总是出现OOM怎么办?
 
 如果以本地模式运行,则需要修改`start-seatunnel.sh`启动脚本。 在 `spark-submit` 之后添加参数 
`--driver-memory 4g` 。 一般情况下,生产环境中不使用本地模式。 因此,On YARN时一般不需要设置该参数。 
有关详细信息,请参阅:[应用程序属性](https://spark.apache.org/docs/latest/configuration.html#application-properties)。
@@ -335,10 +318,6 @@ spark-submit --verbose
     ...
 ```
 
-## 如何使用SeaTunnel跨HDFS集群同步数据?
-
-只需正确配置 hdfs-site.xml 即可。 参考:https://www.cnblogs.com/suanec/p/7828139.html。
-
 ## 我想学习SeaTunnel的源代码。 我应该从哪里开始?
 
 SeaTunnel 拥有完全抽象、结构化的代码实现,很多人都选择 SeaTunnel 作为学习 Spark 的方式。 
您可以从主程序入口了解源代码:SeaTunnel.java
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogLoader.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogLoader.java
index 0f4610783a..bbb590502c 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogLoader.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogLoader.java
@@ -50,25 +50,20 @@ public class IcebergCatalogLoader implements Serializable {
     private static final long serialVersionUID = -6003040601422350869L;
     private static final List<String> HADOOP_CONF_FILES =
             ImmutableList.of("core-site.xml", "hdfs-site.xml", 
"hive-site.xml");
-    private CommonConfig config;
+    private final CommonConfig config;
 
     public IcebergCatalogLoader(CommonConfig config) {
         this.config = config;
     }
 
     public Catalog loadCatalog() {
-        // When using the seatunel engine, set the current class loader to 
prevent loading failures
+        // When using the SeaTunnel engine, set the current class loader to 
prevent loading failures
         
Thread.currentThread().setContextClassLoader(IcebergCatalogLoader.class.getClassLoader());
         return CatalogUtil.buildIcebergCatalog(
                 config.getCatalogName(), config.getCatalogProps(), 
loadHadoopConfig(config));
     }
 
-    /**
-     * Loading Hadoop configuration through reflection
-     *
-     * @param config
-     * @return
-     */
+    /** Loading Hadoop configuration through reflection */
     public Object loadHadoopConfig(CommonConfig config) {
         Class<?> configClass =
                 DynClasses.builder()
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
index 520f9bdbac..fc28001b2c 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
@@ -58,9 +58,9 @@ import static 
org.apache.seatunnel.connectors.seatunnel.iceberg.utils.SchemaUtil
 
 @Slf4j
 public class IcebergCatalog implements Catalog {
-    private String catalogName;
-    private ReadonlyConfig readonlyConfig;
-    private IcebergCatalogLoader icebergCatalogLoader;
+    private final String catalogName;
+    private final ReadonlyConfig readonlyConfig;
+    private final IcebergCatalogLoader icebergCatalogLoader;
     private org.apache.iceberg.catalog.Catalog catalog;
 
     public IcebergCatalog(String catalogName, ReadonlyConfig readonlyConfig) {
@@ -224,22 +224,21 @@ public class IcebergCatalog implements Catalog {
     public CatalogTable toCatalogTable(Table icebergTable, TablePath 
tablePath) {
         List<Types.NestedField> columns = icebergTable.schema().columns();
         TableSchema.Builder builder = TableSchema.builder();
-        columns.stream()
-                .forEach(
-                        nestedField -> {
-                            String name = nestedField.name();
-                            SeaTunnelDataType<?> seaTunnelType =
-                                    SchemaUtils.toSeaTunnelType(name, 
nestedField.type());
-                            PhysicalColumn physicalColumn =
-                                    PhysicalColumn.of(
-                                            name,
-                                            seaTunnelType,
-                                            (Long) null,
-                                            true,
-                                            null,
-                                            nestedField.doc());
-                            builder.column(physicalColumn);
-                        });
+        columns.forEach(
+                nestedField -> {
+                    String name = nestedField.name();
+                    SeaTunnelDataType<?> seaTunnelType =
+                            SchemaUtils.toSeaTunnelType(name, 
nestedField.type());
+                    PhysicalColumn physicalColumn =
+                            PhysicalColumn.of(
+                                    name,
+                                    seaTunnelType,
+                                    (Long) null,
+                                    true,
+                                    null,
+                                    nestedField.doc());
+                    builder.column(physicalColumn);
+                });
 
         List<String> partitionKeys =
                 icebergTable.spec().fields().stream()
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/RowConverter.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/RowConverter.java
index 8c699b3440..f46928456f 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/RowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/RowConverter.java
@@ -92,17 +92,17 @@ public class RowConverter {
         return nameMappingString != null ? 
NameMappingParser.fromJson(nameMappingString) : null;
     }
 
-    public Record convert(Object row, SeaTunnelDataType rowType) {
+    public Record convert(Object row, SeaTunnelDataType<?> rowType) {
         return convertStructValue(row, rowType, tableSchema.asStruct(), -1, 
null);
     }
 
-    public Record convert(Object row, SeaTunnelDataType rowType, 
SchemaChangeWrapper wrapper) {
+    public Record convert(Object row, SeaTunnelDataType<?> rowType, 
SchemaChangeWrapper wrapper) {
         return convertStructValue(row, rowType, tableSchema.asStruct(), -1, 
wrapper);
     }
 
     protected GenericRecord convertStructValue(
             Object value,
-            SeaTunnelDataType fromType,
+            SeaTunnelDataType<?> fromType,
             Types.StructType schema,
             int parentFieldId,
             SchemaChangeWrapper wrapper) {
@@ -120,15 +120,7 @@ public class RowConverter {
         }
     }
 
-    /**
-     * Convert RowType
-     *
-     * @param row
-     * @param fromType
-     * @param schema
-     * @param structFieldId
-     * @return
-     */
+    /** Convert RowType */
     private GenericRecord convertToStruct(
             SeaTunnelRow row,
             SeaTunnelRowType fromType,
@@ -179,7 +171,7 @@ public class RowConverter {
 
     public Object convertValue(
             Object value,
-            SeaTunnelDataType fromType,
+            SeaTunnelDataType<?> fromType,
             Type type,
             int fieldId,
             SchemaChangeWrapper wrapper) {
@@ -252,7 +244,7 @@ public class RowConverter {
 
     protected List<Object> convertListValue(
             Object value,
-            SeaTunnelDataType fromType,
+            SeaTunnelDataType<?> fromType,
             Types.ListType type,
             SchemaChangeWrapper wrapper) {
         Preconditions.checkArgument(value.getClass().isArray());
@@ -269,7 +261,7 @@ public class RowConverter {
 
     protected Map<Object, Object> convertMapValue(
             Object value,
-            SeaTunnelDataType fromType,
+            SeaTunnelDataType<?> fromType,
             Types.MapType type,
             SchemaChangeWrapper wrapper) {
         Preconditions.checkArgument(value instanceof Map);
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java
index 65bccbdb89..a1d43d6acf 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java
@@ -61,9 +61,9 @@ public class IcebergSink
                 SupportSaveMode,
                 SupportMultiTableSink {
     private static String PLUGIN_NAME = "Iceberg";
-    private SinkConfig config;
-    private ReadonlyConfig readonlyConfig;
-    private CatalogTable catalogTable;
+    private final SinkConfig config;
+    private final ReadonlyConfig readonlyConfig;
+    private final CatalogTable catalogTable;
 
     public IcebergSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) 
{
         this.readonlyConfig = pluginConfig;
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.java
index aed6522ca8..3a5e22b93b 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.java
@@ -54,13 +54,12 @@ public class IcebergSinkWriter
         implements SinkWriter<SeaTunnelRow, IcebergCommitInfo, 
IcebergSinkState>,
                 SupportMultiTableSinkWriter<Void> {
     private SeaTunnelRowType rowType;
-    private SinkConfig config;
-    private IcebergTableLoader icebergTableLoader;
+    private final SinkConfig config;
+    private final IcebergTableLoader icebergTableLoader;
     private RecordWriter writer;
-    private IcebergFilesCommitter filesCommitter;
-    private List<WriteResult> results = Lists.newArrayList();
+    private final IcebergFilesCommitter filesCommitter;
+    private final List<WriteResult> results = Lists.newArrayList();
     private String commitUser = UUID.randomUUID().toString();
-    private long checkpointId;
 
     private final DataTypeChangeEventHandler dataTypeChangeEventHandler;
 
@@ -77,7 +76,6 @@ public class IcebergSinkWriter
         tryCreateRecordWriter();
         if (Objects.nonNull(states) && !states.isEmpty()) {
             this.commitUser = states.get(0).getCommitUser();
-            this.checkpointId = states.get(0).getCheckpointId();
             preCommit(states);
         }
     }
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java
index 2be206ebb6..06b48591df 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java
@@ -54,7 +54,7 @@ public class IcebergRecordWriter implements RecordWriter {
     private final List<WriteResult> writerResults;
     private TaskWriter<Record> writer;
     private RowConverter recordConverter;
-    private IcebergWriterFactory writerFactory;
+    private final IcebergWriterFactory writerFactory;
 
     public IcebergRecordWriter(Table table, IcebergWriterFactory 
writerFactory, SinkConfig config) {
         this.config = config;
@@ -122,12 +122,7 @@ public class IcebergRecordWriter implements RecordWriter {
         }
     }
 
-    /**
-     * apply schema update
-     *
-     * @param updates
-     * @return
-     */
+    /** apply schema update */
     private void applySchemaUpdate(SchemaChangeWrapper updates) {
         // complete the current file
         flush();
@@ -169,7 +164,4 @@ public class IcebergRecordWriter implements RecordWriter {
                         table.spec().partitionType()));
         writer = null;
     }
-
-    @Override
-    public void close() {}
 }
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergWriterFactory.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergWriterFactory.java
index 67809088ef..2ee7c3d6d7 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergWriterFactory.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergWriterFactory.java
@@ -40,9 +40,6 @@ import org.apache.iceberg.io.UnpartitionedWriter;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.PropertyUtil;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import lombok.extern.slf4j.Slf4j;
 
 import java.util.List;
@@ -58,7 +55,6 @@ import static 
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DE
 
 @Slf4j
 public class IcebergWriterFactory {
-    private static final Logger LOG = 
LoggerFactory.getLogger(IcebergWriterFactory.class);
     private final IcebergTableLoader tableLoader;
     private final SinkConfig config;
 
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSource.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSource.java
index 7a2fdf9d4f..c56f3f2f00 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSource.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSource.java
@@ -65,12 +65,12 @@ public class IcebergSource
 
     private static final long serialVersionUID = 4343414808223919870L;
 
-    private SourceConfig sourceConfig;
-    private Schema tableSchema;
-    private Schema projectedSchema;
-    private SeaTunnelRowType seaTunnelRowType;
+    private final SourceConfig sourceConfig;
+    private final Schema tableSchema;
+    private final Schema projectedSchema;
+    private final SeaTunnelRowType seaTunnelRowType;
     private JobContext jobContext;
-    private CatalogTable catalogTable;
+    private final CatalogTable catalogTable;
 
     public IcebergSource(ReadonlyConfig config, CatalogTable catalogTable) {
         this.sourceConfig = SourceConfig.loadConfig(config);
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
index 6c99eb409c..01343a119f 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
@@ -40,7 +40,6 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.UpdateSchema;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -106,21 +105,11 @@ public class SchemaUtils {
         SinkConfig config = new SinkConfig(readonlyConfig);
         // build auto create table
         Map<String, String> options = new HashMap<>(table.getOptions());
-        options.put(TableProperties.FORMAT_VERSION, "2");
         // override
         options.putAll(config.getAutoCreateProps());
         return createTable(catalog, toIcebergTableIdentifier(tablePath), 
config, schema, options);
     }
 
-    /**
-     * For local test
-     *
-     * @param catalog
-     * @param tableIdentifier
-     * @param config
-     * @param rowType
-     * @return
-     */
     public static Table autoCreateTable(
             Catalog catalog,
             TableIdentifier tableIdentifier,
@@ -180,7 +169,7 @@ public class SchemaUtils {
                     Optional<Integer> pkId =
                             structType.fields().stream()
                                     .filter(nestedField -> 
nestedField.name().equals(pk))
-                                    .map(nestedField -> nestedField.fieldId())
+                                    .map(Types.NestedField::fieldId)
                                     .findFirst();
                     if (!pkId.isPresent()) {
                         throw new IllegalArgumentException(
@@ -196,23 +185,14 @@ public class SchemaUtils {
         structType
                 .fields()
                 .forEach(
-                        field -> {
-                            fields.add(
-                                    
identifierFieldIds.contains(field.fieldId())
-                                            ? field.asRequired()
-                                            : field.asOptional());
-                        });
+                        field ->
+                                fields.add(
+                                        
identifierFieldIds.contains(field.fieldId())
+                                                ? field.asRequired()
+                                                : field.asOptional()));
         return new Schema(fields, identifierFieldIds);
     }
 
-    public static TableIdentifier toIcebergTableIdentifierFromCatalogTable(
-            CatalogTable catalogTable) {
-        org.apache.seatunnel.api.table.catalog.TableIdentifier tableIdentifier 
=
-                catalogTable.getTableId();
-        return TableIdentifier.of(
-                tableIdentifier.getDatabaseName(), 
tableIdentifier.getTableName());
-    }
-
     public static TableIdentifier toIcebergTableIdentifier(TablePath 
tablePath) {
         return TableIdentifier.of(tablePath.getDatabaseName(), 
tablePath.getTableName());
     }
@@ -221,12 +201,7 @@ public class SchemaUtils {
         return TablePath.of(tableIdentifier.namespace().toString(), 
tableIdentifier.name());
     }
 
-    /**
-     * Commit table schema updates
-     *
-     * @param table
-     * @param wrapper
-     */
+    /** Commit table schema updates */
     private static void commitSchemaUpdates(Table table, SchemaChangeWrapper 
wrapper) {
         // get the latest schema in case another process updated it
         table.refresh();
@@ -249,7 +224,7 @@ public class SchemaUtils {
                         .collect(toList());
 
         // Rename column name
-        List<SchemaChangeColumn> changeColumns = 
wrapper.changeColumns().stream().collect(toList());
+        List<SchemaChangeColumn> changeColumns = new 
ArrayList<>(wrapper.changeColumns());
 
         if (addColumns.isEmpty()
                 && modifyColumns.isEmpty()
@@ -294,7 +269,7 @@ public class SchemaUtils {
         return IcebergTypeMapper.mapping(fieldName, type);
     }
 
-    public static Type toIcebergType(SeaTunnelDataType rowType) {
+    public static Type toIcebergType(SeaTunnelDataType<?> rowType) {
         return IcebergTypeMapper.toIcebergType(rowType);
     }
 

Reply via email to