This is an automated email from the ASF dual-hosted git repository. vernedeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 0b3fdfc051 [INLONG-8964][Manager] Support Sortstandalone cluster management (#8978) 0b3fdfc051 is described below commit 0b3fdfc051c44e3bef7122f16482f1d85807bd1f Author: vernedeng <verned...@apache.org> AuthorDate: Mon Sep 25 19:25:20 2023 +0800 [INLONG-8964][Manager] Support Sortstandalone cluster management (#8978) * [INLONG-8964][Manager] Support SortStandalone cluster management --- .../inlong/manager/common/enums/ClusterType.java | 2 + .../sortstandalone/SortStandaloneClusterDTO.java | 65 +++++++++++++++++ .../sortstandalone/SortStandaloneClusterInfo.java | 52 ++++++++++++++ .../SortStandaloneClusterRequest.java | 46 ++++++++++++ .../cluster/SortStandaloneClusterOperator.java | 84 ++++++++++++++++++++++ .../service/cluster/InlongClusterServiceTest.java | 37 ++++++++++ 6 files changed, 286 insertions(+) diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java index 2c057641b2..f6a33ff71e 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java @@ -33,6 +33,7 @@ public class ClusterType { public static final String DATAPROXY = "DATAPROXY"; public static final String KAFKA = "KAFKA"; public static final String ELASTICSEARCH = "ELASTICSEARCH"; + public static final String SORTSTANDALONE = "SORTSTANDALONE"; private static final Set<String> TYPE_SET = new HashSet<String>() { @@ -43,6 +44,7 @@ public class ClusterType { add(ClusterType.DATAPROXY); add(ClusterType.KAFKA); add(ClusterType.ELASTICSEARCH); + add(ClusterType.SORTSTANDALONE); } }; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterDTO.java new file mode 100644 index 0000000000..0eec2ded29 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterDTO.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.cluster.sortstandalone; + +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonUtils; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.commons.lang3.StringUtils; + +import javax.validation.constraints.NotNull; + +import java.util.Set; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@ApiModel("SortStandalone cluster info") +public class SortStandaloneClusterDTO { + + @ApiModelProperty(value = "Supported sink types") + private Set<String> supportedSinkTypes; + + public static SortStandaloneClusterDTO getFromRequest(SortStandaloneClusterRequest request, String extParams) { + SortStandaloneClusterDTO dto = StringUtils.isNotBlank(extParams) + ? SortStandaloneClusterDTO.getFromJson(extParams) + : new SortStandaloneClusterDTO(); + return CommonBeanUtils.copyProperties(request, dto, true); + } + + /** + * Get the dto instance from the JSON string. + */ + public static SortStandaloneClusterDTO getFromJson(@NotNull String extParams) { + try { + return JsonUtils.parseObject(extParams, SortStandaloneClusterDTO.class); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT, + String.format("parse extParams of SortStandalone Cluster failure: %s", e.getMessage())); + } + } +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterInfo.java new file mode 100644 index 0000000000..ea14d2a56b --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterInfo.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.cluster.sortstandalone; + +import org.apache.inlong.manager.common.enums.ClusterType; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.cluster.ClusterInfo; +import org.apache.inlong.manager.pojo.cluster.ClusterRequest; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +import java.util.Set; + +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = ClusterType.SORTSTANDALONE) +@ApiModel("Inlong cluster info for SortStandalone") +public class SortStandaloneClusterInfo extends ClusterInfo { + + @ApiModelProperty(value = "Supported sink types") + private Set<String> supportedSinkTypes; + + public SortStandaloneClusterInfo() { + this.setType(ClusterType.SORTSTANDALONE); + } + + @Override + public ClusterRequest genRequest() { + return CommonBeanUtils.copyProperties(this, SortStandaloneClusterRequest::new); + } +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterRequest.java new file mode 100644 index 0000000000..fd8c10e0d0 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterRequest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.cluster.sortstandalone; + +import org.apache.inlong.manager.common.enums.ClusterType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.cluster.ClusterRequest; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +import java.util.Set; + +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = ClusterType.SORTSTANDALONE) +@ApiModel("Inlong cluster request for SortStandalone") +public class SortStandaloneClusterRequest extends ClusterRequest { + + @ApiModelProperty(value = "Supported sink types") + private Set<String> supportedSinkTypes; + + public SortStandaloneClusterRequest() { + this.setType(ClusterType.SORTSTANDALONE); + } + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortStandaloneClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortStandaloneClusterOperator.java new file mode 100644 index 0000000000..9e04f33c0d --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortStandaloneClusterOperator.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.cluster; + +import org.apache.inlong.manager.common.enums.ClusterType; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.dao.entity.InlongClusterEntity; +import org.apache.inlong.manager.pojo.cluster.ClusterInfo; +import org.apache.inlong.manager.pojo.cluster.ClusterRequest; +import org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterDTO; +import org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterInfo; +import org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterRequest; + +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +public class SortStandaloneClusterOperator extends AbstractClusterOperator { + + @Autowired + private ObjectMapper objectMapper; + + @Override + protected void setTargetEntity(ClusterRequest request, InlongClusterEntity targetEntity) { + SortStandaloneClusterRequest standaloneRequest = (SortStandaloneClusterRequest) request; + CommonBeanUtils.copyProperties(standaloneRequest, targetEntity, true); + try { + SortStandaloneClusterDTO dto = SortStandaloneClusterDTO.getFromRequest(standaloneRequest, + targetEntity.getExtParams()); + targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); + log.debug("success to set entity for SortStandalone cluster"); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT, + String.format("serialize extParams of SortStandalone Cluster failure: %s", e.getMessage())); + } + } + + @Override + public Boolean accept(String clusterType) { + return getClusterType().equals(clusterType); + } + + @Override + public String getClusterType() { + return ClusterType.SORTSTANDALONE; + } + + @Override + public ClusterInfo getFromEntity(InlongClusterEntity entity) { + if (entity == null) { + throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND); + } + + SortStandaloneClusterInfo clusterInfo = new SortStandaloneClusterInfo(); + CommonBeanUtils.copyProperties(entity, clusterInfo); + if (StringUtils.isNotBlank(entity.getExtParams())) { + SortStandaloneClusterDTO dto = SortStandaloneClusterDTO.getFromJson(entity.getExtParams()); + CommonBeanUtils.copyProperties(dto, clusterInfo); + } + return clusterInfo; + } + +} diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java index 006429e37c..39106fccda 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java @@ -32,6 +32,8 @@ import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest; import org.apache.inlong.manager.pojo.cluster.dataproxy.DataProxyClusterRequest; import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo; import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterRequest; +import org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterInfo; +import org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterRequest; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.UpdateResult; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; @@ -43,7 +45,9 @@ import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import java.util.Comparator; +import java.util.HashSet; import java.util.List; +import java.util.Set; /** * Inlong cluster service test for {@link InlongClusterService} @@ -55,6 +59,15 @@ public class InlongClusterServiceTest extends ServiceBaseTest { @Autowired private HeartbeatManager heartbeatManager; + public Integer saveStandaloneCluster(String clusterTag, String clusterName, Set<String> supportedSinkTypes) { + SortStandaloneClusterRequest request = new SortStandaloneClusterRequest(); + request.setClusterTags(clusterTag); + request.setName(clusterName); + request.setSupportedSinkTypes(supportedSinkTypes); + request.setInCharges(GLOBAL_OPERATOR); + return clusterService.save(request, GLOBAL_OPERATOR); + } + /** * Save data proxy cluster */ @@ -312,6 +325,30 @@ public class InlongClusterServiceTest extends ServiceBaseTest { } + @Test + public void testStandaloneCluster() { + String clusterTag = "standalone_cluster"; + String clusterName = "test_standalone"; + String type1 = "type1"; + String type2 = "type2"; + String type3 = "type3"; + Set<String> supportedType = new HashSet<>(); + supportedType.add(type1); + supportedType.add(type2); + supportedType.add(type3); + + Integer id = this.saveStandaloneCluster(clusterTag, clusterName, supportedType); + Assertions.assertNotNull(id); + + ClusterInfo info = clusterService.get(id, GLOBAL_OPERATOR); + Assertions.assertInstanceOf(SortStandaloneClusterInfo.class, info); + + Set<String> types = ((SortStandaloneClusterInfo) info).getSupportedSinkTypes(); + Assertions.assertTrue(types.contains(type1)); + Assertions.assertTrue(types.contains(type2)); + Assertions.assertTrue(types.contains(type3)); + } + @Test public void testDataProxyCluster() { String clusterTag = "default_cluster";