This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 5ff109f6d1 [INLONG-9333][Sort] Related problem with attribute exceptions when creating 'hudiSink' (#9334) 5ff109f6d1 is described below commit 5ff109f6d15c80822879e4f032548d40c4f6077b Author: LiJie20190102 <53458004+lijie20190...@users.noreply.github.com> AuthorDate: Mon Nov 27 17:16:52 2023 +0800 [INLONG-9333][Sort] Related problem with attribute exceptions when creating 'hudiSink' (#9334) Co-authored-by: lijie0203 <li...@qishudi.com> --- .../inlong/manager/client/File2HudiExample.java | 8 ++++++++ .../sort/protocol/node/extract/HudiExtractNode.java | 21 ++++++++++++--------- .../sort/protocol/node/load/HudiLoadNode.java | 21 ++++++++++++--------- 3 files changed, 32 insertions(+), 18 deletions(-) diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HudiExample.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HudiExample.java index 6162d7fd53..023ea92433 100644 --- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HudiExample.java +++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HudiExample.java @@ -39,6 +39,7 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.concurrent.TimeUnit; @@ -157,6 +158,13 @@ public class File2HudiExample extends BaseExample { fields.add(field3); fields.add(field4); sink.setSinkFieldList(fields); + + List<HashMap<String, String>> extList = new ArrayList<>(); + HashMap<String, String> map = new HashMap<>(); + map.put("hoodie.datasource.hive_sync.partition_fields", "name"); + extList.add(map); + sink.setExtList(extList); + sink.setPrimaryKey("name"); return sink; } } diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/HudiExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/HudiExtractNode.java index cc945ac5e6..6b34a174c7 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/HudiExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/HudiExtractNode.java @@ -25,6 +25,7 @@ import org.apache.inlong.sort.protocol.transformation.WatermarkField; import com.google.common.base.Preconditions; import lombok.Data; import lombok.EqualsAndHashCode; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include; @@ -142,15 +143,17 @@ public class HudiExtractNode extends ExtractNode implements Serializable { // If the extend attributes starts with .ddl, // it will be passed to the ddl statement of the table - extList.forEach(ext -> { - String keyName = ext.get(EXTEND_ATTR_KEY_NAME); - if (StringUtils.isNoneBlank(keyName) && - keyName.startsWith(DDL_ATTR_PREFIX)) { - String ddlKeyName = keyName.substring(DDL_ATTR_PREFIX.length()); - String ddlValue = ext.get(EXTEND_ATTR_VALUE_NAME); - options.put(ddlKeyName, ddlValue); - } - }); + if (CollectionUtils.isNotEmpty(extList)) { + extList.forEach(ext -> { + String keyName = ext.get(EXTEND_ATTR_KEY_NAME); + if (StringUtils.isNoneBlank(keyName) && + keyName.startsWith(DDL_ATTR_PREFIX)) { + String ddlKeyName = keyName.substring(DDL_ATTR_PREFIX.length()); + String ddlValue = ext.get(EXTEND_ATTR_VALUE_NAME); + options.put(ddlKeyName, ddlValue); + } + }); + } String path = String.format("%s/%s.db/%s", warehouse, dbName, tableName); options.put(HUDI_OPTION_DEFAULT_PATH, path); diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java index e411f89a8a..859c80cbda 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java @@ -29,6 +29,7 @@ import com.google.common.base.Preconditions; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; @@ -145,15 +146,17 @@ public class HudiLoadNode extends LoadNode implements InlongMetric, Serializable // If the extend attributes starts with .ddl, // it will be passed to the ddl statement of the table - extList.forEach(ext -> { - String keyName = ext.get(EXTEND_ATTR_KEY_NAME); - if (StringUtils.isNoneBlank(keyName) && - keyName.startsWith(DDL_ATTR_PREFIX)) { - String ddlKeyName = keyName.substring(DDL_ATTR_PREFIX.length()); - String ddlValue = ext.get(EXTEND_ATTR_VALUE_NAME); - options.put(ddlKeyName, ddlValue); - } - }); + if (CollectionUtils.isNotEmpty(extList)) { + extList.forEach(ext -> { + String keyName = ext.get(EXTEND_ATTR_KEY_NAME); + if (StringUtils.isNoneBlank(keyName) && + keyName.startsWith(DDL_ATTR_PREFIX)) { + String ddlKeyName = keyName.substring(DDL_ATTR_PREFIX.length()); + String ddlValue = ext.get(EXTEND_ATTR_VALUE_NAME); + options.put(ddlKeyName, ddlValue); + } + }); + } String path = String.format("%s/%s.db/%s", warehouse, dbName, tableName); options.put(HUDI_OPTION_DEFAULT_PATH, path);