This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new cc63c57e3 [INLONG-5775][Manager] Add unit tests for the Inlong Consume (#5777) cc63c57e3 is described below commit cc63c57e396fe4d09a46d17338e44977ea3d580f Author: ciscozhou <45899072+ciscoz...@users.noreply.github.com> AuthorDate: Thu Sep 15 11:13:51 2022 +0800 [INLONG-5775][Manager] Add unit tests for the Inlong Consume (#5777) Co-authored-by: healchow <healc...@gmail.com> --- .../dao/mapper/InlongConsumeEntityMapper.java | 4 +- .../resources/mappers/ConsumptionEntityMapper.xml | 2 +- .../mappers/InlongConsumeEntityMapper.xml | 8 +- .../mappers/WorkflowProcessEntityMapper.xml | 2 +- .../resources/mappers/WorkflowTaskEntityMapper.xml | 2 +- .../pojo/consume/InlongConsumeCountInfo.java | 8 +- .../service/consume/InlongConsumeServiceImpl.java | 26 ++-- .../service/consume/InlongConsumeServiceTest.java | 165 +++++++++++++++++++++ .../service/core/impl/InlongStreamServiceTest.java | 21 +-- .../service/group/InlongGroupServiceTest.java | 21 ++- 10 files changed, 207 insertions(+), 52 deletions(-) diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java index feaf28a50..ab9615e70 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java @@ -19,11 +19,11 @@ package org.apache.inlong.manager.dao.mapper; import org.apache.ibatis.annotations.Param; import org.apache.inlong.manager.dao.entity.InlongConsumeEntity; +import org.apache.inlong.manager.pojo.common.CountInfo; import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest; import org.springframework.stereotype.Repository; import java.util.List; -import java.util.Map; @Repository public interface InlongConsumeEntityMapper { @@ -32,7 +32,7 @@ public interface InlongConsumeEntityMapper { InlongConsumeEntity selectById(Integer id); - List<Map<String, Object>> countByUser(@Param(value = "username") String username); + List<CountInfo> countByUser(@Param(value = "username") String username); List<InlongConsumeEntity> selectByCondition(InlongConsumePageRequest request); diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/ConsumptionEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/ConsumptionEntityMapper.xml index 885cc22e6..9865ca0a1 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/ConsumptionEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/ConsumptionEntityMapper.xml @@ -264,7 +264,7 @@ <select id="countByQuery" parameterType="org.apache.inlong.manager.pojo.consumption.ConsumptionQuery" resultType="org.apache.inlong.manager.pojo.common.CountInfo"> - select status as `key`, count(1) as value + select status as `key`, count(1) as `value` from consumption where is_deleted=0 <if test="consumerGroup != null and consumerGroup != ''"> diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml index 435d6ca3c..0eaee7686 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml @@ -51,12 +51,12 @@ mq_type, topic, inlong_group_id, filter_enabled, inlong_stream_id, ext_params, in_charges, status, - is_deleted, creator, modifier) + creator, modifier) values (#{id, jdbcType=INTEGER}, #{consumerGroup, jdbcType=VARCHAR}, #{description, jdbcType=VARCHAR}, #{mqType, jdbcType=VARCHAR}, #{topic, jdbcType=VARCHAR}, #{inlongGroupId, jdbcType=VARCHAR}, #{filterEnabled, jdbcType=INTEGER}, #{inlongStreamId, jdbcType=VARCHAR}, #{extParams, jdbcType=LONGVARCHAR}, #{inCharges, jdbcType=VARCHAR}, #{status, jdbcType=INTEGER}, - #{isDeleted, jdbcType=INTEGER}, #{creator, jdbcType=VARCHAR}, #{modifier, jdbcType=VARCHAR}) + #{creator, jdbcType=VARCHAR}, #{modifier, jdbcType=VARCHAR}) </insert> <select id="selectById" parameterType="java.lang.Integer" resultMap="BaseResultMap"> @@ -65,8 +65,8 @@ from inlong_consume where id = #{id, jdbcType=INTEGER} </select> - <select id="countByUser" resultType="java.util.Map"> - select status, count(1) as total + <select id="countByUser" resultType="org.apache.inlong.manager.pojo.common.CountInfo"> + select status as `key`, count(1) as `value` from inlong_consume where is_deleted = 0 and (creator = #{username, jdbcType=VARCHAR} or FIND_IN_SET(#{username, jdbcType=VARCHAR}, in_charges)) diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowProcessEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowProcessEntityMapper.xml index a557fe8e4..3a46eef18 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowProcessEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowProcessEntityMapper.xml @@ -114,7 +114,7 @@ <select id="countByQuery" parameterType="org.apache.inlong.manager.pojo.workflow.ProcessCountRequest" resultType="org.apache.inlong.manager.pojo.common.CountInfo"> - select status as `key`, count(1) as value + select status as `key`, count(1) as `value` from workflow_process <where> <if test="name != null and name !=''"> diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowTaskEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowTaskEntityMapper.xml index ab7bb9d3a..8746164fd 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowTaskEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowTaskEntityMapper.xml @@ -136,7 +136,7 @@ <select id="countByQuery" parameterType="org.apache.inlong.manager.pojo.workflow.TaskCountRequest" resultType="org.apache.inlong.manager.pojo.common.CountInfo"> - select status as `key`, count(1) as value + select status as `key`, count(1) as `value` from workflow_task <where> <if test="processId != null"> diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeCountInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeCountInfo.java index e48d97e0d..f15440be8 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeCountInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeCountInfo.java @@ -29,15 +29,15 @@ import lombok.Data; public class InlongConsumeCountInfo { @ApiModelProperty(value = "Total consume number") - private long totalCount; + private Integer totalCount = 0; @ApiModelProperty(value = "Total number of to be allocated (the number of configuring consumes)") - private long waitAssignCount; + private Integer waitAssignCount = 0; @ApiModelProperty(value = "Total number of to be approved") - private long waitApproveCount; + private Integer waitApproveCount = 0; @ApiModelProperty(value = "Total number of rejections") - private long rejectCount; + private Integer rejectCount = 0; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java index f0c5cec8f..d1621cb02 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java @@ -28,6 +28,7 @@ import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.dao.entity.InlongConsumeEntity; import org.apache.inlong.manager.dao.mapper.InlongConsumeEntityMapper; +import org.apache.inlong.manager.pojo.common.CountInfo; import org.apache.inlong.manager.pojo.common.OrderFieldEnum; import org.apache.inlong.manager.pojo.common.OrderTypeEnum; import org.apache.inlong.manager.pojo.common.PageResult; @@ -46,7 +47,6 @@ import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; @@ -78,10 +78,10 @@ public class InlongConsumeServiceImpl implements InlongConsumeService { } InlongConsumeOperator consumeOperator = consumeOperatorFactory.getInstance(request.getMqType()); - consumeOperator.saveOpt(request, operator); + Integer id = consumeOperator.saveOpt(request, operator); LOGGER.info("success to save inlong consume for consumer group={} by user={}", consumerGroup, operator); - return request.getId(); + return id; } @Override @@ -147,23 +147,23 @@ public class InlongConsumeServiceImpl implements InlongConsumeService { @Override public InlongConsumeCountInfo countStatus(String username) { - List<Map<String, Object>> statusCount = consumeMapper.countByUser(username); - InlongConsumeCountInfo countInfo = new InlongConsumeCountInfo(); - for (Map<String, Object> map : statusCount) { - int status = (Integer) map.get("status"); - long count = (Long) map.get("count"); - countInfo.setTotalCount(countInfo.getTotalCount() + count); + List<CountInfo> countInfoList = consumeMapper.countByUser(username); + InlongConsumeCountInfo result = new InlongConsumeCountInfo(); + for (CountInfo countInfo : countInfoList) { + int status = Integer.parseInt(countInfo.getKey()); + int count = countInfo.getValue(); + result.setTotalCount(result.getTotalCount() + count); if (status == ConsumeStatus.WAIT_ASSIGN.getCode()) { - countInfo.setWaitAssignCount(countInfo.getWaitAssignCount() + count); + result.setWaitAssignCount(result.getWaitAssignCount() + count); } else if (status == ConsumeStatus.WAIT_APPROVE.getCode()) { - countInfo.setWaitApproveCount(countInfo.getWaitApproveCount() + count); + result.setWaitApproveCount(result.getWaitApproveCount() + count); } else if (status == ConsumeStatus.REJECTED.getCode()) { - countInfo.setRejectCount(countInfo.getRejectCount() + count); + result.setRejectCount(result.getRejectCount() + count); } } LOGGER.debug("success to count inlong consume for user={}", username); - return countInfo; + return result; } @Override diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceTest.java new file mode 100644 index 000000000..ad58b9ca6 --- /dev/null +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceTest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.consume; + +import org.apache.inlong.manager.common.consts.MQType; +import org.apache.inlong.manager.common.enums.ClusterType; +import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterRequest; +import org.apache.inlong.manager.pojo.common.OrderFieldEnum; +import org.apache.inlong.manager.pojo.common.OrderTypeEnum; +import org.apache.inlong.manager.pojo.common.PageResult; +import org.apache.inlong.manager.pojo.consume.InlongConsumeBriefInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumeCountInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest; +import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarRequest; +import org.apache.inlong.manager.service.ServiceBaseTest; +import org.apache.inlong.manager.service.cluster.InlongClusterService; +import org.apache.inlong.manager.service.core.impl.InlongStreamServiceTest; +import org.apache.inlong.manager.service.group.InlongGroupServiceTest; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * Test for {@link InlongConsumeServiceImpl} + */ +public class InlongConsumeServiceTest extends ServiceBaseTest { + + private final String groupId = "consume_group_id"; + private final String streamId = "consume_stream_id"; + private final String consumerGroup = "test_consumer_group"; + private final String deadLetterTopic = "test_dlp"; + private Integer clusterId; + + @Autowired + private InlongConsumeService consumeService; + @Autowired + private InlongGroupServiceTest groupServiceTest; + @Autowired + private InlongStreamServiceTest streamServiceTest; + @Autowired + private InlongClusterService clusterService; + + @BeforeEach + public void before() { + groupServiceTest.saveGroup(groupId, GLOBAL_OPERATOR); + streamServiceTest.saveInlongStream(groupId, streamId, GLOBAL_OPERATOR); + // before saving inlong consume, the related MQ cluster must exist + this.saveCluster(); + } + + @Test + public void testAll() { + // test save operation + Integer consumeId = this.testSave(); + Assertions.assertNotNull(consumeId); + + // test get operation + InlongConsumeInfo consumeInfo = this.testGet(consumeId); + Assertions.assertEquals(consumeInfo.getId(), consumeId); + + // test list operation + Assertions.assertTrue(this.testList().getPageSize() > 0); + + // test count status operation + InlongConsumeCountInfo countInfo = testCountStatus(); + Assertions.assertNotNull(countInfo); + + // test update operation + Assertions.assertTrue(this.testUpdate(consumeInfo)); + + // test delete operation + Assertions.assertTrue(this.testDelete(consumeId)); + } + + @AfterEach + public void after() { + streamServiceTest.deleteStream(groupId, streamId, GLOBAL_OPERATOR); + // Current status=to_be_submit was not allowed to delete + // groupServiceTest.deleteGroup(groupId, GLOBAL_OPERATOR); + // before saving inlong consume, the related MQ cluster must exist + clusterService.delete(clusterId, GLOBAL_OPERATOR); + } + + private Integer testSave() { + ConsumePulsarRequest request = new ConsumePulsarRequest(); + request.setInlongGroupId(groupId); + request.setInlongStreamId(streamId); + request.setMqType(MQType.PULSAR); + request.setTopic(streamId); + request.setConsumerGroup(consumerGroup); + request.setInCharges(GLOBAL_OPERATOR); + request.setIsDlq(1); + request.setDeadLetterTopic(deadLetterTopic); + request.setIsRlq(0); + return consumeService.save(request, GLOBAL_OPERATOR); + } + + private InlongConsumeInfo testGet(Integer id) { + return consumeService.get(id); + } + + private PageResult<InlongConsumeBriefInfo> testList() { + InlongConsumePageRequest request = new InlongConsumePageRequest(); + request.setPageNum(1); + request.setPageSize(10); + request.setOrderField(OrderFieldEnum.CREATE_TIME.name()); + request.setOrderType(OrderTypeEnum.DESC.name()); + request.setConsumerGroup(consumerGroup); + return consumeService.list(request); + } + + private Boolean testUpdate(InlongConsumeInfo consumeInfo) { + ConsumePulsarRequest request = new ConsumePulsarRequest(); + request.setId(consumeInfo.getId()); + request.setMqType(MQType.PULSAR); + request.setInlongGroupId(groupId); + request.setIsDlq(1); + request.setDeadLetterTopic(deadLetterTopic); + request.setIsRlq(0); + request.setVersion(consumeInfo.getVersion()); + return consumeService.update(request, GLOBAL_OPERATOR); + } + + private InlongConsumeCountInfo testCountStatus() { + return consumeService.countStatus(GLOBAL_OPERATOR); + } + + private Boolean testDelete(Integer id) { + return consumeService.delete(id, GLOBAL_OPERATOR); + } + + private void saveCluster() { + PulsarClusterRequest request = new PulsarClusterRequest(); + String clusterTag = "consume_cluster_tag"; + request.setClusterTags(clusterTag); + String clusterName = "consume_pulsar_cluster"; + request.setName(clusterName); + request.setType(ClusterType.PULSAR); + String adminUrl = "http://127.0.0.1:8080"; + request.setAdminUrl(adminUrl); + String tenant = "public"; + request.setTenant(tenant); + request.setInCharges(GLOBAL_OPERATOR); + clusterId = clusterService.save(request, GLOBAL_OPERATOR); + } + +} diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceTest.java index 203005aa1..74772fe84 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceTest.java @@ -19,10 +19,9 @@ package org.apache.inlong.manager.service.core.impl; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamRequest; +import org.apache.inlong.manager.service.group.InlongGroupServiceTest; import org.apache.inlong.manager.service.stream.InlongStreamService; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.apache.inlong.manager.service.group.InlongGroupServiceTest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -36,8 +35,6 @@ public class InlongStreamServiceTest { private static final Logger LOGGER = LoggerFactory.getLogger(InlongStreamServiceTest.class); - private final String globalOperator = "admin"; - @Autowired private InlongStreamService streamService; @Autowired @@ -66,20 +63,16 @@ public class InlongStreamServiceTest { return streamService.save(request, operator); } - // @Test - public void testSaveAndDelete() { - String groupId = "stream_service_test_group"; - String streamId = "stream_service_test_stream"; - Integer id = this.saveInlongStream(groupId, streamId, globalOperator); - Assertions.assertNotNull(id); - - boolean result = streamService.delete(groupId, streamId, globalOperator); - Assertions.assertTrue(result); + /** + * Delete one inlong stream + */ + public Boolean deleteStream(String groupId, String streamId, String operator) { + return streamService.delete(groupId, streamId, operator); } @Test public void test() { - LOGGER.info("If you don't add test, UnusedImports: Unused import: org.junit.Test."); + LOGGER.info("Blank test for inlong stream service"); } } diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/group/InlongGroupServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/group/InlongGroupServiceTest.java index 535c290d8..e175e9ce9 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/group/InlongGroupServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/group/InlongGroupServiceTest.java @@ -17,13 +17,13 @@ package org.apache.inlong.manager.service.group; -import org.apache.inlong.manager.common.enums.GroupStatus; import org.apache.inlong.manager.common.consts.MQType; +import org.apache.inlong.manager.common.enums.GroupStatus; +import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity; +import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper; import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo; -import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity; -import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -50,7 +50,7 @@ public class InlongGroupServiceTest { InlongGroupExtEntityMapper groupExtMapper; /** - * Test to save group + * Save one inlong group with Pulsar info */ public String saveGroup(String inlongGroupId, String operator) { InlongGroupInfo groupInfo; @@ -77,14 +77,11 @@ public class InlongGroupServiceTest { return groupService.save(pulsarInfo.genRequest(), operator); } - // @TestComponent runs as a whole without injecting objects - // @Test - public void testSaveAndDelete() { - String groupId = this.saveGroup(globalGroupId, globalOperator); - Assertions.assertNotNull(groupId); - - boolean result = groupService.delete(groupId, globalOperator); - Assertions.assertTrue(result); + /** + * Delete one inlong group + */ + public Boolean deleteGroup(String groupId, String operator) { + return groupService.delete(groupId, operator); } // @TestComponent runs as a whole without injecting objects