This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.5 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit ee17285f03280850e1b3511ba4374e6fc1a0f488 Author: feat <featzh...@outlook.com> AuthorDate: Tue Jan 3 16:47:08 2023 +0800 [INLONG-7100][Manager][Sort][Dashboard] Support partition key in Hudi sink (#7101) --- inlong-dashboard/src/locales/cn.json | 4 +- inlong-dashboard/src/locales/en.json | 4 +- inlong-dashboard/src/metas/sinks/defaults/Hudi.ts | 56 +++++----------------- .../inlong/manager/pojo/sink/hudi/HudiSink.java | 2 +- .../inlong/manager/pojo/sink/hudi/HudiSinkDTO.java | 16 ++----- .../manager/pojo/sink/hudi/HudiSinkRequest.java | 2 +- .../manager/pojo/sink/hudi/HudiTableInfo.java | 2 + .../manager/pojo/sort/util/LoadNodeUtils.java | 11 +---- .../resource/sink/hudi/HudiCatalogClient.java | 1 - .../service/sink/hudi/HudiSinkOperator.java | 28 ++++++++++- .../sort/protocol/node/load/HudiLoadNode.java | 15 ++---- .../sort/protocol/node/load/HudiLoadNodeTest.java | 2 +- .../inlong/sort/parser/HudiNodeSqlParserTest.java | 5 +- 13 files changed, 59 insertions(+), 89 deletions(-) diff --git a/inlong-dashboard/src/locales/cn.json b/inlong-dashboard/src/locales/cn.json index d17d4a1ef..b778fb42d 100644 --- a/inlong-dashboard/src/locales/cn.json +++ b/inlong-dashboard/src/locales/cn.json @@ -204,9 +204,9 @@ "meta.Sinks.Hudi.FieldType": "字段类型", "meta.Sinks.Hudi.FieldDescription": "字段描述", "meta.Sinks.Hudi.PrimaryKey": "主键", - "meta.Sinks.Hudi.PartitionFieldList": "分区字段", + "meta.Sinks.Hudi.PartitionKey": "分区字段", "meta.Sinks.Hudi.PrimaryKeyHelper": "主键字段,以逗号(,)分割", - "meta.Sinks.Hudi.PartitionFieldListHelp": "字段类型若为timestamp,则必须设置此字段值的格式,支持 MICROSECONDS,MILLISECONDS,SECONDS,SQL,ISO_8601,以及自定义,比如:yyyy-MM-dd HH:mm:ss 等", + "meta.Sinks.Hudi.PartitionKeyHelp": "分区字段列表,以英文逗号(,)分隔.", "meta.Sinks.Hudi.FieldFormat": "字段格式", "meta.Sinks.Hudi.ExtListHelper": "hudi表的DDL属性需带前缀'ddl.'", "meta.Sinks.Greenplum.TableName": "表名称", diff --git a/inlong-dashboard/src/locales/en.json b/inlong-dashboard/src/locales/en.json index 8ba00e92b..b44a94ff5 100644 --- a/inlong-dashboard/src/locales/en.json +++ b/inlong-dashboard/src/locales/en.json @@ -205,8 +205,8 @@ "meta.Sinks.Hudi.FieldDescription": "FieldDescription", "meta.Sinks.Hudi.PrimaryKey": "PrimaryKey", "meta.Sinks.Hudi.PrimaryKeyHelper": "The Primary key fields, separated by commas (,)", - "meta.Sinks.Hudi.PartitionFieldList": "PartitionFieldList", - "meta.Sinks.Hudi.PartitionFieldListHelp": "If the field type is timestamp, you must set the format of the field value, support MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601, and custom, such as: yyyy-MM-dd HH:mm:ss, etc.", + "meta.Sinks.Hudi.PartitionKey": "PartitionKey", + "meta.Sinks.Hudi.PartitionKeyHelp": "A list of partition fields, separated by commas.", "meta.Sinks.Hudi.FieldFormat": "FieldFormat", "meta.Sinks.Hudi.ExtListHelper": "The DDL attribute of the hudi table needs to be prefixed with 'ddl.'", "meta.Sinks.Greenplum.TableName": "TableName", diff --git a/inlong-dashboard/src/metas/sinks/defaults/Hudi.ts b/inlong-dashboard/src/metas/sinks/defaults/Hudi.ts index ad4648ac2..15f173a18 100644 --- a/inlong-dashboard/src/metas/sinks/defaults/Hudi.ts +++ b/inlong-dashboard/src/metas/sinks/defaults/Hudi.ts @@ -262,50 +262,6 @@ export default class HudiSink extends SinkInfo implements DataWithBackend, Rende }) sinkFieldList: Record<string, unknown>[]; - @FieldDecorator({ - type: EditableTable, - tooltip: i18n.t('meta.Sinks.Hudi.PartitionFieldListHelp'), - col: 24, - props: { - size: 'small', - required: false, - columns: [ - { - title: i18n.t('meta.Sinks.Hudi.FieldName'), - dataIndex: 'fieldName', - rules: [{ required: true }], - }, - { - title: i18n.t('meta.Sinks.Hudi.FieldType'), - dataIndex: 'fieldType', - type: 'select', - initialValue: 'string', - props: { - options: ['string', 'timestamp'].map(item => ({ - label: item, - value: item, - })), - }, - }, - { - title: i18n.t('meta.Sinks.Hudi.FieldFormat'), - dataIndex: 'fieldFormat', - type: 'autocomplete', - props: { - options: ['MICROSECONDS', 'MILLISECONDS', 'SECONDS', 'SQL', 'ISO_8601'].map(item => ({ - label: item, - value: item, - })), - }, - rules: [{ required: true }], - visible: (text, record) => record.fieldType === 'timestamp', - }, - ], - }, - }) - @I18n('meta.Sinks.Hudi.PartitionFieldList') - partitionFieldList: Record<string, unknown>[]; - @FieldDecorator({ type: 'input', tooltip: i18n.t('meta.Sinks.Hudi.PrimaryKeyHelper'), @@ -317,6 +273,18 @@ export default class HudiSink extends SinkInfo implements DataWithBackend, Rende @ColumnDecorator() @I18n('meta.Sinks.Hudi.PrimaryKey') primaryKey: string; + + @FieldDecorator({ + type: 'input', + tooltip: i18n.t('meta.Sinks.Hudi.PartitionKeyHelper'), + rules: [{ required: false }], + props: values => ({ + disabled: [110, 130].includes(values?.status), + }), + }) + @ColumnDecorator() + @I18n('meta.Sinks.Hudi.PartitionKey') + partitionKey: string; } const getFieldListColumns = sinkValues => { diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSink.java index 54e332f88..b23c1fad5 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSink.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSink.java @@ -77,7 +77,7 @@ public class HudiSink extends StreamSink { private List<HashMap<String, String>> extList; @ApiModelProperty("Partition field list") - private List<HudiPartitionField> partitionFieldList; + private String partitionKey; public HudiSink() { this.setSinkType(SinkType.HUDI); diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java index 616fc56f7..32b214060 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java @@ -26,7 +26,6 @@ import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; -import org.apache.commons.collections.CollectionUtils; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.JsonUtils; @@ -75,7 +74,7 @@ public class HudiSinkDTO { private List<HashMap<String, String>> extList; @ApiModelProperty("Partition field list") - private List<HudiPartitionField> partitionFieldList; + private String partitionKey; /** * Get the dto instance from the request @@ -87,7 +86,7 @@ public class HudiSinkDTO { .dbName(request.getDbName()) .tableName(request.getTableName()) .dataPath(request.getDataPath()) - .partitionFieldList(request.getPartitionFieldList()) + .partitionKey(request.getPartitionKey()) .fileFormat(request.getFileFormat()) .catalogType(request.getCatalogType()) .properties(request.getProperties()) @@ -112,16 +111,7 @@ public class HudiSinkDTO { tableInfo.setDbName(hudiInfo.getDbName()); tableInfo.setTableName(hudiInfo.getTableName()); - // Set partition fields - if (CollectionUtils.isNotEmpty(hudiInfo.getPartitionFieldList())) { - for (HudiPartitionField field : hudiInfo.getPartitionFieldList()) { - HudiColumnInfo columnInfo = new HudiColumnInfo(); - columnInfo.setName(field.getFieldName()); - columnInfo.setPartition(true); - columnInfo.setType("string"); - columnList.add(columnInfo); - } - } + tableInfo.setPartitionKey(hudiInfo.getPartitionKey()); tableInfo.setColumns(columnList); tableInfo.setPrimaryKey(hudiInfo.getPrimaryKey()); tableInfo.setFileFormat(hudiInfo.getFileFormat()); diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkRequest.java index a51d0a153..bd6c1833b 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkRequest.java @@ -63,7 +63,7 @@ public class HudiSinkRequest extends SinkRequest { private List<HashMap<String, String>> extList; @ApiModelProperty("Partition field list") - private List<HudiPartitionField> partitionFieldList; + private String partitionKey; @ApiModelProperty("Primary key") private String primaryKey; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiTableInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiTableInfo.java index 24e4d7260..324e14ac4 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiTableInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiTableInfo.java @@ -35,4 +35,6 @@ public class HudiTableInfo { private List<HudiColumnInfo> columns; private String primaryKey; + + private String partitionKey; } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java index 240f208ca..d8f47eda3 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java @@ -409,14 +409,7 @@ public class LoadNodeUtils { public static HudiLoadNode createLoadNode(HudiSink hudiSink, List<FieldInfo> fieldInfos, List<FieldRelation> fieldRelations, Map<String, String> properties) { HudiConstant.CatalogType catalogType = HudiConstant.CatalogType.forName(hudiSink.getCatalogType()); - List<FieldInfo> partitionFields = Lists.newArrayList(); - if (CollectionUtils.isNotEmpty(hudiSink.getPartitionFieldList())) { - partitionFields = hudiSink.getPartitionFieldList().stream() - .map(partitionField -> new FieldInfo(partitionField.getFieldName(), hudiSink.getSinkName(), - FieldInfoUtils.convertFieldFormat(partitionField.getFieldType(), - partitionField.getFieldFormat()))) - .collect(Collectors.toList()); - } + return new HudiLoadNode( hudiSink.getSinkName(), hudiSink.getSinkName(), @@ -433,7 +426,7 @@ public class LoadNodeUtils { hudiSink.getCatalogUri(), hudiSink.getWarehouse(), hudiSink.getExtList(), - partitionFields); + hudiSink.getPartitionKey()); } /** diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java index 8055114ad..cff7995f7 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java @@ -141,7 +141,6 @@ public class HudiCatalogClient { // filter out the metadata columns .filter(s -> !HoodieAvroUtils.isMetadataField(s.getName())) .collect(Collectors.toList()); - allCols.addAll(hiveTable.getPartitionKeys()); return allCols.stream() .map((FieldSchema s) -> { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java index 953137cc2..734697296 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java @@ -17,9 +17,12 @@ package org.apache.inlong.manager.service.sink.hudi; +import static com.google.common.base.Preconditions.checkState; + import com.fasterxml.jackson.databind.ObjectMapper; -import java.util.HashMap; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.inlong.manager.common.consts.SinkType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; @@ -72,7 +75,28 @@ public class HudiSinkOperator extends AbstractSinkOperator { Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()), ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + getSinkType()); HudiSinkRequest sinkRequest = (HudiSinkRequest) request; - List<HashMap<String, String>> extList = sinkRequest.getExtList(); + + String partitionKey = sinkRequest.getPartitionKey(); + String primaryKey = sinkRequest.getPrimaryKey(); + boolean primaryKeyExist = StringUtils.isNotEmpty(partitionKey); + boolean partitionKeyExist = StringUtils.isNotEmpty(primaryKey); + if (primaryKeyExist || partitionKeyExist) { + Set<String> fieldNames = sinkRequest.getSinkFieldList().stream().map(SinkField::getFieldName) + .collect(Collectors.toSet()); + if (primaryKeyExist) { + checkState( + fieldNames.contains(partitionKey), + "The partitionKey({}) must be included in the sinkFieldList({})", + partitionKey, fieldNames); + } + if (partitionKeyExist) { + checkState( + fieldNames.contains(partitionKey), + "The primaryKey({}) must be included in the sinkFieldList({})", + primaryKey, + fieldNames); + } + } try { HudiSinkDTO dto = HudiSinkDTO.getFromRequest(sinkRequest); diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java index b1448a82b..a150d0c40 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java @@ -22,7 +22,6 @@ import java.io.Serializable; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; import lombok.Data; @@ -92,8 +91,8 @@ public class HudiLoadNode extends LoadNode implements InlongMetric, Serializable @JsonProperty("extList") private List<HashMap<String, String>> extList; - @JsonProperty("partitionFields") - private List<FieldInfo> partitionFields; + @JsonProperty("partitionKey") + private String partitionKey; @JsonCreator public HudiLoadNode( @@ -112,7 +111,7 @@ public class HudiLoadNode extends LoadNode implements InlongMetric, Serializable @JsonProperty("uri") String uri, @JsonProperty("warehouse") String warehouse, @JsonProperty("extList") List<HashMap<String, String>> extList, - @JsonProperty("partitionFields") List<FieldInfo> partitionFields) { + @JsonProperty("partitionKey") String partitionKey) { super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties); this.tableName = Preconditions.checkNotNull(tableName, "table name is null"); this.dbName = Preconditions.checkNotNull(dbName, "db name is null"); @@ -121,7 +120,7 @@ public class HudiLoadNode extends LoadNode implements InlongMetric, Serializable this.uri = uri; this.warehouse = warehouse; this.extList = extList; - this.partitionFields = partitionFields; + this.partitionKey = partitionKey; } @Override @@ -137,11 +136,7 @@ public class HudiLoadNode extends LoadNode implements InlongMetric, Serializable options.put(HUDI_OPTION_HIVE_SYNC_METASTORE_URIS, uri); // partition field - if (partitionFields != null && !partitionFields.isEmpty()) { - String partitionKey = - partitionFields.stream() - .map(FieldInfo::getName) - .collect(Collectors.joining(",")); + if (StringUtils.isNoneBlank(partitionKey)) { options.put(HUDI_OPTION_PARTITION_PATH_FIELD_NAME, partitionKey); } diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java index c3579177d..a8e64bcb3 100644 --- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java +++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java @@ -51,6 +51,6 @@ public class HudiLoadNodeTest extends SerializeBaseTest<HudiLoadNode> { "thrift://localhost:9083", "hdfs://localhost:9000/user/hudi/warehouse", new ArrayList<>(), - new ArrayList<>()); + "f1"); } } diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java index 8ddd3fd9b..87c820ca6 100644 --- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java +++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java @@ -24,7 +24,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import org.apache.commons.compress.utils.Lists; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; @@ -106,7 +105,7 @@ public class HudiNodeSqlParserTest extends AbstractTestBase { null, "hdfs://localhost:9000/hudi/warehouse", extList, - Lists.newArrayList()); + "f1"); } private HudiLoadNode buildHudiLoadNodeWithHiveCatalog() { @@ -145,7 +144,7 @@ public class HudiNodeSqlParserTest extends AbstractTestBase { "thrift://localhost:9083", "/hive/warehouse", extList, - Lists.newArrayList()); + "f1"); } /**