This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d7fc81e67 [INLONG-8643][Sort] Support Iceberg source (#8818)
5d7fc81e67 is described below

commit 5d7fc81e6761bba8289b8f862af92eacc487771a
Author: vernedeng <verned...@apache.org>
AuthorDate: Fri Sep 1 14:31:40 2023 +0800

    [INLONG-8643][Sort] Support Iceberg source (#8818)
---
 .../resource/sort/DefaultSortConfigOperator.java   |   1 -
 .../sort/protocol/constant/IcebergConstant.java    |  13 +
 .../inlong/sort/protocol/node/ExtractNode.java     |   2 +
 .../org/apache/inlong/sort/protocol/node/Node.java |   2 +
 .../IcebergExtracNode.java}                        | 107 +--
 .../sort/protocol/node/load/IcebergLoadNode.java   |  12 +-
 .../parser/Iceberg2StarRocksSqlParserTest.java     | 133 ++++
 inlong-sort/sort-flink/sort-flink-v1.13/pom.xml    |   3 +-
 inlong-sort/sort-flink/sort-flink-v1.15/pom.xml    |   5 -
 .../sort-connectors/iceberg/pom.xml                | 139 ++++
 .../apache/inlong/sort/iceberg/FlinkCatalog.java   | 812 +++++++++++++++++++++
 .../inlong/sort/iceberg/FlinkCatalogFactory.java   | 216 ++++++
 .../sort/iceberg/FlinkDynamicTableFactory.java     | 205 ++++++
 .../sort/iceberg/FlinkEnvironmentContext.java      |  35 +
 .../org.apache.flink.table.factories.Factory       |  18 +
 .../org.apache.flink.table.factories.TableFactory  |  16 +
 .../sort-flink-v1.15/sort-connectors/pom.xml       |   1 +
 licenses/inlong-sort-connectors/LICENSE            |   8 +
 pom.xml                                            |   4 +-
 19 files changed, 1666 insertions(+), 66 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index 97683f1ac3..37429cf2a5 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -253,5 +253,4 @@ public class DefaultSortConfigOperator implements 
SortConfigOperator {
         groupInfo.getExtList().removeIf(ext -> 
extInfo.getKeyName().equals(ext.getKeyName()));
         groupInfo.getExtList().add(extInfo);
     }
-
 }
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
index 70ea0d5158..676f7f4435 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
@@ -22,6 +22,19 @@ package org.apache.inlong.sort.protocol.constant;
  */
 public class IcebergConstant {
 
+    public static final String DEFAULT_CATALOG_NAME = "ICEBERG_HIVE";
+    public static final String CONNECTOR_KEY = "connector";
+    public static final String CONNECTOR = "iceberg-inlong";
+    public static final String DATABASE_KEY = "catalog-database";
+    public static final String DEFAULT_DATABASE_KEY = "default-database";
+    public static final String TABLE_KEY = "catalog-table";
+    public static final String CATALOG_TYPE_KEY = "catalog-type";
+    public static final String CATALOG_NAME_KEY = "catalog-name";
+    public static final String URI_KEY = "uri";
+    public static final String WAREHOUSE_KEY = "warehouse";
+    public static final String START_SNAPSHOT_ID = "start-snapshot-id";
+    public static final String STREAMING = "streaming";
+
     /**
      * Iceberg supported catalog type
      */
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
index 3c27b6a407..543f8cf3be 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
@@ -21,6 +21,7 @@ import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.node.extract.DorisExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.FileSystemExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.HudiExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.IcebergExtracNode;
 import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
@@ -64,6 +65,7 @@ import java.util.Map;
         @JsonSubTypes.Type(value = RedisExtractNode.class, name = 
"redisExtract"),
         @JsonSubTypes.Type(value = DorisExtractNode.class, name = 
"dorisExtract"),
         @JsonSubTypes.Type(value = HudiExtractNode.class, name = 
"hudiExtract"),
+        @JsonSubTypes.Type(value = IcebergExtracNode.class, name = 
"icebergExtract"),
 })
 @Data
 @NoArgsConstructor
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
index ecd704815b..f755f439d7 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
@@ -21,6 +21,7 @@ import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.node.extract.DorisExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.FileSystemExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.HudiExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.IcebergExtracNode;
 import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
@@ -78,6 +79,7 @@ import java.util.TreeMap;
         @JsonSubTypes.Type(value = RedisExtractNode.class, name = 
"redisExtract"),
         @JsonSubTypes.Type(value = DorisExtractNode.class, name = 
"dorisExtract"),
         @JsonSubTypes.Type(value = HudiExtractNode.class, name = 
"hudiExtract"),
+        @JsonSubTypes.Type(value = IcebergExtracNode.class, name = 
"icebergExtract"),
         @JsonSubTypes.Type(value = TransformNode.class, name = 
"baseTransform"),
         @JsonSubTypes.Type(value = DistinctNode.class, name = "distinct"),
         @JsonSubTypes.Type(value = KafkaLoadNode.class, name = "kafkaLoad"),
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtracNode.java
similarity index 53%
copy from 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
copy to 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtracNode.java
index 257290119a..b876d96aff 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtracNode.java
@@ -15,22 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.protocol.node.load;
+package org.apache.inlong.sort.protocol.node.extract;
 
 import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.InlongMetric;
 import org.apache.inlong.sort.protocol.constant.IcebergConstant;
-import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
-import org.apache.inlong.sort.protocol.enums.FilterStrategy;
-import org.apache.inlong.sort.protocol.node.LoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelation;
-import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.transformation.WatermarkField;
 
-import com.google.common.base.Preconditions;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
-import lombok.NoArgsConstructor;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
 
@@ -41,13 +35,14 @@ import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 
-@JsonTypeName("icebergLoad")
-@Data
-@NoArgsConstructor
+/**
+ * Iceberg extract node for extract data from iceberg
+ */
 @EqualsAndHashCode(callSuper = true)
