This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 62de4b6e3 [INLONG-6410][Manager] Ensure that the additional parameters 
of group and stream can be saved (#6411)
62de4b6e3 is described below

commit 62de4b6e35f3a0a3000c46dd9201eaa288c0bafe
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Sun Nov 6 19:03:51 2022 +0800

    [INLONG-6410][Manager] Ensure that the additional parameters of group and 
stream can be saved (#6411)
---
 .../manager/common/util/CommonBeanUtils.java       | 41 +++++++++++++++++-----
 .../BaseInlongConsume.java}                        | 14 ++++----
 .../manager/pojo/consume/InlongConsumeInfo.java    |  6 ++--
 .../manager/pojo/consume/InlongConsumeRequest.java |  4 ++-
 .../pojo/consume/pulsar/ConsumePulsarDTO.java      | 11 +++---
 .../pojo/consume/tubemq/ConsumeTubeMQDTO.java      |  6 ++--
 .../manager/pojo/group/kafka/InlongKafkaDTO.java   | 11 +++---
 .../manager/pojo/group/pulsar/InlongPulsarDTO.java | 18 +++-------
 .../manager/pojo/group/tubemq/InlongTubeMQDTO.java | 28 +++++++++++++--
 .../service/group/InlongGroupOperator4TubeMQ.java  | 15 ++++++--
 10 files changed, 100 insertions(+), 54 deletions(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/CommonBeanUtils.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/CommonBeanUtils.java
index d86680602..f9fc3db80 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/CommonBeanUtils.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/CommonBeanUtils.java
@@ -36,9 +36,9 @@ public class CommonBeanUtils extends BeanUtils {
      * Usage scenario: Loop replication for each Java entity in the List
      *
      * @param sources Source entity list
-     * @param target  target entity list
-     * @param <S>     The type of the source entity list
-     * @param <T>     The type of the target entity list
+     * @param target target entity list
+     * @param <S> The type of the source entity list
+     * @param <T> The type of the target entity list
      * @return target entity list
      */
     public static <S, T> List<T> copyListProperties(List<S> sources, 
Supplier<T> target) {
@@ -59,8 +59,8 @@ public class CommonBeanUtils extends BeanUtils {
      *
      * @param source source data content
      * @param target target type
-     * @param <S>    source type
-     * @param <T>    target type
+     * @param <S> source type
+     * @param <T> target type
      * @return the target type object after copying
      */
     public static <S, T> T copyProperties(S source, Supplier<T> target) {
@@ -75,11 +75,11 @@ public class CommonBeanUtils extends BeanUtils {
     /**
      * Copy the content of the source instance to the target instance
      *
-     * @param source     source data content
-     * @param target     target data
+     * @param source source data content
+     * @param target target data
      * @param ignoreNull Whether to ignore null values
-     * @param <S>        source type
-     * @param <T>        target type
+     * @param <S> source type
+     * @param <T> target type
      * @apiNote If ignoreNull = false, non-null attributes in the target 
instance may be overwritten
      */
     public static <S, T> T copyProperties(S source, T target, boolean 
ignoreNull) {
@@ -95,6 +95,29 @@ public class CommonBeanUtils extends BeanUtils {
         return target;
     }
 
+    /**
+     * Copy the content of the source instance to the target instance, and 
return the result
+     *
+     * @param source source data content
+     * @param target target data
+     * @param ignoreNull Whether to ignore null values
+     * @param <S> source type
+     * @param <T> target type
+     * @apiNote If ignoreNull = false, non-null attributes in the target 
instance may be overwritten
+     */
+    public static <S, T> T copyProperties(S source, Supplier<T> target, 
boolean ignoreNull) {
+        T result = target.get();
+        if (source == null) {
+            return result;
+        }
+        if (ignoreNull) {
+            copyProperties(source, result, getNullPropertyNames(source));
+        } else {
+            copyProperties(source, result);
+        }
+        return result;
+    }
+
     /**
      * Get an array of null field names for a given object
      *
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BaseInlongConsume.java
similarity index 73%
copy from 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BaseInlongConsume.java
index 01f9ad348..087e24e1f 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BaseInlongConsume.java
@@ -15,20 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.group.tubemq;
+package org.apache.inlong.manager.pojo.consume;
 
 import io.swagger.annotations.ApiModel;
+import lombok.AllArgsConstructor;
 import lombok.Data;
-import lombok.NoArgsConstructor;
 
 /**
- * Inlong group info for TubeMQ
+ * The base parameter class of InlongConsume, support user extend their own 
business params.
  */
 @Data
-@NoArgsConstructor
-@ApiModel("Inlong group info for TubeMQ")
-public class InlongTubeMQDTO {
+@AllArgsConstructor
+@ApiModel("Base info of inlong consume")
+public class BaseInlongConsume {
 
-    // no fields
+    // you can add extend parameters in this class
 
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeInfo.java
index 1820366d3..48a48bed3 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeInfo.java
@@ -23,8 +23,8 @@ import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
-import lombok.experimental.SuperBuilder;
 
 import java.util.Date;
 
@@ -32,12 +32,12 @@ import java.util.Date;
  * Base inlong consume info
  */
 @Data
-@SuperBuilder
+@EqualsAndHashCode(callSuper = true)
 @NoArgsConstructor
 @AllArgsConstructor
 @ApiModel("Base inlong consume info")
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "mqType")
-public abstract class InlongConsumeInfo {
+public abstract class InlongConsumeInfo extends BaseInlongConsume {
 
     @ApiModelProperty(value = "Primary key")
     private Integer id;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeRequest.java
index 5dfa0bcc1..110efc83d 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeRequest.java
@@ -22,6 +22,7 @@ import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
 
@@ -32,11 +33,12 @@ import javax.validation.constraints.NotNull;
  * Base inlong consume request
  */
 @Data
+@EqualsAndHashCode(callSuper = true)
 @NoArgsConstructor
 @AllArgsConstructor
 @ApiModel("Base inlong consume request")
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "mqType")
-public abstract class InlongConsumeRequest {
+public abstract class InlongConsumeRequest extends BaseInlongConsume {
 
     @NotNull(groups = UpdateValidation.class)
     @ApiModelProperty(value = "Primary key")
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarDTO.java
index f096e6f44..42d491f60 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarDTO.java
@@ -25,7 +25,9 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.consume.BaseInlongConsume;
 
 import javax.validation.constraints.NotNull;
 
@@ -37,7 +39,7 @@ import javax.validation.constraints.NotNull;
 @NoArgsConstructor
 @AllArgsConstructor
 @ApiModel("Inlong group dto of Pulsar")
-public class ConsumePulsarDTO {
+public class ConsumePulsarDTO extends BaseInlongConsume {
 
     @ApiModelProperty("Whether to configure the dead letter queue, 0: not 
configure, 1: configure")
     private Integer isDlq;
@@ -55,12 +57,7 @@ public class ConsumePulsarDTO {
      * Get the dto instance from the request
      */
     public static ConsumePulsarDTO getFromRequest(ConsumePulsarRequest 
request) {
-        return ConsumePulsarDTO.builder()
-                .isDlq(request.getIsDlq())
-                .deadLetterTopic(request.getDeadLetterTopic())
-                .isRlq(request.getIsRlq())
-                .retryLetterTopic(request.getRetryLetterTopic())
-                .build();
+        return CommonBeanUtils.copyProperties(request, ConsumePulsarDTO::new, 
true);
     }
 
     /**
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQDTO.java
index c16c8021a..d6f23335d 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQDTO.java
@@ -23,7 +23,9 @@ import lombok.Builder;
 import lombok.Data;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.consume.BaseInlongConsume;
 
 import javax.validation.constraints.NotNull;
 
@@ -34,7 +36,7 @@ import javax.validation.constraints.NotNull;
 @Builder
 @AllArgsConstructor
 @ApiModel("Inlong group info of TubeMQ")
-public class ConsumeTubeMQDTO {
+public class ConsumeTubeMQDTO extends BaseInlongConsume {
 
     // no fields
 
@@ -42,7 +44,7 @@ public class ConsumeTubeMQDTO {
      * Get the dto instance from the request
      */
     public static ConsumeTubeMQDTO getFromRequest(ConsumeTubeMQRequest 
request) {
-        return ConsumeTubeMQDTO.builder().build();
+        return CommonBeanUtils.copyProperties(request, ConsumeTubeMQDTO::new, 
true);
     }
 
     /**
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaDTO.java
index 62a94e722..093441768 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaDTO.java
@@ -24,7 +24,9 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.group.BaseInlongGroup;
 
 import javax.validation.constraints.NotNull;
 
@@ -36,7 +38,7 @@ import javax.validation.constraints.NotNull;
 @NoArgsConstructor
 @AllArgsConstructor
 @ApiModel("Inlong group info for Kafka")
-public class InlongKafkaDTO {
+public class InlongKafkaDTO extends BaseInlongGroup {
 
     // partition number
     private Integer numPartitions;
@@ -51,12 +53,7 @@ public class InlongKafkaDTO {
      * Get the dto instance from the request
      */
     public static InlongKafkaDTO getFromRequest(InlongKafkaRequest request) {
-        return InlongKafkaDTO.builder()
-                .numPartitions(request.getNumPartitions())
-                .replicationFactor(request.getReplicationFactor())
-                .groupId(request.getGroupId())
-                .autoCommit(request.getAutoCommit())
-                .build();
+        return CommonBeanUtils.copyProperties(request, InlongKafkaDTO::new, 
true);
     }
 
     /**
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java
index 5f42fe0e5..3f0c61980 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java
@@ -25,7 +25,9 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.group.BaseInlongGroup;
 
 import javax.validation.constraints.NotNull;
 
@@ -37,7 +39,7 @@ import javax.validation.constraints.NotNull;
 @NoArgsConstructor
 @AllArgsConstructor
 @ApiModel("Inlong group info for Pulsar")
-public class InlongPulsarDTO {
+public class InlongPulsarDTO extends BaseInlongGroup {
 
     @ApiModelProperty(value = "Queue model, parallel: multiple partitions, 
high throughput, out-of-order messages;"
             + "serial: single partition, low throughput, and orderly messages")
@@ -77,19 +79,7 @@ public class InlongPulsarDTO {
      * Get the dto instance from the request
      */
     public static InlongPulsarDTO getFromRequest(InlongPulsarRequest request) {
-        return InlongPulsarDTO.builder()
-                .queueModule(request.getQueueModule())
-                .partitionNum(request.getPartitionNum())
-                .ensemble(request.getEnsemble())
-                .writeQuorum(request.getWriteQuorum())
-                .ackQuorum(request.getAckQuorum())
-                .retentionTime(request.getRetentionTime())
-                .retentionTimeUnit(request.getRetentionTimeUnit())
-                .retentionSize(request.getRetentionSize())
-                .retentionSizeUnit(request.getRetentionSizeUnit())
-                .ttl(request.getTtl())
-                .ttlUnit(request.getTtlUnit())
-                .build();
+        return CommonBeanUtils.copyProperties(request, InlongPulsarDTO::new, 
true);
     }
 
     /**
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
index 01f9ad348..573d17c81 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
@@ -20,6 +20,14 @@ package org.apache.inlong.manager.pojo.group.tubemq;
 import io.swagger.annotations.ApiModel;
 import lombok.Data;
 import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.group.BaseInlongGroup;
+import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
+
+import javax.validation.constraints.NotNull;
 
 /**
  * Inlong group info for TubeMQ
@@ -27,8 +35,24 @@ import lombok.NoArgsConstructor;
 @Data
 @NoArgsConstructor
 @ApiModel("Inlong group info for TubeMQ")
-public class InlongTubeMQDTO {
+public class InlongTubeMQDTO extends BaseInlongGroup {
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static InlongTubeMQDTO getFromRequest(InlongGroupRequest request) {
+        return CommonBeanUtils.copyProperties(request, InlongTubeMQDTO::new, 
true);
+    }
 
-    // no fields
+    /**
+     * Get the dto instance from the JSON string.
+     */
+    public static InlongTubeMQDTO getFromJson(@NotNull String extParams) {
+        try {
+            return JsonUtils.parseObject(extParams, InlongTubeMQDTO.class);
+        } catch (Exception e) {
+            throw new 
BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+        }
+    }
 
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4TubeMQ.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4TubeMQ.java
index b39347410..a10085a85 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4TubeMQ.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4TubeMQ.java
@@ -27,7 +27,9 @@ import 
org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
+import org.apache.inlong.manager.pojo.group.tubemq.InlongTubeMQDTO;
 import org.apache.inlong.manager.pojo.group.tubemq.InlongTubeMQInfo;
+import org.apache.inlong.manager.pojo.group.tubemq.InlongTubeMQRequest;
 import org.apache.inlong.manager.pojo.group.tubemq.InlongTubeMQTopicInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,7 +63,10 @@ public class InlongGroupOperator4TubeMQ extends 
AbstractGroupOperator {
 
         InlongTubeMQInfo groupInfo = new InlongTubeMQInfo();
         CommonBeanUtils.copyProperties(entity, groupInfo);
-
+        if (StringUtils.isNotBlank(entity.getExtParams())) {
+            InlongTubeMQDTO dto = 
InlongTubeMQDTO.getFromJson(entity.getExtParams());
+            CommonBeanUtils.copyProperties(dto, groupInfo);
+        }
         // TODO get the cluster
         // groupInfo.setTubeMaster();
         return groupInfo;
@@ -69,7 +74,13 @@ public class InlongGroupOperator4TubeMQ extends 
AbstractGroupOperator {
 
     @Override
     protected void setTargetEntity(InlongGroupRequest request, 
InlongGroupEntity targetEntity) {
-        LOGGER.info("do nothing for inlong group with TubeMQ");
+        InlongTubeMQRequest tubeMQRequest = (InlongTubeMQRequest) request;
+        try {
+            InlongTubeMQDTO dto = 
InlongTubeMQDTO.getFromRequest(tubeMQRequest);
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+        }
     }
 
     @Override

Reply via email to