This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d116f1a685 [INLONG-9231][Sort] Find no audit time field when the field 
is in upper case (#9232)
d116f1a685 is described below

commit d116f1a6859316aaf22fcab604246d18971fb5c3
Author: vernedeng <verned...@apache.org>
AuthorDate: Wed Nov 8 11:30:51 2023 +0800

    [INLONG-9231][Sort] Find no audit time field when the field is in upper 
case (#9232)
---
 .../apache/inlong/sort/iceberg/utils/SinkMetadataUtils.java   | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/utils/SinkMetadataUtils.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/utils/SinkMetadataUtils.java
index cd303c0234..dfadc07334 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/utils/SinkMetadataUtils.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/utils/SinkMetadataUtils.java
@@ -32,6 +32,7 @@ import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
@@ -49,13 +50,14 @@ public class SinkMetadataUtils implements Serializable {
     public SinkMetadataUtils(List<String> metadataKeys, DataType 
consumedDataType) {
         Set<String> metadataKeySet = ImmutableSet.copyOf(metadataKeys);
         List<RowType.RowField> metaFields = ((RowType) 
consumedDataType.getLogicalType()).getFields();
-
+        List<String> names = 
metaFields.stream().map(RowType.RowField::getName).collect(Collectors.toList());
+        log.info("start to config SinkMetadataUtils, metaKeys={}, consume 
fields={}", metadataKeys, names);
         // get related converters by real keys
         // the pos of physical column will be replaced by 
IcebergWritableMetadata.NULL
         this.converters = metaFields.stream()
                 .map(RowType.RowField::getName)
                 .map(key -> Stream.of(IcebergWritableMetadata.values())
-                        .filter(m -> m.getKey().equals(key))
+                        .filter(m -> m.getKey().equalsIgnoreCase(key))
                         .findFirst()
                         .orElse(IcebergWritableMetadata.NULL))
                 .map(IcebergWritableMetadata::getConverter)
@@ -65,8 +67,8 @@ public class SinkMetadataUtils implements Serializable {
         ImmutableBiMap.Builder<String, Integer> builder = 
ImmutableBiMap.builder();
         for (int i = 0; i < metaFields.size(); i++) {
             String name = metaFields.get(i).getName();
-            if (metadataKeySet.contains(name)) {
-                builder.put(name, i);
+            if (metadataKeySet.contains(name.toLowerCase())) {
+                builder.put(name.toLowerCase(), i);
             }
         }
         this.field2posMap = builder.build();
@@ -74,6 +76,7 @@ public class SinkMetadataUtils implements Serializable {
 
         // for audit time
         DATA_TIME_INDEX = 
field2posMap.getOrDefault(Constants.META_AUDIT_DATA_TIME, -1);
+        log.info("find data time index={}, filed2posMap={}", DATA_TIME_INDEX, 
field2posMap);
     }
 
     public Integer getMetadataPosByName(String name) {

Reply via email to