This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 248d65d2de [INLONG-9098][Manager] Support to save additional info for the Iceberg field (#9099) 248d65d2de is described below commit 248d65d2de71bbca2dd13ee1db924c998fdad77c Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Wed Oct 25 11:42:53 2023 +0800 [INLONG-9098][Manager] Support to save additional info for the Iceberg field (#9099) --- .../pojo/sink/iceberg/IcebergColumnInfo.java | 20 ++++--- .../resource/sink/iceberg/IcebergCatalogUtils.java | 49 ++++++++-------- .../sink/iceberg/IcebergResourceOperator.java | 6 +- .../service/sink/iceberg/IcebergSinkOperator.java | 67 +++++++++++++++++++++- .../source/iceberg/IcebergSourceOperator.java | 6 +- 5 files changed, 110 insertions(+), 38 deletions(-) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergColumnInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergColumnInfo.java index 10f3e6684b..b9ac57c07c 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergColumnInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergColumnInfo.java @@ -17,13 +17,16 @@ package org.apache.inlong.manager.pojo.sink.iceberg; +import org.apache.inlong.manager.common.consts.SinkType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonTypeDefine; import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.sink.SinkField; import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; -import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.commons.lang3.StringUtils; @@ -32,10 +35,10 @@ import org.apache.commons.lang3.StringUtils; * Iceberg column info */ @Data -@Builder @NoArgsConstructor @AllArgsConstructor -public class IcebergColumnInfo { +@JsonTypeDefine(value = SinkType.ICEBERG) +public class IcebergColumnInfo extends SinkField { @ApiModelProperty("Length of fixed type") private Integer length; @@ -55,12 +58,15 @@ public class IcebergColumnInfo { @ApiModelProperty("Width param of truncate partition") private Integer width; - // The following are passed from base field and need not be part of API for extra param - private String name; - private String type; - private String desc; private boolean required; + /** + * Get the dto instance from the request + */ + public static IcebergColumnInfo getFromRequest(SinkField sinkField) { + return CommonBeanUtils.copyProperties(sinkField, IcebergColumnInfo::new, true); + } + /** * Get the extra param from the Json */ diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergCatalogUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergCatalogUtils.java index 3a6847fed0..1cd10e12c0 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergCatalogUtils.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergCatalogUtils.java @@ -95,10 +95,10 @@ public class IcebergCatalogUtils { int id = 1; for (IcebergColumnInfo column : tableInfo.getColumns()) { if (column.isRequired()) { - nestedFields.add(Types.NestedField.required(id, column.getName(), + nestedFields.add(Types.NestedField.required(id, column.getFieldName(), Types.fromPrimitiveString(icebergTypeDesc(column)))); } else { - nestedFields.add(Types.NestedField.optional(id, column.getName(), + nestedFields.add(Types.NestedField.optional(id, column.getFieldName(), Types.fromPrimitiveString(icebergTypeDesc(column)))); } id += 1; @@ -118,14 +118,14 @@ public class IcebergCatalogUtils { * Transform to iceberg recognizable type description */ private static String icebergTypeDesc(IcebergColumnInfo column) { - switch (IcebergType.forType(column.getType())) { + switch (IcebergType.forType(column.getFieldType())) { case DECIMAL: // note: the space is needed or iceberg won't recognize return String.format("decimal(%d, %d)", column.getPrecision(), column.getScale()); case FIXED: return String.format("fixed(%d)", column.getLength()); default: - return column.getType(); + return column.getFieldType(); } } @@ -147,9 +147,9 @@ public class IcebergCatalogUtils { Schema schema = table.schema(); for (NestedField column : schema.columns()) { IcebergColumnInfo info = new IcebergColumnInfo(); - info.setName(column.name()); + info.setFieldName(column.name()); info.setRequired(column.isRequired()); - info.setType(column.type().toString()); + info.setFieldType(column.type().toString()); columnList.add(info); } return columnList; @@ -167,11 +167,12 @@ public class IcebergCatalogUtils { UpdateSchema updateSchema = table.updateSchema(); for (IcebergColumnInfo column : columns) { if (column.isRequired()) { - updateSchema.addRequiredColumn(column.getName(), Types.fromPrimitiveString(icebergTypeDesc(column)), - column.getDesc()); + updateSchema.addRequiredColumn(column.getFieldName(), + Types.fromPrimitiveString(icebergTypeDesc(column)), + column.getFieldComment()); } else { - updateSchema.addColumn(column.getName(), Types.fromPrimitiveString(icebergTypeDesc(column)), - column.getDesc()); + updateSchema.addColumn(column.getFieldName(), Types.fromPrimitiveString(icebergTypeDesc(column)), + column.getFieldComment()); } } @@ -202,25 +203,25 @@ public class IcebergCatalogUtils { } switch (IcebergPartition.forName(column.getPartitionStrategy())) { case IDENTITY: - builder.identity(column.getName()); + builder.identity(column.getFieldName()); break; case BUCKET: - builder.bucket(column.getName(), column.getBucketNum()); + builder.bucket(column.getFieldName(), column.getBucketNum()); break; case TRUNCATE: - builder.truncate(column.getName(), column.getWidth()); + builder.truncate(column.getFieldName(), column.getWidth()); break; case YEAR: - builder.year(column.getName()); + builder.year(column.getFieldName()); break; case MONTH: - builder.month(column.getName()); + builder.month(column.getFieldName()); break; case DAY: - builder.day(column.getName()); + builder.day(column.getFieldName()); break; case HOUR: - builder.hour(column.getName()); + builder.hour(column.getFieldName()); break; case NONE: break; @@ -241,25 +242,25 @@ public class IcebergCatalogUtils { } switch (IcebergPartition.forName(column.getPartitionStrategy())) { case IDENTITY: - builder.addField(column.getName()); + builder.addField(column.getFieldName()); break; case BUCKET: - builder.addField(Expressions.bucket(column.getName(), column.getBucketNum())); + builder.addField(Expressions.bucket(column.getFieldName(), column.getBucketNum())); break; case TRUNCATE: - builder.addField(Expressions.truncate(column.getName(), column.getWidth())); + builder.addField(Expressions.truncate(column.getFieldName(), column.getWidth())); break; case YEAR: - builder.addField(Expressions.year(column.getName())); + builder.addField(Expressions.year(column.getFieldName())); break; case MONTH: - builder.addField(Expressions.month(column.getName())); + builder.addField(Expressions.month(column.getFieldName())); break; case DAY: - builder.addField(Expressions.day(column.getName())); + builder.addField(Expressions.day(column.getFieldName())); break; case HOUR: - builder.addField(Expressions.hour(column.getName())); + builder.addField(Expressions.hour(column.getFieldName())); break; case NONE: break; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergResourceOperator.java index e28d1204a2..68c16813ad 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergResourceOperator.java @@ -160,9 +160,9 @@ public class IcebergResourceOperator implements SinkResourceOperator { List<IcebergColumnInfo> columnList = new ArrayList<>(); for (StreamSinkFieldEntity field : fieldList) { IcebergColumnInfo column = IcebergColumnInfo.getFromJson(field.getExtParams()); - column.setName(field.getFieldName()); - column.setType(field.getFieldType()); - column.setDesc(field.getFieldComment()); + column.setFieldName(field.getFieldName()); + column.setFieldType(field.getFieldType()); + column.setFieldComment(field.getFieldComment()); column.setRequired(field.getIsRequired() != null && field.getIsRequired() > 0); columnList.add(column); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java index eb01dbb981..4001b79981 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java @@ -17,12 +17,14 @@ package org.apache.inlong.manager.service.sink.iceberg; +import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.consts.SinkType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.enums.FieldType; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.dao.entity.StreamSinkEntity; +import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity; import org.apache.inlong.manager.pojo.node.iceberg.IcebergDataNodeInfo; import org.apache.inlong.manager.pojo.sink.SinkField; import org.apache.inlong.manager.pojo.sink.SinkRequest; @@ -34,12 +36,14 @@ import org.apache.inlong.manager.pojo.sink.iceberg.IcebergSinkRequest; import org.apache.inlong.manager.service.sink.AbstractSinkOperator; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.ArrayList; import java.util.List; /** @@ -102,7 +106,7 @@ public class IcebergSinkOperator extends AbstractSinkOperator { CommonBeanUtils.copyProperties(entity, sink, true); CommonBeanUtils.copyProperties(dto, sink, true); - List<SinkField> sinkFields = super.getSinkFields(entity.getId()); + List<SinkField> sinkFields = getSinkFields(entity.getId()); sink.setSinkFieldList(sinkFields); return sink; } @@ -127,4 +131,65 @@ public class IcebergSinkOperator extends AbstractSinkOperator { } } + @Override + public void saveFieldOpt(SinkRequest request) { + List<SinkField> fieldList = request.getSinkFieldList(); + LOGGER.info("begin to save es sink fields={}", fieldList); + if (CollectionUtils.isEmpty(fieldList)) { + return; + } + + int size = fieldList.size(); + List<StreamSinkFieldEntity> entityList = new ArrayList<>(size); + String groupId = request.getInlongGroupId(); + String streamId = request.getInlongStreamId(); + String sinkType = request.getSinkType(); + Integer sinkId = request.getId(); + for (SinkField fieldInfo : fieldList) { + this.checkFieldInfo(fieldInfo); + StreamSinkFieldEntity fieldEntity = CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new); + if (StringUtils.isEmpty(fieldEntity.getFieldComment())) { + fieldEntity.setFieldComment(fieldEntity.getFieldName()); + } + try { + IcebergColumnInfo dto = IcebergColumnInfo.getFromRequest(fieldInfo); + fieldEntity.setExtParams(objectMapper.writeValueAsString(dto)); + } catch (Exception e) { + LOGGER.error("parsing json string to sink field info failed", e); + throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage()); + } + fieldEntity.setInlongGroupId(groupId); + fieldEntity.setInlongStreamId(streamId); + fieldEntity.setSinkType(sinkType); + fieldEntity.setSinkId(sinkId); + fieldEntity.setIsDeleted(InlongConstants.UN_DELETED); + entityList.add(fieldEntity); + } + + sinkFieldMapper.insertAll(entityList); + LOGGER.info("success to save es sink fields"); + } + + @Override + public List<SinkField> getSinkFields(Integer sinkId) { + List<StreamSinkFieldEntity> sinkFieldEntities = sinkFieldMapper.selectBySinkId(sinkId); + List<SinkField> fieldList = new ArrayList<>(); + if (CollectionUtils.isEmpty(sinkFieldEntities)) { + return fieldList; + } + sinkFieldEntities.forEach(field -> { + SinkField sinkField = new SinkField(); + if (StringUtils.isNotBlank(field.getExtParams())) { + IcebergColumnInfo icebergColumnInfo = IcebergColumnInfo.getFromJson( + field.getExtParams()); + CommonBeanUtils.copyProperties(field, icebergColumnInfo, true); + fieldList.add(icebergColumnInfo); + } else { + CommonBeanUtils.copyProperties(field, sinkField, true); + fieldList.add(sinkField); + } + + }); + return fieldList; + } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/iceberg/IcebergSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/iceberg/IcebergSourceOperator.java index cb29ffd6ef..d594ab26c0 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/iceberg/IcebergSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/iceberg/IcebergSourceOperator.java @@ -112,9 +112,9 @@ public class IcebergSourceOperator extends AbstractSourceOperator { List<IcebergColumnInfo> existColumns = IcebergCatalogUtils.getColumns(metastoreUri, dbName, tableName); for (IcebergColumnInfo columnInfo : existColumns) { StreamField streamField = new StreamField(); - streamField.setFieldName(columnInfo.getName()); - streamField.setFieldType(FieldInfoUtils.sqlTypeToJavaTypeStr(columnInfo.getType())); - streamField.setFieldComment(columnInfo.getDesc()); + streamField.setFieldName(columnInfo.getFieldName()); + streamField.setFieldType(FieldInfoUtils.sqlTypeToJavaTypeStr(columnInfo.getFieldType())); + streamField.setFieldComment(columnInfo.getFieldComment()); streamFields.add(streamField); } updateField(sourceRequest.getInlongGroupId(), sourceRequest.getInlongStreamId(), streamFields);