This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 62de4b6e3 [INLONG-6410][Manager] Ensure that the additional parameters of group and stream can be saved (#6411) 62de4b6e3 is described below commit 62de4b6e35f3a0a3000c46dd9201eaa288c0bafe Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Sun Nov 6 19:03:51 2022 +0800 [INLONG-6410][Manager] Ensure that the additional parameters of group and stream can be saved (#6411) --- .../manager/common/util/CommonBeanUtils.java | 41 +++++++++++++++++----- .../BaseInlongConsume.java} | 14 ++++---- .../manager/pojo/consume/InlongConsumeInfo.java | 6 ++-- .../manager/pojo/consume/InlongConsumeRequest.java | 4 ++- .../pojo/consume/pulsar/ConsumePulsarDTO.java | 11 +++--- .../pojo/consume/tubemq/ConsumeTubeMQDTO.java | 6 ++-- .../manager/pojo/group/kafka/InlongKafkaDTO.java | 11 +++--- .../manager/pojo/group/pulsar/InlongPulsarDTO.java | 18 +++------- .../manager/pojo/group/tubemq/InlongTubeMQDTO.java | 28 +++++++++++++-- .../service/group/InlongGroupOperator4TubeMQ.java | 15 ++++++-- 10 files changed, 100 insertions(+), 54 deletions(-) diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/CommonBeanUtils.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/CommonBeanUtils.java index d86680602..f9fc3db80 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/CommonBeanUtils.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/CommonBeanUtils.java @@ -36,9 +36,9 @@ public class CommonBeanUtils extends BeanUtils { * Usage scenario: Loop replication for each Java entity in the List * * @param sources Source entity list - * @param target target entity list - * @param <S> The type of the source entity list - * @param <T> The type of the target entity list + * @param target target entity list + * @param <S> The type of the source entity list + * @param <T> The type of the target entity list * @return target entity list */ public static <S, T> List<T> copyListProperties(List<S> sources, Supplier<T> target) { @@ -59,8 +59,8 @@ public class CommonBeanUtils extends BeanUtils { * * @param source source data content * @param target target type - * @param <S> source type - * @param <T> target type + * @param <S> source type + * @param <T> target type * @return the target type object after copying */ public static <S, T> T copyProperties(S source, Supplier<T> target) { @@ -75,11 +75,11 @@ public class CommonBeanUtils extends BeanUtils { /** * Copy the content of the source instance to the target instance * - * @param source source data content - * @param target target data + * @param source source data content + * @param target target data * @param ignoreNull Whether to ignore null values - * @param <S> source type - * @param <T> target type + * @param <S> source type + * @param <T> target type * @apiNote If ignoreNull = false, non-null attributes in the target instance may be overwritten */ public static <S, T> T copyProperties(S source, T target, boolean ignoreNull) { @@ -95,6 +95,29 @@ public class CommonBeanUtils extends BeanUtils { return target; } + /** + * Copy the content of the source instance to the target instance, and return the result + * + * @param source source data content + * @param target target data + * @param ignoreNull Whether to ignore null values + * @param <S> source type + * @param <T> target type + * @apiNote If ignoreNull = false, non-null attributes in the target instance may be overwritten + */ + public static <S, T> T copyProperties(S source, Supplier<T> target, boolean ignoreNull) { + T result = target.get(); + if (source == null) { + return result; + } + if (ignoreNull) { + copyProperties(source, result, getNullPropertyNames(source)); + } else { + copyProperties(source, result); + } + return result; + } + /** * Get an array of null field names for a given object * diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BaseInlongConsume.java similarity index 73% copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BaseInlongConsume.java index 01f9ad348..087e24e1f 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BaseInlongConsume.java @@ -15,20 +15,20 @@ * limitations under the License. */ -package org.apache.inlong.manager.pojo.group.tubemq; +package org.apache.inlong.manager.pojo.consume; import io.swagger.annotations.ApiModel; +import lombok.AllArgsConstructor; import lombok.Data; -import lombok.NoArgsConstructor; /** - * Inlong group info for TubeMQ + * The base parameter class of InlongConsume, support user extend their own business params. */ @Data -@NoArgsConstructor -@ApiModel("Inlong group info for TubeMQ") -public class InlongTubeMQDTO { +@AllArgsConstructor +@ApiModel("Base info of inlong consume") +public class BaseInlongConsume { - // no fields + // you can add extend parameters in this class } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeInfo.java index 1820366d3..48a48bed3 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeInfo.java @@ -23,8 +23,8 @@ import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; -import lombok.experimental.SuperBuilder; import java.util.Date; @@ -32,12 +32,12 @@ import java.util.Date; * Base inlong consume info */ @Data -@SuperBuilder +@EqualsAndHashCode(callSuper = true) @NoArgsConstructor @AllArgsConstructor @ApiModel("Base inlong consume info") @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "mqType") -public abstract class InlongConsumeInfo { +public abstract class InlongConsumeInfo extends BaseInlongConsume { @ApiModelProperty(value = "Primary key") private Integer id; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeRequest.java index 5dfa0bcc1..110efc83d 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeRequest.java @@ -22,6 +22,7 @@ import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; import org.apache.inlong.manager.common.validation.UpdateValidation; @@ -32,11 +33,12 @@ import javax.validation.constraints.NotNull; * Base inlong consume request */ @Data +@EqualsAndHashCode(callSuper = true) @NoArgsConstructor @AllArgsConstructor @ApiModel("Base inlong consume request") @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "mqType") -public abstract class InlongConsumeRequest { +public abstract class InlongConsumeRequest extends BaseInlongConsume { @NotNull(groups = UpdateValidation.class) @ApiModelProperty(value = "Primary key") diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarDTO.java index f096e6f44..42d491f60 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarDTO.java @@ -25,7 +25,9 @@ import lombok.Data; import lombok.NoArgsConstructor; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.consume.BaseInlongConsume; import javax.validation.constraints.NotNull; @@ -37,7 +39,7 @@ import javax.validation.constraints.NotNull; @NoArgsConstructor @AllArgsConstructor @ApiModel("Inlong group dto of Pulsar") -public class ConsumePulsarDTO { +public class ConsumePulsarDTO extends BaseInlongConsume { @ApiModelProperty("Whether to configure the dead letter queue, 0: not configure, 1: configure") private Integer isDlq; @@ -55,12 +57,7 @@ public class ConsumePulsarDTO { * Get the dto instance from the request */ public static ConsumePulsarDTO getFromRequest(ConsumePulsarRequest request) { - return ConsumePulsarDTO.builder() - .isDlq(request.getIsDlq()) - .deadLetterTopic(request.getDeadLetterTopic()) - .isRlq(request.getIsRlq()) - .retryLetterTopic(request.getRetryLetterTopic()) - .build(); + return CommonBeanUtils.copyProperties(request, ConsumePulsarDTO::new, true); } /** diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQDTO.java index c16c8021a..d6f23335d 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQDTO.java @@ -23,7 +23,9 @@ import lombok.Builder; import lombok.Data; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.consume.BaseInlongConsume; import javax.validation.constraints.NotNull; @@ -34,7 +36,7 @@ import javax.validation.constraints.NotNull; @Builder @AllArgsConstructor @ApiModel("Inlong group info of TubeMQ") -public class ConsumeTubeMQDTO { +public class ConsumeTubeMQDTO extends BaseInlongConsume { // no fields @@ -42,7 +44,7 @@ public class ConsumeTubeMQDTO { * Get the dto instance from the request */ public static ConsumeTubeMQDTO getFromRequest(ConsumeTubeMQRequest request) { - return ConsumeTubeMQDTO.builder().build(); + return CommonBeanUtils.copyProperties(request, ConsumeTubeMQDTO::new, true); } /** diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaDTO.java index 62a94e722..093441768 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaDTO.java @@ -24,7 +24,9 @@ import lombok.Data; import lombok.NoArgsConstructor; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.group.BaseInlongGroup; import javax.validation.constraints.NotNull; @@ -36,7 +38,7 @@ import javax.validation.constraints.NotNull; @NoArgsConstructor @AllArgsConstructor @ApiModel("Inlong group info for Kafka") -public class InlongKafkaDTO { +public class InlongKafkaDTO extends BaseInlongGroup { // partition number private Integer numPartitions; @@ -51,12 +53,7 @@ public class InlongKafkaDTO { * Get the dto instance from the request */ public static InlongKafkaDTO getFromRequest(InlongKafkaRequest request) { - return InlongKafkaDTO.builder() - .numPartitions(request.getNumPartitions()) - .replicationFactor(request.getReplicationFactor()) - .groupId(request.getGroupId()) - .autoCommit(request.getAutoCommit()) - .build(); + return CommonBeanUtils.copyProperties(request, InlongKafkaDTO::new, true); } /** diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java index 5f42fe0e5..3f0c61980 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java @@ -25,7 +25,9 @@ import lombok.Data; import lombok.NoArgsConstructor; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.group.BaseInlongGroup; import javax.validation.constraints.NotNull; @@ -37,7 +39,7 @@ import javax.validation.constraints.NotNull; @NoArgsConstructor @AllArgsConstructor @ApiModel("Inlong group info for Pulsar") -public class InlongPulsarDTO { +public class InlongPulsarDTO extends BaseInlongGroup { @ApiModelProperty(value = "Queue model, parallel: multiple partitions, high throughput, out-of-order messages;" + "serial: single partition, low throughput, and orderly messages") @@ -77,19 +79,7 @@ public class InlongPulsarDTO { * Get the dto instance from the request */ public static InlongPulsarDTO getFromRequest(InlongPulsarRequest request) { - return InlongPulsarDTO.builder() - .queueModule(request.getQueueModule()) - .partitionNum(request.getPartitionNum()) - .ensemble(request.getEnsemble()) - .writeQuorum(request.getWriteQuorum()) - .ackQuorum(request.getAckQuorum()) - .retentionTime(request.getRetentionTime()) - .retentionTimeUnit(request.getRetentionTimeUnit()) - .retentionSize(request.getRetentionSize()) - .retentionSizeUnit(request.getRetentionSizeUnit()) - .ttl(request.getTtl()) - .ttlUnit(request.getTtlUnit()) - .build(); + return CommonBeanUtils.copyProperties(request, InlongPulsarDTO::new, true); } /** diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java index 01f9ad348..573d17c81 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java @@ -20,6 +20,14 @@ package org.apache.inlong.manager.pojo.group.tubemq; import io.swagger.annotations.ApiModel; import lombok.Data; import lombok.NoArgsConstructor; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.group.BaseInlongGroup; +import org.apache.inlong.manager.pojo.group.InlongGroupRequest; + +import javax.validation.constraints.NotNull; /** * Inlong group info for TubeMQ @@ -27,8 +35,24 @@ import lombok.NoArgsConstructor; @Data @NoArgsConstructor @ApiModel("Inlong group info for TubeMQ") -public class InlongTubeMQDTO { +public class InlongTubeMQDTO extends BaseInlongGroup { + + /** + * Get the dto instance from the request + */ + public static InlongTubeMQDTO getFromRequest(InlongGroupRequest request) { + return CommonBeanUtils.copyProperties(request, InlongTubeMQDTO::new, true); + } - // no fields + /** + * Get the dto instance from the JSON string. + */ + public static InlongTubeMQDTO getFromJson(@NotNull String extParams) { + try { + return JsonUtils.parseObject(extParams, InlongTubeMQDTO.class); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage() + ": " + e.getMessage()); + } + } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4TubeMQ.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4TubeMQ.java index b39347410..a10085a85 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4TubeMQ.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4TubeMQ.java @@ -27,7 +27,9 @@ import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.group.InlongGroupRequest; import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo; +import org.apache.inlong.manager.pojo.group.tubemq.InlongTubeMQDTO; import org.apache.inlong.manager.pojo.group.tubemq.InlongTubeMQInfo; +import org.apache.inlong.manager.pojo.group.tubemq.InlongTubeMQRequest; import org.apache.inlong.manager.pojo.group.tubemq.InlongTubeMQTopicInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +63,10 @@ public class InlongGroupOperator4TubeMQ extends AbstractGroupOperator { InlongTubeMQInfo groupInfo = new InlongTubeMQInfo(); CommonBeanUtils.copyProperties(entity, groupInfo); - + if (StringUtils.isNotBlank(entity.getExtParams())) { + InlongTubeMQDTO dto = InlongTubeMQDTO.getFromJson(entity.getExtParams()); + CommonBeanUtils.copyProperties(dto, groupInfo); + } // TODO get the cluster // groupInfo.setTubeMaster(); return groupInfo; @@ -69,7 +74,13 @@ public class InlongGroupOperator4TubeMQ extends AbstractGroupOperator { @Override protected void setTargetEntity(InlongGroupRequest request, InlongGroupEntity targetEntity) { - LOGGER.info("do nothing for inlong group with TubeMQ"); + InlongTubeMQRequest tubeMQRequest = (InlongTubeMQRequest) request; + try { + InlongTubeMQDTO dto = InlongTubeMQDTO.getFromRequest(tubeMQRequest); + targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + e.getMessage()); + } } @Override