-public class IcebergLoadNode extends LoadNode implements InlongMetric, 
Serializable {
-
-    private static final long serialVersionUID = -1L;
+@JsonTypeName("icebergExtract")
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@Data
+public class IcebergExtracNode extends ExtractNode implements Serializable {
 
     @JsonProperty("tableName")
     @Nonnull
@@ -57,67 +52,77 @@ public class IcebergLoadNode extends LoadNode implements 
InlongMetric, Serializa
     @Nonnull
     private String dbName;
 
-    @JsonProperty("primaryKey")
-    private String primaryKey;
-
     @JsonProperty("catalogType")
     private IcebergConstant.CatalogType catalogType;
 
+    @Nullable
     @JsonProperty("uri")
     private String uri;
 
     @JsonProperty("warehouse")
     private String warehouse;
 
-    @JsonCreator
-    public IcebergLoadNode(@JsonProperty("id") String id,
-            @JsonProperty("name") String name,
-            @JsonProperty("fields") List<FieldInfo> fields,
-            @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
-            @JsonProperty("filters") List<FilterFunction> filters,
-            @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
-            @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
-            @JsonProperty("properties") Map<String, String> properties,
+    @JsonProperty("catalogName")
+    private String catalogName;
+
+    @JsonProperty("primaryKey")
+    private String primaryKey;
+
+    @JsonProperty("startSnapShotId")
+    @Nullable
+    private Long startSnapShotId;
+
+    public IcebergExtracNode(
+            @Nonnull @JsonProperty("id") String id,
+            @Nonnull @JsonProperty("name") String name,
+            @Nonnull @JsonProperty("fields") List<FieldInfo> fields,
+            @Nullable @JsonProperty("watermarkField") WatermarkField 
watermarkField,
+            @Nullable @JsonProperty("uri") String uri,
+            @Nullable @JsonProperty("warehouse") String warehouse,
             @Nonnull @JsonProperty("dbName") String dbName,
             @Nonnull @JsonProperty("tableName") String tableName,
-            @JsonProperty("primaryKey") String primaryKey,
             @JsonProperty("catalogType") IcebergConstant.CatalogType 
catalogType,
-            @JsonProperty("uri") String uri,
-            @JsonProperty("warehouse") String warehouse) {
-        super(id, name, fields, fieldRelations, filters, filterStrategy, 
sinkParallelism, properties);
-        this.tableName = Preconditions.checkNotNull(tableName, "table name is 
null");
-        this.dbName = Preconditions.checkNotNull(dbName, "db name is null");
-        this.primaryKey = primaryKey;
-        this.catalogType = catalogType == null ? CatalogType.HIVE : 
catalogType;
+            @Nullable @JsonProperty("catalogName") String catalogName,
+            @JsonProperty("primaryKey") String primaryKey,
+            @Nullable @JsonProperty("startSnapShotId") Long startSnapShotId,
+            @Nullable @JsonProperty("properties") Map<String, String> 
properties) {
+        super(id, name, fields, watermarkField, properties);
         this.uri = uri;
         this.warehouse = warehouse;
+        this.dbName = dbName;
+        this.tableName = tableName;
+        this.catalogName = catalogName == null ? 
IcebergConstant.DEFAULT_CATALOG_NAME : catalogName;
+        this.primaryKey = primaryKey;
+        this.startSnapShotId = startSnapShotId;
+        this.catalogType = catalogType == null ? 
IcebergConstant.CatalogType.HIVE : catalogType;
+    }
+
+    @Override
+    public String genTableName() {
+        return String.format("iceberg_table_%s", getId());
     }
 
     @Override
     public Map<String, String> tableOptions() {
         Map<String, String> options = super.tableOptions();
-        options.put("connector", "iceberg-inlong");
-        // for test sink.ignore.changelog
-        // options.put("sink.ignore.changelog", "true");
-        options.put("catalog-database", dbName);
-        options.put("catalog-table", tableName);
-        options.put("default-database", dbName);
-        options.put("catalog-type", catalogType.name());
-        options.put("catalog-name", catalogType.name());
+        options.put(IcebergConstant.CONNECTOR_KEY, IcebergConstant.CONNECTOR);
+        options.put(IcebergConstant.DATABASE_KEY, dbName);
+        options.put(IcebergConstant.TABLE_KEY, tableName);
+        options.put(IcebergConstant.CATALOG_TYPE_KEY, catalogType.name());
+        options.put(IcebergConstant.CATALOG_NAME_KEY, catalogName);
+        options.put(IcebergConstant.STREAMING, "true");
         if (null != uri) {
-            options.put("uri", uri);
+            options.put(IcebergConstant.URI_KEY, uri);
         }
         if (null != warehouse) {
-            options.put("warehouse", warehouse);
+            options.put(IcebergConstant.WAREHOUSE_KEY, warehouse);
+        }
+        if (null != startSnapShotId) {
+            options.put(IcebergConstant.START_SNAPSHOT_ID, 
startSnapShotId.toString());
         }
         return options;
     }
 
-    @Override
-    public String genTableName() {
-        return tableName;
-    }
-
     @Override
     public String getPrimaryKey() {
         return primaryKey;
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
index 257290119a..4d90d99c5a 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
@@ -96,14 +96,14 @@ public class IcebergLoadNode extends LoadNode implements 
InlongMetric, Serializa
     @Override
     public Map<String, String> tableOptions() {
         Map<String, String> options = super.tableOptions();
-        options.put("connector", "iceberg-inlong");
+        options.put(IcebergConstant.CONNECTOR_KEY, IcebergConstant.CONNECTOR);
         // for test sink.ignore.changelog
         // options.put("sink.ignore.changelog", "true");
-        options.put("catalog-database", dbName);
-        options.put("catalog-table", tableName);
-        options.put("default-database", dbName);
-        options.put("catalog-type", catalogType.name());
-        options.put("catalog-name", catalogType.name());
+        options.put(IcebergConstant.DATABASE_KEY, dbName);
+        options.put(IcebergConstant.TABLE_KEY, tableName);
+        options.put(IcebergConstant.DEFAULT_DATABASE_KEY, dbName);
+        options.put(IcebergConstant.CATALOG_TYPE_KEY, catalogType.name());
+        options.put(IcebergConstant.CATALOG_NAME_KEY, catalogType.name());
         if (null != uri) {
             options.put("uri", uri);
         }
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/Iceberg2StarRocksSqlParserTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/Iceberg2StarRocksSqlParserTest.java
new file mode 100644
index 0000000000..5ec917debe
--- /dev/null
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/Iceberg2StarRocksSqlParserTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.parser;
+
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.parser.result.FlinkSqlParseResult;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.constant.IcebergConstant;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.IcebergExtracNode;
+import org.apache.inlong.sort.protocol.node.load.StarRocksLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class Iceberg2StarRocksSqlParserTest extends AbstractTestBase {
+
+    private String groupId = "b_test_wk_0801";
+    private String streamId = "b_test_wkstream_0801";
+
+    // iceberg
+    private String uri = "";
+    private String icDatabase = "";
+    private String icTable = "";
+    private String catalogName = "HIVE";
+    private String warehouse = "";
+
+    // starrocks
+    private String user = "";
+    private String password = "";
+    private String jdbc = "";
+    private String srDatabase = "";
+    private String srTable = "";
+    private String primaryKey = "id";
+    private String loadUrl = "";
+
+    private List<FieldInfo> fields() {
+        return Arrays.asList(
+                new FieldInfo("id", new LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("source", new StringFormatInfo()),
+                new FieldInfo("count", new LongFormatInfo()),
+                new FieldInfo("remark", new StringFormatInfo()),
+                new FieldInfo("send_time", new StringFormatInfo()));
+    }
+
+    private List<FieldRelation> relations() {
+        return fields().stream()
+                .map(info -> new FieldRelation(info, info))
+                .collect(Collectors.toList());
+    }
+
+    private IcebergExtracNode buildIcebergExtracNode(String id) {
+
+        return new IcebergExtracNode(id, "iceberg-source", fields(), null, uri,
+                warehouse, icDatabase, icTable, 
IcebergConstant.CatalogType.HIVE, catalogName,
+                null, null, null);
+
+    }
+
+    private StarRocksLoadNode buildStarRocksLoadNode(String id) {
+
+        Map<String, String> properties = new HashMap<>();
+        properties.put("sink.properties.format", "json");
+        properties.put("sink.properties.strip_outer_array", "true");
+        return new StarRocksLoadNode(id, "sink", fields(), relations(), null, 
null,
+                1, properties, jdbc, loadUrl, user, password, srDatabase,
+                srTable, primaryKey, null, null, null, null);
+    }
+
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> 
outputs) {
+        List<String> inputIds = 
inputs.stream().map(Node::getId).collect(Collectors.toList());
+        List<String> outputIds = 
outputs.stream().map(Node::getId).collect(Collectors.toList());
+        return new NodeRelation(inputIds, outputIds);
+    }
+
+    @Test
+    public void testIceberg() throws Exception {
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .inStreamingMode()
+                .build();
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
+        Node inputNode = buildIcebergExtracNode("1");
+        Node outputNode = buildStarRocksLoadNode("2");
+        StreamInfo streamInfo = new StreamInfo(streamId, 
Arrays.asList(inputNode, outputNode),
+                
Arrays.asList(buildNodeRelation(Collections.singletonList(inputNode),
+                        Collections.singletonList(outputNode))));
+        GroupInfo groupInfo = new GroupInfo(groupId, 
Collections.singletonList(streamInfo));
+
+        ObjectMapper objectMapper = new ObjectMapper();
+        System.out.println(objectMapper.writeValueAsString(groupInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, 
groupInfo);
+        FlinkSqlParseResult result = (FlinkSqlParseResult) parser.parse();
+        Assert.assertTrue(!result.getLoadSqls().isEmpty() && 
!result.getCreateTableSqls().isEmpty());
+    }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.13/pom.xml
index adbaa7ae0b..83784f0290 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/pom.xml
@@ -57,6 +57,7 @@
         <hudi.version>0.12.3</hudi.version>
         <sqlserver.jdbc.version>7.2.2.jre8</sqlserver.jdbc.version>
         <thrift.version>0.9.3</thrift.version>
+        <flink.iceberg.version>1.1.0</flink.iceberg.version>
     </properties>
 
     <dependencyManagement>
@@ -148,7 +149,7 @@
             <dependency>
                 <groupId>org.apache.iceberg</groupId>
                 <artifactId>iceberg-flink-runtime-1.14</artifactId>
-                <version>${iceberg.version}</version>
+                <version>${flink.iceberg.version}</version>
             </dependency>
 
             <!-- flink -->
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.15/pom.xml
index 0bbbcefe0d..2e039fedcd 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/pom.xml
@@ -143,11 +143,6 @@
                 <artifactId>mssql-jdbc</artifactId>
                 <version>${sqlserver.jdbc.version}</version>
             </dependency>
-            <dependency>
-                <groupId>org.apache.iceberg</groupId>
-                <artifactId>iceberg-flink-runtime-1.14</artifactId>
-                <version>${iceberg.version}</version>
-            </dependency>
 
             <!-- flink -->
             <dependency>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/pom.xml
new file mode 100644
index 0000000000..c481dd8de6
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/pom.xml
@@ -0,0 +1,139 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements. See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License. You may obtain a copy of the License at
+  ~
+  ~ http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.inlong</groupId>
+        <artifactId>sort-connectors-v1.15</artifactId>
+        <version>1.9.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>sort-connector-iceberg-v1.15</artifactId>
+    <packaging>jar</packaging>
+    <name>Apache InLong - Sort-connector-iceberg</name>
+
+    <properties>
+        <iceberg-connector.version>1.3.1</iceberg-connector.version>
+        <flink.iceberg.version>1.15</flink.iceberg.version>
+        
<inlong.root.dir>${project.parent.parent.parent.parent.parent.basedir}</inlong.root.dir>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-common</artifactId>
+            <version>${project.version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            
<artifactId>iceberg-flink-runtime-${flink.iceberg.version}</artifactId>
+            <version>${iceberg-connector.version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-hive-runtime</artifactId>
+            <version>${iceberg-connector.version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-exec</artifactId>
+            <version>${hive3x.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.parquet</groupId>
+            <artifactId>parquet-hadoop</artifactId>
+            <version>${parquet.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-hive-metastore</artifactId>
+            <scope>compile</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>${plugin.shade.version}</version>
+
+                <executions>
+                    <execution>
+                        <id>shade-flink</id>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <phase>package</phase>
+
+                        <configuration>
+
+                            <artifactSet>
+                                <includes>
+                                    <include>org.apache.inlong:*</include>
+                                    
<include>com.google.protobuf:protobuf-java</include>
+                                    <include>org.apache.kafka:*</include>
+                                    <include>com.fasterxml.*:*</include>
+                                    <include>org.apache.iceberg:*</include>
+                                    <include>org.apache.hive:*</include>
+                                    <!--  Include fixed version 18.0-13.0 of 
flink shaded guava  -->
+                                    
<include>org.apache.flink:flink-shaded-guava</include>
+                                    <include>com.google.protobuf:*</include>
+                                    <include>org.apache.thrift:*</include>
+                                    <include>com.facebook.*:*</include>
+                                </includes>
+                            </artifactSet>
+
+                            <filters>
+                                <filter>
+                                    
<artifact>org.apache.inlong:sort-connector-*</artifact>
+                                    <includes>
+                                        <include>org/apache/inlong/**</include>
+                                        
<include>META-INF/services/org.apache.flink.table.factories.Factory</include>
+                                        
<include>META-INF/services/org.apache.flink.table.factories.TableFactory</include>
+                                    </includes>
+                                </filter>
+                            </filters>
+                            <relocations>
+                                <relocation>
+                                    
<pattern>org.apache.inlong.sort.base</pattern>
+                                    
<shadedPattern>org.apache.inlong.sort.iceberg.shaded.org.apache.inlong.sort.base</shadedPattern>
+                                </relocation>
+                            </relocations>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java
new file mode 100644
index 0000000000..ba4298d9a1
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java
@@ -0,0 +1,812 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg;
+
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.util.StringUtils;
+import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.UpdateProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * A Flink Catalog implementation that wraps an Iceberg {@link Catalog}.
+ *
+ * <p>The mapping between Flink database and Iceberg namespace: Supplying a 
base namespace for a
+ * given catalog, so if you have a catalog that supports a 2-level namespace, 
you would supply the
+ * first level in the catalog configuration and the second level would be 
exposed as Flink
+ * databases.
+ *
+ * <p>The Iceberg table manages its partitions by itself. The partition of the 
Iceberg table is
+ * independent of the partition of Flink.
+ *
+ * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1
+ */
+public class FlinkCatalog extends AbstractCatalog {
+
+    private final CatalogLoader catalogLoader;
+    private final Catalog icebergCatalog;
+    private final Namespace baseNamespace;
+    private final SupportsNamespaces asNamespaceCatalog;
+    private final Closeable closeable;
+    private final boolean cacheEnabled;
+
+    public FlinkCatalog(
+            String catalogName,
+            String defaultDatabase,
+            Namespace baseNamespace,
+            CatalogLoader catalogLoader,
+            boolean cacheEnabled,
+            long cacheExpirationIntervalMs) {
+        super(catalogName, defaultDatabase);
+        this.catalogLoader = catalogLoader;
+        this.baseNamespace = baseNamespace;
+        this.cacheEnabled = cacheEnabled;
+
+        Catalog originalCatalog = catalogLoader.loadCatalog();
+        icebergCatalog =
+                cacheEnabled
+                        ? CachingCatalog.wrap(originalCatalog, 
cacheExpirationIntervalMs)
+                        : originalCatalog;
+        asNamespaceCatalog =
+                originalCatalog instanceof SupportsNamespaces ? 
(SupportsNamespaces) originalCatalog : null;
+        closeable = originalCatalog instanceof Closeable ? (Closeable) 
originalCatalog : null;
+
+        FlinkEnvironmentContext.init();
+    }
+
+    @Override
+    public void open() throws CatalogException {
+    }
+
+    @Override
+    public void close() throws CatalogException {
+        if (closeable != null) {
+            try {
+                closeable.close();
+            } catch (IOException e) {
+                throw new CatalogException(e);
+            }
+        }
+    }
+
+    public Catalog catalog() {
+        return icebergCatalog;
+    }
+
+    /** Append a new level to the base namespace */
+    private static Namespace appendLevel(Namespace baseNamespace, String 
newLevel) {
+        String[] namespace = new String[baseNamespace.levels().length + 1];
+        System.arraycopy(baseNamespace.levels(), 0, namespace, 0, 
baseNamespace.levels().length);
+        namespace[baseNamespace.levels().length] = newLevel;
+        return Namespace.of(namespace);
+    }
+
+    TableIdentifier toIdentifier(ObjectPath path) {
+        String objectName = path.getObjectName();
+        List<String> tableName = Splitter.on('$').splitToList(objectName);
+
+        if (tableName.size() == 1) {
+            return TableIdentifier.of(
+                    appendLevel(baseNamespace, path.getDatabaseName()), 
path.getObjectName());
+        } else if (tableName.size() == 2 && 
MetadataTableType.from(tableName.get(1)) != null) {
+            return TableIdentifier.of(
+                    appendLevel(appendLevel(baseNamespace, 
path.getDatabaseName()), tableName.get(0)),
+                    tableName.get(1));
+        } else {
+            throw new IllegalArgumentException("Illegal table name:" + 
objectName);
+        }
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        if (asNamespaceCatalog == null) {
+            return Collections.singletonList(getDefaultDatabase());
+        }
+
+        return asNamespaceCatalog.listNamespaces(baseNamespace).stream()
+                .map(n -> n.level(n.levels().length - 1))
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public CatalogDatabase getDatabase(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        if (asNamespaceCatalog == null) {
+            if (!getDefaultDatabase().equals(databaseName)) {
+                throw new DatabaseNotExistException(getName(), databaseName);
+            } else {
+                return new CatalogDatabaseImpl(Maps.newHashMap(), "");
+            }
+        } else {
+            try {
+                Map<String, String> metadata =
+                        Maps.newHashMap(
+                                
asNamespaceCatalog.loadNamespaceMetadata(appendLevel(baseNamespace, 
databaseName)));
+                String comment = metadata.remove("comment");
+                return new CatalogDatabaseImpl(metadata, comment);
+            } catch (NoSuchNamespaceException e) {
+                throw new DatabaseNotExistException(getName(), databaseName, 
e);
+            }
+        }
+    }
+
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException 
{
+        try {
+            getDatabase(databaseName);
+            return true;
+        } catch (DatabaseNotExistException ignore) {
+            return false;
+        }
+    }
+
+    @Override
+    public void createDatabase(String name, CatalogDatabase database, boolean 
ignoreIfExists)
+            throws DatabaseAlreadyExistException, CatalogException {
+        createDatabase(
+                name, mergeComment(database.getProperties(), 
database.getComment()), ignoreIfExists);
+    }
+
+    private void createDatabase(
+            String databaseName, Map<String, String> metadata, boolean 
ignoreIfExists)
+            throws DatabaseAlreadyExistException, CatalogException {
+        if (asNamespaceCatalog != null) {
+            try {
+                asNamespaceCatalog.createNamespace(appendLevel(baseNamespace, 
databaseName), metadata);
+            } catch (AlreadyExistsException e) {
+                if (!ignoreIfExists) {
+                    throw new DatabaseAlreadyExistException(getName(), 
databaseName, e);
+                }
+            }
+        } else {
+            throw new UnsupportedOperationException(
+                    "Namespaces are not supported by catalog: " + getName());
+        }
+    }
+
+    private Map<String, String> mergeComment(Map<String, String> metadata, 
String comment) {
+        Map<String, String> ret = Maps.newHashMap(metadata);
+        if (metadata.containsKey("comment")) {
+            throw new CatalogException("Database properties should not contain 
key: 'comment'.");
+        }
+
+        if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
+            ret.put("comment", comment);
+        }
+        return ret;
+    }
+
+    @Override
+    public void dropDatabase(String name, boolean ignoreIfNotExists, boolean 
cascade)
+            throws DatabaseNotExistException, DatabaseNotEmptyException, 
CatalogException {
+        if (asNamespaceCatalog != null) {
+            try {
+                boolean success = 
asNamespaceCatalog.dropNamespace(appendLevel(baseNamespace, name));
+                if (!success && !ignoreIfNotExists) {
+                    throw new DatabaseNotExistException(getName(), name);
+                }
+            } catch (NoSuchNamespaceException e) {
+                if (!ignoreIfNotExists) {
+                    throw new DatabaseNotExistException(getName(), name, e);
+                }
+            } catch (NamespaceNotEmptyException e) {
+                throw new DatabaseNotEmptyException(getName(), name, e);
+            }
+        } else {
+            if (!ignoreIfNotExists) {
+                throw new DatabaseNotExistException(getName(), name);
+            }
+        }
+    }
+
+    @Override
+    public void alterDatabase(String name, CatalogDatabase newDatabase, 
boolean ignoreIfNotExists)
+            throws DatabaseNotExistException, CatalogException {
+        if (asNamespaceCatalog != null) {
+            Namespace namespace = appendLevel(baseNamespace, name);
+            Map<String, String> updates = Maps.newHashMap();
+            Set<String> removals = Sets.newHashSet();
+
+            try {
+                Map<String, String> oldProperties = 
asNamespaceCatalog.loadNamespaceMetadata(namespace);
+                Map<String, String> newProperties =
+                        mergeComment(newDatabase.getProperties(), 
newDatabase.getComment());
+
+                for (String key : oldProperties.keySet()) {
+                    if (!newProperties.containsKey(key)) {
+                        removals.add(key);
+                    }
+                }
+
+                for (Map.Entry<String, String> entry : 
newProperties.entrySet()) {
+                    if 
(!entry.getValue().equals(oldProperties.get(entry.getKey()))) {
+                        updates.put(entry.getKey(), entry.getValue());
+                    }
+                }
+
+                if (!updates.isEmpty()) {
+                    asNamespaceCatalog.setProperties(namespace, updates);
+                }
+
+                if (!removals.isEmpty()) {
+                    asNamespaceCatalog.removeProperties(namespace, removals);
+                }
+
+            } catch (NoSuchNamespaceException e) {
+                if (!ignoreIfNotExists) {
+                    throw new DatabaseNotExistException(getName(), name, e);
+                }
+            }
+        } else {
+            if (getDefaultDatabase().equals(name)) {
+                throw new CatalogException(
+                        "Can not alter the default database when the iceberg 
catalog doesn't support namespaces.");
+            }
+            if (!ignoreIfNotExists) {
+                throw new DatabaseNotExistException(getName(), name);
+            }
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        try {
+            return icebergCatalog.listTables(appendLevel(baseNamespace, 
databaseName)).stream()
+                    .map(TableIdentifier::name)
+                    .collect(Collectors.toList());
+        } catch (NoSuchNamespaceException e) {
+            throw new DatabaseNotExistException(getName(), databaseName, e);
+        }
+    }
+
+    @Override
+    public CatalogTable getTable(ObjectPath tablePath)
+            throws TableNotExistException, CatalogException {
+        Table table = loadIcebergTable(tablePath);
+        return toCatalogTable(table);
+    }
+
+    private Table loadIcebergTable(ObjectPath tablePath) throws 
TableNotExistException {
+        try {
+            Table table = icebergCatalog.loadTable(toIdentifier(tablePath));
+            if (cacheEnabled) {
+                table.refresh();
+            }
+
+            return table;
+        } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+            throw new TableNotExistException(getName(), tablePath, e);
+        }
+    }
+
+    @Override
+    public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+        return icebergCatalog.tableExists(toIdentifier(tablePath));
+    }
+
+    @Override
+    public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        try {
+            icebergCatalog.dropTable(toIdentifier(tablePath));
+        } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+            if (!ignoreIfNotExists) {
+                throw new TableNotExistException(getName(), tablePath, e);
+            }
+        }
+    }
+
+    @Override
+    public void renameTable(ObjectPath tablePath, String newTableName, boolean 
ignoreIfNotExists)
+            throws TableNotExistException, TableAlreadyExistException, 
CatalogException {
+        try {
+            icebergCatalog.renameTable(
+                    toIdentifier(tablePath),
+                    toIdentifier(new ObjectPath(tablePath.getDatabaseName(), 
newTableName)));
+        } catch (NoSuchTableException e) {
+            if (!ignoreIfNotExists) {
+                throw new TableNotExistException(getName(), tablePath, e);
+            }
+        } catch (AlreadyExistsException e) {
+            throw new TableAlreadyExistException(getName(), tablePath, e);
+        }
+    }
+
+    @Override
+    public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+            throws CatalogException, TableAlreadyExistException {
+        if (Objects.equals(
+                table.getOptions().get("connector"), 
FlinkDynamicTableFactory.FACTORY_IDENTIFIER)) {
+            throw new IllegalArgumentException(
+                    "Cannot create the table with 'connector'='iceberg' table 
property in "
+                            + "an iceberg catalog, Please create table with 
'connector'='iceberg' property in a non-iceberg catalog or "
+                            + "create table without 'connector'='iceberg' 
related properties in an iceberg table.");
+        }
+
+        createIcebergTable(tablePath, table, ignoreIfExists);
+    }
+
+    void createIcebergTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+            throws CatalogException, TableAlreadyExistException {
+        validateFlinkTable(table);
+
+        Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
+        PartitionSpec spec = toPartitionSpec(((CatalogTable) 
table).getPartitionKeys(), icebergSchema);
+
+        ImmutableMap.Builder<String, String> properties = 
ImmutableMap.builder();
+        String location = null;
+        for (Map.Entry<String, String> entry : table.getOptions().entrySet()) {
+            if ("location".equalsIgnoreCase(entry.getKey())) {
+                location = entry.getValue();
+            } else {
+                properties.put(entry.getKey(), entry.getValue());
+            }
+        }
+
+        try {
+            icebergCatalog.createTable(
+                    toIdentifier(tablePath), icebergSchema, spec, location, 
properties.build());
+        } catch (AlreadyExistsException e) {
+            if (!ignoreIfExists) {
+                throw new TableAlreadyExistException(getName(), tablePath, e);
+            }
+        }
+    }
+
+    private static void validateTableSchemaAndPartition(CatalogTable ct1, 
CatalogTable ct2) {
+        TableSchema ts1 = ct1.getSchema();
+        TableSchema ts2 = ct2.getSchema();
+        boolean equalsPrimary = false;
+
+        if (ts1.getPrimaryKey().isPresent() && 
ts2.getPrimaryKey().isPresent()) {
+            equalsPrimary =
+                    Objects.equals(ts1.getPrimaryKey().get().getType(), 
ts2.getPrimaryKey().get().getType())
+                            && Objects.equals(
+                                    ts1.getPrimaryKey().get().getColumns(), 
ts2.getPrimaryKey().get().getColumns());
+        } else if (!ts1.getPrimaryKey().isPresent() && 
!ts2.getPrimaryKey().isPresent()) {
+            equalsPrimary = true;
+        }
+
+        if (!(Objects.equals(ts1.getTableColumns(), ts2.getTableColumns())
+                && Objects.equals(ts1.getWatermarkSpecs(), 
ts2.getWatermarkSpecs())
+                && equalsPrimary)) {
+            throw new UnsupportedOperationException("Altering schema is not 
supported yet.");
+        }
+
+        if (!ct1.getPartitionKeys().equals(ct2.getPartitionKeys())) {
+            throw new UnsupportedOperationException("Altering partition keys 
is not supported yet.");
+        }
+    }
+
+    @Override
+    public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
+            throws CatalogException, TableNotExistException {
+        validateFlinkTable(newTable);
+
+        Table icebergTable;
+        try {
+            icebergTable = loadIcebergTable(tablePath);
+        } catch (TableNotExistException e) {
+            if (!ignoreIfNotExists) {
+                throw e;
+            } else {
+                return;
+            }
+        }
+
+        CatalogTable table = toCatalogTable(icebergTable);
+
+        // Currently, Flink SQL only support altering table properties.
+
+        // For current Flink Catalog API, support for adding/removing/renaming 
columns cannot be done by
+        // comparing
+        // CatalogTable instances, unless the Flink schema contains Iceberg 
column IDs.
+        validateTableSchemaAndPartition(table, (CatalogTable) newTable);
+
+        Map<String, String> oldProperties = table.getOptions();
+        Map<String, String> setProperties = Maps.newHashMap();
+
+        String setLocation = null;
+        String setSnapshotId = null;
+        String pickSnapshotId = null;
+
+        for (Map.Entry<String, String> entry : 
newTable.getOptions().entrySet()) {
+            String key = entry.getKey();
+            String value = entry.getValue();
+
+            if (Objects.equals(value, oldProperties.get(key))) {
+                continue;
+            }
+
+            if ("location".equalsIgnoreCase(key)) {
+                setLocation = value;
+            } else if ("current-snapshot-id".equalsIgnoreCase(key)) {
+                setSnapshotId = value;
+            } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(key)) {
+                pickSnapshotId = value;
+            } else {
+                setProperties.put(key, value);
+            }
+        }
+
+        oldProperties
+                .keySet()
+                .forEach(
+                        k -> {
+                            if (!newTable.getOptions().containsKey(k)) {
+                                setProperties.put(k, null);
+                            }
+                        });
+
+        commitChanges(icebergTable, setLocation, setSnapshotId, 
pickSnapshotId, setProperties);
+    }
+
+    private static void validateFlinkTable(CatalogBaseTable table) {
+        Preconditions.checkArgument(
+                table instanceof CatalogTable, "The Table should be a 
CatalogTable.");
+
+        TableSchema schema = table.getSchema();
+        schema
+                .getTableColumns()
+                .forEach(
+                        column -> {
+                            if 
(!FlinkCompatibilityUtil.isPhysicalColumn(column)) {
+                                throw new UnsupportedOperationException(
+                                        "Creating table with computed columns 
is not supported yet.");
+                            }
+                        });
+
+        if (!schema.getWatermarkSpecs().isEmpty()) {
+            throw new UnsupportedOperationException(
+                    "Creating table with watermark specs is not supported 
yet.");
+        }
+    }
+
+    private static PartitionSpec toPartitionSpec(List<String> partitionKeys, 
Schema icebergSchema) {
+        PartitionSpec.Builder builder = 
PartitionSpec.builderFor(icebergSchema);
+        partitionKeys.forEach(builder::identity);
+        return builder.build();
+    }
+
+    private static List<String> toPartitionKeys(PartitionSpec spec, Schema 
icebergSchema) {
+        ImmutableList.Builder<String> partitionKeysBuilder = 
ImmutableList.builder();
+        for (PartitionField field : spec.fields()) {
+            if (field.transform().isIdentity()) {
+                
partitionKeysBuilder.add(icebergSchema.findColumnName(field.sourceId()));
+            } else {
+                // Not created by Flink SQL.
+                // For compatibility with iceberg tables, return empty.
+                // TODO modify this after Flink support partition transform.
+                return Collections.emptyList();
+            }
+        }
+        return partitionKeysBuilder.build();
+    }
+
+    private static void commitChanges(
+            Table table,
+            String setLocation,
+            String setSnapshotId,
+            String pickSnapshotId,
+            Map<String, String> setProperties) {
+        // don't allow setting the snapshot and picking a commit at the same 
time because order is
+        // ambiguous and choosing
+        // one order leads to different results
+        Preconditions.checkArgument(
+                setSnapshotId == null || pickSnapshotId == null,
+                "Cannot set the current snapshot ID and cherry-pick snapshot 
changes");
+
+        if (setSnapshotId != null) {
+            long newSnapshotId = Long.parseLong(setSnapshotId);
+            table.manageSnapshots().setCurrentSnapshot(newSnapshotId).commit();
+        }
+
+        // if updating the table snapshot, perform that update first in case 
it fails
+        if (pickSnapshotId != null) {
+            long newSnapshotId = Long.parseLong(pickSnapshotId);
+            table.manageSnapshots().cherrypick(newSnapshotId).commit();
+        }
+
+        Transaction transaction = table.newTransaction();
+
+        if (setLocation != null) {
+            transaction.updateLocation().setLocation(setLocation).commit();
+        }
+
+        if (!setProperties.isEmpty()) {
+            UpdateProperties updateProperties = transaction.updateProperties();
+            setProperties.forEach(
+                    (k, v) -> {
+                        if (v == null) {
+                            updateProperties.remove(k);
+                        } else {
+                            updateProperties.set(k, v);
+                        }
+                    });
+            updateProperties.commit();
+        }
+
+        transaction.commitTransaction();
+    }
+
+    static CatalogTable toCatalogTable(Table table) {
+        TableSchema schema = FlinkSchemaUtil.toSchema(table.schema());
+        List<String> partitionKeys = toPartitionKeys(table.spec(), 
table.schema());
+
+        // NOTE: We can not create a IcebergCatalogTable extends CatalogTable, 
because Flink optimizer
+        // may use
+        // CatalogTableImpl to copy a new catalog table.
+        // Let's re-loading table from Iceberg catalog when creating 
source/sink operators.
+        // Iceberg does not have Table comment, so pass a null (Default 
comment value in Flink).
+        return new CatalogTableImpl(schema, partitionKeys, table.properties(), 
null);
+    }
+
+    @Override
+    public Optional<Factory> getFactory() {
+        return Optional.of(new FlinkDynamicTableFactory(this));
+    }
+
+    CatalogLoader getCatalogLoader() {
+        return catalogLoader;
+    }
+
+    // ------------------------------ Unsupported methods
+    // ---------------------------------------------
+
+    @Override
+    public List<String> listViews(String databaseName) throws CatalogException 
{
+        return Collections.emptyList();
+    }
+
+    @Override
+    public CatalogPartition getPartition(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void createPartition(
+            ObjectPath tablePath,
+            CatalogPartitionSpec partitionSpec,
+            CatalogPartition partition,
+            boolean ignoreIfExists)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void dropPartition(
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean 
ignoreIfNotExists)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void alterPartition(
+            ObjectPath tablePath,
+            CatalogPartitionSpec partitionSpec,
+            CatalogPartition newPartition,
+            boolean ignoreIfNotExists)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<String> listFunctions(String dbName) throws CatalogException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public CatalogFunction getFunction(ObjectPath functionPath)
+            throws FunctionNotExistException, CatalogException {
+        throw new FunctionNotExistException(getName(), functionPath);
+    }
+
+    @Override
+    public boolean functionExists(ObjectPath functionPath) throws 
CatalogException {
+        return false;
+    }
+
+    @Override
+    public void createFunction(
+            ObjectPath functionPath, CatalogFunction function, boolean 
ignoreIfExists)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void alterFunction(
+            ObjectPath functionPath, CatalogFunction newFunction, boolean 
ignoreIfNotExists)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void dropFunction(ObjectPath functionPath, boolean 
ignoreIfNotExists)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void alterTableStatistics(
+            ObjectPath tablePath, CatalogTableStatistics tableStatistics, 
boolean ignoreIfNotExists)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void alterTableColumnStatistics(
+            ObjectPath tablePath, CatalogColumnStatistics columnStatistics, 
boolean ignoreIfNotExists)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void alterPartitionStatistics(
+            ObjectPath tablePath,
+            CatalogPartitionSpec partitionSpec,
+            CatalogTableStatistics partitionStatistics,
+            boolean ignoreIfNotExists)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void alterPartitionColumnStatistics(
+            ObjectPath tablePath,
+            CatalogPartitionSpec partitionSpec,
+            CatalogColumnStatistics columnStatistics,
+            boolean ignoreIfNotExists)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
+            throws TableNotExistException, TableNotPartitionedException, 
CatalogException {
+        Table table = loadIcebergTable(tablePath);
+
+        if (table.spec().isUnpartitioned()) {
+            throw new TableNotPartitionedException(icebergCatalog.name(), 
tablePath);
+        }
+
+        Set<CatalogPartitionSpec> set = Sets.newHashSet();
+        try (CloseableIterable<FileScanTask> tasks = 
table.newScan().planFiles()) {
+            for (DataFile dataFile : CloseableIterable.transform(tasks, 
FileScanTask::file)) {
+                Map<String, String> map = Maps.newHashMap();
+                StructLike structLike = dataFile.partition();
+                PartitionSpec spec = table.specs().get(dataFile.specId());
+                for (int i = 0; i < structLike.size(); i++) {
+                    map.put(spec.fields().get(i).name(), 
String.valueOf(structLike.get(i, Object.class)));
+                }
+                set.add(new CatalogPartitionSpec(map));
+            }
+        } catch (IOException e) {
+            throw new CatalogException(
+                    String.format("Failed to list partitions of table %s", 
tablePath), e);
+        }
+
+        return Lists.newArrayList(set);
+    }
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitions(
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws 
CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitionsByFilter(
+            ObjectPath tablePath, List<Expression> filters) throws 
CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    // After partition pruning and filter push down, the statistics have 
become very inaccurate, so
+    // the statistics from
+    // here are of little significance.
+    // Flink will support something like SupportsReportStatistics in future.
+
+    @Override
+    public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) 
throws CatalogException {
+        return CatalogTableStatistics.UNKNOWN;
+    }
+
+    @Override
+    public CatalogColumnStatistics getTableColumnStatistics(ObjectPath 
tablePath)
+            throws CatalogException {
+        return CatalogColumnStatistics.UNKNOWN;
+    }
+
+    @Override
+    public CatalogTableStatistics getPartitionStatistics(
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws 
CatalogException {
+        return CatalogTableStatistics.UNKNOWN;
+    }
+
+    @Override
+    public CatalogColumnStatistics getPartitionColumnStatistics(
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws 
CatalogException {
+        return CatalogColumnStatistics.UNKNOWN;
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java
new file mode 100644
index 0000000000..4adf4a3ed8
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg;
+
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.PropertyUtil;
+
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+/**
+ * A Flink Catalog factory implementation that creates {@link 
org.apache.iceberg.flink.FlinkCatalog}.
+ *
+ * <p>This supports the following catalog configuration options:
+ *
+ * <ul>
+ *   <li><code>type</code> - Flink catalog factory key, should be "iceberg"
+ *   <li><code>catalog-type</code> - iceberg catalog type, "hive", "hadoop" or 
"rest"
+ *   <li><code>uri</code> - the Hive Metastore URI (Hive catalog only)
+ *   <li><code>clients</code> - the Hive Client Pool Size (Hive catalog only)
+ *   <li><code>warehouse</code> - the warehouse path (Hadoop catalog only)
+ *   <li><code>default-database</code> - a database name to use as the default
+ *   <li><code>base-namespace</code> - a base namespace as the prefix for all 
databases (Hadoop
+ *       catalog only)
+ *   <li><code>cache-enabled</code> - whether to enable catalog cache
+ * </ul>
+ *
+ * <p>To use a custom catalog that is not a Hive or Hadoop catalog, extend 
this class and override
+ * {@link #createCatalogLoader(String, Map, Configuration)}.
+ *
+ * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1
+ */
+public class FlinkCatalogFactory implements CatalogFactory {
+
+    // Can not just use "type", it conflicts with CATALOG_TYPE.
+    public static final String ICEBERG_CATALOG_TYPE = "catalog-type";
+    public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop";
+    public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
+    public static final String ICEBERG_CATALOG_TYPE_REST = "rest";
+
+    public static final String HIVE_CONF_DIR = "hive-conf-dir";
+    public static final String HADOOP_CONF_DIR = "hadoop-conf-dir";
+    public static final String DEFAULT_DATABASE = "default-database";
+    public static final String DEFAULT_DATABASE_NAME = "default";
+    public static final String BASE_NAMESPACE = "base-namespace";
+
+    public static final String TYPE = "type";
+    public static final String PROPERTY_VERSION = "property-version";
+
+    /**
+     * Create an Iceberg {@link org.apache.iceberg.catalog.Catalog} loader to 
be used by this Flink
+     * catalog adapter.
+     *
+     * @param name Flink's catalog name
+     * @param properties Flink's catalog properties
+     * @param hadoopConf Hadoop configuration for catalog
+     * @return an Iceberg catalog loader
+     */
+    static CatalogLoader createCatalogLoader(
+            String name, Map<String, String> properties, Configuration 
hadoopConf) {
+        String catalogImpl = properties.get(CatalogProperties.CATALOG_IMPL);
+        if (catalogImpl != null) {
+            String catalogType = properties.get(ICEBERG_CATALOG_TYPE);
+            Preconditions.checkArgument(
+                    catalogType == null,
+                    "Cannot create catalog %s, both catalog-type and 
catalog-impl are set: catalog-type=%s, catalog-impl=%s",
+                    name,
+                    catalogType,
+                    catalogImpl);
+            return CatalogLoader.custom(name, properties, hadoopConf, 
catalogImpl);
+        }
+
+        String catalogType = properties.getOrDefault(ICEBERG_CATALOG_TYPE, 
ICEBERG_CATALOG_TYPE_HIVE);
+        switch (catalogType.toLowerCase(Locale.ENGLISH)) {
+            case ICEBERG_CATALOG_TYPE_HIVE:
+                // The values of properties 'uri', 'warehouse', 
'hive-conf-dir' are allowed to be null, in
+                // that case it will
+                // fallback to parse those values from hadoop configuration 
which is loaded from classpath.
+                String hiveConfDir = properties.get(HIVE_CONF_DIR);
+                String hadoopConfDir = properties.get(HADOOP_CONF_DIR);
+                Configuration newHadoopConf = mergeHiveConf(hadoopConf, 
hiveConfDir, hadoopConfDir);
+                return CatalogLoader.hive(name, newHadoopConf, properties);
+
+            case ICEBERG_CATALOG_TYPE_HADOOP:
+                return CatalogLoader.hadoop(name, hadoopConf, properties);
+
+            case ICEBERG_CATALOG_TYPE_REST:
+                return CatalogLoader.rest(name, hadoopConf, properties);
+
+            default:
+                throw new UnsupportedOperationException(
+                        "Unknown catalog-type: " + catalogType + " (Must be 
'hive', 'hadoop' or 'rest')");
+        }
+    }
+
+    @Override
+    public Map<String, String> requiredContext() {
+        Map<String, String> context = Maps.newHashMap();
+        context.put(TYPE, "iceberg");
+        context.put(PROPERTY_VERSION, "1");
+        return context;
+    }
+
+    @Override
+    public List<String> supportedProperties() {
+        return ImmutableList.of("*");
+    }
+
+    @Override
+    public Catalog createCatalog(String name, Map<String, String> properties) {
+        return createCatalog(name, properties, clusterHadoopConf());
+    }
+
+    protected Catalog createCatalog(
+            String name, Map<String, String> properties, Configuration 
hadoopConf) {
+        CatalogLoader catalogLoader = createCatalogLoader(name, properties, 
hadoopConf);
+        String defaultDatabase = properties.getOrDefault(DEFAULT_DATABASE, 
DEFAULT_DATABASE_NAME);
+
+        Namespace baseNamespace = Namespace.empty();
+        if (properties.containsKey(BASE_NAMESPACE)) {
+            baseNamespace = 
Namespace.of(properties.get(BASE_NAMESPACE).split("\\."));
+        }
+
+        boolean cacheEnabled =
+                PropertyUtil.propertyAsBoolean(
+                        properties, CatalogProperties.CACHE_ENABLED, 
CatalogProperties.CACHE_ENABLED_DEFAULT);
+
+        long cacheExpirationIntervalMs =
+                PropertyUtil.propertyAsLong(
+                        properties,
+                        CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS,
+                        CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_OFF);
+        Preconditions.checkArgument(
+                cacheExpirationIntervalMs != 0,
+                "%s is not allowed to be 0.",
+                CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS);
+
+        return new FlinkCatalog(
+                name,
+                defaultDatabase,
+                baseNamespace,
+                catalogLoader,
+                cacheEnabled,
+                cacheExpirationIntervalMs);
+    }
+
+    private static Configuration mergeHiveConf(
+            Configuration hadoopConf, String hiveConfDir, String 
hadoopConfDir) {
+        Configuration newConf = new Configuration(hadoopConf);
+        if (!Strings.isNullOrEmpty(hiveConfDir)) {
+            Preconditions.checkState(
+                    Files.exists(Paths.get(hiveConfDir, "hive-site.xml")),
+                    "There should be a hive-site.xml file under the directory 
%s",
+                    hiveConfDir);
+            newConf.addResource(new Path(hiveConfDir, "hive-site.xml"));
+        } else {
+            // If don't provide the hive-site.xml path explicitly, it will try 
to load resource from
+            // classpath. If still
+            // couldn't load the configuration file, then it will throw 
exception in HiveCatalog.
+            URL configFile = 
CatalogLoader.class.getClassLoader().getResource("hive-site.xml");
+            if (configFile != null) {
+                newConf.addResource(configFile);
+            }
+        }
+
+        if (!Strings.isNullOrEmpty(hadoopConfDir)) {
+            Preconditions.checkState(
+                    Files.exists(Paths.get(hadoopConfDir, "hdfs-site.xml")),
+                    "Failed to load Hadoop configuration: missing %s",
+                    Paths.get(hadoopConfDir, "hdfs-site.xml"));
+            newConf.addResource(new Path(hadoopConfDir, "hdfs-site.xml"));
+            Preconditions.checkState(
+                    Files.exists(Paths.get(hadoopConfDir, "core-site.xml")),
+                    "Failed to load Hadoop configuration: missing %s",
+                    Paths.get(hadoopConfDir, "core-site.xml"));
+            newConf.addResource(new Path(hadoopConfDir, "core-site.xml"));
+        }
+
+        return newConf;
+    }
+
+    public static Configuration clusterHadoopConf() {
+        return 
HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration());
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
new file mode 100644
index 0000000000..1edf546c23
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.flink.IcebergTableSink;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.source.IcebergTableSource;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1
+ */
+public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, 
DynamicTableSourceFactory {
+
+    static final String FACTORY_IDENTIFIER = "iceberg-inlong";
+
+    private static final ConfigOption<String> CATALOG_NAME =
+            ConfigOptions.key("catalog-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Catalog name");
+
+    private static final ConfigOption<String> CATALOG_TYPE =
+            ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE)
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Catalog type, the optional types are: 
custom, hadoop, hive.");
+
+    private static final ConfigOption<String> CATALOG_DATABASE =
+            ConfigOptions.key("catalog-database")
+                    .stringType()
+                    .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME)
+                    .withDescription("Database name managed in the iceberg 
catalog.");
+
+    private static final ConfigOption<String> CATALOG_TABLE =
+            ConfigOptions.key("catalog-table")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Table name managed in the underlying 
iceberg catalog and database.");
+
+    private final FlinkCatalog catalog;
+
+    public FlinkDynamicTableFactory() {
+        this.catalog = null;
+    }
+
+    public FlinkDynamicTableFactory(FlinkCatalog catalog) {
+        this.catalog = catalog;
+    }
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+
+        ObjectIdentifier objectIdentifier = context.getObjectIdentifier();
+        CatalogTable catalogTable = context.getCatalogTable();
+        Map<String, String> tableProps = catalogTable.getOptions();
+        TableSchema tableSchema = 
TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema());
+
+        TableLoader tableLoader =
+                createTableLoader(
+                        catalogTable,
+                        tableProps,
+                        objectIdentifier.getDatabaseName(),
+                        objectIdentifier.getObjectName());
+        return new IcebergTableSource(tableLoader, tableSchema, tableProps, 
context.getConfiguration());
+    }
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        ObjectPath objectPath = context.getObjectIdentifier().toObjectPath();
+        CatalogTable catalogTable = context.getCatalogTable();
+        Map<String, String> writeProps = catalogTable.getOptions();
+        TableSchema tableSchema = 
TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema());
+
+        TableLoader tableLoader;
+        if (catalog != null) {
+            tableLoader = createTableLoader(catalog, objectPath);
+        } else {
+            tableLoader =
+                    createTableLoader(
+                            catalogTable, writeProps, 
objectPath.getDatabaseName(), objectPath.getObjectName());
+        }
+        return new IcebergTableSink(tableLoader, tableSchema, 
context.getConfiguration(), writeProps);
+    }
+
+    private static TableLoader createTableLoader(
+            CatalogBaseTable catalogBaseTable,
+            Map<String, String> tableProps,
+            String databaseName,
+            String tableName) {
+        Configuration flinkConf = new Configuration();
+        tableProps.forEach(flinkConf::setString);
+
+        String catalogName = flinkConf.getString(CATALOG_NAME);
+        Preconditions.checkNotNull(
+                catalogName, "Table property '%s' cannot be null", 
CATALOG_NAME.key());
+
+        String catalogDatabase = flinkConf.getString(CATALOG_DATABASE, 
databaseName);
+        Preconditions.checkNotNull(catalogDatabase, "The iceberg database name 
cannot be null");
+
+        String catalogTable = flinkConf.getString(CATALOG_TABLE, tableName);
+        Preconditions.checkNotNull(catalogTable, "The iceberg table name 
cannot be null");
+
+        org.apache.hadoop.conf.Configuration hadoopConf = 
FlinkCatalogFactory.clusterHadoopConf();
+        FlinkCatalogFactory factory = new FlinkCatalogFactory();
+        FlinkCatalog flinkCatalog =
+                (FlinkCatalog) factory.createCatalog(catalogName, tableProps, 
hadoopConf);
+        ObjectPath objectPath = new ObjectPath(catalogDatabase, catalogTable);
+
+        // Create database if not exists in the external catalog.
+        if (!flinkCatalog.databaseExists(catalogDatabase)) {
+            try {
+                flinkCatalog.createDatabase(
+                        catalogDatabase, new 
CatalogDatabaseImpl(Maps.newHashMap(), null), true);
+            } catch (DatabaseAlreadyExistException e) {
+                throw new AlreadyExistsException(
+                        e,
+                        "Database %s already exists in the iceberg catalog 
%s.",
+                        catalogName,
+                        catalogDatabase);
+            }
+        }
+
+        // Create table if not exists in the external catalog.
+        if (!flinkCatalog.tableExists(objectPath)) {
+            try {
+                flinkCatalog.createIcebergTable(objectPath, catalogBaseTable, 
true);
+            } catch (TableAlreadyExistException e) {
+                throw new AlreadyExistsException(
+                        e,
+                        "Table %s already exists in the database %s and 
catalog %s",
+                        catalogTable,
+                        catalogDatabase,
+                        catalogName);
+            }
+        }
+
+        return TableLoader.fromCatalog(
+                flinkCatalog.getCatalogLoader(), 
TableIdentifier.of(catalogDatabase, catalogTable));
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return FACTORY_IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        Set<ConfigOption<?>> options = Sets.newHashSet();
+        options.add(CATALOG_TYPE);
+        options.add(CATALOG_NAME);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> options = Sets.newHashSet();
+        options.add(CATALOG_DATABASE);
+        options.add(CATALOG_TABLE);
+        return options;
+    }
+
+    private static TableLoader createTableLoader(FlinkCatalog catalog, 
ObjectPath objectPath) {
+        Preconditions.checkNotNull(catalog, "Flink catalog cannot be null");
+        return TableLoader.fromCatalog(catalog.getCatalogLoader(), 
catalog.toIdentifier(objectPath));
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkEnvironmentContext.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkEnvironmentContext.java
new file mode 100644
index 0000000000..2a04b4ee7e
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkEnvironmentContext.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg;
+
+import org.apache.iceberg.EnvironmentContext;
+import org.apache.iceberg.flink.util.FlinkPackage;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1
+ */
+class FlinkEnvironmentContext {
+
+    private FlinkEnvironmentContext() {
+    }
+
+    public static void init() {
+        EnvironmentContext.put(EnvironmentContext.ENGINE_NAME, "flink");
+        EnvironmentContext.put(EnvironmentContext.ENGINE_VERSION, 
FlinkPackage.version());
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000000..7d1e60eab4
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.inlong.sort.iceberg.FlinkDynamicTableFactory
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
new file mode 100644
index 0000000000..254f72875d
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.iceberg.FlinkCatalogFactory
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
index e81c324c04..9f360dd169 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
@@ -34,6 +34,7 @@
     <modules>
         <module>postgres-cdc</module>
         <module>starrocks</module>
+        <module>iceberg</module>
     </modules>
 
     <properties>
diff --git a/licenses/inlong-sort-connectors/LICENSE 
b/licenses/inlong-sort-connectors/LICENSE
index cc6d1b407c..9a82ec18d8 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -738,6 +738,14 @@
   Source  : com.starrocks:flink-connector-starrocks:1.2.7_flink-1.15 (Please 
note that the software have been modified.)
   License : https://www.apache.org/licenses/LICENSE-2.0.txt
 
+ 1.3.17 
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java
+         
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java
+         
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+         
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkEnvironmentContext.java
+
+ Source  : iceberg-flink:iceberg-flink-1.15:1.3.1 (Please note that the 
software have been modified.)
+ License : https://github.com/apache/iceberg/LICENSE
+
 =======================================================================
 Apache InLong Subcomponents:
 
diff --git a/pom.xml b/pom.xml
index 5e7c0d328f..ca8a551c21 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,7 +62,7 @@
         <plugin.assembly.version>3.2.0</plugin.assembly.version>
         <plugin.surefire.version>3.0.0-M7</plugin.surefire.version>
         <plugin.failsafe.version>3.0.0-M7</plugin.failsafe.version>
-        <plugin.shade.version>3.2.4</plugin.shade.version>
+        <plugin.shade.version>3.4.0</plugin.shade.version>
         <plugin.maven.source>3.0.1</plugin.maven.source>
         <plugin.maven.jar.version>3.2.0</plugin.maven.jar.version>
         <exec.maven.version>1.6.0</exec.maven.version>
@@ -153,7 +153,7 @@
         <zookeeper.version>3.6.3</zookeeper.version>
         <pulsar.version>2.8.4</pulsar.version>
         <kafka.version>2.4.1</kafka.version>
-        <iceberg.version>1.1.0</iceberg.version>
+        <iceberg.version>1.3.1</iceberg.version>
         <flink.version.v1.13>1.13.5</flink.version.v1.13>
         <flink.version.v1.15>1.15.4</flink.version.v1.15>
         <flink.minor.version>1.13</flink.minor.version>


Reply via email to