This is an automated email from the ASF dual-hosted git repository. pacinogong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 4c6a61b45 [INLONG-6784][Sort] Support apache hudi LoadNode (#6789) 4c6a61b45 is described below commit 4c6a61b45b29aa49658a86d8799600538f86156e Author: averyzhang <dearzhangzuof...@gmail.com> AuthorDate: Fri Dec 9 19:52:54 2022 +0800 [INLONG-6784][Sort] Support apache hudi LoadNode (#6789) --- .../apache/inlong/sort/protocol/node/LoadNode.java | 5 +- .../org/apache/inlong/sort/protocol/node/Node.java | 3 + .../sort/protocol/node/load/HudiLoadNode.java | 174 +++++++++++++++++++ .../sort/protocol/node/load/HudiLoadNodeTest.java | 57 +++++++ .../inlong/sort/parser/HudiNodeSqlParserTest.java | 186 +++++++++++++++++++++ 5 files changed, 424 insertions(+), 1 deletion(-) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java index 72cc42931..da8859e8c 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java @@ -36,6 +36,7 @@ import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode; import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode; import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode; import org.apache.inlong.sort.protocol.node.load.HiveLoadNode; +import org.apache.inlong.sort.protocol.node.load.HudiLoadNode; import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode; import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode; import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode; @@ -71,7 +72,9 @@ import java.util.Map; @JsonSubTypes.Type(value = GreenplumLoadNode.class, name = "greenplumLoad"), @JsonSubTypes.Type(value = DLCIcebergLoadNode.class, name = "dlcIcebergLoad"), @JsonSubTypes.Type(value = DorisLoadNode.class, name = "dorisLoad"), - @JsonSubTypes.Type(value = StarRocksLoadNode.class, name = "starRocksLoad") + @JsonSubTypes.Type(value = StarRocksLoadNode.class, name = "starRocksLoad"), + @JsonSubTypes.Type(value = DorisLoadNode.class, name = "dorisLoad"), + @JsonSubTypes.Type(value = HudiLoadNode.class, name = "hudiLoad"), }) @NoArgsConstructor @Data diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java index 7f37075e0..087a5a9a7 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java @@ -41,6 +41,7 @@ import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode; import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode; import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode; import org.apache.inlong.sort.protocol.node.load.HiveLoadNode; +import org.apache.inlong.sort.protocol.node.load.HudiLoadNode; import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode; import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode; import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode; @@ -90,6 +91,8 @@ import java.util.TreeMap; @JsonSubTypes.Type(value = GreenplumLoadNode.class, name = "greenplumLoad"), @JsonSubTypes.Type(value = DLCIcebergLoadNode.class, name = "dlcIcebergLoad"), @JsonSubTypes.Type(value = DorisLoadNode.class, name = "dorisLoad"), + @JsonSubTypes.Type(value = HudiLoadNode.class, name = "hudiLoad"), + @JsonSubTypes.Type(value = DorisLoadNode.class, name = "dorisLoad"), @JsonSubTypes.Type(value = StarRocksLoadNode.class, name = "starRocksLoad"), }) public interface Node { diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java new file mode 100644 index 000000000..86306dea0 --- /dev/null +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.protocol.node.load; + +import com.google.common.base.Preconditions; +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.inlong.sort.protocol.FieldInfo; +import org.apache.inlong.sort.protocol.InlongMetric; +import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType; +import org.apache.inlong.sort.protocol.enums.FilterStrategy; +import org.apache.inlong.sort.protocol.node.LoadNode; +import org.apache.inlong.sort.protocol.transformation.FieldRelation; +import org.apache.inlong.sort.protocol.transformation.FilterFunction; + +@JsonTypeName("hudiLoad") +@Data +@NoArgsConstructor +@EqualsAndHashCode(callSuper = true) +public class HudiLoadNode extends LoadNode implements InlongMetric, Serializable { + + private static final long serialVersionUID = -1L; + + private static final String HUDI_OPTION_HIVE_SYNC_ENABLED = "hive_sync.enabled"; + private static final String HUDI_OPTION_HIVE_SYNC_DB = "hive_sync.db"; + private static final String HUDI_OPTION_HIVE_SYNC_TABLE = "hive_sync.table"; + private static final String HUDI_OPTION_HIVE_SYNC_FILE_FORMAT = "hive_sync.file_format"; + private static final String HUDI_OPTION_HIVE_SYNC_MODE = "hive_sync.mode"; + private static final String HUDI_OPTION_HIVE_SYNC_MODE_HMS_VALUE = "hms"; + private static final String HUDI_OPTION_HIVE_SYNC_METASTORE_URIS = "hive_sync.metastore.uris"; + private static final String HUDI_OPTION_DEFAULT_PATH = "path"; + private static final String HUDI_OPTION_DATABASE_NAME = "hoodie.database.name"; + private static final String HUDI_OPTION_TABLE_NAME = "hoodie.table.name"; + private static final String HUDI_OPTION_RECORDKEY_FIELD_NAME = "hoodie.datasource.write.recordkey.field"; + private static final String HUDI_OPTION_PARTITIONPATH_FIELD_NAME = "hoodie.datasource.write.partitionpath.field"; + private static final String DDL_ATTRIBUTE_HUDI = "hudi."; + + @JsonProperty("tableName") + @Nonnull + private String tableName; + + @JsonProperty("dbName") + @Nonnull + private String dbName; + + @JsonProperty("primaryKey") + private String primaryKey; + + @JsonProperty("catalogType") + private CatalogType catalogType; + + @JsonProperty("uri") + private String uri; + + @JsonProperty("warehouse") + private String warehouse; + + @JsonProperty("extList") + private List<HashMap<String, String>> extList; + + @JsonProperty("partitionFields") + private List<FieldInfo> partitionFields; + + @JsonCreator + public HudiLoadNode( + @JsonProperty("id") String id, + @JsonProperty("name") String name, + @JsonProperty("fields") List<FieldInfo> fields, + @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations, + @JsonProperty("filters") List<FilterFunction> filters, + @JsonProperty("filterStrategy") FilterStrategy filterStrategy, + @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism, + @JsonProperty("properties") Map<String, String> properties, + @Nonnull @JsonProperty("dbName") String dbName, + @Nonnull @JsonProperty("tableName") String tableName, + @JsonProperty("primaryKey") String primaryKey, + @JsonProperty("catalogType") CatalogType catalogType, + @JsonProperty("uri") String uri, + @JsonProperty("warehouse") String warehouse, + @JsonProperty("extList") List<HashMap<String, String>> extList, + @JsonProperty("partitionFields") List<FieldInfo> partitionFields) { + super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties); + this.tableName = Preconditions.checkNotNull(tableName, "table name is null"); + this.dbName = Preconditions.checkNotNull(dbName, "db name is null"); + this.primaryKey = primaryKey; + this.catalogType = catalogType == null ? CatalogType.HIVE : catalogType; + this.uri = uri; + this.warehouse = warehouse; + this.extList = extList; + this.partitionFields = partitionFields; + } + + @Override + public Map<String, String> tableOptions() { + Map<String, String> options = super.tableOptions(); + + options.put(HUDI_OPTION_HIVE_SYNC_ENABLED, "true"); + options.put(HUDI_OPTION_HIVE_SYNC_MODE, HUDI_OPTION_HIVE_SYNC_MODE_HMS_VALUE); + options.put(HUDI_OPTION_HIVE_SYNC_DB, dbName); + options.put(HUDI_OPTION_HIVE_SYNC_TABLE, tableName); + options.put(HUDI_OPTION_HIVE_SYNC_METASTORE_URIS, uri); + + // partition field + if (partitionFields != null && !partitionFields.isEmpty()) { + String partitionKey = + partitionFields.stream() + .map(FieldInfo::getName) + .collect(Collectors.joining(",")); + options.put(HUDI_OPTION_PARTITIONPATH_FIELD_NAME, partitionKey); + } + + extList.forEach(ext -> { + String keyName = ext.get("keyName"); + if (StringUtils.isNoneBlank(keyName) && + keyName.startsWith(DDL_ATTRIBUTE_HUDI)) { + options.put(keyName, ext.get("keyValue")); + } + }); + + String path = String.format("%s/%s.db/%s", warehouse, dbName, tableName); + options.put(HUDI_OPTION_DEFAULT_PATH, path); + + options.put(HUDI_OPTION_DATABASE_NAME, dbName); + options.put(HUDI_OPTION_TABLE_NAME, tableName); + options.put(HUDI_OPTION_RECORDKEY_FIELD_NAME, primaryKey); + options.put("connector", "hudi-inlong"); + + return options; + } + + @Override + public String genTableName() { + return tableName; + } + + @Override + public String getPrimaryKey() { + return primaryKey; + } + + @Override + public List<FieldInfo> getPartitionFields() { + return super.getPartitionFields(); + } + +} diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java new file mode 100644 index 000000000..dabfd54ac --- /dev/null +++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.protocol.node.load; + +import com.google.common.collect.Maps; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Map; +import org.apache.inlong.sort.SerializeBaseTest; +import org.apache.inlong.sort.formats.common.StringFormatInfo; +import org.apache.inlong.sort.protocol.FieldInfo; +import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType; +import org.apache.inlong.sort.protocol.transformation.FieldRelation; + +/** + * Test for {@link HudiLoadNode} + */ +public class HudiLoadNodeTest extends SerializeBaseTest<HudiLoadNode> { + + @Override + public HudiLoadNode getTestObject() { + Map<String, Object> properties = Maps.newHashMap(); + return new HudiLoadNode("1", "test_hudi", + Collections.singletonList(new FieldInfo("id", new StringFormatInfo())), + Collections.singletonList(new FieldRelation(new FieldInfo("id", new StringFormatInfo()), + new FieldInfo("id", new StringFormatInfo()))), + null, + null, + 1, + null, + "test_db", + "test_table", + "id", + CatalogType.HIVE, + "thrift://localhost:9083", + "hdfs://localhost:9000/user/hudi/warehouse", + new ArrayList<>(), + new ArrayList<>()); + } +} diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java new file mode 100644 index 000000000..0cb9802d1 --- /dev/null +++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.parser; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.commons.compress.utils.Lists; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.inlong.sort.formats.common.FloatFormatInfo; +import org.apache.inlong.sort.formats.common.IntFormatInfo; +import org.apache.inlong.sort.formats.common.LongFormatInfo; +import org.apache.inlong.sort.formats.common.StringFormatInfo; +import org.apache.inlong.sort.formats.common.TimestampFormatInfo; +import org.apache.inlong.sort.parser.impl.FlinkSqlParser; +import org.apache.inlong.sort.parser.result.FlinkSqlParseResult; +import org.apache.inlong.sort.protocol.FieldInfo; +import org.apache.inlong.sort.protocol.GroupInfo; +import org.apache.inlong.sort.protocol.StreamInfo; +import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType; +import org.apache.inlong.sort.protocol.node.Node; +import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode; +import org.apache.inlong.sort.protocol.node.load.HudiLoadNode; +import org.apache.inlong.sort.protocol.transformation.FieldRelation; +import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test for Hudi SQL parser. + */ +public class HudiNodeSqlParserTest extends AbstractTestBase { + + private MySqlExtractNode buildMySQLExtractNode(String id) { + List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()), + new FieldInfo("name", new StringFormatInfo()), + new FieldInfo("age", new IntFormatInfo()), + new FieldInfo("salary", new FloatFormatInfo()), + new FieldInfo("ts", new TimestampFormatInfo()), + new FieldInfo("event_type", new StringFormatInfo())); + // if you hope hive load mode of append, please add this config + Map<String, String> map = new HashMap<>(); + map.put("append-mode", "true"); + return new MySqlExtractNode(id, "mysql_input", fields, + null, map, "id", + Collections.singletonList("work1"), "localhost", "root", "123456", + "inlong", null, null, + null, null); + } + + private HudiLoadNode buildHudiLoadNodeWithHadoopCatalog() { + List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()), + new FieldInfo("name", new StringFormatInfo()), + new FieldInfo("salary", new StringFormatInfo()), + new FieldInfo("ts", new TimestampFormatInfo())); + List<FieldRelation> relations = Arrays + .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()), + new FieldInfo("id", new LongFormatInfo())), + new FieldRelation(new FieldInfo("name", new StringFormatInfo()), + new FieldInfo("name", new StringFormatInfo())), + new FieldRelation(new FieldInfo("age", new IntFormatInfo()), + new FieldInfo("age", new IntFormatInfo())), + new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()), + new FieldInfo("ts", new TimestampFormatInfo()))); + + List<HashMap<String, String>> extList = new ArrayList<>(); + HashMap<String, String> map = new HashMap<>(); + map.put("table.type", "MERGE_ON_READ"); + extList.add(map); + + return new HudiLoadNode( + "hudi", + "hudi_table_name", + fields, + relations, + null, + null, + null, + null, + "inlong", + "inlong_hudi", + null, + CatalogType.HADOOP, + null, + "hdfs://localhost:9000/hudi/warehouse", + extList, + Lists.newArrayList()); + } + + private HudiLoadNode buildHudiLoadNodeWithHiveCatalog() { + List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()), + new FieldInfo("name", new StringFormatInfo()), + new FieldInfo("age", new IntFormatInfo()), + new FieldInfo("ts", new TimestampFormatInfo())); + List<FieldRelation> relations = Arrays + .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()), + new FieldInfo("id", new LongFormatInfo())), + new FieldRelation(new FieldInfo("name", new StringFormatInfo()), + new FieldInfo("name", new StringFormatInfo())), + new FieldRelation(new FieldInfo("age", new IntFormatInfo()), + new FieldInfo("age", new IntFormatInfo())), + new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()), + new FieldInfo("ts", new TimestampFormatInfo()))); + List<HashMap<String, String>> extList = new ArrayList<>(); + HashMap<String, String> map = new HashMap<>(); + map.put("table.type", "MERGE_ON_READ"); + extList.add(map); + + // set HIVE_CONF_DIR,or set uri and warehouse + return new HudiLoadNode( + "hudi", + "hudi_table_name", + fields, + relations, + null, + null, + null, + null, + "inlong", + "inlong_hudi", + null, + CatalogType.HIVE, + "thrift://localhost:9083", + "/hive/warehouse", + extList, + Lists.newArrayList()); + } + + /** + * build node relation + * + * @param inputs extract node + * @param outputs load node + * @return node relation + */ + private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) { + List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList()); + List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList()); + return new NodeRelation(inputIds, outputIds); + } + + @Test + public void testHudi() throws Exception { + EnvironmentSettings settings = EnvironmentSettings + .newInstance() + .useBlinkPlanner() + .inStreamingMode() + .build(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.enableCheckpointing(10000); + StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); + Node inputNode = buildMySQLExtractNode("1"); + Node outputNode = buildHudiLoadNodeWithHiveCatalog(); + StreamInfo streamInfo = new StreamInfo("1L", Arrays.asList(inputNode, outputNode), + Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode), + Collections.singletonList(outputNode)))); + GroupInfo groupInfo = new GroupInfo("group_id", Collections.singletonList(streamInfo)); + FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo); + FlinkSqlParseResult result = (FlinkSqlParseResult) parser.parse(); + Assert.assertTrue(!result.getLoadSqls().isEmpty() && !result.getCreateTableSqls().isEmpty()); + } +}