This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 473f67b24 [INLONG-5302][Manager] Supports the extension of data consumption for different MQs (#5542) 473f67b24 is described below commit 473f67b248b9c8c309878e4e5200fa5c1a645ccc Author: ciscozhou <45899072+ciscoz...@users.noreply.github.com> AuthorDate: Sat Sep 3 18:07:28 2022 +0800 [INLONG-5302][Manager] Supports the extension of data consumption for different MQs (#5542) --- .../{ConsumptionStatus.java => ConsumeStatus.java} | 49 +++-- .../inlong/manager/common/enums/ErrorCodeEnum.java | 8 + .../manager/dao/entity/InlongConsumeEntity.java} | 34 +++- .../dao/mapper/InlongConsumeEntityMapper.java} | 35 ++-- .../mappers/InlongConsumeEntityMapper.xml | 190 +++++++++++++++++++ .../inlong/manager/pojo/common/PageRequest.java | 2 + .../pojo/consume/InlongConsumeBriefInfo.java | 73 ++++++++ .../InlongConsumeCountInfo.java} | 23 ++- .../manager/pojo/consume/InlongConsumeInfo.java | 89 +++++++++ .../pojo/consume/InlongConsumePageRequest.java | 69 +++++++ .../manager/pojo/consume/InlongConsumeRequest.java | 72 +++++++ .../pojo/consume/pulsar/ConsumePulsarDTO.java | 81 ++++++++ .../pojo/consume/pulsar/ConsumePulsarInfo.java | 57 ++++++ .../pojo/consume/pulsar/ConsumePulsarRequest.java | 51 +++++ .../pojo/consume/tubemq/ConsumeTubeMQDTO.java | 63 +++++++ .../tubemq/ConsumeTubeMQInfo.java} | 26 ++- .../tubemq/ConsumeTubeMQRequest.java} | 20 +- .../manager/pojo/group/InlongGroupRequest.java | 3 - .../manager/pojo/group/tubemq/InlongTubeMQDTO.java | 2 +- .../service/consume/AbstractConsumeOperator.java | 86 +++++++++ .../service/consume/ConsumePulsarOperator.java | 152 +++++++++++++++ .../service/consume/ConsumeTubeMQOperator.java | 91 +++++++++ .../service/consume/InlongConsumeOperator.java | 73 ++++++++ .../consume/InlongConsumeOperatorFactory.java | 46 +++++ .../InlongConsumeProcessService.java} | 25 ++- .../service/consume/InlongConsumeService.java | 96 ++++++++++ .../service/consume/InlongConsumeServiceImpl.java | 208 +++++++++++++++++++++ .../service/core/impl/ConsumptionServiceImpl.java | 20 +- .../service/group/InlongGroupProcessService.java | 2 +- .../service/group/InlongGroupServiceImpl.java | 7 +- .../ConsumptionCancelProcessListener.java | 4 +- .../ConsumptionCompleteProcessListener.java | 4 +- .../ConsumptionRejectProcessListener.java | 4 +- .../main/resources/h2/apache_inlong_manager.sql | 31 ++- .../manager-web/sql/apache_inlong_manager.sql | 31 ++- .../web/controller/ConsumptionController.java | 4 +- .../web/controller/InlongConsumeController.java | 116 ++++++++++++ 37 files changed, 1849 insertions(+), 98 deletions(-) diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ConsumptionStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ConsumeStatus.java similarity index 62% rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ConsumptionStatus.java rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ConsumeStatus.java index 05e45fe3b..0b9013df2 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ConsumptionStatus.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ConsumeStatus.java @@ -21,17 +21,18 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; +import org.apache.inlong.manager.common.util.InlongCollectionUtils; +import org.apache.inlong.manager.common.util.Preconditions; + import java.util.Map; import java.util.Set; import java.util.function.Function; -import org.apache.inlong.manager.common.util.InlongCollectionUtils; -import org.apache.inlong.manager.common.util.Preconditions; /** - * Data consumption status + * Inlong consume status */ -@ApiModel("Data consumption status") -public enum ConsumptionStatus { +@ApiModel("Inlong consume status") +public enum ConsumeStatus { @ApiModelProperty(value = "To be allocated: 10") WAIT_ASSIGN(10), @@ -46,33 +47,41 @@ public enum ConsumptionStatus { APPROVED(21), @ApiModelProperty(value = "Cancel application: 22") - CANCELED(22); + CANCELED(22), + + @ApiModelProperty(value = "Deleting: 41") + DELETING(41), + + @ApiModelProperty(value = "Deleted: 40") + DELETED(40), + + ; - public static final Set<ConsumptionStatus> ALLOW_SAVE_UPDATE_STATUS = ImmutableSet + public static final Set<ConsumeStatus> ALLOW_SAVE_UPDATE_STATUS = ImmutableSet .of(WAIT_ASSIGN, REJECTED, CANCELED); - public static final Set<ConsumptionStatus> ALLOW_START_WORKFLOW_STATUS = ImmutableSet.of(WAIT_ASSIGN); + public static final Set<ConsumeStatus> ALLOW_START_WORKFLOW_STATUS = ImmutableSet.of(WAIT_ASSIGN); - private static final Map<Integer, ConsumptionStatus> STATUS_MAP = InlongCollectionUtils.transformToImmutableMap( - Lists.newArrayList(ConsumptionStatus.values()), - ConsumptionStatus::getStatus, + private static final Map<Integer, ConsumeStatus> STATUS_MAP = InlongCollectionUtils.transformToImmutableMap( + Lists.newArrayList(ConsumeStatus.values()), + ConsumeStatus::getCode, Function.identity() ); - private final int status; + private final int code; - ConsumptionStatus(int status) { - this.status = status; + ConsumeStatus(int code) { + this.code = code; } - public static ConsumptionStatus fromStatus(int status) { - ConsumptionStatus consumptionStatus = STATUS_MAP.get(status); - Preconditions.checkNotNull(consumptionStatus, "status is unavailable :" + status); - return consumptionStatus; + public static ConsumeStatus fromStatus(int status) { + ConsumeStatus consumeStatus = STATUS_MAP.get(status); + Preconditions.checkNotNull(consumeStatus, "consume status is invalid for " + status); + return consumeStatus; } - public int getStatus() { - return status; + public int getCode() { + return code; } } diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java index d6cc20776..87a6e706c 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java @@ -125,6 +125,14 @@ public enum ErrorCodeEnum { PULSAR_DLQ_RLQ_ERROR(2606, "Wrong config for the RLQ and DLQ: RLQ was enabled, but the DLQ was disabled"), PULSAR_DLQ_DUPLICATED(2607, "DLQ topic already exists under the inlong group"), PULSAR_RLQ_DUPLICATED(2608, "RLQ topic already exists under the inlong group"), + CONSUMER_INFO_INCORRECT(2609, "Consumer info was incorrect"), + CONSUMER_NOR_FOUND(2609, "Consumer not found"), + + CONSUME_NOT_FOUND(3001, "Inlong consume does not exist/no operation authority"), + CONSUME_DUPLICATE(3002, "Inlong consume already exists"), + CONSUME_INFO_INCORRECT(3003, "Inlong consume info was incorrect"), + CONSUME_SAVE_FAILED(3004, "Failed to save/update inlong consume"), + CONSUME_PERMISSION_DENIED(3005, "No permission to access this inlong consume"), ; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongConsumeEntity.java similarity index 52% copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongConsumeEntity.java index 4b7bdff63..55b1fc645 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongConsumeEntity.java @@ -15,20 +15,38 @@ * limitations under the License. */ -package org.apache.inlong.manager.pojo.group.tubemq; +package org.apache.inlong.manager.dao.entity; -import io.swagger.annotations.ApiModel; import lombok.Data; -import lombok.NoArgsConstructor; + +import java.io.Serializable; +import java.util.Date; /** - * Inlong group info for TubeMQ + * Inlong consume entity. */ @Data -@NoArgsConstructor -@ApiModel("Inlong group info for TubeMQ") -public class InlongTubeMQDTO { +public class InlongConsumeEntity implements Serializable { + + private static final long serialVersionUID = 1L; + private Integer id; + private String consumerGroup; + private String description; + private String mqType; + private String topic; + + private String inlongGroupId; + private Integer filterEnabled; + private String inlongStreamId; + private String extParams; - // no field + private String inCharges; + private Integer status; + private Integer isDeleted; + private String creator; + private String modifier; + private Date createTime; + private Date modifyTime; + private Integer version; } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java similarity index 50% copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java index 4b7bdff63..3b0205048 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java @@ -15,20 +15,31 @@ * limitations under the License. */ -package org.apache.inlong.manager.pojo.group.tubemq; +package org.apache.inlong.manager.dao.mapper; -import io.swagger.annotations.ApiModel; -import lombok.Data; -import lombok.NoArgsConstructor; +import org.apache.ibatis.annotations.Param; +import org.apache.inlong.manager.dao.entity.InlongConsumeEntity; +import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest; +import org.springframework.stereotype.Repository; -/** - * Inlong group info for TubeMQ - */ -@Data -@NoArgsConstructor -@ApiModel("Inlong group info for TubeMQ") -public class InlongTubeMQDTO { +import java.util.List; +import java.util.Map; + +@Repository +public interface InlongConsumeEntityMapper { + + int insert(InlongConsumeEntity record); + + InlongConsumeEntity selectById(Integer id); + + List<Map<String, Object>> countByUser(@Param(value = "username") String username); + + List<InlongConsumeEntity> selectByCondition(InlongConsumePageRequest request); + + int updateById(InlongConsumeEntity record); + + int updateByIdSelective(InlongConsumeEntity record); - // no field + int deleteById(Integer id); } diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml new file mode 100644 index 000000000..1db846b23 --- /dev/null +++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml @@ -0,0 +1,190 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> +<mapper namespace="org.apache.inlong.manager.dao.mapper.InlongConsumeEntityMapper"> + <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.InlongConsumeEntity"> + <id column="id" jdbcType="INTEGER" property="id"/> + <result column="consumer_group" jdbcType="VARCHAR" property="consumerGroup"/> + <result column="description" jdbcType="VARCHAR" property="description"/> + <result column="mq_type" jdbcType="VARCHAR" property="mqType"/> + <result column="topic" jdbcType="VARCHAR" property="topic"/> + <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId"/> + <result column="filter_enabled" jdbcType="INTEGER" property="filterEnabled"/> + <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId"/> + <result column="ext_params" jdbcType="LONGVARCHAR" property="extParams"/> + <result column="in_charges" jdbcType="VARCHAR" property="inCharges"/> + <result column="status" jdbcType="INTEGER" property="status"/> + <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/> + <result column="creator" jdbcType="VARCHAR" property="creator"/> + <result column="modifier" jdbcType="VARCHAR" property="modifier"/> + <result column="create_time" jdbcType="TIMESTAMP" property="createTime"/> + <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime"/> + <result column="version" jdbcType="INTEGER" property="version"/> + </resultMap> + + <sql id="Base_Column_List"> + id, consumer_group, description, mq_type, topic, inlong_group_id, filter_enabled, inlong_stream_id, + ext_params, in_charges, status, is_deleted, creator, modifier, create_time, modify_time, version + </sql> + + <insert id="insert" useGeneratedKeys="true" keyProperty="id" + parameterType="org.apache.inlong.manager.dao.entity.InlongConsumeEntity"> + insert into inlong_consume (id, consumer_group, description, + mq_type, topic, inlong_group_id, + filter_enabled, inlong_stream_id, + ext_params, in_charges, status, + is_deleted, creator, modifier) + values (#{id, jdbcType=INTEGER}, #{consumerGroup, jdbcType=VARCHAR}, #{description, jdbcType=VARCHAR}, + #{mqType, jdbcType=VARCHAR}, #{topic, jdbcType=VARCHAR}, #{inlongGroupId, jdbcType=VARCHAR}, + #{filterEnabled, jdbcType=INTEGER}, #{inlongStreamId, jdbcType=VARCHAR}, + #{extParams, jdbcType=LONGVARCHAR}, #{inCharges, jdbcType=VARCHAR}, #{status, jdbcType=INTEGER}, + #{isDeleted, jdbcType=INTEGER}, #{creator, jdbcType=VARCHAR}, #{modifier, jdbcType=VARCHAR}) + </insert> + + <select id="selectById" parameterType="java.lang.Integer" resultMap="BaseResultMap"> + select + <include refid="Base_Column_List"/> + from inlong_consume + where id = #{id, jdbcType=INTEGER} + </select> + <select id="countByUser" resultType="java.util.Map"> + select status, count(1) as total + from inlong_consume + where is_deleted = 0 + and (creator = #{username, jdbcType=VARCHAR} or FIND_IN_SET(#{username, jdbcType=VARCHAR}, in_charges)) + group by status + </select> + <select id="selectByCondition" resultMap="BaseResultMap" + parameterType="org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest"> + select + <include refid="Base_Column_List"/> + from inlong_consume + <where> + is_deleted = 0 + <if test="isAdminRole == false"> + and ( + creator = #{currentUser, jdbcType=VARCHAR} or FIND_IN_SET(#{currentUser, jdbcType=VARCHAR}, in_charges) + ) + </if> + <if test="keyword != null and keyword !=''"> + and (consumer_group like CONCAT('%', #{keyword}, '%') or topic like CONCAT('%', #{keyword}, '%')) + </if> + <if test="consumerGroup != null and consumerGroup != ''"> + and consumer_group = #{consumerGroup, jdbcType=VARCHAR} + </if> + <if test="mqType != null and mqType != ''"> + and mq_type = #{mqType, jdbcType=VARCHAR} + </if> + <if test="topic != null and topic != ''"> + and topic = #{topic} + </if> + <if test="inlongGroupId != null and inlongGroupId != ''"> + and inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR} + </if> + <if test="status != null"> + and status = #{status, jdbcType=INTEGER} + </if> + <if test="statusList != null and statusList.size() > 0"> + and status in + <foreach collection="statusList" item="status" index="index" open="(" close=")" separator=","> + #{status} + </foreach> + </if> + </where> + <choose> + <when test="orderField != null and orderField != '' and orderType != null and orderType != ''"> + order by ${orderField} ${orderType} + </when> + <otherwise> + order by create_time desc + </otherwise> + </choose> + </select> + + <update id="updateById" parameterType="org.apache.inlong.manager.dao.entity.InlongConsumeEntity"> + update inlong_consume + set consumer_group = #{consumerGroup, jdbcType=VARCHAR}, + in_charges = #{inCharges, jdbcType=VARCHAR}, + inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR}, + mq_type = #{mqType, jdbcType=VARCHAR}, + topic = #{topic, jdbcType=VARCHAR}, + filter_enabled = #{filterEnabled, jdbcType=INTEGER}, + inlong_stream_id = #{inlongStreamId, jdbcType=VARCHAR}, + ext_params = #{extParams, jdbcType = LONGVARCHAR}, + status = #{status, jdbcType=INTEGER}, + modifier = #{modifier, jdbcType=VARCHAR}, + is_deleted = #{isDeleted, jdbcType=INTEGER}, + version = #{version, jdbcType=INTEGER} + 1 + where id = #{id, jdbcType=INTEGER} + and is_deleted = 0 + and version = #{version, jdbcType=INTEGER} + </update> + + <update id="updateByIdSelective" + parameterType="org.apache.inlong.manager.dao.entity.InlongConsumeEntity"> + update inlong_consume + <set> + <if test="consumerGroup != null"> + consumer_group = #{consumerGroup, jdbcType=VARCHAR}, + </if> + <if test="description != null"> + description = #{description,jdbcType=VARCHAR}, + </if> + <if test="mqType != null"> + mq_type = #{mqType, jdbcType=VARCHAR}, + </if> + <if test="topic != null"> + topic = #{topic, jdbcType=VARCHAR}, + </if> + <if test="inlongGroupId != null"> + inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR}, + </if> + <if test="filterEnabled != null"> + filter_enabled = #{filterEnabled, jdbcType=INTEGER}, + </if> + <if test="inlongStreamId != null"> + inlong_stream_id = #{inlongStreamId, jdbcType=VARCHAR}, + </if> + <if test="extParams != null"> + ext_params = #{extParams, jdbcType=LONGVARCHAR}, + </if> + <if test="inCharges != null"> + in_charges = #{inCharges, jdbcType=VARCHAR}, + </if> + <if test="status != null"> + status = #{status, jdbcType=INTEGER}, + </if> + <if test="modifier != null"> + modifier = #{modifier, jdbcType=VARCHAR}, + </if> + version = #{version, jdbcType=INTEGER} + 1 + </set> + where id = #{id, jdbcType=INTEGER} + and is_deleted = 0 + and version = #{version, jdbcType=INTEGER} + </update> + + <delete id="deleteById" parameterType="java.lang.Integer"> + delete + from inlong_consume + where id = #{id,jdbcType=INTEGER} + </delete> +</mapper> diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/common/PageRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/common/PageRequest.java index 8c1a105c8..fb9af69f4 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/common/PageRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/common/PageRequest.java @@ -26,6 +26,8 @@ import io.swagger.annotations.ApiModelProperty; @ApiModel(value = "Pagination request") public class PageRequest { + public static final Integer MAX_PAGE_SIZE = 100; + @ApiModelProperty(value = "Current page number, default is 1") private int pageNum = 1; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeBriefInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeBriefInfo.java new file mode 100644 index 000000000..70ae5f26c --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeBriefInfo.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.consume; + +import com.fasterxml.jackson.annotation.JsonFormat; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Date; + +/** + * Inlong consume brief info + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@ApiModel("Inlong consume brief info") +public class InlongConsumeBriefInfo { + + @ApiModelProperty(value = "Primary key") + private Integer id; + + @ApiModelProperty(value = "Consumer group, only support [a-zA-Z0-9_]") + private String consumerGroup; + + @ApiModelProperty(value = "MQ type, high throughput: TUBEMQ, high consistency: PULSAR") + private String mqType; + + @ApiModelProperty(value = "The target topic of inlong consume") + private String topic; + + @ApiModelProperty(value = "The target inlongGroupId of inlong consume") + private String inlongGroupId; + + @ApiModelProperty(value = "Name of responsible person, separated by commas") + private String inCharges; + + @ApiModelProperty(value = "Consume status") + private Integer status; + + @ApiModelProperty(value = "Name of creator") + private String creator; + + @ApiModelProperty(value = "Name of modifier") + private String modifier; + + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private Date createTime; + + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private Date modifyTime; + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeCountInfo.java similarity index 58% copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeCountInfo.java index 4b7bdff63..e48d97e0d 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeCountInfo.java @@ -15,20 +15,29 @@ * limitations under the License. */ -package org.apache.inlong.manager.pojo.group.tubemq; +package org.apache.inlong.manager.pojo.consume; import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; import lombok.Data; -import lombok.NoArgsConstructor; /** - * Inlong group info for TubeMQ + * Count info of inlong consume status. */ @Data -@NoArgsConstructor -@ApiModel("Inlong group info for TubeMQ") -public class InlongTubeMQDTO { +@ApiModel("Count info of inlong consume status") +public class InlongConsumeCountInfo { - // no field + @ApiModelProperty(value = "Total consume number") + private long totalCount; + + @ApiModelProperty(value = "Total number of to be allocated (the number of configuring consumes)") + private long waitAssignCount; + + @ApiModelProperty(value = "Total number of to be approved") + private long waitApproveCount; + + @ApiModelProperty(value = "Total number of rejections") + private long rejectCount; } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeInfo.java new file mode 100644 index 000000000..a1ecf3620 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeInfo.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.consume; + +import com.fasterxml.jackson.annotation.JsonFormat; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +import java.util.Date; + +/** + * Base inlong consume info + */ +@Data +@SuperBuilder +@NoArgsConstructor +@AllArgsConstructor +@ApiModel("Base inlong consume info") +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "mqType") +public abstract class InlongConsumeInfo { + + @ApiModelProperty(value = "Primary key") + private Integer id; + + @ApiModelProperty(value = "Consumer group, only support [a-zA-Z0-9_]") + private String consumerGroup; + + @ApiModelProperty(value = "MQ type, high throughput: TUBEMQ, high consistency: PULSAR") + private String mqType; + + @ApiModelProperty(value = "The target topic of this consume") + private String topic; + + @ApiModelProperty(value = "The target inlong group id of this consume") + private String inlongGroupId; + + @ApiModelProperty(value = "Whether to filter consumption, 0-not filter, 1-filter") + private Integer filterEnabled = 0; + + @ApiModelProperty(value = "The target inlong stream id of this consume, needed if the filterEnabled=1") + private String inlongStreamId; + + @ApiModelProperty(value = "Cluster URL of message queue") + private String clusterUrl; + + @ApiModelProperty(value = "Name of responsible person, separated by commas") + private String inCharges; + + @ApiModelProperty(value = "Consume status") + private Integer status; + + @ApiModelProperty(value = "Name of creator") + private String creator; + + @ApiModelProperty(value = "Name of modifier") + private String modifier; + + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private Date createTime; + + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private Date modifyTime; + + @ApiModelProperty(value = "Version number") + private Integer version; + + public abstract InlongConsumeRequest genRequest(); + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumePageRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumePageRequest.java new file mode 100644 index 000000000..a8f73552e --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumePageRequest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.consume; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import org.apache.inlong.manager.pojo.common.PageRequest; + +import java.util.List; + +/** + * Inlong consume query request + */ +@Data +@EqualsAndHashCode(callSuper = false) +@Builder +@NoArgsConstructor +@AllArgsConstructor +@ApiModel("Inlong consume query request") +public class InlongConsumePageRequest extends PageRequest { + + @ApiModelProperty(value = "Keyword, can be consumer group") + private String keyword; + + @ApiModelProperty(value = "Consumer group name") + private String consumerGroup; + + @ApiModelProperty(value = "MQ type, high throughput: TUBEMQ, high consistency: PULSAR") + private String mqType; + + @ApiModelProperty(value = "The target topic of inlong consume") + private String topic; + + @ApiModelProperty(value = "The target inlongGroupId of inlong consume") + private String inlongGroupId; + + @ApiModelProperty(value = "Consume status") + private Integer status; + + @ApiModelProperty(value = "Consume status list") + private List<Integer> statusList; + + @ApiModelProperty(value = "Current user", hidden = true) + private String currentUser; + + @ApiModelProperty(value = "Whether the current user is in the administrator role", hidden = true) + private Boolean isAdminRole; + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeRequest.java new file mode 100644 index 000000000..5dfa0bcc1 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeRequest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.consume; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.inlong.manager.common.validation.UpdateValidation; + +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotNull; + +/** + * Base inlong consume request + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +@ApiModel("Base inlong consume request") +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "mqType") +public abstract class InlongConsumeRequest { + + @NotNull(groups = UpdateValidation.class) + @ApiModelProperty(value = "Primary key") + private Integer id; + + @NotBlank(message = "consumerGroup cannot be null") + @ApiModelProperty(value = "Consumer group, only support [a-zA-Z0-9_]") + private String consumerGroup; + + @ApiModelProperty(value = "MQ type, high throughput: TUBEMQ, high consistency: PULSAR") + private String mqType; + + @ApiModelProperty(value = "The target topic of this consume") + private String topic; + + @NotBlank(message = "inlong group id cannot be null") + @ApiModelProperty(value = "The target inlong group id of this consume") + private String inlongGroupId; + + @ApiModelProperty(value = "Whether to filter consumption, 0-not filter, 1-filter") + private Integer filterEnabled = 0; + + @ApiModelProperty(value = "The target inlong stream id of this consume, needed if the filterEnabled=1") + private String inlongStreamId; + + @NotBlank(message = "inCharges cannot be null") + @ApiModelProperty(value = "Name of responsible person, separated by commas") + private String inCharges; + + @ApiModelProperty(value = "Version number") + private Integer version; + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarDTO.java new file mode 100644 index 000000000..5e6388922 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarDTO.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.consume.pulsar; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; + +import javax.validation.constraints.NotNull; + +/** + * Inlong group dto of Pulsar + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@ApiModel("Inlong group dto of Pulsar") +public class ConsumePulsarDTO { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + @ApiModelProperty("Whether to configure the dead letter queue, 0: not configure, 1: configure") + private Integer isDlq; + + @ApiModelProperty("The name of the dead letter queue Topic") + private String deadLetterTopic; + + @ApiModelProperty("Whether to configure the retry letter queue, 0: not configure, 1: configure") + private Integer isRlq; + + @ApiModelProperty("The name of the retry letter queue topic") + private String retryLetterTopic; + + /** + * Get the dto instance from the request + */ + public static ConsumePulsarDTO getFromRequest(ConsumePulsarRequest request) { + return ConsumePulsarDTO.builder() + .isDlq(request.getIsDlq()) + .deadLetterTopic(request.getDeadLetterTopic()) + .isRlq(request.getIsRlq()) + .retryLetterTopic(request.getRetryLetterTopic()) + .build(); + } + + /** + * Get the dto instance from the JSON string. + */ + public static ConsumePulsarDTO getFromJson(@NotNull String extParams) { + try { + return OBJECT_MAPPER.readValue(extParams, ConsumePulsarDTO.class); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.CONSUMER_INFO_INCORRECT.getMessage() + ": " + e.getMessage()); + } + } + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarInfo.java new file mode 100644 index 000000000..9a1f07324 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarInfo.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.consume.pulsar; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.inlong.manager.common.consts.MQType; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo; + +/** + * Inlong consume info of Pulsar + */ +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = MQType.PULSAR) +@ApiModel("Inlong consume info of Pulsar") +public class ConsumePulsarInfo extends InlongConsumeInfo { + + @ApiModelProperty("Whether to configure the dead letter queue, 0: not configure, 1: configure") + private Integer isDlq; + + @ApiModelProperty("The name of the dead letter queue Topic") + private String deadLetterTopic; + + @ApiModelProperty("Whether to configure the retry letter queue, 0: not configure, 1: configure") + private Integer isRlq; + + @ApiModelProperty("The name of the retry letter queue topic") + private String retryLetterTopic; + + @Override + public ConsumePulsarRequest genRequest() { + return CommonBeanUtils.copyProperties(this, ConsumePulsarRequest::new); + } + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarRequest.java new file mode 100644 index 000000000..f125fd461 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarRequest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.consume.pulsar; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.inlong.manager.common.consts.MQType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest; + +/** + * Inlong consume request of Pulsar + */ +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@ApiModel("Inlong consume request of Pulsar") +@JsonTypeDefine(value = MQType.PULSAR) +public class ConsumePulsarRequest extends InlongConsumeRequest { + + @ApiModelProperty("Whether to configure the dead letter queue, 0: not configure, 1: configure") + private Integer isDlq; + + @ApiModelProperty("The name of the dead letter queue Topic") + private String deadLetterTopic; + + @ApiModelProperty("Whether to configure the retry letter queue, 0: not configure, 1: configure") + private Integer isRlq; + + @ApiModelProperty("The name of the retry letter queue topic") + private String retryLetterTopic; + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQDTO.java new file mode 100644 index 000000000..6b0b313d5 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQDTO.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.consume.tubemq; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.swagger.annotations.ApiModel; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; + +import javax.validation.constraints.NotNull; + +/** + * Inlong group info of TubeMQ + */ +@Data +@Builder +@AllArgsConstructor +@ApiModel("Inlong group info of TubeMQ") +public class ConsumeTubeMQDTO { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + // no fields + + /** + * Get the dto instance from the request + */ + public static ConsumeTubeMQDTO getFromRequest(ConsumeTubeMQRequest request) { + return ConsumeTubeMQDTO.builder().build(); + } + + /** + * Get the dto instance from the JSON string. + */ + public static ConsumeTubeMQDTO getFromJson(@NotNull String extParams) { + try { + return OBJECT_MAPPER.readValue(extParams, ConsumeTubeMQDTO.class); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.CONSUMER_INFO_INCORRECT.getMessage() + ": " + e.getMessage()); + } + } + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQInfo.java similarity index 53% copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQInfo.java index 4b7bdff63..3c2d4ec52 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQInfo.java @@ -15,20 +15,32 @@ * limitations under the License. */ -package org.apache.inlong.manager.pojo.group.tubemq; +package org.apache.inlong.manager.pojo.consume.tubemq; import io.swagger.annotations.ApiModel; import lombok.Data; -import lombok.NoArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.inlong.manager.common.consts.MQType; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo; /** - * Inlong group info for TubeMQ + * Inlong consume info of TubeMQ */ @Data -@NoArgsConstructor -@ApiModel("Inlong group info for TubeMQ") -public class InlongTubeMQDTO { +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = MQType.TUBEMQ) +@ApiModel("Inlong consume info of TubeMQ") +public class ConsumeTubeMQInfo extends InlongConsumeInfo { - // no field + // no fields + + @Override + public ConsumeTubeMQRequest genRequest() { + return CommonBeanUtils.copyProperties(this, ConsumeTubeMQRequest::new); + } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQRequest.java similarity index 60% copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQRequest.java index 4b7bdff63..5e051a593 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQRequest.java @@ -15,20 +15,26 @@ * limitations under the License. */ -package org.apache.inlong.manager.pojo.group.tubemq; +package org.apache.inlong.manager.pojo.consume.tubemq; import io.swagger.annotations.ApiModel; import lombok.Data; -import lombok.NoArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.inlong.manager.common.consts.MQType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest; /** - * Inlong group info for TubeMQ + * Inlong consume request of TubeMQ */ @Data -@NoArgsConstructor -@ApiModel("Inlong group info for TubeMQ") -public class InlongTubeMQDTO { +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = MQType.TUBEMQ) +@ApiModel("Inlong consume request of TubeMQ") +public class ConsumeTubeMQRequest extends InlongConsumeRequest { - // no field + // no fields } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java index 3c07a9c79..b01595500 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java @@ -98,9 +98,6 @@ public abstract class InlongGroupRequest { @ApiModelProperty(value = "Name of followers, separated by commas") private String followers; - @ApiModelProperty(value = "Name of creator") - private String creator; - @ApiModelProperty(value = "Inlong group Extension properties") private List<InlongGroupExtInfo> extList; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java index 4b7bdff63..01f9ad348 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java @@ -29,6 +29,6 @@ import lombok.NoArgsConstructor; @ApiModel("Inlong group info for TubeMQ") public class InlongTubeMQDTO { - // no field + // no fields } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/AbstractConsumeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/AbstractConsumeOperator.java new file mode 100644 index 000000000..e9f886909 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/AbstractConsumeOperator.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.consume; + +import org.apache.inlong.manager.common.consts.InlongConstants; +import org.apache.inlong.manager.common.enums.ConsumeStatus; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.dao.entity.InlongConsumeEntity; +import org.apache.inlong.manager.dao.mapper.InlongConsumeEntityMapper; +import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.transaction.annotation.Isolation; +import org.springframework.transaction.annotation.Transactional; + +/** + * Default operator of inlong consume. + */ +public abstract class AbstractConsumeOperator implements InlongConsumeOperator { + + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractConsumeOperator.class); + + @Autowired + private InlongConsumeEntityMapper consumeMapper; + + @Override + @Transactional(rollbackFor = Throwable.class) + public Integer saveOpt(InlongConsumeRequest request, String operator) { + // firstly check the topic info + this.checkTopicInfo(request); + + // set the ext params, init status, and other info + InlongConsumeEntity entity = CommonBeanUtils.copyProperties(request, InlongConsumeEntity::new); + this.setTargetEntity(request, entity); + entity.setStatus(ConsumeStatus.WAIT_ASSIGN.getCode()); + entity.setCreator(operator); + entity.setModifier(operator); + + consumeMapper.insert(entity); + return entity.getId(); + } + + /** + * Set the parameters of the target entity. + * + * @param request inlong consume request + * @param targetEntity targetEntity which will set the new parameters + */ + protected abstract void setTargetEntity(InlongConsumeRequest request, InlongConsumeEntity targetEntity); + + @Override + @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ) + public void updateOpt(InlongConsumeRequest request, String operator) { + // get the entity from request + InlongConsumeEntity entity = CommonBeanUtils.copyProperties(request, InlongConsumeEntity::new); + // set the ext params + this.setTargetEntity(request, entity); + entity.setModifier(operator); + + int rowCount = consumeMapper.updateByIdSelective(entity); + if (rowCount != InlongConstants.AFFECTED_ONE_ROW) { + LOGGER.error("inlong consume has already updated with id={}, expire version={}", + request.getId(), request.getVersion()); + throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED); + } + } + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java new file mode 100644 index 000000000..20d697fe0 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.consume; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; +import org.apache.inlong.manager.common.consts.InlongConstants; +import org.apache.inlong.manager.common.consts.MQType; +import org.apache.inlong.manager.common.enums.ClusterType; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.Preconditions; +import org.apache.inlong.manager.dao.entity.InlongConsumeEntity; +import org.apache.inlong.manager.dao.entity.InlongGroupEntity; +import org.apache.inlong.manager.dao.entity.InlongStreamEntity; +import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper; +import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper; +import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest; +import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarDTO; +import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarInfo; +import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarRequest; +import org.apache.inlong.manager.service.cluster.InlongClusterService; +import org.apache.inlong.manager.service.stream.InlongStreamService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * Inlong consume operator for Pulsar. + */ +@Service +public class ConsumePulsarOperator extends AbstractConsumeOperator { + + private static final int DLQ_RLQ_ENABLE = 1; + private static final int DLQ__RLQ_DISABLE = 0; + // Topic prefix for the dead letter queue + private static final String PREFIX_DLQ = "dlq"; + // Topic prefix for the retry letter queue + private static final String PREFIX_RLQ = "rlq"; + + @Autowired + private InlongGroupEntityMapper groupMapper; + @Autowired + private InlongStreamEntityMapper streamMapper; + @Autowired + private InlongClusterService clusterService; + @Autowired + private InlongStreamService streamService; + @Autowired + private ObjectMapper objectMapper; + + @Override + public Boolean accept(String mqType) { + return getMQType().equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType); + } + + @Override + public String getMQType() { + return MQType.PULSAR; + } + + @Override + public void checkTopicInfo(InlongConsumeRequest request) { + // one inlong stream only has one Pulsar topic, + // one inlong group may have multiple Pulsar topics. + String groupId = request.getInlongGroupId(); + String originTopic = request.getTopic(); + InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, originTopic); + Preconditions.checkNotNull(streamEntity, "not found any Pulsar topic for inlong group " + groupId); + + // format the topic to 'tenant/namespace/topic' + InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId); + PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterService.getOne( + groupEntity.getInlongClusterTag(), null, ClusterType.PULSAR); + String tenant = StringUtils.isEmpty(pulsarCluster.getTenant()) + ? InlongConstants.DEFAULT_PULSAR_TENANT + : pulsarCluster.getTenant(); + + request.setTopic(String.format(InlongConstants.PULSAR_TOPIC_FORMAT, tenant, + groupEntity.getMqResource(), originTopic)); + } + + @Override + public InlongConsumeInfo getFromEntity(InlongConsumeEntity entity) { + Preconditions.checkNotNull(entity, ErrorCodeEnum.CONSUME_NOT_FOUND.getMessage()); + + ConsumePulsarInfo consumeInfo = new ConsumePulsarInfo(); + CommonBeanUtils.copyProperties(entity, consumeInfo); + if (StringUtils.isNotBlank(entity.getExtParams())) { + ConsumePulsarDTO dto = ConsumePulsarDTO.getFromJson(entity.getExtParams()); + CommonBeanUtils.copyProperties(dto, consumeInfo); + } + + return consumeInfo; + } + + @Override + protected void setTargetEntity(InlongConsumeRequest request, InlongConsumeEntity targetEntity) { + // prerequisite for RLQ to be turned on: DLQ must be turned on. + // it means, if DLQ is closed, RLQ cannot exist alone and must be closed. + ConsumePulsarRequest pulsarRequest = (ConsumePulsarRequest) request; + boolean dlqEnable = (DLQ_RLQ_ENABLE == pulsarRequest.getIsDlq()); + boolean rlqEnable = (DLQ_RLQ_ENABLE == pulsarRequest.getIsRlq()); + if (rlqEnable && !dlqEnable) { + throw new BusinessException(ErrorCodeEnum.PULSAR_DLQ_RLQ_ERROR); + } + + // when saving, the DLQ / RLQ under the same groupId cannot be repeated. + // when updating, delete the related DLQ / RLQ info. + String groupId = targetEntity.getInlongGroupId(); + if (dlqEnable) { + String dlqTopic = PREFIX_DLQ + "_" + pulsarRequest.getDeadLetterTopic(); + Preconditions.checkTrue(!streamService.exist(groupId, dlqTopic), + ErrorCodeEnum.PULSAR_DLQ_DUPLICATED.getMessage()); + } else { + pulsarRequest.setIsDlq(DLQ__RLQ_DISABLE); + pulsarRequest.setDeadLetterTopic(null); + } + if (rlqEnable) { + String rlqTopic = PREFIX_RLQ + "_" + pulsarRequest.getRetryLetterTopic(); + Preconditions.checkTrue(!streamService.exist(groupId, rlqTopic), + ErrorCodeEnum.PULSAR_RLQ_DUPLICATED.getMessage()); + } else { + pulsarRequest.setIsRlq(DLQ__RLQ_DISABLE); + pulsarRequest.setRetryLetterTopic(null); + } + + try { + targetEntity.setExtParams(objectMapper.writeValueAsString(ConsumePulsarDTO.getFromRequest(pulsarRequest))); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.CONSUME_INFO_INCORRECT.getMessage() + ": " + e.getMessage()); + } + } + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumeTubeMQOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumeTubeMQOperator.java new file mode 100644 index 000000000..d507c2d2c --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumeTubeMQOperator.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.consume; + +import org.apache.commons.lang3.StringUtils; +import org.apache.inlong.manager.common.consts.MQType; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.Preconditions; +import org.apache.inlong.manager.dao.entity.InlongConsumeEntity; +import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest; +import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarInfo; +import org.apache.inlong.manager.pojo.consume.tubemq.ConsumeTubeMQDTO; +import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo; +import org.apache.inlong.manager.service.group.InlongGroupService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.Objects; + +/** + * Inlong consume operator for TubeMQ. + */ +@Service +public class ConsumeTubeMQOperator extends AbstractConsumeOperator { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConsumeTubeMQOperator.class); + + @Autowired + private InlongGroupService groupService; + + @Override + public Boolean accept(String mqType) { + return getMQType().equals(mqType); + } + + @Override + public String getMQType() { + return MQType.TUBEMQ; + } + + @Override + public void checkTopicInfo(InlongConsumeRequest request) { + String groupId = request.getInlongGroupId(); + InlongGroupTopicInfo topicInfo = groupService.getTopic(groupId); + Preconditions.checkNotNull(topicInfo, "inlong group not exist: " + groupId); + + // one inlong group only has one TubeMQ topic + String mqResource = topicInfo.getMqResource(); + Preconditions.checkTrue(Objects.equals(mqResource, request.getTopic()), + String.format("inlong consume topic %s not belongs to inlong group %s", request.getTopic(), groupId)); + } + + @Override + public InlongConsumeInfo getFromEntity(InlongConsumeEntity entity) { + Preconditions.checkNotNull(entity, ErrorCodeEnum.CONSUME_NOT_FOUND.getMessage()); + + ConsumePulsarInfo consumeInfo = new ConsumePulsarInfo(); + CommonBeanUtils.copyProperties(entity, consumeInfo); + if (StringUtils.isNotBlank(entity.getExtParams())) { + ConsumeTubeMQDTO dto = ConsumeTubeMQDTO.getFromJson(entity.getExtParams()); + CommonBeanUtils.copyProperties(dto, consumeInfo); + } + + return consumeInfo; + } + + @Override + protected void setTargetEntity(InlongConsumeRequest request, InlongConsumeEntity targetEntity) { + LOGGER.info("do nothing for inlong consume with TubeMQ"); + } + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeOperator.java new file mode 100644 index 000000000..52b51c892 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeOperator.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.consume; + +import org.apache.inlong.manager.dao.entity.InlongConsumeEntity; +import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest; + +/** + * Interface of the inlong consume operator. + */ +public interface InlongConsumeOperator { + + /** + * Determines whether the current instance matches the specified type. + */ + Boolean accept(String mqType); + + /** + * Get the MQ type. + * + * @return MQ type string + */ + String getMQType(); + + /** + * Check whether the topic in inlong consume belongs to its inlong group id. + * + * @param request inlong consume request + */ + void checkTopicInfo(InlongConsumeRequest request); + + /** + * Save the inlong consume info. + * + * @param request request of the group + * @param operator name of the operator + * @return inlong consume id after saving + */ + Integer saveOpt(InlongConsumeRequest request, String operator); + + /** + * Get the inlong consume info from the given entity. + * + * @param entity get field value from the entity + * @return inlong consume info after encapsulating + */ + InlongConsumeInfo getFromEntity(InlongConsumeEntity entity); + + /** + * Update the inlong consume info. + * + * @param request request of update + * @param operator name of operator + */ + void updateOpt(InlongConsumeRequest request, String operator); + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeOperatorFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeOperatorFactory.java new file mode 100644 index 000000000..1a4a57482 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeOperatorFactory.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.consume; + +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + * Factory for {@link InlongConsumeOperator}. + */ +@Service +public class InlongConsumeOperatorFactory { + + @Autowired + private List<InlongConsumeOperator> consumeOperatorList; + + /** + * Get a consumption operator instance via the given mqType + */ + public InlongConsumeOperator getInstance(String mqType) { + return consumeOperatorList.stream() + .filter(inst -> inst.accept(mqType)) + .findFirst() + .orElseThrow(() -> new + BusinessException(String.format(ErrorCodeEnum.MQ_TYPE_NOT_SUPPORTED.getMessage(), mqType))); + } +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeProcessService.java similarity index 82% rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionProcessService.java rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeProcessService.java index 0e413701f..d4be887e4 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionProcessService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeProcessService.java @@ -15,11 +15,11 @@ * limitations under the License. */ -package org.apache.inlong.manager.service.core.impl; +package org.apache.inlong.manager.service.consume; import lombok.extern.slf4j.Slf4j; -import org.apache.inlong.manager.common.enums.ConsumptionStatus; import org.apache.inlong.manager.common.consts.MQType; +import org.apache.inlong.manager.common.enums.ConsumeStatus; import org.apache.inlong.manager.common.enums.ProcessName; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.Preconditions; @@ -34,9 +34,12 @@ import org.apache.inlong.manager.service.workflow.WorkflowService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +/** + * Operation to the inlong consume process. + */ @Slf4j @Service -public class ConsumptionProcessService { +public class InlongConsumeProcessService { @Autowired private ConsumptionService consumptionService; @@ -45,13 +48,20 @@ public class ConsumptionProcessService { @Autowired private ConsumptionPulsarEntityMapper consumptionPulsarMapper; + /** + * Start the process for the specified ID. + * + * @param id inlong consume id + * @param operator name of operator + * @return workflow result + */ public WorkflowResult startProcess(Integer id, String operator) { ConsumptionInfo consumptionInfo = consumptionService.get(id); - Preconditions.checkTrue(ConsumptionStatus.ALLOW_START_WORKFLOW_STATUS.contains( - ConsumptionStatus.fromStatus(consumptionInfo.getStatus())), - "current status not allow start workflow"); + Preconditions.checkTrue(ConsumeStatus.ALLOW_START_WORKFLOW_STATUS.contains( + ConsumeStatus.fromStatus(consumptionInfo.getStatus())), + "current status not allowed to start workflow"); - consumptionInfo.setStatus(ConsumptionStatus.WAIT_APPROVE.getStatus()); + consumptionInfo.setStatus(ConsumeStatus.WAIT_APPROVE.getCode()); boolean rowCount = consumptionService.update(consumptionInfo, operator); Preconditions.checkTrue(rowCount, "update consumption failed"); @@ -72,4 +82,5 @@ public class ConsumptionProcessService { form.setConsumptionInfo(consumptionInfo); return form; } + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java new file mode 100644 index 000000000..ce421e337 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.consume; + +import org.apache.inlong.manager.pojo.common.PageResult; +import org.apache.inlong.manager.pojo.consume.InlongConsumeBriefInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumeCountInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest; +import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest; + +import javax.validation.Valid; +import javax.validation.constraints.NotNull; + +/** + * Inlong consume service layer interface + */ +public interface InlongConsumeService { + + /** + * Save inlong consume info. + * + * @param request consume request need to save + * @param operator name of operator + * @return inlong consume id after saving + */ + Integer save(InlongConsumeRequest request, String operator); + + /** + * Get inlong consume info based on ID + * + * @param id inlong consume id + * @return detail of inlong group + */ + InlongConsumeInfo get(Integer id); + + /** + * Check whether the consumer group exists or not + * + * @param consumerGroup consumer group + * @param excludeSelfId exclude the ID of this record + * @return true if exists, false if not exists + */ + boolean consumerGroupExists(String consumerGroup, Integer excludeSelfId); + + /** + * Paging query inlong consume info list + * + * @param request pagination query request + * @return inlong consume list + */ + PageResult<InlongConsumeBriefInfo> list(InlongConsumePageRequest request); + + /** + * Query the inlong consume statistics info via the username + * + * @param username username + * @return inlong consume status statistics + */ + InlongConsumeCountInfo countStatus(String username); + + /** + * Update the inlong consume + * + * @param request inlong consume request that needs to be updated + * @param operator name of operator + * @return whether succeed + */ + Boolean update(@Valid @NotNull(message = "inlong consume request cannot be null") InlongConsumeRequest request, + String operator); + + /** + * Delete the inlong consume by the id + * + * @param id inlong consume id that needs to be deleted + * @param operator name of operator + * @return whether succeed + */ + Boolean delete(Integer id, String operator); + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java new file mode 100644 index 000000000..c7977b500 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.consume; + +import com.github.pagehelper.Page; +import com.github.pagehelper.PageHelper; +import org.apache.commons.collections.CollectionUtils; +import org.apache.inlong.manager.common.consts.InlongConstants; +import org.apache.inlong.manager.common.enums.ConsumeStatus; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.Preconditions; +import org.apache.inlong.manager.dao.entity.InlongConsumeEntity; +import org.apache.inlong.manager.dao.mapper.InlongConsumeEntityMapper; +import org.apache.inlong.manager.pojo.common.OrderFieldEnum; +import org.apache.inlong.manager.pojo.common.OrderTypeEnum; +import org.apache.inlong.manager.pojo.common.PageResult; +import org.apache.inlong.manager.pojo.consume.InlongConsumeBriefInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumeCountInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest; +import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Isolation; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +import static org.apache.inlong.manager.pojo.common.PageRequest.MAX_PAGE_SIZE; + +/** + * Inlong consume service layer implementation + */ +@Service +public class InlongConsumeServiceImpl implements InlongConsumeService { + + private static final Logger LOGGER = LoggerFactory.getLogger(InlongConsumeServiceImpl.class); + + @Autowired + private InlongConsumeEntityMapper consumeMapper; + @Autowired + private InlongConsumeOperatorFactory consumeOperatorFactory; + + @Override + public Integer save(InlongConsumeRequest request, String operator) { + LOGGER.debug("begin to save inlong consume={} by user={}", request, operator); + Preconditions.checkNotNull(request, "inlong consume request cannot be null"); + Preconditions.checkNotNull(request.getTopic(), "inlong consume topic cannot be null"); + String consumerGroup = request.getConsumerGroup(); + Preconditions.checkNotNull(consumerGroup, "inlong consume topic cannot be null"); + if (consumerGroupExists(consumerGroup, request.getId())) { + throw new BusinessException(String.format("consumer group %s already exist", consumerGroup)); + } + + InlongConsumeOperator consumeOperator = consumeOperatorFactory.getInstance(request.getMqType()); + consumeOperator.saveOpt(request, operator); + + LOGGER.info("success to save inlong consume for consumer group={} by user={}", consumerGroup, operator); + return request.getId(); + } + + @Override + public boolean consumerGroupExists(String consumerGroup, Integer excludeSelfId) { + InlongConsumePageRequest request = InlongConsumePageRequest.builder() + .consumerGroup(consumerGroup) + .isAdminRole(true) + .build(); + List<InlongConsumeEntity> result = consumeMapper.selectByCondition(request); + if (excludeSelfId != null) { + result = result.stream() + .filter(consumer -> !excludeSelfId.equals(consumer.getId())) + .collect(Collectors.toList()); + } + return CollectionUtils.isNotEmpty(result); + } + + @Override + public InlongConsumeInfo get(Integer id) { + Preconditions.checkNotNull(id, "inlong consume id cannot be null"); + InlongConsumeEntity entity = consumeMapper.selectById(id); + if (entity == null) { + LOGGER.error("inlong consume not found with id={}", id); + throw new BusinessException(ErrorCodeEnum.CONSUME_NOT_FOUND); + } + + InlongConsumeOperator consumeOperator = consumeOperatorFactory.getInstance(entity.getMqType()); + InlongConsumeInfo consumeInfo = consumeOperator.getFromEntity(entity); + + LOGGER.debug("success to get inlong consume for id={}", id); + return consumeInfo; + } + + @Override + public InlongConsumeCountInfo countStatus(String username) { + List<Map<String, Object>> statusCount = consumeMapper.countByUser(username); + InlongConsumeCountInfo countInfo = new InlongConsumeCountInfo(); + for (Map<String, Object> map : statusCount) { + int status = (Integer) map.get("status"); + long count = (Long) map.get("count"); + countInfo.setTotalCount(countInfo.getTotalCount() + count); + if (status == ConsumeStatus.WAIT_ASSIGN.getCode()) { + countInfo.setWaitAssignCount(countInfo.getWaitAssignCount() + count); + } else if (status == ConsumeStatus.WAIT_APPROVE.getCode()) { + countInfo.setWaitApproveCount(countInfo.getWaitApproveCount() + count); + } else if (status == ConsumeStatus.REJECTED.getCode()) { + countInfo.setRejectCount(countInfo.getRejectCount() + count); + } + } + + LOGGER.debug("success to count inlong consume for user={}", username); + return countInfo; + } + + @Override + public PageResult<InlongConsumeBriefInfo> list(InlongConsumePageRequest request) { + if (request.getPageSize() > MAX_PAGE_SIZE) { + LOGGER.warn("list inlong consumes, change page size from {} to {}", request.getPageSize(), MAX_PAGE_SIZE); + request.setPageSize(MAX_PAGE_SIZE); + } + PageHelper.startPage(request.getPageNum(), request.getPageSize()); + OrderFieldEnum.checkOrderField(request); + OrderTypeEnum.checkOrderType(request); + Page<InlongConsumeEntity> entityPage = (Page<InlongConsumeEntity>) consumeMapper.selectByCondition(request); + List<InlongConsumeBriefInfo> briefInfos = CommonBeanUtils.copyListProperties(entityPage, + InlongConsumeBriefInfo::new); + PageResult<InlongConsumeBriefInfo> pageResult = new PageResult<>(briefInfos, + entityPage.getTotal(), entityPage.getPageNum(), entityPage.getPageSize()); + + LOGGER.debug("success to list inlong consume for {}", request); + return pageResult; + } + + @Override + @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ, + propagation = Propagation.REQUIRES_NEW) + public Boolean update(InlongConsumeRequest request, String operator) { + LOGGER.debug("begin to update inlong consume={} by user={}", request, operator); + Preconditions.checkNotNull(request, "inlong consume request cannot be null"); + + // check if it can be modified + Integer consumeId = request.getId(); + InlongConsumeEntity existEntity = consumeMapper.selectById(consumeId); + Preconditions.checkNotNull(existEntity, "inlong consume not exist with id " + consumeId); + Preconditions.checkTrue(existEntity.getInCharges().contains(operator), + "operator" + operator + " has no privilege for the inlong consume"); + + if (!Objects.equals(existEntity.getVersion(), request.getVersion())) { + LOGGER.error(String.format("inlong consume has already updated, id=%s, curVersion=%s", + existEntity.getId(), request.getVersion())); + throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED); + } + + ConsumeStatus consumeStatus = ConsumeStatus.fromStatus(existEntity.getStatus()); + Preconditions.checkTrue(ConsumeStatus.ALLOW_SAVE_UPDATE_STATUS.contains(consumeStatus), + "inlong consume not allowed update when status is " + consumeStatus.name()); + + InlongConsumeOperator consumeOperator = consumeOperatorFactory.getInstance(request.getMqType()); + consumeOperator.updateOpt(request, operator); + + LOGGER.info("success to update inlong consume={} by user={}", request, operator); + return true; + } + + @Override + public Boolean delete(Integer id, String operator) { + LOGGER.info("begin to delete inlong consume for id={} by user={}", id, operator); + Preconditions.checkNotNull(id, "inlong consume id cannot be null"); + InlongConsumeEntity entity = consumeMapper.selectById(id); + Preconditions.checkNotNull(entity, "inlong consume not exist with id " + id); + + entity.setIsDeleted(id); + entity.setStatus(ConsumeStatus.DELETED.getCode()); + entity.setModifier(operator); + + int rowCount = consumeMapper.updateByIdSelective(entity); + if (rowCount != InlongConstants.AFFECTED_ONE_ROW) { + LOGGER.error("inlong consume has already updated with id={}, curVersion={}", id, entity.getVersion()); + throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED); + } + + LOGGER.info("success to delete inlong consume for id={} by user={}", id, operator); + return true; + } + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java index 8a899a390..5641a293c 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java @@ -24,7 +24,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.consts.MQType; import org.apache.inlong.manager.common.enums.ClusterType; -import org.apache.inlong.manager.common.enums.ConsumptionStatus; +import org.apache.inlong.manager.common.enums.ConsumeStatus; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; @@ -104,9 +104,9 @@ public class ConsumptionServiceImpl implements ConsumptionService { return ConsumptionSummary.builder() .totalCount(countMap.values().stream().mapToInt(c -> c).sum()) - .waitingAssignCount(countMap.getOrDefault(ConsumptionStatus.WAIT_ASSIGN.getStatus() + "", 0)) - .waitingApproveCount(countMap.getOrDefault(ConsumptionStatus.WAIT_APPROVE.getStatus() + "", 0)) - .rejectedCount(countMap.getOrDefault(ConsumptionStatus.REJECTED.getStatus() + "", 0)).build(); + .waitingAssignCount(countMap.getOrDefault(ConsumeStatus.WAIT_ASSIGN.getCode() + "", 0)) + .waitingApproveCount(countMap.getOrDefault(ConsumeStatus.WAIT_APPROVE.getCode() + "", 0)) + .rejectedCount(countMap.getOrDefault(ConsumeStatus.REJECTED.getCode() + "", 0)).build(); } @Override @@ -166,9 +166,9 @@ public class ConsumptionServiceImpl implements ConsumptionService { if (info.getId() != null) { ConsumptionEntity consumptionEntity = consumptionMapper.selectByPrimaryKey(info.getId()); Preconditions.checkNotNull(consumptionEntity, "consumption not exist with id: " + info.getId()); - ConsumptionStatus consumptionStatus = ConsumptionStatus.fromStatus(consumptionEntity.getStatus()); - Preconditions.checkTrue(ConsumptionStatus.ALLOW_SAVE_UPDATE_STATUS.contains(consumptionStatus), - "consumption not allow update when status is " + consumptionStatus.name()); + ConsumeStatus consumeStatus = ConsumeStatus.fromStatus(consumptionEntity.getStatus()); + Preconditions.checkTrue(ConsumeStatus.ALLOW_SAVE_UPDATE_STATUS.contains(consumeStatus), + "consumption not allow update when status is " + consumeStatus.name()); } setTopicInfo(info); @@ -284,7 +284,7 @@ public class ConsumptionServiceImpl implements ConsumptionService { // If the consumption has been approved, then close/open DLQ or RLQ, it is necessary to // add/remove inlong streams in the inlong group - if (ConsumptionStatus.APPROVED.getStatus() == exists.getStatus()) { + if (ConsumeStatus.APPROVED.getCode() == exists.getStatus()) { String groupId = info.getInlongGroupId(); String dlqNameOld = pulsarEntity.getDeadLetterTopic(); String dlqNameNew = update.getDeadLetterTopic(); @@ -356,7 +356,7 @@ public class ConsumptionServiceImpl implements ConsumptionService { entity.setInCharges(groupInfo.getInCharges()); entity.setFilterEnabled(0); - entity.setStatus(ConsumptionStatus.APPROVED.getStatus()); + entity.setStatus(ConsumeStatus.APPROVED.getCode()); String operator = groupInfo.getCreator(); entity.setCreator(operator); entity.setModifier(operator); @@ -377,7 +377,7 @@ public class ConsumptionServiceImpl implements ConsumptionService { private ConsumptionEntity saveConsumption(ConsumptionInfo info, String operator) { ConsumptionEntity entity = CommonBeanUtils.copyProperties(info, ConsumptionEntity::new); - entity.setStatus(ConsumptionStatus.WAIT_ASSIGN.getStatus()); + entity.setStatus(ConsumeStatus.WAIT_ASSIGN.getCode()); entity.setCreator(operator); entity.setModifier(operator); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java index 0132b6259..8d950b706 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java @@ -49,7 +49,7 @@ import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; import java.util.concurrent.TimeUnit; /** - * Operation related to inlong group process + * Operation to the inlong group process */ @Service public class InlongGroupProcessService { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java index d9c5f1c63..150c8e317 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java @@ -77,6 +77,8 @@ import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; +import static org.apache.inlong.manager.pojo.common.PageRequest.MAX_PAGE_SIZE; + /** * Inlong group service layer implementation */ @@ -85,7 +87,6 @@ import java.util.stream.Collectors; public class InlongGroupServiceImpl implements InlongGroupService { private static final Logger LOGGER = LoggerFactory.getLogger(InlongGroupServiceImpl.class); - private static final Integer MAX_PAGE_SIZE = 100; @Autowired private InlongGroupOperatorFactory groupOperatorFactory; @@ -184,7 +185,7 @@ public class InlongGroupServiceImpl implements InlongGroupService { @Override public PageResult<InlongGroupBriefInfo> listBrief(InlongGroupPageRequest request) { if (request.getPageSize() > MAX_PAGE_SIZE) { - LOGGER.warn("list group info, but page size is {}, change to {}", request.getPageSize(), MAX_PAGE_SIZE); + LOGGER.warn("list inlong groups, change page size from {} to {}", request.getPageSize(), MAX_PAGE_SIZE); request.setPageSize(MAX_PAGE_SIZE); } PageHelper.startPage(request.getPageNum(), request.getPageSize()); @@ -275,8 +276,8 @@ public class InlongGroupServiceImpl implements InlongGroupService { return true; } - @Transactional(rollbackFor = Throwable.class) @Override + @Transactional(rollbackFor = Throwable.class) public boolean delete(String groupId, String operator) { LOGGER.info("begin to delete inlong group for groupId={} by user={}", groupId, operator); Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage()); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCancelProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCancelProcessListener.java index caded9717..c06c3d5a2 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCancelProcessListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCancelProcessListener.java @@ -19,7 +19,7 @@ package org.apache.inlong.manager.service.listener.consumption; import lombok.extern.slf4j.Slf4j; import org.apache.inlong.manager.common.consts.InlongConstants; -import org.apache.inlong.manager.common.enums.ConsumptionStatus; +import org.apache.inlong.manager.common.enums.ConsumeStatus; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.exceptions.WorkflowListenerException; @@ -58,7 +58,7 @@ public class ConsumptionCancelProcessListener implements ProcessEventListener { ApplyConsumptionProcessForm processForm = (ApplyConsumptionProcessForm) context.getProcessForm(); ConsumptionEntity update = consumptionEntityMapper.selectByPrimaryKey(processForm.getConsumptionInfo().getId()); - update.setStatus(ConsumptionStatus.CANCELED.getStatus()); + update.setStatus(ConsumeStatus.CANCELED.getCode()); int rowCount = consumptionEntityMapper.updateByPrimaryKeySelective(update); if (rowCount != InlongConstants.AFFECTED_ONE_ROW) { log.error("consumption information has already updated, id={}, curVersion={}", diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java index c42930487..e9d0795b0 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java @@ -21,7 +21,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.ClusterType; -import org.apache.inlong.manager.common.enums.ConsumptionStatus; +import org.apache.inlong.manager.common.enums.ConsumeStatus; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.consts.MQType; import org.apache.inlong.manager.common.exceptions.BusinessException; @@ -104,7 +104,7 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener */ private void updateConsumerInfo(Integer consumptionId, String consumerGroup) { ConsumptionEntity update = consumptionMapper.selectByPrimaryKey(consumptionId); - update.setStatus(ConsumptionStatus.APPROVED.getStatus()); + update.setStatus(ConsumeStatus.APPROVED.getCode()); update.setConsumerGroup(consumerGroup); int rowCount = consumptionMapper.updateByPrimaryKeySelective(update); if (rowCount != InlongConstants.AFFECTED_ONE_ROW) { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionRejectProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionRejectProcessListener.java index 72af56894..b1e5030e7 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionRejectProcessListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionRejectProcessListener.java @@ -18,7 +18,7 @@ package org.apache.inlong.manager.service.listener.consumption; import org.apache.inlong.manager.common.consts.InlongConstants; -import org.apache.inlong.manager.common.enums.ConsumptionStatus; +import org.apache.inlong.manager.common.enums.ConsumeStatus; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.exceptions.WorkflowListenerException; @@ -60,7 +60,7 @@ public class ConsumptionRejectProcessListener implements ProcessEventListener { ApplyConsumptionProcessForm processForm = (ApplyConsumptionProcessForm) context.getProcessForm(); ConsumptionEntity update = consumptionEntityMapper.selectByPrimaryKey(processForm.getConsumptionInfo().getId()); - update.setStatus(ConsumptionStatus.REJECTED.getStatus()); + update.setStatus(ConsumeStatus.REJECTED.getCode()); int rowCount = consumptionEntityMapper.updateByPrimaryKeySelective(update); if (rowCount != InlongConstants.AFFECTED_ONE_ROW) { diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql index ea5fdca22..d844285b7 100644 --- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql +++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql @@ -140,6 +140,33 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster_node` UNIQUE KEY `unique_inlong_cluster_node` (`parent_id`, `type`, `ip`, `port`, `is_deleted`) ); +-- ---------------------------- +-- Table structure for inlong_consume +-- ---------------------------- +CREATE TABLE IF NOT EXISTS `inlong_consume` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `consumer_group` varchar(256) NOT NULL COMMENT 'Consumer group name, filled in by the user, undeleted ones cannot be repeated', + `description` varchar(256) DEFAULT '' COMMENT 'Inlong consume description', + `mq_type` varchar(10) DEFAULT 'TUBEMQ' COMMENT 'Message queue type, high throughput: TUBEMQ, high consistency: PULSAR', + `topic` varchar(256) NOT NULL COMMENT 'The target topic of this consume', + `inlong_group_id` varchar(256) NOT NULL COMMENT 'The target inlong group id of this consume', + `filter_enabled` int(2) DEFAULT '0' COMMENT 'Whether to filter consume, 0: not filter, 1: filter', + `inlong_stream_id` varchar(256) DEFAULT NULL COMMENT 'The target inlong stream id of this consume, needed if the filter_enabled=1', + `ext_params` mediumtext DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string', + `in_charges` varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas', + `status` int(4) DEFAULT '100' COMMENT 'Inlong consume status', + `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted', + `creator` varchar(64) NOT NULL COMMENT 'Creator name', + `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name', + `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time', + `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time', + `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification', + PRIMARY KEY (`id`), + UNIQUE KEY `unique_inlong_consume` (`consumer_group`, `is_deleted`) +); + + -- ---------------------------- -- Table structure for data_node -- ---------------------------- @@ -166,7 +193,7 @@ CREATE TABLE IF NOT EXISTS `data_node` ); -- ---------------------------- --- Table structure for consumption +-- Deprecated: Table structure for consumption -- ---------------------------- CREATE TABLE IF NOT EXISTS `consumption` ( @@ -189,7 +216,7 @@ CREATE TABLE IF NOT EXISTS `consumption` ); -- ---------------------------- --- Table structure for consumption_pulsar +-- Deprecated: Table structure for consumption_pulsar -- ---------------------------- CREATE TABLE IF NOT EXISTS `consumption_pulsar` ( diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql index 057af951f..cfa64e471 100644 --- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql +++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql @@ -151,6 +151,33 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster_node` ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong cluster node table'; +-- ---------------------------- +-- Table structure for inlong_consume +-- ---------------------------- +CREATE TABLE IF NOT EXISTS `inlong_consume` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `consumer_group` varchar(256) NOT NULL COMMENT 'Consumer group name, filled in by the user, undeleted ones cannot be repeated', + `description` varchar(256) DEFAULT '' COMMENT 'Inlong consume description', + `mq_type` varchar(10) DEFAULT 'TUBEMQ' COMMENT 'Message queue type, high throughput: TUBEMQ, high consistency: PULSAR', + `topic` varchar(256) NOT NULL COMMENT 'The target topic of this consume', + `inlong_group_id` varchar(256) NOT NULL COMMENT 'The target inlong group id of this consume', + `filter_enabled` int(2) DEFAULT '0' COMMENT 'Whether to filter consume, 0: not filter, 1: filter', + `inlong_stream_id` varchar(256) DEFAULT NULL COMMENT 'The target inlong stream id of this consume, needed if the filter_enabled=1', + `ext_params` mediumtext DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string', + `in_charges` varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas', + `status` int(4) DEFAULT '100' COMMENT 'Inlong consume status', + `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted', + `creator` varchar(64) NOT NULL COMMENT 'Creator name', + `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name', + `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time', + `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time', + `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification', + PRIMARY KEY (`id`), + UNIQUE KEY `unique_inlong_consume` (`consumer_group`, `is_deleted`) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong consume table'; + -- ---------------------------- -- Table structure for data_node -- ---------------------------- @@ -178,7 +205,7 @@ CREATE TABLE IF NOT EXISTS `data_node` DEFAULT CHARSET = utf8mb4 COMMENT ='Data node table'; -- ---------------------------- --- Table structure for consumption +-- Deprecated: Table structure for consumption -- ---------------------------- CREATE TABLE IF NOT EXISTS `consumption` ( @@ -202,7 +229,7 @@ CREATE TABLE IF NOT EXISTS `consumption` DEFAULT CHARSET = utf8mb4 COMMENT ='Data consumption configuration table'; -- ---------------------------- --- Table structure for consumption_pulsar +-- Deprecated: Table structure for consumption_pulsar -- ---------------------------- CREATE TABLE IF NOT EXISTS `consumption_pulsar` ( diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/ConsumptionController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/ConsumptionController.java index 7207ce726..f1c77ad51 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/ConsumptionController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/ConsumptionController.java @@ -29,7 +29,7 @@ import org.apache.inlong.manager.pojo.consumption.ConsumptionQuery; import org.apache.inlong.manager.pojo.consumption.ConsumptionSummary; import org.apache.inlong.manager.pojo.workflow.WorkflowResult; import org.apache.inlong.manager.service.core.ConsumptionService; -import org.apache.inlong.manager.service.core.impl.ConsumptionProcessService; +import org.apache.inlong.manager.service.consume.InlongConsumeProcessService; import org.apache.inlong.manager.service.operationlog.OperationLog; import org.apache.inlong.manager.service.user.LoginUserUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -53,7 +53,7 @@ public class ConsumptionController { @Autowired private ConsumptionService consumptionService; @Autowired - private ConsumptionProcessService processOperation; + private InlongConsumeProcessService processOperation; @GetMapping("/consumption/summary") @ApiOperation(value = "Get data consumption summary") diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java new file mode 100644 index 000000000..4697cbe57 --- /dev/null +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.web.controller; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiOperation; +import org.apache.inlong.manager.common.enums.OperationType; +import org.apache.inlong.manager.common.enums.UserTypeEnum; +import org.apache.inlong.manager.common.validation.UpdateValidation; +import org.apache.inlong.manager.pojo.common.PageResult; +import org.apache.inlong.manager.pojo.common.Response; +import org.apache.inlong.manager.pojo.consume.InlongConsumeBriefInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumeCountInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest; +import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest; +import org.apache.inlong.manager.pojo.workflow.WorkflowResult; +import org.apache.inlong.manager.service.consume.InlongConsumeProcessService; +import org.apache.inlong.manager.service.consume.InlongConsumeService; +import org.apache.inlong.manager.service.operationlog.OperationLog; +import org.apache.inlong.manager.service.user.LoginUserUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.DeleteMapping; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RestController; + +/** + * Inlong consume control layer + */ +@RestController +@RequestMapping("/api") +@Api(tags = "Inlong-Consume-API") +public class InlongConsumeController { + + @Autowired + private InlongConsumeService consumeService; + @Autowired + private InlongConsumeProcessService consumeProcessService; + + @RequestMapping(value = "/consume/save", method = RequestMethod.POST) + @OperationLog(operation = OperationType.CREATE) + @ApiOperation(value = "Save inlong consume") + public Response<Integer> save(@Validated(UpdateValidation.class) @RequestBody InlongConsumeRequest request) { + String operator = LoginUserUtils.getLoginUser().getName(); + return Response.success(consumeService.save(request, operator)); + } + + @GetMapping("/consume/get/{id}") + @ApiOperation(value = "Get inlong consume") + @ApiImplicitParam(name = "id", value = "Inlong consume ID", dataTypeClass = Integer.class, required = true) + public Response<InlongConsumeInfo> get(@PathVariable(name = "id") Integer id) { + return Response.success(consumeService.get(id)); + } + + @GetMapping(value = "/consume/countStatus") + @ApiOperation(value = "Count inlong consume status by current user") + public Response<InlongConsumeCountInfo> countStatusByUser() { + return Response.success(consumeService.countStatus(LoginUserUtils.getLoginUser().getName())); + } + + @GetMapping("/consume/list") + @ApiOperation(value = "List inlong consume by pagination") + public Response<PageResult<InlongConsumeBriefInfo>> list(InlongConsumePageRequest request) { + request.setCurrentUser(LoginUserUtils.getLoginUser().getName()); + request.setIsAdminRole(LoginUserUtils.getLoginUser().getRoles().contains(UserTypeEnum.ADMIN.name())); + return Response.success(consumeService.list(request)); + } + + @PostMapping("/consume/update/{id}") + @OperationLog(operation = OperationType.UPDATE) + @ApiOperation(value = "Update inlong consume") + public Response<Boolean> update(@Validated(UpdateValidation.class) @RequestBody InlongConsumeRequest request) { + return Response.success(consumeService.update(request, LoginUserUtils.getLoginUser().getName())); + } + + @DeleteMapping("/consume/delete/{id}") + @OperationLog(operation = OperationType.DELETE) + @ApiOperation(value = "Delete inlong consume by ID") + @ApiImplicitParam(name = "id", value = "Inlong consume ID", dataTypeClass = Integer.class, required = true) + public Response<Object> delete(@PathVariable(name = "id") Integer id) { + this.consumeService.delete(id, LoginUserUtils.getLoginUser().getName()); + return Response.success(); + } + + @PostMapping("/consume/startProcess/{id}") + @OperationLog(operation = OperationType.UPDATE) + @ApiOperation(value = "Start inlong consume process") + @ApiImplicitParam(name = "id", value = "Inlong consume ID", dataTypeClass = Integer.class, required = true) + public Response<WorkflowResult> startProcess(@PathVariable(name = "id") Integer id) { + String username = LoginUserUtils.getLoginUser().getName(); + return Response.success(consumeProcessService.startProcess(id, username)); + } + +}