This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 5d7fc81e67 [INLONG-8643][Sort] Support Iceberg source (#8818) 5d7fc81e67 is described below commit 5d7fc81e6761bba8289b8f862af92eacc487771a Author: vernedeng <verned...@apache.org> AuthorDate: Fri Sep 1 14:31:40 2023 +0800 [INLONG-8643][Sort] Support Iceberg source (#8818) --- .../resource/sort/DefaultSortConfigOperator.java | 1 - .../sort/protocol/constant/IcebergConstant.java | 13 + .../inlong/sort/protocol/node/ExtractNode.java | 2 + .../org/apache/inlong/sort/protocol/node/Node.java | 2 + .../IcebergExtracNode.java} | 107 +-- .../sort/protocol/node/load/IcebergLoadNode.java | 12 +- .../parser/Iceberg2StarRocksSqlParserTest.java | 133 ++++ inlong-sort/sort-flink/sort-flink-v1.13/pom.xml | 3 +- inlong-sort/sort-flink/sort-flink-v1.15/pom.xml | 5 - .../sort-connectors/iceberg/pom.xml | 139 ++++ .../apache/inlong/sort/iceberg/FlinkCatalog.java | 812 +++++++++++++++++++++ .../inlong/sort/iceberg/FlinkCatalogFactory.java | 216 ++++++ .../sort/iceberg/FlinkDynamicTableFactory.java | 205 ++++++ .../sort/iceberg/FlinkEnvironmentContext.java | 35 + .../org.apache.flink.table.factories.Factory | 18 + .../org.apache.flink.table.factories.TableFactory | 16 + .../sort-flink-v1.15/sort-connectors/pom.xml | 1 + licenses/inlong-sort-connectors/LICENSE | 8 + pom.xml | 4 +- 19 files changed, 1666 insertions(+), 66 deletions(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java index 97683f1ac3..37429cf2a5 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java @@ -253,5 +253,4 @@ public class DefaultSortConfigOperator implements SortConfigOperator { groupInfo.getExtList().removeIf(ext -> extInfo.getKeyName().equals(ext.getKeyName())); groupInfo.getExtList().add(extInfo); } - } diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java index 70ea0d5158..676f7f4435 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java @@ -22,6 +22,19 @@ package org.apache.inlong.sort.protocol.constant; */ public class IcebergConstant { + public static final String DEFAULT_CATALOG_NAME = "ICEBERG_HIVE"; + public static final String CONNECTOR_KEY = "connector"; + public static final String CONNECTOR = "iceberg-inlong"; + public static final String DATABASE_KEY = "catalog-database"; + public static final String DEFAULT_DATABASE_KEY = "default-database"; + public static final String TABLE_KEY = "catalog-table"; + public static final String CATALOG_TYPE_KEY = "catalog-type"; + public static final String CATALOG_NAME_KEY = "catalog-name"; + public static final String URI_KEY = "uri"; + public static final String WAREHOUSE_KEY = "warehouse"; + public static final String START_SNAPSHOT_ID = "start-snapshot-id"; + public static final String STREAMING = "streaming"; + /** * Iceberg supported catalog type */ diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java index 3c27b6a407..543f8cf3be 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java @@ -21,6 +21,7 @@ import org.apache.inlong.sort.protocol.FieldInfo; import org.apache.inlong.sort.protocol.node.extract.DorisExtractNode; import org.apache.inlong.sort.protocol.node.extract.FileSystemExtractNode; import org.apache.inlong.sort.protocol.node.extract.HudiExtractNode; +import org.apache.inlong.sort.protocol.node.extract.IcebergExtracNode; import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode; import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode; import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode; @@ -64,6 +65,7 @@ import java.util.Map; @JsonSubTypes.Type(value = RedisExtractNode.class, name = "redisExtract"), @JsonSubTypes.Type(value = DorisExtractNode.class, name = "dorisExtract"), @JsonSubTypes.Type(value = HudiExtractNode.class, name = "hudiExtract"), + @JsonSubTypes.Type(value = IcebergExtracNode.class, name = "icebergExtract"), }) @Data @NoArgsConstructor diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java index ecd704815b..f755f439d7 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java @@ -21,6 +21,7 @@ import org.apache.inlong.sort.protocol.FieldInfo; import org.apache.inlong.sort.protocol.node.extract.DorisExtractNode; import org.apache.inlong.sort.protocol.node.extract.FileSystemExtractNode; import org.apache.inlong.sort.protocol.node.extract.HudiExtractNode; +import org.apache.inlong.sort.protocol.node.extract.IcebergExtracNode; import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode; import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode; import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode; @@ -78,6 +79,7 @@ import java.util.TreeMap; @JsonSubTypes.Type(value = RedisExtractNode.class, name = "redisExtract"), @JsonSubTypes.Type(value = DorisExtractNode.class, name = "dorisExtract"), @JsonSubTypes.Type(value = HudiExtractNode.class, name = "hudiExtract"), + @JsonSubTypes.Type(value = IcebergExtracNode.class, name = "icebergExtract"), @JsonSubTypes.Type(value = TransformNode.class, name = "baseTransform"), @JsonSubTypes.Type(value = DistinctNode.class, name = "distinct"), @JsonSubTypes.Type(value = KafkaLoadNode.class, name = "kafkaLoad"), diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtracNode.java similarity index 53% copy from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java copy to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtracNode.java index 257290119a..b876d96aff 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtracNode.java @@ -15,22 +15,16 @@ * limitations under the License. */ -package org.apache.inlong.sort.protocol.node.load; +package org.apache.inlong.sort.protocol.node.extract; import org.apache.inlong.sort.protocol.FieldInfo; -import org.apache.inlong.sort.protocol.InlongMetric; import org.apache.inlong.sort.protocol.constant.IcebergConstant; -import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType; -import org.apache.inlong.sort.protocol.enums.FilterStrategy; -import org.apache.inlong.sort.protocol.node.LoadNode; -import org.apache.inlong.sort.protocol.transformation.FieldRelation; -import org.apache.inlong.sort.protocol.transformation.FilterFunction; +import org.apache.inlong.sort.protocol.node.ExtractNode; +import org.apache.inlong.sort.protocol.transformation.WatermarkField; -import com.google.common.base.Preconditions; import lombok.Data; import lombok.EqualsAndHashCode; -import lombok.NoArgsConstructor; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName; @@ -41,13 +35,14 @@ import java.io.Serializable; import java.util.List; import java.util.Map; -@JsonTypeName("icebergLoad") -@Data -@NoArgsConstructor +/** + * Iceberg extract node for extract data from iceberg + */ @EqualsAndHashCode(callSuper = true) -public class IcebergLoadNode extends LoadNode implements InlongMetric, Serializable { - - private static final long serialVersionUID = -1L; +@JsonTypeName("icebergExtract") +@JsonInclude(JsonInclude.Include.NON_NULL) +@Data +public class IcebergExtracNode extends ExtractNode implements Serializable { @JsonProperty("tableName") @Nonnull @@ -57,67 +52,77 @@ public class IcebergLoadNode extends LoadNode implements InlongMetric, Serializa @Nonnull private String dbName; - @JsonProperty("primaryKey") - private String primaryKey; - @JsonProperty("catalogType") private IcebergConstant.CatalogType catalogType; + @Nullable @JsonProperty("uri") private String uri; @JsonProperty("warehouse") private String warehouse; - @JsonCreator - public IcebergLoadNode(@JsonProperty("id") String id, - @JsonProperty("name") String name, - @JsonProperty("fields") List<FieldInfo> fields, - @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations, - @JsonProperty("filters") List<FilterFunction> filters, - @JsonProperty("filterStrategy") FilterStrategy filterStrategy, - @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism, - @JsonProperty("properties") Map<String, String> properties, + @JsonProperty("catalogName") + private String catalogName; + + @JsonProperty("primaryKey") + private String primaryKey; + + @JsonProperty("startSnapShotId") + @Nullable + private Long startSnapShotId; + + public IcebergExtracNode( + @Nonnull @JsonProperty("id") String id, + @Nonnull @JsonProperty("name") String name, + @Nonnull @JsonProperty("fields") List<FieldInfo> fields, + @Nullable @JsonProperty("watermarkField") WatermarkField watermarkField, + @Nullable @JsonProperty("uri") String uri, + @Nullable @JsonProperty("warehouse") String warehouse, @Nonnull @JsonProperty("dbName") String dbName, @Nonnull @JsonProperty("tableName") String tableName, - @JsonProperty("primaryKey") String primaryKey, @JsonProperty("catalogType") IcebergConstant.CatalogType catalogType, - @JsonProperty("uri") String uri, - @JsonProperty("warehouse") String warehouse) { - super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties); - this.tableName = Preconditions.checkNotNull(tableName, "table name is null"); - this.dbName = Preconditions.checkNotNull(dbName, "db name is null"); - this.primaryKey = primaryKey; - this.catalogType = catalogType == null ? CatalogType.HIVE : catalogType; + @Nullable @JsonProperty("catalogName") String catalogName, + @JsonProperty("primaryKey") String primaryKey, + @Nullable @JsonProperty("startSnapShotId") Long startSnapShotId, + @Nullable @JsonProperty("properties") Map<String, String> properties) { + super(id, name, fields, watermarkField, properties); this.uri = uri; this.warehouse = warehouse; + this.dbName = dbName; + this.tableName = tableName; + this.catalogName = catalogName == null ? IcebergConstant.DEFAULT_CATALOG_NAME : catalogName; + this.primaryKey = primaryKey; + this.startSnapShotId = startSnapShotId; + this.catalogType = catalogType == null ? IcebergConstant.CatalogType.HIVE : catalogType; + } + + @Override + public String genTableName() { + return String.format("iceberg_table_%s", getId()); } @Override public Map<String, String> tableOptions() { Map<String, String> options = super.tableOptions(); - options.put("connector", "iceberg-inlong"); - // for test sink.ignore.changelog - // options.put("sink.ignore.changelog", "true"); - options.put("catalog-database", dbName); - options.put("catalog-table", tableName); - options.put("default-database", dbName); - options.put("catalog-type", catalogType.name()); - options.put("catalog-name", catalogType.name()); + options.put(IcebergConstant.CONNECTOR_KEY, IcebergConstant.CONNECTOR); + options.put(IcebergConstant.DATABASE_KEY, dbName); + options.put(IcebergConstant.TABLE_KEY, tableName); + options.put(IcebergConstant.CATALOG_TYPE_KEY, catalogType.name()); + options.put(IcebergConstant.CATALOG_NAME_KEY, catalogName); + options.put(IcebergConstant.STREAMING, "true"); if (null != uri) { - options.put("uri", uri); + options.put(IcebergConstant.URI_KEY, uri); } if (null != warehouse) { - options.put("warehouse", warehouse); + options.put(IcebergConstant.WAREHOUSE_KEY, warehouse); + } + if (null != startSnapShotId) { + options.put(IcebergConstant.START_SNAPSHOT_ID, startSnapShotId.toString()); } return options; } - @Override - public String genTableName() { - return tableName; - } - @Override public String getPrimaryKey() { return primaryKey; diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java index 257290119a..4d90d99c5a 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java @@ -96,14 +96,14 @@ public class IcebergLoadNode extends LoadNode implements InlongMetric, Serializa @Override public Map<String, String> tableOptions() { Map<String, String> options = super.tableOptions(); - options.put("connector", "iceberg-inlong"); + options.put(IcebergConstant.CONNECTOR_KEY, IcebergConstant.CONNECTOR); // for test sink.ignore.changelog // options.put("sink.ignore.changelog", "true"); - options.put("catalog-database", dbName); - options.put("catalog-table", tableName); - options.put("default-database", dbName); - options.put("catalog-type", catalogType.name()); - options.put("catalog-name", catalogType.name()); + options.put(IcebergConstant.DATABASE_KEY, dbName); + options.put(IcebergConstant.TABLE_KEY, tableName); + options.put(IcebergConstant.DEFAULT_DATABASE_KEY, dbName); + options.put(IcebergConstant.CATALOG_TYPE_KEY, catalogType.name()); + options.put(IcebergConstant.CATALOG_NAME_KEY, catalogType.name()); if (null != uri) { options.put("uri", uri); } diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/Iceberg2StarRocksSqlParserTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/Iceberg2StarRocksSqlParserTest.java new file mode 100644 index 0000000000..5ec917debe --- /dev/null +++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/Iceberg2StarRocksSqlParserTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.parser; + +import org.apache.inlong.sort.formats.common.LongFormatInfo; +import org.apache.inlong.sort.formats.common.StringFormatInfo; +import org.apache.inlong.sort.parser.impl.FlinkSqlParser; +import org.apache.inlong.sort.parser.result.FlinkSqlParseResult; +import org.apache.inlong.sort.protocol.FieldInfo; +import org.apache.inlong.sort.protocol.GroupInfo; +import org.apache.inlong.sort.protocol.StreamInfo; +import org.apache.inlong.sort.protocol.constant.IcebergConstant; +import org.apache.inlong.sort.protocol.node.Node; +import org.apache.inlong.sort.protocol.node.extract.IcebergExtracNode; +import org.apache.inlong.sort.protocol.node.load.StarRocksLoadNode; +import org.apache.inlong.sort.protocol.transformation.FieldRelation; +import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.test.util.AbstractTestBase; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class Iceberg2StarRocksSqlParserTest extends AbstractTestBase { + + private String groupId = "b_test_wk_0801"; + private String streamId = "b_test_wkstream_0801"; + + // iceberg + private String uri = ""; + private String icDatabase = ""; + private String icTable = ""; + private String catalogName = "HIVE"; + private String warehouse = ""; + + // starrocks + private String user = ""; + private String password = ""; + private String jdbc = ""; + private String srDatabase = ""; + private String srTable = ""; + private String primaryKey = "id"; + private String loadUrl = ""; + + private List<FieldInfo> fields() { + return Arrays.asList( + new FieldInfo("id", new LongFormatInfo()), + new FieldInfo("name", new StringFormatInfo()), + new FieldInfo("source", new StringFormatInfo()), + new FieldInfo("count", new LongFormatInfo()), + new FieldInfo("remark", new StringFormatInfo()), + new FieldInfo("send_time", new StringFormatInfo())); + } + + private List<FieldRelation> relations() { + return fields().stream() + .map(info -> new FieldRelation(info, info)) + .collect(Collectors.toList()); + } + + private IcebergExtracNode buildIcebergExtracNode(String id) { + + return new IcebergExtracNode(id, "iceberg-source", fields(), null, uri, + warehouse, icDatabase, icTable, IcebergConstant.CatalogType.HIVE, catalogName, + null, null, null); + + } + + private StarRocksLoadNode buildStarRocksLoadNode(String id) { + + Map<String, String> properties = new HashMap<>(); + properties.put("sink.properties.format", "json"); + properties.put("sink.properties.strip_outer_array", "true"); + return new StarRocksLoadNode(id, "sink", fields(), relations(), null, null, + 1, properties, jdbc, loadUrl, user, password, srDatabase, + srTable, primaryKey, null, null, null, null); + } + + private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) { + List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList()); + List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList()); + return new NodeRelation(inputIds, outputIds); + } + + @Test + public void testIceberg() throws Exception { + EnvironmentSettings settings = EnvironmentSettings + .newInstance() + .inStreamingMode() + .build(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.enableCheckpointing(10000); + StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); + Node inputNode = buildIcebergExtracNode("1"); + Node outputNode = buildStarRocksLoadNode("2"); + StreamInfo streamInfo = new StreamInfo(streamId, Arrays.asList(inputNode, outputNode), + Arrays.asList(buildNodeRelation(Collections.singletonList(inputNode), + Collections.singletonList(outputNode)))); + GroupInfo groupInfo = new GroupInfo(groupId, Collections.singletonList(streamInfo)); + + ObjectMapper objectMapper = new ObjectMapper(); + System.out.println(objectMapper.writeValueAsString(groupInfo)); + FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo); + FlinkSqlParseResult result = (FlinkSqlParseResult) parser.parse(); + Assert.assertTrue(!result.getLoadSqls().isEmpty() && !result.getCreateTableSqls().isEmpty()); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.13/pom.xml index adbaa7ae0b..83784f0290 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.13/pom.xml @@ -57,6 +57,7 @@ <hudi.version>0.12.3</hudi.version> <sqlserver.jdbc.version>7.2.2.jre8</sqlserver.jdbc.version> <thrift.version>0.9.3</thrift.version> + <flink.iceberg.version>1.1.0</flink.iceberg.version> </properties> <dependencyManagement> @@ -148,7 +149,7 @@ <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-flink-runtime-1.14</artifactId> - <version>${iceberg.version}</version> + <version>${flink.iceberg.version}</version> </dependency> <!-- flink --> diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/pom.xml index 0bbbcefe0d..2e039fedcd 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/pom.xml @@ -143,11 +143,6 @@ <artifactId>mssql-jdbc</artifactId> <version>${sqlserver.jdbc.version}</version> </dependency> - <dependency> - <groupId>org.apache.iceberg</groupId> - <artifactId>iceberg-flink-runtime-1.14</artifactId> - <version>${iceberg.version}</version> - </dependency> <!-- flink --> <dependency> diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/pom.xml new file mode 100644 index 0000000000..c481dd8de6 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/pom.xml @@ -0,0 +1,139 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-connectors-v1.15</artifactId> + <version>1.9.0-SNAPSHOT</version> + </parent> + + <artifactId>sort-connector-iceberg-v1.15</artifactId> + <packaging>jar</packaging> + <name>Apache InLong - Sort-connector-iceberg</name> + + <properties> + <iceberg-connector.version>1.3.1</iceberg-connector.version> + <flink.iceberg.version>1.15</flink.iceberg.version> + <inlong.root.dir>${project.parent.parent.parent.parent.parent.basedir}</inlong.root.dir> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-connector-base</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-common</artifactId> + <version>${project.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.iceberg</groupId> + <artifactId>iceberg-flink-runtime-${flink.iceberg.version}</artifactId> + <version>${iceberg-connector.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.iceberg</groupId> + <artifactId>iceberg-hive-runtime</artifactId> + <version>${iceberg-connector.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + <version>${hive3x.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-hadoop</artifactId> + <version>${parquet.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.iceberg</groupId> + <artifactId>iceberg-hive-metastore</artifactId> + <scope>compile</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>${plugin.shade.version}</version> + + <executions> + <execution> + <id>shade-flink</id> + <goals> + <goal>shade</goal> + </goals> + <phase>package</phase> + + <configuration> + + <artifactSet> + <includes> + <include>org.apache.inlong:*</include> + <include>com.google.protobuf:protobuf-java</include> + <include>org.apache.kafka:*</include> + <include>com.fasterxml.*:*</include> + <include>org.apache.iceberg:*</include> + <include>org.apache.hive:*</include> + <!-- Include fixed version 18.0-13.0 of flink shaded guava --> + <include>org.apache.flink:flink-shaded-guava</include> + <include>com.google.protobuf:*</include> + <include>org.apache.thrift:*</include> + <include>com.facebook.*:*</include> + </includes> + </artifactSet> + + <filters> + <filter> + <artifact>org.apache.inlong:sort-connector-*</artifact> + <includes> + <include>org/apache/inlong/**</include> + <include>META-INF/services/org.apache.flink.table.factories.Factory</include> + <include>META-INF/services/org.apache.flink.table.factories.TableFactory</include> + </includes> + </filter> + </filters> + <relocations> + <relocation> + <pattern>org.apache.inlong.sort.base</pattern> + <shadedPattern>org.apache.inlong.sort.iceberg.shaded.org.apache.inlong.sort.base</shadedPattern> + </relocation> + </relocations> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java new file mode 100644 index 0000000000..ba4298d9a1 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java @@ -0,0 +1,812 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.iceberg; + +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogTableImpl; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.factories.Factory; +import org.apache.flink.util.StringUtils; +import org.apache.iceberg.CachingCatalog; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.flink.CatalogLoader; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.base.Splitter; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * A Flink Catalog implementation that wraps an Iceberg {@link Catalog}. + * + * <p>The mapping between Flink database and Iceberg namespace: Supplying a base namespace for a + * given catalog, so if you have a catalog that supports a 2-level namespace, you would supply the + * first level in the catalog configuration and the second level would be exposed as Flink + * databases. + * + * <p>The Iceberg table manages its partitions by itself. The partition of the Iceberg table is + * independent of the partition of Flink. + * + * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1 + */ +public class FlinkCatalog extends AbstractCatalog { + + private final CatalogLoader catalogLoader; + private final Catalog icebergCatalog; + private final Namespace baseNamespace; + private final SupportsNamespaces asNamespaceCatalog; + private final Closeable closeable; + private final boolean cacheEnabled; + + public FlinkCatalog( + String catalogName, + String defaultDatabase, + Namespace baseNamespace, + CatalogLoader catalogLoader, + boolean cacheEnabled, + long cacheExpirationIntervalMs) { + super(catalogName, defaultDatabase); + this.catalogLoader = catalogLoader; + this.baseNamespace = baseNamespace; + this.cacheEnabled = cacheEnabled; + + Catalog originalCatalog = catalogLoader.loadCatalog(); + icebergCatalog = + cacheEnabled + ? CachingCatalog.wrap(originalCatalog, cacheExpirationIntervalMs) + : originalCatalog; + asNamespaceCatalog = + originalCatalog instanceof SupportsNamespaces ? (SupportsNamespaces) originalCatalog : null; + closeable = originalCatalog instanceof Closeable ? (Closeable) originalCatalog : null; + + FlinkEnvironmentContext.init(); + } + + @Override + public void open() throws CatalogException { + } + + @Override + public void close() throws CatalogException { + if (closeable != null) { + try { + closeable.close(); + } catch (IOException e) { + throw new CatalogException(e); + } + } + } + + public Catalog catalog() { + return icebergCatalog; + } + + /** Append a new level to the base namespace */ + private static Namespace appendLevel(Namespace baseNamespace, String newLevel) { + String[] namespace = new String[baseNamespace.levels().length + 1]; + System.arraycopy(baseNamespace.levels(), 0, namespace, 0, baseNamespace.levels().length); + namespace[baseNamespace.levels().length] = newLevel; + return Namespace.of(namespace); + } + + TableIdentifier toIdentifier(ObjectPath path) { + String objectName = path.getObjectName(); + List<String> tableName = Splitter.on('$').splitToList(objectName); + + if (tableName.size() == 1) { + return TableIdentifier.of( + appendLevel(baseNamespace, path.getDatabaseName()), path.getObjectName()); + } else if (tableName.size() == 2 && MetadataTableType.from(tableName.get(1)) != null) { + return TableIdentifier.of( + appendLevel(appendLevel(baseNamespace, path.getDatabaseName()), tableName.get(0)), + tableName.get(1)); + } else { + throw new IllegalArgumentException("Illegal table name:" + objectName); + } + } + + @Override + public List<String> listDatabases() throws CatalogException { + if (asNamespaceCatalog == null) { + return Collections.singletonList(getDefaultDatabase()); + } + + return asNamespaceCatalog.listNamespaces(baseNamespace).stream() + .map(n -> n.level(n.levels().length - 1)) + .collect(Collectors.toList()); + } + + @Override + public CatalogDatabase getDatabase(String databaseName) + throws DatabaseNotExistException, CatalogException { + if (asNamespaceCatalog == null) { + if (!getDefaultDatabase().equals(databaseName)) { + throw new DatabaseNotExistException(getName(), databaseName); + } else { + return new CatalogDatabaseImpl(Maps.newHashMap(), ""); + } + } else { + try { + Map<String, String> metadata = + Maps.newHashMap( + asNamespaceCatalog.loadNamespaceMetadata(appendLevel(baseNamespace, databaseName))); + String comment = metadata.remove("comment"); + return new CatalogDatabaseImpl(metadata, comment); + } catch (NoSuchNamespaceException e) { + throw new DatabaseNotExistException(getName(), databaseName, e); + } + } + } + + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + try { + getDatabase(databaseName); + return true; + } catch (DatabaseNotExistException ignore) { + return false; + } + } + + @Override + public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + createDatabase( + name, mergeComment(database.getProperties(), database.getComment()), ignoreIfExists); + } + + private void createDatabase( + String databaseName, Map<String, String> metadata, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + if (asNamespaceCatalog != null) { + try { + asNamespaceCatalog.createNamespace(appendLevel(baseNamespace, databaseName), metadata); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new DatabaseAlreadyExistException(getName(), databaseName, e); + } + } + } else { + throw new UnsupportedOperationException( + "Namespaces are not supported by catalog: " + getName()); + } + } + + private Map<String, String> mergeComment(Map<String, String> metadata, String comment) { + Map<String, String> ret = Maps.newHashMap(metadata); + if (metadata.containsKey("comment")) { + throw new CatalogException("Database properties should not contain key: 'comment'."); + } + + if (!StringUtils.isNullOrWhitespaceOnly(comment)) { + ret.put("comment", comment); + } + return ret; + } + + @Override + public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException { + if (asNamespaceCatalog != null) { + try { + boolean success = asNamespaceCatalog.dropNamespace(appendLevel(baseNamespace, name)); + if (!success && !ignoreIfNotExists) { + throw new DatabaseNotExistException(getName(), name); + } + } catch (NoSuchNamespaceException e) { + if (!ignoreIfNotExists) { + throw new DatabaseNotExistException(getName(), name, e); + } + } catch (NamespaceNotEmptyException e) { + throw new DatabaseNotEmptyException(getName(), name, e); + } + } else { + if (!ignoreIfNotExists) { + throw new DatabaseNotExistException(getName(), name); + } + } + } + + @Override + public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) + throws DatabaseNotExistException, CatalogException { + if (asNamespaceCatalog != null) { + Namespace namespace = appendLevel(baseNamespace, name); + Map<String, String> updates = Maps.newHashMap(); + Set<String> removals = Sets.newHashSet(); + + try { + Map<String, String> oldProperties = asNamespaceCatalog.loadNamespaceMetadata(namespace); + Map<String, String> newProperties = + mergeComment(newDatabase.getProperties(), newDatabase.getComment()); + + for (String key : oldProperties.keySet()) { + if (!newProperties.containsKey(key)) { + removals.add(key); + } + } + + for (Map.Entry<String, String> entry : newProperties.entrySet()) { + if (!entry.getValue().equals(oldProperties.get(entry.getKey()))) { + updates.put(entry.getKey(), entry.getValue()); + } + } + + if (!updates.isEmpty()) { + asNamespaceCatalog.setProperties(namespace, updates); + } + + if (!removals.isEmpty()) { + asNamespaceCatalog.removeProperties(namespace, removals); + } + + } catch (NoSuchNamespaceException e) { + if (!ignoreIfNotExists) { + throw new DatabaseNotExistException(getName(), name, e); + } + } + } else { + if (getDefaultDatabase().equals(name)) { + throw new CatalogException( + "Can not alter the default database when the iceberg catalog doesn't support namespaces."); + } + if (!ignoreIfNotExists) { + throw new DatabaseNotExistException(getName(), name); + } + } + } + + @Override + public List<String> listTables(String databaseName) + throws DatabaseNotExistException, CatalogException { + try { + return icebergCatalog.listTables(appendLevel(baseNamespace, databaseName)).stream() + .map(TableIdentifier::name) + .collect(Collectors.toList()); + } catch (NoSuchNamespaceException e) { + throw new DatabaseNotExistException(getName(), databaseName, e); + } + } + + @Override + public CatalogTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + Table table = loadIcebergTable(tablePath); + return toCatalogTable(table); + } + + private Table loadIcebergTable(ObjectPath tablePath) throws TableNotExistException { + try { + Table table = icebergCatalog.loadTable(toIdentifier(tablePath)); + if (cacheEnabled) { + table.refresh(); + } + + return table; + } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { + throw new TableNotExistException(getName(), tablePath, e); + } + } + + @Override + public boolean tableExists(ObjectPath tablePath) throws CatalogException { + return icebergCatalog.tableExists(toIdentifier(tablePath)); + } + + @Override + public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + try { + icebergCatalog.dropTable(toIdentifier(tablePath)); + } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { + if (!ignoreIfNotExists) { + throw new TableNotExistException(getName(), tablePath, e); + } + } + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + try { + icebergCatalog.renameTable( + toIdentifier(tablePath), + toIdentifier(new ObjectPath(tablePath.getDatabaseName(), newTableName))); + } catch (NoSuchTableException e) { + if (!ignoreIfNotExists) { + throw new TableNotExistException(getName(), tablePath, e); + } + } catch (AlreadyExistsException e) { + throw new TableAlreadyExistException(getName(), tablePath, e); + } + } + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws CatalogException, TableAlreadyExistException { + if (Objects.equals( + table.getOptions().get("connector"), FlinkDynamicTableFactory.FACTORY_IDENTIFIER)) { + throw new IllegalArgumentException( + "Cannot create the table with 'connector'='iceberg' table property in " + + "an iceberg catalog, Please create table with 'connector'='iceberg' property in a non-iceberg catalog or " + + "create table without 'connector'='iceberg' related properties in an iceberg table."); + } + + createIcebergTable(tablePath, table, ignoreIfExists); + } + + void createIcebergTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws CatalogException, TableAlreadyExistException { + validateFlinkTable(table); + + Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema()); + PartitionSpec spec = toPartitionSpec(((CatalogTable) table).getPartitionKeys(), icebergSchema); + + ImmutableMap.Builder<String, String> properties = ImmutableMap.builder(); + String location = null; + for (Map.Entry<String, String> entry : table.getOptions().entrySet()) { + if ("location".equalsIgnoreCase(entry.getKey())) { + location = entry.getValue(); + } else { + properties.put(entry.getKey(), entry.getValue()); + } + } + + try { + icebergCatalog.createTable( + toIdentifier(tablePath), icebergSchema, spec, location, properties.build()); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new TableAlreadyExistException(getName(), tablePath, e); + } + } + } + + private static void validateTableSchemaAndPartition(CatalogTable ct1, CatalogTable ct2) { + TableSchema ts1 = ct1.getSchema(); + TableSchema ts2 = ct2.getSchema(); + boolean equalsPrimary = false; + + if (ts1.getPrimaryKey().isPresent() && ts2.getPrimaryKey().isPresent()) { + equalsPrimary = + Objects.equals(ts1.getPrimaryKey().get().getType(), ts2.getPrimaryKey().get().getType()) + && Objects.equals( + ts1.getPrimaryKey().get().getColumns(), ts2.getPrimaryKey().get().getColumns()); + } else if (!ts1.getPrimaryKey().isPresent() && !ts2.getPrimaryKey().isPresent()) { + equalsPrimary = true; + } + + if (!(Objects.equals(ts1.getTableColumns(), ts2.getTableColumns()) + && Objects.equals(ts1.getWatermarkSpecs(), ts2.getWatermarkSpecs()) + && equalsPrimary)) { + throw new UnsupportedOperationException("Altering schema is not supported yet."); + } + + if (!ct1.getPartitionKeys().equals(ct2.getPartitionKeys())) { + throw new UnsupportedOperationException("Altering partition keys is not supported yet."); + } + } + + @Override + public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) + throws CatalogException, TableNotExistException { + validateFlinkTable(newTable); + + Table icebergTable; + try { + icebergTable = loadIcebergTable(tablePath); + } catch (TableNotExistException e) { + if (!ignoreIfNotExists) { + throw e; + } else { + return; + } + } + + CatalogTable table = toCatalogTable(icebergTable); + + // Currently, Flink SQL only support altering table properties. + + // For current Flink Catalog API, support for adding/removing/renaming columns cannot be done by + // comparing + // CatalogTable instances, unless the Flink schema contains Iceberg column IDs. + validateTableSchemaAndPartition(table, (CatalogTable) newTable); + + Map<String, String> oldProperties = table.getOptions(); + Map<String, String> setProperties = Maps.newHashMap(); + + String setLocation = null; + String setSnapshotId = null; + String pickSnapshotId = null; + + for (Map.Entry<String, String> entry : newTable.getOptions().entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + + if (Objects.equals(value, oldProperties.get(key))) { + continue; + } + + if ("location".equalsIgnoreCase(key)) { + setLocation = value; + } else if ("current-snapshot-id".equalsIgnoreCase(key)) { + setSnapshotId = value; + } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(key)) { + pickSnapshotId = value; + } else { + setProperties.put(key, value); + } + } + + oldProperties + .keySet() + .forEach( + k -> { + if (!newTable.getOptions().containsKey(k)) { + setProperties.put(k, null); + } + }); + + commitChanges(icebergTable, setLocation, setSnapshotId, pickSnapshotId, setProperties); + } + + private static void validateFlinkTable(CatalogBaseTable table) { + Preconditions.checkArgument( + table instanceof CatalogTable, "The Table should be a CatalogTable."); + + TableSchema schema = table.getSchema(); + schema + .getTableColumns() + .forEach( + column -> { + if (!FlinkCompatibilityUtil.isPhysicalColumn(column)) { + throw new UnsupportedOperationException( + "Creating table with computed columns is not supported yet."); + } + }); + + if (!schema.getWatermarkSpecs().isEmpty()) { + throw new UnsupportedOperationException( + "Creating table with watermark specs is not supported yet."); + } + } + + private static PartitionSpec toPartitionSpec(List<String> partitionKeys, Schema icebergSchema) { + PartitionSpec.Builder builder = PartitionSpec.builderFor(icebergSchema); + partitionKeys.forEach(builder::identity); + return builder.build(); + } + + private static List<String> toPartitionKeys(PartitionSpec spec, Schema icebergSchema) { + ImmutableList.Builder<String> partitionKeysBuilder = ImmutableList.builder(); + for (PartitionField field : spec.fields()) { + if (field.transform().isIdentity()) { + partitionKeysBuilder.add(icebergSchema.findColumnName(field.sourceId())); + } else { + // Not created by Flink SQL. + // For compatibility with iceberg tables, return empty. + // TODO modify this after Flink support partition transform. + return Collections.emptyList(); + } + } + return partitionKeysBuilder.build(); + } + + private static void commitChanges( + Table table, + String setLocation, + String setSnapshotId, + String pickSnapshotId, + Map<String, String> setProperties) { + // don't allow setting the snapshot and picking a commit at the same time because order is + // ambiguous and choosing + // one order leads to different results + Preconditions.checkArgument( + setSnapshotId == null || pickSnapshotId == null, + "Cannot set the current snapshot ID and cherry-pick snapshot changes"); + + if (setSnapshotId != null) { + long newSnapshotId = Long.parseLong(setSnapshotId); + table.manageSnapshots().setCurrentSnapshot(newSnapshotId).commit(); + } + + // if updating the table snapshot, perform that update first in case it fails + if (pickSnapshotId != null) { + long newSnapshotId = Long.parseLong(pickSnapshotId); + table.manageSnapshots().cherrypick(newSnapshotId).commit(); + } + + Transaction transaction = table.newTransaction(); + + if (setLocation != null) { + transaction.updateLocation().setLocation(setLocation).commit(); + } + + if (!setProperties.isEmpty()) { + UpdateProperties updateProperties = transaction.updateProperties(); + setProperties.forEach( + (k, v) -> { + if (v == null) { + updateProperties.remove(k); + } else { + updateProperties.set(k, v); + } + }); + updateProperties.commit(); + } + + transaction.commitTransaction(); + } + + static CatalogTable toCatalogTable(Table table) { + TableSchema schema = FlinkSchemaUtil.toSchema(table.schema()); + List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema()); + + // NOTE: We can not create a IcebergCatalogTable extends CatalogTable, because Flink optimizer + // may use + // CatalogTableImpl to copy a new catalog table. + // Let's re-loading table from Iceberg catalog when creating source/sink operators. + // Iceberg does not have Table comment, so pass a null (Default comment value in Flink). + return new CatalogTableImpl(schema, partitionKeys, table.properties(), null); + } + + @Override + public Optional<Factory> getFactory() { + return Optional.of(new FlinkDynamicTableFactory(this)); + } + + CatalogLoader getCatalogLoader() { + return catalogLoader; + } + + // ------------------------------ Unsupported methods + // --------------------------------------------- + + @Override + public List<String> listViews(String databaseName) throws CatalogException { + return Collections.emptyList(); + } + + @Override + public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void createPartition( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogPartition partition, + boolean ignoreIfExists) + throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropPartition( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) + throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterPartition( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogPartition newPartition, + boolean ignoreIfNotExists) + throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List<String> listFunctions(String dbName) throws CatalogException { + return Collections.emptyList(); + } + + @Override + public CatalogFunction getFunction(ObjectPath functionPath) + throws FunctionNotExistException, CatalogException { + throw new FunctionNotExistException(getName(), functionPath); + } + + @Override + public boolean functionExists(ObjectPath functionPath) throws CatalogException { + return false; + } + + @Override + public void createFunction( + ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists) + throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterFunction( + ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists) + throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) + throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterTableStatistics( + ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) + throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterTableColumnStatistics( + ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) + throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterPartitionStatistics( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogTableStatistics partitionStatistics, + boolean ignoreIfNotExists) + throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterPartitionColumnStatistics( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogColumnStatistics columnStatistics, + boolean ignoreIfNotExists) + throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath) + throws TableNotExistException, TableNotPartitionedException, CatalogException { + Table table = loadIcebergTable(tablePath); + + if (table.spec().isUnpartitioned()) { + throw new TableNotPartitionedException(icebergCatalog.name(), tablePath); + } + + Set<CatalogPartitionSpec> set = Sets.newHashSet(); + try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) { + for (DataFile dataFile : CloseableIterable.transform(tasks, FileScanTask::file)) { + Map<String, String> map = Maps.newHashMap(); + StructLike structLike = dataFile.partition(); + PartitionSpec spec = table.specs().get(dataFile.specId()); + for (int i = 0; i < structLike.size(); i++) { + map.put(spec.fields().get(i).name(), String.valueOf(structLike.get(i, Object.class))); + } + set.add(new CatalogPartitionSpec(map)); + } + } catch (IOException e) { + throw new CatalogException( + String.format("Failed to list partitions of table %s", tablePath), e); + } + + return Lists.newArrayList(set); + } + + @Override + public List<CatalogPartitionSpec> listPartitions( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List<CatalogPartitionSpec> listPartitionsByFilter( + ObjectPath tablePath, List<Expression> filters) throws CatalogException { + throw new UnsupportedOperationException(); + } + + // After partition pruning and filter push down, the statistics have become very inaccurate, so + // the statistics from + // here are of little significance. + // Flink will support something like SupportsReportStatistics in future. + + @Override + public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws CatalogException { + return CatalogTableStatistics.UNKNOWN; + } + + @Override + public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) + throws CatalogException { + return CatalogColumnStatistics.UNKNOWN; + } + + @Override + public CatalogTableStatistics getPartitionStatistics( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException { + return CatalogTableStatistics.UNKNOWN; + } + + @Override + public CatalogColumnStatistics getPartitionColumnStatistics( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException { + return CatalogColumnStatistics.UNKNOWN; + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java new file mode 100644 index 0000000000..4adf4a3ed8 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.iceberg; + +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.runtime.util.HadoopUtils; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.factories.CatalogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.flink.CatalogLoader; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.base.Strings; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.PropertyUtil; + +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +/** + * A Flink Catalog factory implementation that creates {@link org.apache.iceberg.flink.FlinkCatalog}. + * + * <p>This supports the following catalog configuration options: + * + * <ul> + * <li><code>type</code> - Flink catalog factory key, should be "iceberg" + * <li><code>catalog-type</code> - iceberg catalog type, "hive", "hadoop" or "rest" + * <li><code>uri</code> - the Hive Metastore URI (Hive catalog only) + * <li><code>clients</code> - the Hive Client Pool Size (Hive catalog only) + * <li><code>warehouse</code> - the warehouse path (Hadoop catalog only) + * <li><code>default-database</code> - a database name to use as the default + * <li><code>base-namespace</code> - a base namespace as the prefix for all databases (Hadoop + * catalog only) + * <li><code>cache-enabled</code> - whether to enable catalog cache + * </ul> + * + * <p>To use a custom catalog that is not a Hive or Hadoop catalog, extend this class and override + * {@link #createCatalogLoader(String, Map, Configuration)}. + * + * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1 + */ +public class FlinkCatalogFactory implements CatalogFactory { + + // Can not just use "type", it conflicts with CATALOG_TYPE. + public static final String ICEBERG_CATALOG_TYPE = "catalog-type"; + public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop"; + public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive"; + public static final String ICEBERG_CATALOG_TYPE_REST = "rest"; + + public static final String HIVE_CONF_DIR = "hive-conf-dir"; + public static final String HADOOP_CONF_DIR = "hadoop-conf-dir"; + public static final String DEFAULT_DATABASE = "default-database"; + public static final String DEFAULT_DATABASE_NAME = "default"; + public static final String BASE_NAMESPACE = "base-namespace"; + + public static final String TYPE = "type"; + public static final String PROPERTY_VERSION = "property-version"; + + /** + * Create an Iceberg {@link org.apache.iceberg.catalog.Catalog} loader to be used by this Flink + * catalog adapter. + * + * @param name Flink's catalog name + * @param properties Flink's catalog properties + * @param hadoopConf Hadoop configuration for catalog + * @return an Iceberg catalog loader + */ + static CatalogLoader createCatalogLoader( + String name, Map<String, String> properties, Configuration hadoopConf) { + String catalogImpl = properties.get(CatalogProperties.CATALOG_IMPL); + if (catalogImpl != null) { + String catalogType = properties.get(ICEBERG_CATALOG_TYPE); + Preconditions.checkArgument( + catalogType == null, + "Cannot create catalog %s, both catalog-type and catalog-impl are set: catalog-type=%s, catalog-impl=%s", + name, + catalogType, + catalogImpl); + return CatalogLoader.custom(name, properties, hadoopConf, catalogImpl); + } + + String catalogType = properties.getOrDefault(ICEBERG_CATALOG_TYPE, ICEBERG_CATALOG_TYPE_HIVE); + switch (catalogType.toLowerCase(Locale.ENGLISH)) { + case ICEBERG_CATALOG_TYPE_HIVE: + // The values of properties 'uri', 'warehouse', 'hive-conf-dir' are allowed to be null, in + // that case it will + // fallback to parse those values from hadoop configuration which is loaded from classpath. + String hiveConfDir = properties.get(HIVE_CONF_DIR); + String hadoopConfDir = properties.get(HADOOP_CONF_DIR); + Configuration newHadoopConf = mergeHiveConf(hadoopConf, hiveConfDir, hadoopConfDir); + return CatalogLoader.hive(name, newHadoopConf, properties); + + case ICEBERG_CATALOG_TYPE_HADOOP: + return CatalogLoader.hadoop(name, hadoopConf, properties); + + case ICEBERG_CATALOG_TYPE_REST: + return CatalogLoader.rest(name, hadoopConf, properties); + + default: + throw new UnsupportedOperationException( + "Unknown catalog-type: " + catalogType + " (Must be 'hive', 'hadoop' or 'rest')"); + } + } + + @Override + public Map<String, String> requiredContext() { + Map<String, String> context = Maps.newHashMap(); + context.put(TYPE, "iceberg"); + context.put(PROPERTY_VERSION, "1"); + return context; + } + + @Override + public List<String> supportedProperties() { + return ImmutableList.of("*"); + } + + @Override + public Catalog createCatalog(String name, Map<String, String> properties) { + return createCatalog(name, properties, clusterHadoopConf()); + } + + protected Catalog createCatalog( + String name, Map<String, String> properties, Configuration hadoopConf) { + CatalogLoader catalogLoader = createCatalogLoader(name, properties, hadoopConf); + String defaultDatabase = properties.getOrDefault(DEFAULT_DATABASE, DEFAULT_DATABASE_NAME); + + Namespace baseNamespace = Namespace.empty(); + if (properties.containsKey(BASE_NAMESPACE)) { + baseNamespace = Namespace.of(properties.get(BASE_NAMESPACE).split("\\.")); + } + + boolean cacheEnabled = + PropertyUtil.propertyAsBoolean( + properties, CatalogProperties.CACHE_ENABLED, CatalogProperties.CACHE_ENABLED_DEFAULT); + + long cacheExpirationIntervalMs = + PropertyUtil.propertyAsLong( + properties, + CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS, + CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_OFF); + Preconditions.checkArgument( + cacheExpirationIntervalMs != 0, + "%s is not allowed to be 0.", + CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS); + + return new FlinkCatalog( + name, + defaultDatabase, + baseNamespace, + catalogLoader, + cacheEnabled, + cacheExpirationIntervalMs); + } + + private static Configuration mergeHiveConf( + Configuration hadoopConf, String hiveConfDir, String hadoopConfDir) { + Configuration newConf = new Configuration(hadoopConf); + if (!Strings.isNullOrEmpty(hiveConfDir)) { + Preconditions.checkState( + Files.exists(Paths.get(hiveConfDir, "hive-site.xml")), + "There should be a hive-site.xml file under the directory %s", + hiveConfDir); + newConf.addResource(new Path(hiveConfDir, "hive-site.xml")); + } else { + // If don't provide the hive-site.xml path explicitly, it will try to load resource from + // classpath. If still + // couldn't load the configuration file, then it will throw exception in HiveCatalog. + URL configFile = CatalogLoader.class.getClassLoader().getResource("hive-site.xml"); + if (configFile != null) { + newConf.addResource(configFile); + } + } + + if (!Strings.isNullOrEmpty(hadoopConfDir)) { + Preconditions.checkState( + Files.exists(Paths.get(hadoopConfDir, "hdfs-site.xml")), + "Failed to load Hadoop configuration: missing %s", + Paths.get(hadoopConfDir, "hdfs-site.xml")); + newConf.addResource(new Path(hadoopConfDir, "hdfs-site.xml")); + Preconditions.checkState( + Files.exists(Paths.get(hadoopConfDir, "core-site.xml")), + "Failed to load Hadoop configuration: missing %s", + Paths.get(hadoopConfDir, "core-site.xml")); + newConf.addResource(new Path(hadoopConfDir, "core-site.xml")); + } + + return newConf; + } + + public static Configuration clusterHadoopConf() { + return HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration()); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java new file mode 100644 index 0000000000..1edf546c23 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.iceberg; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.utils.TableSchemaUtils; +import org.apache.flink.util.Preconditions; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.flink.IcebergTableSink; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.source.IcebergTableSource; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; + +import java.util.Map; +import java.util.Set; + +/** + * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1 + */ +public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, DynamicTableSourceFactory { + + static final String FACTORY_IDENTIFIER = "iceberg-inlong"; + + private static final ConfigOption<String> CATALOG_NAME = + ConfigOptions.key("catalog-name") + .stringType() + .noDefaultValue() + .withDescription("Catalog name"); + + private static final ConfigOption<String> CATALOG_TYPE = + ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE) + .stringType() + .noDefaultValue() + .withDescription("Catalog type, the optional types are: custom, hadoop, hive."); + + private static final ConfigOption<String> CATALOG_DATABASE = + ConfigOptions.key("catalog-database") + .stringType() + .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME) + .withDescription("Database name managed in the iceberg catalog."); + + private static final ConfigOption<String> CATALOG_TABLE = + ConfigOptions.key("catalog-table") + .stringType() + .noDefaultValue() + .withDescription("Table name managed in the underlying iceberg catalog and database."); + + private final FlinkCatalog catalog; + + public FlinkDynamicTableFactory() { + this.catalog = null; + } + + public FlinkDynamicTableFactory(FlinkCatalog catalog) { + this.catalog = catalog; + } + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + + ObjectIdentifier objectIdentifier = context.getObjectIdentifier(); + CatalogTable catalogTable = context.getCatalogTable(); + Map<String, String> tableProps = catalogTable.getOptions(); + TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema()); + + TableLoader tableLoader = + createTableLoader( + catalogTable, + tableProps, + objectIdentifier.getDatabaseName(), + objectIdentifier.getObjectName()); + return new IcebergTableSource(tableLoader, tableSchema, tableProps, context.getConfiguration()); + } + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + ObjectPath objectPath = context.getObjectIdentifier().toObjectPath(); + CatalogTable catalogTable = context.getCatalogTable(); + Map<String, String> writeProps = catalogTable.getOptions(); + TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema()); + + TableLoader tableLoader; + if (catalog != null) { + tableLoader = createTableLoader(catalog, objectPath); + } else { + tableLoader = + createTableLoader( + catalogTable, writeProps, objectPath.getDatabaseName(), objectPath.getObjectName()); + } + return new IcebergTableSink(tableLoader, tableSchema, context.getConfiguration(), writeProps); + } + + private static TableLoader createTableLoader( + CatalogBaseTable catalogBaseTable, + Map<String, String> tableProps, + String databaseName, + String tableName) { + Configuration flinkConf = new Configuration(); + tableProps.forEach(flinkConf::setString); + + String catalogName = flinkConf.getString(CATALOG_NAME); + Preconditions.checkNotNull( + catalogName, "Table property '%s' cannot be null", CATALOG_NAME.key()); + + String catalogDatabase = flinkConf.getString(CATALOG_DATABASE, databaseName); + Preconditions.checkNotNull(catalogDatabase, "The iceberg database name cannot be null"); + + String catalogTable = flinkConf.getString(CATALOG_TABLE, tableName); + Preconditions.checkNotNull(catalogTable, "The iceberg table name cannot be null"); + + org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf(); + FlinkCatalogFactory factory = new FlinkCatalogFactory(); + FlinkCatalog flinkCatalog = + (FlinkCatalog) factory.createCatalog(catalogName, tableProps, hadoopConf); + ObjectPath objectPath = new ObjectPath(catalogDatabase, catalogTable); + + // Create database if not exists in the external catalog. + if (!flinkCatalog.databaseExists(catalogDatabase)) { + try { + flinkCatalog.createDatabase( + catalogDatabase, new CatalogDatabaseImpl(Maps.newHashMap(), null), true); + } catch (DatabaseAlreadyExistException e) { + throw new AlreadyExistsException( + e, + "Database %s already exists in the iceberg catalog %s.", + catalogName, + catalogDatabase); + } + } + + // Create table if not exists in the external catalog. + if (!flinkCatalog.tableExists(objectPath)) { + try { + flinkCatalog.createIcebergTable(objectPath, catalogBaseTable, true); + } catch (TableAlreadyExistException e) { + throw new AlreadyExistsException( + e, + "Table %s already exists in the database %s and catalog %s", + catalogTable, + catalogDatabase, + catalogName); + } + } + + return TableLoader.fromCatalog( + flinkCatalog.getCatalogLoader(), TableIdentifier.of(catalogDatabase, catalogTable)); + } + + @Override + public String factoryIdentifier() { + return FACTORY_IDENTIFIER; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + Set<ConfigOption<?>> options = Sets.newHashSet(); + options.add(CATALOG_TYPE); + options.add(CATALOG_NAME); + return options; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + Set<ConfigOption<?>> options = Sets.newHashSet(); + options.add(CATALOG_DATABASE); + options.add(CATALOG_TABLE); + return options; + } + + private static TableLoader createTableLoader(FlinkCatalog catalog, ObjectPath objectPath) { + Preconditions.checkNotNull(catalog, "Flink catalog cannot be null"); + return TableLoader.fromCatalog(catalog.getCatalogLoader(), catalog.toIdentifier(objectPath)); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkEnvironmentContext.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkEnvironmentContext.java new file mode 100644 index 0000000000..2a04b4ee7e --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkEnvironmentContext.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.iceberg; + +import org.apache.iceberg.EnvironmentContext; +import org.apache.iceberg.flink.util.FlinkPackage; + +/** + * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1 + */ +class FlinkEnvironmentContext { + + private FlinkEnvironmentContext() { + } + + public static void init() { + EnvironmentContext.put(EnvironmentContext.ENGINE_NAME, "flink"); + EnvironmentContext.put(EnvironmentContext.ENGINE_VERSION, FlinkPackage.version()); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000000..7d1e60eab4 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.inlong.sort.iceberg.FlinkDynamicTableFactory diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory new file mode 100644 index 0000000000..254f72875d --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.inlong.sort.iceberg.FlinkCatalogFactory diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml index e81c324c04..9f360dd169 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml @@ -34,6 +34,7 @@ <modules> <module>postgres-cdc</module> <module>starrocks</module> + <module>iceberg</module> </modules> <properties> diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE index cc6d1b407c..9a82ec18d8 100644 --- a/licenses/inlong-sort-connectors/LICENSE +++ b/licenses/inlong-sort-connectors/LICENSE @@ -738,6 +738,14 @@ Source : com.starrocks:flink-connector-starrocks:1.2.7_flink-1.15 (Please note that the software have been modified.) License : https://www.apache.org/licenses/LICENSE-2.0.txt + 1.3.17 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkEnvironmentContext.java + + Source : iceberg-flink:iceberg-flink-1.15:1.3.1 (Please note that the software have been modified.) + License : https://github.com/apache/iceberg/LICENSE + ======================================================================= Apache InLong Subcomponents: diff --git a/pom.xml b/pom.xml index 5e7c0d328f..ca8a551c21 100644 --- a/pom.xml +++ b/pom.xml @@ -62,7 +62,7 @@ <plugin.assembly.version>3.2.0</plugin.assembly.version> <plugin.surefire.version>3.0.0-M7</plugin.surefire.version> <plugin.failsafe.version>3.0.0-M7</plugin.failsafe.version> - <plugin.shade.version>3.2.4</plugin.shade.version> + <plugin.shade.version>3.4.0</plugin.shade.version> <plugin.maven.source>3.0.1</plugin.maven.source> <plugin.maven.jar.version>3.2.0</plugin.maven.jar.version> <exec.maven.version>1.6.0</exec.maven.version> @@ -153,7 +153,7 @@ <zookeeper.version>3.6.3</zookeeper.version> <pulsar.version>2.8.4</pulsar.version> <kafka.version>2.4.1</kafka.version> - <iceberg.version>1.1.0</iceberg.version> + <iceberg.version>1.3.1</iceberg.version> <flink.version.v1.13>1.13.5</flink.version.v1.13> <flink.version.v1.15>1.15.4</flink.version.v1.15> <flink.minor.version>1.13</flink.minor.version>