This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ff109f6d1 [INLONG-9333][Sort] Related problem with attribute 
exceptions when creating 'hudiSink' (#9334)
5ff109f6d1 is described below

commit 5ff109f6d15c80822879e4f032548d40c4f6077b
Author: LiJie20190102 <53458004+lijie20190...@users.noreply.github.com>
AuthorDate: Mon Nov 27 17:16:52 2023 +0800

    [INLONG-9333][Sort] Related problem with attribute exceptions when creating 
'hudiSink' (#9334)
    
    Co-authored-by: lijie0203 <li...@qishudi.com>
---
 .../inlong/manager/client/File2HudiExample.java     |  8 ++++++++
 .../sort/protocol/node/extract/HudiExtractNode.java | 21 ++++++++++++---------
 .../sort/protocol/node/load/HudiLoadNode.java       | 21 ++++++++++++---------
 3 files changed, 32 insertions(+), 18 deletions(-)

diff --git 
a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HudiExample.java
 
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HudiExample.java
index 6162d7fd53..023ea92433 100644
--- 
a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HudiExample.java
+++ 
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HudiExample.java
@@ -39,6 +39,7 @@ import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -157,6 +158,13 @@ public class File2HudiExample extends BaseExample {
         fields.add(field3);
         fields.add(field4);
         sink.setSinkFieldList(fields);
+
+        List<HashMap<String, String>> extList = new ArrayList<>();
+        HashMap<String, String> map = new HashMap<>();
+        map.put("hoodie.datasource.hive_sync.partition_fields", "name");
+        extList.add(map);
+        sink.setExtList(extList);
+        sink.setPrimaryKey("name");
         return sink;
     }
 }
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/HudiExtractNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/HudiExtractNode.java
index cc945ac5e6..6b34a174c7 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/HudiExtractNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/HudiExtractNode.java
@@ -25,6 +25,7 @@ import 
org.apache.inlong.sort.protocol.transformation.WatermarkField;
 import com.google.common.base.Preconditions;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
@@ -142,15 +143,17 @@ public class HudiExtractNode extends ExtractNode 
implements Serializable {
 
         // If the extend attributes starts with .ddl,
         // it will be passed to the ddl statement of the table
-        extList.forEach(ext -> {
-            String keyName = ext.get(EXTEND_ATTR_KEY_NAME);
-            if (StringUtils.isNoneBlank(keyName) &&
-                    keyName.startsWith(DDL_ATTR_PREFIX)) {
-                String ddlKeyName = 
keyName.substring(DDL_ATTR_PREFIX.length());
-                String ddlValue = ext.get(EXTEND_ATTR_VALUE_NAME);
-                options.put(ddlKeyName, ddlValue);
-            }
-        });
+        if (CollectionUtils.isNotEmpty(extList)) {
+            extList.forEach(ext -> {
+                String keyName = ext.get(EXTEND_ATTR_KEY_NAME);
+                if (StringUtils.isNoneBlank(keyName) &&
+                        keyName.startsWith(DDL_ATTR_PREFIX)) {
+                    String ddlKeyName = 
keyName.substring(DDL_ATTR_PREFIX.length());
+                    String ddlValue = ext.get(EXTEND_ATTR_VALUE_NAME);
+                    options.put(ddlKeyName, ddlValue);
+                }
+            });
+        }
 
         String path = String.format("%s/%s.db/%s", warehouse, dbName, 
tableName);
         options.put(HUDI_OPTION_DEFAULT_PATH, path);
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java
index e411f89a8a..859c80cbda 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java
@@ -29,6 +29,7 @@ import com.google.common.base.Preconditions;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
@@ -145,15 +146,17 @@ public class HudiLoadNode extends LoadNode implements 
InlongMetric, Serializable
 
         // If the extend attributes starts with .ddl,
         // it will be passed to the ddl statement of the table
-        extList.forEach(ext -> {
-            String keyName = ext.get(EXTEND_ATTR_KEY_NAME);
-            if (StringUtils.isNoneBlank(keyName) &&
-                    keyName.startsWith(DDL_ATTR_PREFIX)) {
-                String ddlKeyName = 
keyName.substring(DDL_ATTR_PREFIX.length());
-                String ddlValue = ext.get(EXTEND_ATTR_VALUE_NAME);
-                options.put(ddlKeyName, ddlValue);
-            }
-        });
+        if (CollectionUtils.isNotEmpty(extList)) {
+            extList.forEach(ext -> {
+                String keyName = ext.get(EXTEND_ATTR_KEY_NAME);
+                if (StringUtils.isNoneBlank(keyName) &&
+                        keyName.startsWith(DDL_ATTR_PREFIX)) {
+                    String ddlKeyName = 
keyName.substring(DDL_ATTR_PREFIX.length());
+                    String ddlValue = ext.get(EXTEND_ATTR_VALUE_NAME);
+                    options.put(ddlKeyName, ddlValue);
+                }
+            });
+        }
 
         String path = String.format("%s/%s.db/%s", warehouse, dbName, 
tableName);
         options.put(HUDI_OPTION_DEFAULT_PATH, path);

Reply via email to