This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 248d65d2de [INLONG-9098][Manager] Support to save additional info for 
the Iceberg field (#9099)
248d65d2de is described below

commit 248d65d2de71bbca2dd13ee1db924c998fdad77c
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Wed Oct 25 11:42:53 2023 +0800

    [INLONG-9098][Manager] Support to save additional info for the Iceberg 
field (#9099)
---
 .../pojo/sink/iceberg/IcebergColumnInfo.java       | 20 ++++---
 .../resource/sink/iceberg/IcebergCatalogUtils.java | 49 ++++++++--------
 .../sink/iceberg/IcebergResourceOperator.java      |  6 +-
 .../service/sink/iceberg/IcebergSinkOperator.java  | 67 +++++++++++++++++++++-
 .../source/iceberg/IcebergSourceOperator.java      |  6 +-
 5 files changed, 110 insertions(+), 38 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergColumnInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergColumnInfo.java
index 10f3e6684b..b9ac57c07c 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergColumnInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergColumnInfo.java
@@ -17,13 +17,16 @@
 
 package org.apache.inlong.manager.pojo.sink.iceberg;
 
+import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
 import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.SinkField;
 
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
-import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.commons.lang3.StringUtils;
@@ -32,10 +35,10 @@ import org.apache.commons.lang3.StringUtils;
  * Iceberg column info
  */
 @Data
-@Builder
 @NoArgsConstructor
 @AllArgsConstructor
-public class IcebergColumnInfo {
+@JsonTypeDefine(value = SinkType.ICEBERG)
+public class IcebergColumnInfo extends SinkField {
 
     @ApiModelProperty("Length of fixed type")
     private Integer length;
@@ -55,12 +58,15 @@ public class IcebergColumnInfo {
     @ApiModelProperty("Width param of truncate partition")
     private Integer width;
 
-    // The following are passed from base field and need not be part of API 
for extra param
-    private String name;
-    private String type;
-    private String desc;
     private boolean required;
 
+    /**
+     * Get the dto instance from the request
+     */
+    public static IcebergColumnInfo getFromRequest(SinkField sinkField) {
+        return CommonBeanUtils.copyProperties(sinkField, 
IcebergColumnInfo::new, true);
+    }
+
     /**
      * Get the extra param from the Json
      */
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergCatalogUtils.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergCatalogUtils.java
index 3a6847fed0..1cd10e12c0 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergCatalogUtils.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergCatalogUtils.java
@@ -95,10 +95,10 @@ public class IcebergCatalogUtils {
         int id = 1;
         for (IcebergColumnInfo column : tableInfo.getColumns()) {
             if (column.isRequired()) {
-                nestedFields.add(Types.NestedField.required(id, 
column.getName(),
+                nestedFields.add(Types.NestedField.required(id, 
column.getFieldName(),
                         Types.fromPrimitiveString(icebergTypeDesc(column))));
             } else {
-                nestedFields.add(Types.NestedField.optional(id, 
column.getName(),
+                nestedFields.add(Types.NestedField.optional(id, 
column.getFieldName(),
                         Types.fromPrimitiveString(icebergTypeDesc(column))));
             }
             id += 1;
@@ -118,14 +118,14 @@ public class IcebergCatalogUtils {
      * Transform to iceberg recognizable type description
      */
     private static String icebergTypeDesc(IcebergColumnInfo column) {
-        switch (IcebergType.forType(column.getType())) {
+        switch (IcebergType.forType(column.getFieldType())) {
             case DECIMAL:
                 // note: the space is needed or iceberg won't recognize
                 return String.format("decimal(%d, %d)", column.getPrecision(), 
column.getScale());
             case FIXED:
                 return String.format("fixed(%d)", column.getLength());
             default:
-                return column.getType();
+                return column.getFieldType();
         }
     }
 
@@ -147,9 +147,9 @@ public class IcebergCatalogUtils {
         Schema schema = table.schema();
         for (NestedField column : schema.columns()) {
             IcebergColumnInfo info = new IcebergColumnInfo();
-            info.setName(column.name());
+            info.setFieldName(column.name());
             info.setRequired(column.isRequired());
-            info.setType(column.type().toString());
+            info.setFieldType(column.type().toString());
             columnList.add(info);
         }
         return columnList;
@@ -167,11 +167,12 @@ public class IcebergCatalogUtils {
         UpdateSchema updateSchema = table.updateSchema();
         for (IcebergColumnInfo column : columns) {
             if (column.isRequired()) {
-                updateSchema.addRequiredColumn(column.getName(), 
Types.fromPrimitiveString(icebergTypeDesc(column)),
-                        column.getDesc());
+                updateSchema.addRequiredColumn(column.getFieldName(),
+                        Types.fromPrimitiveString(icebergTypeDesc(column)),
+                        column.getFieldComment());
             } else {
-                updateSchema.addColumn(column.getName(), 
Types.fromPrimitiveString(icebergTypeDesc(column)),
-                        column.getDesc());
+                updateSchema.addColumn(column.getFieldName(), 
Types.fromPrimitiveString(icebergTypeDesc(column)),
+                        column.getFieldComment());
             }
         }
 
@@ -202,25 +203,25 @@ public class IcebergCatalogUtils {
         }
         switch (IcebergPartition.forName(column.getPartitionStrategy())) {
             case IDENTITY:
-                builder.identity(column.getName());
+                builder.identity(column.getFieldName());
                 break;
             case BUCKET:
-                builder.bucket(column.getName(), column.getBucketNum());
+                builder.bucket(column.getFieldName(), column.getBucketNum());
                 break;
             case TRUNCATE:
-                builder.truncate(column.getName(), column.getWidth());
+                builder.truncate(column.getFieldName(), column.getWidth());
                 break;
             case YEAR:
-                builder.year(column.getName());
+                builder.year(column.getFieldName());
                 break;
             case MONTH:
-                builder.month(column.getName());
+                builder.month(column.getFieldName());
                 break;
             case DAY:
-                builder.day(column.getName());
+                builder.day(column.getFieldName());
                 break;
             case HOUR:
-                builder.hour(column.getName());
+                builder.hour(column.getFieldName());
                 break;
             case NONE:
                 break;
@@ -241,25 +242,25 @@ public class IcebergCatalogUtils {
         }
         switch (IcebergPartition.forName(column.getPartitionStrategy())) {
             case IDENTITY:
-                builder.addField(column.getName());
+                builder.addField(column.getFieldName());
                 break;
             case BUCKET:
-                builder.addField(Expressions.bucket(column.getName(), 
column.getBucketNum()));
+                builder.addField(Expressions.bucket(column.getFieldName(), 
column.getBucketNum()));
                 break;
             case TRUNCATE:
-                builder.addField(Expressions.truncate(column.getName(), 
column.getWidth()));
+                builder.addField(Expressions.truncate(column.getFieldName(), 
column.getWidth()));
                 break;
             case YEAR:
-                builder.addField(Expressions.year(column.getName()));
+                builder.addField(Expressions.year(column.getFieldName()));
                 break;
             case MONTH:
-                builder.addField(Expressions.month(column.getName()));
+                builder.addField(Expressions.month(column.getFieldName()));
                 break;
             case DAY:
-                builder.addField(Expressions.day(column.getName()));
+                builder.addField(Expressions.day(column.getFieldName()));
                 break;
             case HOUR:
-                builder.addField(Expressions.hour(column.getName()));
+                builder.addField(Expressions.hour(column.getFieldName()));
                 break;
             case NONE:
                 break;
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergResourceOperator.java
index e28d1204a2..68c16813ad 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergResourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergResourceOperator.java
@@ -160,9 +160,9 @@ public class IcebergResourceOperator implements 
SinkResourceOperator {
         List<IcebergColumnInfo> columnList = new ArrayList<>();
         for (StreamSinkFieldEntity field : fieldList) {
             IcebergColumnInfo column = 
IcebergColumnInfo.getFromJson(field.getExtParams());
-            column.setName(field.getFieldName());
-            column.setType(field.getFieldType());
-            column.setDesc(field.getFieldComment());
+            column.setFieldName(field.getFieldName());
+            column.setFieldType(field.getFieldType());
+            column.setFieldComment(field.getFieldComment());
             column.setRequired(field.getIsRequired() != null && 
field.getIsRequired() > 0);
             columnList.add(column);
         }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
index eb01dbb981..4001b79981 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
@@ -17,12 +17,14 @@
 
 package org.apache.inlong.manager.service.sink.iceberg;
 
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.FieldType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
 import org.apache.inlong.manager.pojo.node.iceberg.IcebergDataNodeInfo;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
@@ -34,12 +36,14 @@ import 
org.apache.inlong.manager.pojo.sink.iceberg.IcebergSinkRequest;
 import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -102,7 +106,7 @@ public class IcebergSinkOperator extends 
AbstractSinkOperator {
 
         CommonBeanUtils.copyProperties(entity, sink, true);
         CommonBeanUtils.copyProperties(dto, sink, true);
-        List<SinkField> sinkFields = super.getSinkFields(entity.getId());
+        List<SinkField> sinkFields = getSinkFields(entity.getId());
         sink.setSinkFieldList(sinkFields);
         return sink;
     }
@@ -127,4 +131,65 @@ public class IcebergSinkOperator extends 
AbstractSinkOperator {
         }
     }
 
+    @Override
+    public void saveFieldOpt(SinkRequest request) {
+        List<SinkField> fieldList = request.getSinkFieldList();
+        LOGGER.info("begin to save es sink fields={}", fieldList);
+        if (CollectionUtils.isEmpty(fieldList)) {
+            return;
+        }
+
+        int size = fieldList.size();
+        List<StreamSinkFieldEntity> entityList = new ArrayList<>(size);
+        String groupId = request.getInlongGroupId();
+        String streamId = request.getInlongStreamId();
+        String sinkType = request.getSinkType();
+        Integer sinkId = request.getId();
+        for (SinkField fieldInfo : fieldList) {
+            this.checkFieldInfo(fieldInfo);
+            StreamSinkFieldEntity fieldEntity = 
CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new);
+            if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
+                fieldEntity.setFieldComment(fieldEntity.getFieldName());
+            }
+            try {
+                IcebergColumnInfo dto = 
IcebergColumnInfo.getFromRequest(fieldInfo);
+                fieldEntity.setExtParams(objectMapper.writeValueAsString(dto));
+            } catch (Exception e) {
+                LOGGER.error("parsing json string to sink field info failed", 
e);
+                throw new 
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+            }
+            fieldEntity.setInlongGroupId(groupId);
+            fieldEntity.setInlongStreamId(streamId);
+            fieldEntity.setSinkType(sinkType);
+            fieldEntity.setSinkId(sinkId);
+            fieldEntity.setIsDeleted(InlongConstants.UN_DELETED);
+            entityList.add(fieldEntity);
+        }
+
+        sinkFieldMapper.insertAll(entityList);
+        LOGGER.info("success to save es sink fields");
+    }
+
+    @Override
+    public List<SinkField> getSinkFields(Integer sinkId) {
+        List<StreamSinkFieldEntity> sinkFieldEntities = 
sinkFieldMapper.selectBySinkId(sinkId);
+        List<SinkField> fieldList = new ArrayList<>();
+        if (CollectionUtils.isEmpty(sinkFieldEntities)) {
+            return fieldList;
+        }
+        sinkFieldEntities.forEach(field -> {
+            SinkField sinkField = new SinkField();
+            if (StringUtils.isNotBlank(field.getExtParams())) {
+                IcebergColumnInfo icebergColumnInfo = 
IcebergColumnInfo.getFromJson(
+                        field.getExtParams());
+                CommonBeanUtils.copyProperties(field, icebergColumnInfo, true);
+                fieldList.add(icebergColumnInfo);
+            } else {
+                CommonBeanUtils.copyProperties(field, sinkField, true);
+                fieldList.add(sinkField);
+            }
+
+        });
+        return fieldList;
+    }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/iceberg/IcebergSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/iceberg/IcebergSourceOperator.java
index cb29ffd6ef..d594ab26c0 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/iceberg/IcebergSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/iceberg/IcebergSourceOperator.java
@@ -112,9 +112,9 @@ public class IcebergSourceOperator extends 
AbstractSourceOperator {
             List<IcebergColumnInfo> existColumns = 
IcebergCatalogUtils.getColumns(metastoreUri, dbName, tableName);
             for (IcebergColumnInfo columnInfo : existColumns) {
                 StreamField streamField = new StreamField();
-                streamField.setFieldName(columnInfo.getName());
-                
streamField.setFieldType(FieldInfoUtils.sqlTypeToJavaTypeStr(columnInfo.getType()));
-                streamField.setFieldComment(columnInfo.getDesc());
+                streamField.setFieldName(columnInfo.getFieldName());
+                
streamField.setFieldType(FieldInfoUtils.sqlTypeToJavaTypeStr(columnInfo.getFieldType()));
+                streamField.setFieldComment(columnInfo.getFieldComment());
                 streamFields.add(streamField);
             }
             updateField(sourceRequest.getInlongGroupId(), 
sourceRequest.getInlongStreamId(), streamFields);

Reply via email to