This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b3fdfc051 [INLONG-8964][Manager] Support Sortstandalone cluster 
management (#8978)
0b3fdfc051 is described below

commit 0b3fdfc051c44e3bef7122f16482f1d85807bd1f
Author: vernedeng <verned...@apache.org>
AuthorDate: Mon Sep 25 19:25:20 2023 +0800

    [INLONG-8964][Manager] Support Sortstandalone cluster management (#8978)
    
    * [INLONG-8964][Manager] Support SortStandalone cluster management
---
 .../inlong/manager/common/enums/ClusterType.java   |  2 +
 .../sortstandalone/SortStandaloneClusterDTO.java   | 65 +++++++++++++++++
 .../sortstandalone/SortStandaloneClusterInfo.java  | 52 ++++++++++++++
 .../SortStandaloneClusterRequest.java              | 46 ++++++++++++
 .../cluster/SortStandaloneClusterOperator.java     | 84 ++++++++++++++++++++++
 .../service/cluster/InlongClusterServiceTest.java  | 37 ++++++++++
 6 files changed, 286 insertions(+)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java
index 2c057641b2..f6a33ff71e 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java
@@ -33,6 +33,7 @@ public class ClusterType {
     public static final String DATAPROXY = "DATAPROXY";
     public static final String KAFKA = "KAFKA";
     public static final String ELASTICSEARCH = "ELASTICSEARCH";
+    public static final String SORTSTANDALONE = "SORTSTANDALONE";
 
     private static final Set<String> TYPE_SET = new HashSet<String>() {
 
@@ -43,6 +44,7 @@ public class ClusterType {
             add(ClusterType.DATAPROXY);
             add(ClusterType.KAFKA);
             add(ClusterType.ELASTICSEARCH);
+            add(ClusterType.SORTSTANDALONE);
         }
     };
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterDTO.java
new file mode 100644
index 0000000000..0eec2ded29
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterDTO.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.cluster.sortstandalone;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.validation.constraints.NotNull;
+
+import java.util.Set;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("SortStandalone cluster info")
+public class SortStandaloneClusterDTO {
+
+    @ApiModelProperty(value = "Supported sink types")
+    private Set<String> supportedSinkTypes;
+
+    public static SortStandaloneClusterDTO 
getFromRequest(SortStandaloneClusterRequest request, String extParams) {
+        SortStandaloneClusterDTO dto = StringUtils.isNotBlank(extParams)
+                ? SortStandaloneClusterDTO.getFromJson(extParams)
+                : new SortStandaloneClusterDTO();
+        return CommonBeanUtils.copyProperties(request, dto, true);
+    }
+
+    /**
+     * Get the dto instance from the JSON string.
+     */
+    public static SortStandaloneClusterDTO getFromJson(@NotNull String 
extParams) {
+        try {
+            return JsonUtils.parseObject(extParams, 
SortStandaloneClusterDTO.class);
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT,
+                    String.format("parse extParams of SortStandalone Cluster 
failure: %s", e.getMessage()));
+        }
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterInfo.java
new file mode 100644
index 0000000000..ea14d2a56b
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterInfo.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.cluster.sortstandalone;
+
+import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
+import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+import java.util.Set;
+
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = ClusterType.SORTSTANDALONE)
+@ApiModel("Inlong cluster info for SortStandalone")
+public class SortStandaloneClusterInfo extends ClusterInfo {
+
+    @ApiModelProperty(value = "Supported sink types")
+    private Set<String> supportedSinkTypes;
+
+    public SortStandaloneClusterInfo() {
+        this.setType(ClusterType.SORTSTANDALONE);
+    }
+
+    @Override
+    public ClusterRequest genRequest() {
+        return CommonBeanUtils.copyProperties(this, 
SortStandaloneClusterRequest::new);
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterRequest.java
new file mode 100644
index 0000000000..fd8c10e0d0
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterRequest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.cluster.sortstandalone;
+
+import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+import java.util.Set;
+
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = ClusterType.SORTSTANDALONE)
+@ApiModel("Inlong cluster request for SortStandalone")
+public class SortStandaloneClusterRequest extends ClusterRequest {
+
+    @ApiModelProperty(value = "Supported sink types")
+    private Set<String> supportedSinkTypes;
+
+    public SortStandaloneClusterRequest() {
+        this.setType(ClusterType.SORTSTANDALONE);
+    }
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortStandaloneClusterOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortStandaloneClusterOperator.java
new file mode 100644
index 0000000000..9e04f33c0d
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortStandaloneClusterOperator.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.cluster;
+
+import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
+import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
+import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
+import 
org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterDTO;
+import 
org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterInfo;
+import 
org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterRequest;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+public class SortStandaloneClusterOperator extends AbstractClusterOperator {
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    protected void setTargetEntity(ClusterRequest request, InlongClusterEntity 
targetEntity) {
+        SortStandaloneClusterRequest standaloneRequest = 
(SortStandaloneClusterRequest) request;
+        CommonBeanUtils.copyProperties(standaloneRequest, targetEntity, true);
+        try {
+            SortStandaloneClusterDTO dto = 
SortStandaloneClusterDTO.getFromRequest(standaloneRequest,
+                    targetEntity.getExtParams());
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+            log.debug("success to set entity for SortStandalone cluster");
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT,
+                    String.format("serialize extParams of SortStandalone 
Cluster failure: %s", e.getMessage()));
+        }
+    }
+
+    @Override
+    public Boolean accept(String clusterType) {
+        return getClusterType().equals(clusterType);
+    }
+
+    @Override
+    public String getClusterType() {
+        return ClusterType.SORTSTANDALONE;
+    }
+
+    @Override
+    public ClusterInfo getFromEntity(InlongClusterEntity entity) {
+        if (entity == null) {
+            throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
+        }
+
+        SortStandaloneClusterInfo clusterInfo = new 
SortStandaloneClusterInfo();
+        CommonBeanUtils.copyProperties(entity, clusterInfo);
+        if (StringUtils.isNotBlank(entity.getExtParams())) {
+            SortStandaloneClusterDTO dto = 
SortStandaloneClusterDTO.getFromJson(entity.getExtParams());
+            CommonBeanUtils.copyProperties(dto, clusterInfo);
+        }
+        return clusterInfo;
+    }
+
+}
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
index 006429e37c..39106fccda 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
@@ -32,6 +32,8 @@ import 
org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
 import 
org.apache.inlong.manager.pojo.cluster.dataproxy.DataProxyClusterRequest;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterRequest;
+import 
org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterInfo;
+import 
org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterRequest;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.UpdateResult;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
@@ -43,7 +45,9 @@ import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 /**
  * Inlong cluster service test for {@link InlongClusterService}
@@ -55,6 +59,15 @@ public class InlongClusterServiceTest extends 
ServiceBaseTest {
     @Autowired
     private HeartbeatManager heartbeatManager;
 
+    public Integer saveStandaloneCluster(String clusterTag, String 
clusterName, Set<String> supportedSinkTypes) {
+        SortStandaloneClusterRequest request = new 
SortStandaloneClusterRequest();
+        request.setClusterTags(clusterTag);
+        request.setName(clusterName);
+        request.setSupportedSinkTypes(supportedSinkTypes);
+        request.setInCharges(GLOBAL_OPERATOR);
+        return clusterService.save(request, GLOBAL_OPERATOR);
+    }
+
     /**
      * Save data proxy cluster
      */
@@ -312,6 +325,30 @@ public class InlongClusterServiceTest extends 
ServiceBaseTest {
 
     }
 
+    @Test
+    public void testStandaloneCluster() {
+        String clusterTag = "standalone_cluster";
+        String clusterName = "test_standalone";
+        String type1 = "type1";
+        String type2 = "type2";
+        String type3 = "type3";
+        Set<String> supportedType = new HashSet<>();
+        supportedType.add(type1);
+        supportedType.add(type2);
+        supportedType.add(type3);
+
+        Integer id = this.saveStandaloneCluster(clusterTag, clusterName, 
supportedType);
+        Assertions.assertNotNull(id);
+
+        ClusterInfo info = clusterService.get(id, GLOBAL_OPERATOR);
+        Assertions.assertInstanceOf(SortStandaloneClusterInfo.class, info);
+
+        Set<String> types = ((SortStandaloneClusterInfo) 
info).getSupportedSinkTypes();
+        Assertions.assertTrue(types.contains(type1));
+        Assertions.assertTrue(types.contains(type2));
+        Assertions.assertTrue(types.contains(type3));
+    }
+
     @Test
     public void testDataProxyCluster() {
         String clusterTag = "default_cluster";

Reply via email to