This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new c21437090 [INLONG-6044][Manager] Distinguish config processes between the InlongGroup and InlongStream (#6046) c21437090 is described below commit c2143709048cc2a7a6f5dc50ea92575042654182 Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Thu Nov 10 14:46:44 2022 +0800 [INLONG-6044][Manager] Distinguish config processes between the InlongGroup and InlongStream (#6046) * Distinguish config processes between the InlongGroup and InlongStream * Use common thread pool for UpdateGroupCompleteListener * Modify the source state according to lightweight * Modify the asynchronous process logic of inlong stream * Allow configuration without inlong stream * Determine whether to configure Sort according to isStream param Co-authored-by: healchow <healc...@gmail.com> --- .../manager/dao/entity/WorkflowProcessEntity.java | 1 + .../mappers/WorkflowProcessEntityMapper.xml | 27 +++++---- .../manager/pojo/workflow/ProcessRequest.java | 3 + .../manager/pojo/workflow/TaskLogRequest.java | 3 + .../form/process/StreamResourceProcessForm.java | 13 +++++ .../listener/group/InitGroupCompleteListener.java | 24 +++----- .../service/listener/group/InitGroupListener.java | 4 -- .../listener/queue/QueueResourceListener.java | 67 ++++++++++++++++++++++ .../queue/StreamQueueResourceListener.java | 3 - .../service/listener/sort/SortConfigListener.java | 5 ++ .../stream/InitStreamCompleteListener.java | 7 ++- .../queue/kafka/KafkaResourceOperators.java | 25 +------- .../queue/pulsar/PulsarResourceOperator.java | 16 +----- .../resource/sort/DefaultSortConfigOperator.java | 10 ++-- .../service/stream/InlongStreamProcessService.java | 20 +++---- .../service/workflow/WorkflowServiceImpl.java | 1 + .../group/CreateGroupWorkflowDefinition.java | 24 +------- .../group/DeleteGroupWorkflowDefinition.java | 8 ++- .../stream/CreateStreamWorkflowDefinition.java | 4 +- .../stream/DeleteStreamWorkflowDefinition.java | 4 +- .../group/CreateGroupWorkflowDefinitionTest.java | 4 +- .../main/resources/h2/apache_inlong_manager.sql | 3 +- .../manager-web/sql/apache_inlong_manager.sql | 3 +- .../workflow/processor/StartEventProcessor.java | 5 ++ 24 files changed, 162 insertions(+), 122 deletions(-) diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/WorkflowProcessEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/WorkflowProcessEntity.java index 87e92bf4a..b295e1e40 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/WorkflowProcessEntity.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/WorkflowProcessEntity.java @@ -34,6 +34,7 @@ public class WorkflowProcessEntity { private String title; private String inlongGroupId; + private String inlongStreamId; private String applicant; private String status; private String formData; diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowProcessEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowProcessEntityMapper.xml index 1969918b4..8b122ab12 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowProcessEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowProcessEntityMapper.xml @@ -27,6 +27,7 @@ <result column="type" jdbcType="VARCHAR" property="type"/> <result column="title" jdbcType="VARCHAR" property="title"/> <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId"/> + <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId"/> <result column="applicant" jdbcType="VARCHAR" property="applicant"/> <result column="status" jdbcType="VARCHAR" property="status"/> <result column="start_time" jdbcType="TIMESTAMP" property="startTime"/> @@ -36,22 +37,21 @@ <result column="ext_params" jdbcType="LONGVARCHAR" property="extParams"/> </resultMap> <sql id="Base_Column_List"> - id, name, display_name, type, title, inlong_group_id, applicant, - status, start_time, end_time, hidden, form_data, ext_params + id, name, display_name, type, title, inlong_group_id, inlong_stream_id, + applicant, status, start_time, end_time, hidden, form_data, ext_params </sql> <insert id="insert" useGeneratedKeys="true" keyProperty="id" keyColumn="id" parameterType="org.apache.inlong.manager.dao.entity.WorkflowProcessEntity"> - insert into workflow_process (name, display_name, - type, title, inlong_group_id, - applicant, status, - start_time, end_time, - form_data, ext_params, hidden) - values (#{name,jdbcType=VARCHAR}, #{displayName,jdbcType=VARCHAR}, - #{type,jdbcType=VARCHAR}, #{title,jdbcType=VARCHAR}, #{inlongGroupId,jdbcType=VARCHAR}, - #{applicant,jdbcType=VARCHAR}, #{status,jdbcType=VARCHAR}, - #{startTime,jdbcType=TIMESTAMP}, #{endTime,jdbcType=TIMESTAMP}, - #{formData,jdbcType=LONGVARCHAR}, #{extParams,jdbcType=LONGVARCHAR}, #{hidden,jdbcType=TINYINT}) + insert into workflow_process (name, display_name, type, + title, inlong_group_id, inlong_stream_id, + applicant, status, start_time, + end_time, form_data, ext_params, hidden) + values (#{name,jdbcType=VARCHAR}, #{displayName,jdbcType=VARCHAR}, #{type,jdbcType=VARCHAR}, + #{title,jdbcType=VARCHAR}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR}, + #{applicant,jdbcType=VARCHAR}, #{status,jdbcType=VARCHAR}, #{startTime,jdbcType=TIMESTAMP}, + #{endTime,jdbcType=TIMESTAMP}, #{formData,jdbcType=LONGVARCHAR}, #{extParams,jdbcType=LONGVARCHAR}, + #{hidden,jdbcType=TINYINT}) </insert> <select id="selectById" parameterType="java.lang.Integer" resultMap="BaseResultMap"> @@ -87,6 +87,9 @@ <if test="inlongGroupId != null and inlongGroupId !=''"> and inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR} </if> + <if test="inlongStreamId != null and inlongStreamId !=''"> + and inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR} + </if> <if test="applicant != null and applicant !=''"> and applicant = #{applicant,jdbcType=VARCHAR} </if> diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/ProcessRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/ProcessRequest.java index a9b5138e8..585de747a 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/ProcessRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/ProcessRequest.java @@ -63,6 +63,9 @@ public class ProcessRequest extends PageRequest { @ApiModelProperty("Inlong group id") private String inlongGroupId; + @ApiModelProperty("Inlong stream id") + private String inlongStreamId; + @ApiModelProperty("Start time-lower limit") @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") private Date startTimeBegin; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/TaskLogRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/TaskLogRequest.java index 5f5011918..f3cc5a9b2 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/TaskLogRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/TaskLogRequest.java @@ -36,6 +36,9 @@ public class TaskLogRequest extends PageRequest { @ApiModelProperty("Inlong group id") private String inlongGroupId; + @ApiModelProperty("Inlong stream id") + private String inlongStreamId; + @ApiModelProperty("Process name list") private List<String> processNames; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/StreamResourceProcessForm.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/StreamResourceProcessForm.java index 4c50e58e4..b175bc09d 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/StreamResourceProcessForm.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/StreamResourceProcessForm.java @@ -39,6 +39,18 @@ public class StreamResourceProcessForm extends BaseProcessForm { private GroupOperateType groupOperateType = GroupOperateType.INIT; + /** + * Get stream resource process form info. + */ + public static StreamResourceProcessForm getProcessForm(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, + GroupOperateType operateType) { + StreamResourceProcessForm processForm = new StreamResourceProcessForm(); + processForm.setGroupInfo(groupInfo); + processForm.setStreamInfo(streamInfo); + processForm.setGroupOperateType(operateType); + return processForm; + } + @Override public void validate() throws FormValidateException { @@ -53,4 +65,5 @@ public class StreamResourceProcessForm extends BaseProcessForm { public String getInlongGroupId() { return groupInfo.getInlongGroupId(); } + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java index 944c740c7..ca5a375fb 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java @@ -18,27 +18,26 @@ package org.apache.inlong.manager.service.listener.group; import lombok.extern.slf4j.Slf4j; -import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.GroupStatus; import org.apache.inlong.manager.common.enums.ProcessEvent; -import org.apache.inlong.manager.common.enums.SourceStatus; -import org.apache.inlong.manager.common.enums.StreamStatus; import org.apache.inlong.manager.common.exceptions.WorkflowListenerException; import org.apache.inlong.manager.dao.entity.InlongGroupEntity; import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.group.InlongGroupRequest; import org.apache.inlong.manager.pojo.group.InlongGroupUtils; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm; import org.apache.inlong.manager.service.group.InlongGroupService; -import org.apache.inlong.manager.service.source.StreamSourceService; -import org.apache.inlong.manager.service.stream.InlongStreamService; +import org.apache.inlong.manager.service.stream.InlongStreamProcessService; import org.apache.inlong.manager.workflow.WorkflowContext; import org.apache.inlong.manager.workflow.event.ListenerResult; import org.apache.inlong.manager.workflow.event.process.ProcessEventListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.util.List; + /** * The listener of InlongGroup when created resources successfully. */ @@ -49,11 +48,9 @@ public class InitGroupCompleteListener implements ProcessEventListener { @Autowired private InlongGroupService groupService; @Autowired - private InlongStreamService streamService; - @Autowired - private StreamSourceService sourceService; - @Autowired private InlongGroupEntityMapper groupMapper; + @Autowired + private InlongStreamProcessService streamProcessService; @Override public ProcessEvent event() { @@ -84,12 +81,9 @@ public class InitGroupCompleteListener implements ProcessEventListener { updateGroupRequest.setVersion(existGroup.getVersion()); groupService.update(updateGroupRequest, operator); - // update status of other related configs - streamService.updateStatus(groupId, null, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator); - if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) { - sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_NORMAL.getCode(), operator); - } else { - sourceService.updateStatus(groupId, null, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator); + List<InlongStreamInfo> streamList = form.getStreamInfos(); + for (InlongStreamInfo streamInfo : streamList) { + streamProcessService.startProcess(groupId, streamInfo.getInlongStreamId(), operator, false); } log.info("success to execute InitGroupCompleteListener for groupId={}", groupId); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupListener.java index d5c238fa8..fbef1c864 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupListener.java @@ -18,7 +18,6 @@ package org.apache.inlong.manager.service.listener.group; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections.CollectionUtils; import org.apache.inlong.manager.common.enums.GroupStatus; import org.apache.inlong.manager.common.enums.ProcessEvent; import org.apache.inlong.manager.common.exceptions.WorkflowListenerException; @@ -59,9 +58,6 @@ public class InitGroupListener implements ProcessEventListener { if (groupInfo == null) { throw new WorkflowListenerException("inlong group info cannot be null for init group process"); } - if (CollectionUtils.isEmpty(form.getStreamInfos())) { - throw new WorkflowListenerException("inlong stream info list cannot be null for init group process"); - } groupService.updateStatus(groupId, GroupStatus.CONFIG_ING.getCode(), context.getOperator()); log.info("success to execute InitGroupListener for groupId={}", groupId); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java index edba69c55..390c11b48 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java @@ -17,22 +17,40 @@ package org.apache.inlong.manager.service.listener.queue; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.GroupOperateType; import org.apache.inlong.manager.common.enums.TaskEvent; +import org.apache.inlong.manager.common.enums.TaskStatus; import org.apache.inlong.manager.common.exceptions.WorkflowListenerException; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; +import org.apache.inlong.manager.pojo.workflow.TaskResponse; +import org.apache.inlong.manager.pojo.workflow.WorkflowResult; import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm; +import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm; import org.apache.inlong.manager.service.group.InlongGroupService; import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator; import org.apache.inlong.manager.service.resource.queue.QueueResourceOperatorFactory; +import org.apache.inlong.manager.service.workflow.WorkflowService; import org.apache.inlong.manager.workflow.WorkflowContext; import org.apache.inlong.manager.workflow.event.ListenerResult; import org.apache.inlong.manager.workflow.event.task.QueueOperateListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; +import java.util.concurrent.TimeUnit; + +import static org.apache.inlong.manager.common.enums.GroupOperateType.INIT; +import static org.apache.inlong.manager.common.enums.ProcessName.CREATE_STREAM_RESOURCE; + /** * Create message queue resources, * such as Pulsar Topic and Subscription, TubeMQ Topic and ConsumerGroup, etc. @@ -41,10 +59,21 @@ import org.springframework.stereotype.Service; @Service public class QueueResourceListener implements QueueOperateListener { + private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor( + 20, + 40, + 10L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new ThreadFactoryBuilder().setNameFormat("inlong-stream-process-%s").build(), + new CallerRunsPolicy()); + @Autowired private InlongGroupService groupService; @Autowired private QueueResourceOperatorFactory queueOperatorFactory; + @Autowired + private WorkflowService workflowService; @Override public TaskEvent event() { @@ -82,7 +111,10 @@ public class QueueResourceListener implements QueueOperateListener { String operator = context.getOperator(); switch (operateType) { case INIT: + // create queue resource for inlong group queueOperator.createQueueForGroup(groupInfo, operator); + // create queue resource for all inlong streams under the inlong group + this.createQueueForStreams(groupInfo, groupProcessForm.getStreamInfos(), operator); break; case DELETE: queueOperator.deleteQueueForGroup(groupInfo, operator); @@ -96,4 +128,39 @@ public class QueueResourceListener implements QueueOperateListener { return ListenerResult.success("success"); } + private void createQueueForStreams(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos, String operator) { + String groupId = groupInfo.getInlongGroupId(); + log.info("success to start stream process for groupId={}", groupId); + + for (InlongStreamInfo stream : streamInfos) { + StreamResourceProcessForm form = StreamResourceProcessForm.getProcessForm(groupInfo, stream, INIT); + String streamId = stream.getInlongStreamId(); + final String errMsg = "failed to start stream process for groupId=" + groupId + " streamId=" + streamId; + + CompletableFuture<WorkflowResult> future = CompletableFuture + .supplyAsync(() -> workflowService.start(CREATE_STREAM_RESOURCE, operator, form), EXECUTOR_SERVICE) + .whenComplete((result, ex) -> { + if (ex != null) { + log.error(errMsg + ": " + ex.getMessage()); + throw new WorkflowListenerException(errMsg, ex); + } else { + List<TaskResponse> tasks = result.getNewTasks(); + if (TaskStatus.FAILED == tasks.get(tasks.size() - 1).getStatus()) { + log.error(errMsg); + throw new WorkflowListenerException(errMsg); + } + } + }); + try { + future.get(180, TimeUnit.SECONDS); + } catch (Exception e) { + String msg = "failed to execute stream process in asynchronously "; + log.error(msg, e); + throw new WorkflowListenerException(msg + ": " + e.getMessage()); + } + } + + log.info("success to start stream process for groupId={}", groupId); + } + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/StreamQueueResourceListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/StreamQueueResourceListener.java index 21403ec21..eb3aaab8c 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/StreamQueueResourceListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/StreamQueueResourceListener.java @@ -29,7 +29,6 @@ import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProces import org.apache.inlong.manager.service.group.InlongGroupService; import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator; import org.apache.inlong.manager.service.resource.queue.QueueResourceOperatorFactory; -import org.apache.inlong.manager.service.stream.InlongStreamService; import org.apache.inlong.manager.workflow.WorkflowContext; import org.apache.inlong.manager.workflow.event.ListenerResult; import org.apache.inlong.manager.workflow.event.task.QueueOperateListener; @@ -46,8 +45,6 @@ public class StreamQueueResourceListener implements QueueOperateListener { @Autowired private InlongGroupService groupService; @Autowired - private InlongStreamService streamService; - @Autowired private QueueResourceOperatorFactory queueOperatorFactory; @Override diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java index d04a20852..08a310ddb 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.service.listener.sort; +import org.apache.commons.collections.CollectionUtils; import org.apache.inlong.manager.common.enums.GroupOperateType; import org.apache.inlong.manager.common.enums.TaskEvent; import org.apache.inlong.manager.common.exceptions.WorkflowListenerException; @@ -81,6 +82,10 @@ public class SortConfigListener implements SortOperateListener { } InlongGroupInfo groupInfo = form.getGroupInfo(); List<InlongStreamInfo> streamInfos = form.getStreamInfos(); + if (CollectionUtils.isEmpty(streamInfos)) { + LOGGER.warn("do not build sort config for groupId={}, as the stream is empty", groupId); + return ListenerResult.success(); + } int sinkCount = streamInfos.stream() .map(stream -> stream.getSinkList() == null ? 0 : stream.getSinkList().size()) .reduce(0, Integer::sum); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java index be6e60fd6..141635c46 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java @@ -18,6 +18,7 @@ package org.apache.inlong.manager.service.listener.stream; import lombok.extern.slf4j.Slf4j; +import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.ProcessEvent; import org.apache.inlong.manager.common.enums.SourceStatus; import org.apache.inlong.manager.common.enums.StreamStatus; @@ -63,7 +64,11 @@ public class InitStreamCompleteListener implements ProcessEventListener { // Update status of other related configs streamService.updateStatus(groupId, streamId, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator); streamService.update(streamInfo.genRequest(), operator); - sourceService.updateStatus(groupId, streamId, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator); + if (InlongConstants.LIGHTWEIGHT_MODE.equals(form.getGroupInfo().getLightweight())) { + sourceService.updateStatus(groupId, streamId, SourceStatus.SOURCE_NORMAL.getCode(), operator); + } else { + sourceService.updateStatus(groupId, streamId, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator); + } return ListenerResult.success(); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java index 4e2b7246a..2f65aa735 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java @@ -54,10 +54,10 @@ public class KafkaResourceOperators implements QueueResourceOperator { @Autowired private InlongClusterService clusterService; @Autowired - private InlongStreamService streamService; - @Autowired private KafkaOperator kafkaOperator; @Autowired + private InlongStreamService streamService; + @Autowired private InlongConsumeService consumeService; @Override @@ -67,26 +67,7 @@ public class KafkaResourceOperators implements QueueResourceOperator { @Override public void createQueueForGroup(@NotNull InlongGroupInfo groupInfo, @NotBlank String operator) { - String groupId = groupInfo.getInlongGroupId(); - log.info("begin to create kafka resource for groupId={}", groupId); - - InlongKafkaInfo inlongKafkaInfo = (InlongKafkaInfo) groupInfo; - try { - // 1. create kafka Topic - each Inlong Stream corresponds to a Kafka Topic - List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId); - if (streamInfoList == null || streamInfoList.isEmpty()) { - log.warn("skip to create kafka topic and subscription as no streams for groupId={}", groupId); - return; - } - for (InlongStreamBriefInfo streamInfo : streamInfoList) { - this.createKafkaTopic(inlongKafkaInfo, streamInfo.getMqResource()); - } - } catch (Exception e) { - String msg = String.format("failed to create kafka resource for groupId=%s", groupId); - log.error(msg, e); - throw new WorkflowListenerException(msg + ": " + e.getMessage()); - } - log.info("success to create kafka resource for groupId={}, cluster={}", groupId, inlongKafkaInfo); + log.info("skip to create kafka topic for groupId={}", groupInfo.getInlongGroupId()); } @Override diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java index 23c13b740..2e379b285 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java @@ -107,20 +107,6 @@ public class PulsarResourceOperator implements QueueResourceOperator { log.info("success to create pulsar namespace for groupId={}, namespace={}, cluster={}", groupId, namespace, clusterName); } - - // create pulsar topic - each Inlong Stream corresponds to a Pulsar topic - List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId); - if (streamInfoList == null || streamInfoList.isEmpty()) { - log.warn("skip to create pulsar topic and subscription as no streams for groupId={}, cluster={}", - groupId, clusterName); - return; - } - // create pulsar topic and subscription - for (InlongStreamBriefInfo stream : streamInfoList) { - this.createTopic(pulsarInfo, pulsarCluster, stream.getMqResource()); - this.createSubscription(pulsarInfo, pulsarCluster, stream.getMqResource(), - stream.getInlongStreamId()); - } } catch (Exception e) { String msg = String.format("failed to create pulsar resource for groupId=%s, cluster=%s", groupId, pulsarCluster.toString()); @@ -184,7 +170,7 @@ public class PulsarResourceOperator implements QueueResourceOperator { streamInfo.getMqResource(), streamId); } catch (Exception e) { String msg = String.format("failed to create pulsar topic for groupId=%s, streamId=%s, cluster=%s", - groupId, streamId,pulsarCluster.getName()); + groupId, streamId, pulsarCluster.getName()); log.error(msg, e); throw new WorkflowListenerException(msg + ": " + e.getMessage()); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java index a4c6b5d80..2449b835e 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java @@ -77,6 +77,10 @@ public class DefaultSortConfigOperator implements SortConfigOperator { @Override public void buildConfig(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos, boolean isStream) throws Exception { + if (isStream) { + LOGGER.warn("stream workflow no need to build sort config for disable zk"); + return; + } if (groupInfo == null || CollectionUtils.isEmpty(streamInfos)) { LOGGER.warn("group info is null or stream infos is empty, no need to build sort config for disable zk"); return; @@ -84,11 +88,7 @@ public class DefaultSortConfigOperator implements SortConfigOperator { GroupInfo configInfo = this.getGroupInfo(groupInfo, streamInfos); String dataflow = OBJECT_MAPPER.writeValueAsString(configInfo); - if (isStream) { - this.addToStreamExt(streamInfos, dataflow); - } else { - this.addToGroupExt(groupInfo, dataflow); - } + this.addToGroupExt(groupInfo, dataflow); if (LOGGER.isDebugEnabled()) { LOGGER.debug("success to build sort config, isStream={}, dataflow={}", isStream, dataflow); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java index 06046eceb..eaa05ba41 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java @@ -92,7 +92,8 @@ public class InlongStreamProcessService { throw new BusinessException(errMsg); } - StreamResourceProcessForm processForm = genStreamProcessForm(groupInfo, streamInfo, GroupOperateType.INIT); + StreamResourceProcessForm processForm = StreamResourceProcessForm.getProcessForm(groupInfo, + streamInfo, GroupOperateType.INIT); ProcessName processName = ProcessName.CREATE_STREAM_RESOURCE; if (sync) { WorkflowResult workflowResult = workflowService.start(processName, operator, processForm); @@ -134,7 +135,8 @@ public class InlongStreamProcessService { String.format("stream status=%s not support suspend stream for groupId=%s streamId=%s", status, groupId, streamId)); } - StreamResourceProcessForm processForm = genStreamProcessForm(groupInfo, streamInfo, GroupOperateType.SUSPEND); + StreamResourceProcessForm processForm = StreamResourceProcessForm.getProcessForm(groupInfo, streamInfo, + GroupOperateType.SUSPEND); ProcessName processName = ProcessName.SUSPEND_STREAM_RESOURCE; if (sync) { WorkflowResult workflowResult = workflowService.start(processName, operator, processForm); @@ -175,7 +177,8 @@ public class InlongStreamProcessService { String.format("stream status=%s not support restart stream for groupId=%s streamId=%s", status, groupId, streamId)); } - StreamResourceProcessForm processForm = genStreamProcessForm(groupInfo, streamInfo, GroupOperateType.RESTART); + StreamResourceProcessForm processForm = StreamResourceProcessForm.getProcessForm(groupInfo, streamInfo, + GroupOperateType.RESTART); ProcessName processName = ProcessName.RESTART_STREAM_RESOURCE; if (sync) { WorkflowResult workflowResult = workflowService.start(processName, operator, processForm); @@ -220,7 +223,8 @@ public class InlongStreamProcessService { String.format("stream status=%s not support delete stream for groupId=%s streamId=%s", status, groupId, streamId)); } - StreamResourceProcessForm processForm = genStreamProcessForm(groupInfo, streamInfo, GroupOperateType.DELETE); + StreamResourceProcessForm processForm = StreamResourceProcessForm.getProcessForm(groupInfo, streamInfo, + GroupOperateType.DELETE); ProcessName processName = ProcessName.DELETE_STREAM_RESOURCE; if (sync) { WorkflowResult workflowResult = workflowService.start(processName, operator, processForm); @@ -242,12 +246,4 @@ public class InlongStreamProcessService { } } - private StreamResourceProcessForm genStreamProcessForm(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, - GroupOperateType operateType) { - StreamResourceProcessForm processForm = new StreamResourceProcessForm(); - processForm.setGroupInfo(groupInfo); - processForm.setStreamInfo(streamInfo); - processForm.setGroupOperateType(operateType); - return processForm; - } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java index d3d6bac9a..b59842f41 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java @@ -189,6 +189,7 @@ public class WorkflowServiceImpl implements WorkflowService { ProcessRequest processRequest = new ProcessRequest(); processRequest.setInlongGroupId(groupId); + processRequest.setInlongStreamId(query.getInlongStreamId()); processRequest.setNameList(processNameList); processRequest.setHidden(1); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java index 01d500bf0..94eb0c3fb 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java @@ -77,14 +77,6 @@ public class CreateGroupWorkflowDefinition implements WorkflowDefinition { initMQTask.setListenerFactory(groupTaskListenerFactory); process.addTask(initMQTask); - // Init Sink - ServiceTask initSinkTask = new ServiceTask(); - initSinkTask.setName("InitSink"); - initSinkTask.setDisplayName("Group-InitSink"); - initSinkTask.setServiceTaskType(ServiceTaskType.INIT_SINK); - initSinkTask.setListenerFactory(groupTaskListenerFactory); - process.addTask(initSinkTask); - // Init Sort ServiceTask initSortTask = new ServiceTask(); initSortTask.setName("InitSort"); @@ -93,25 +85,15 @@ public class CreateGroupWorkflowDefinition implements WorkflowDefinition { initSortTask.setListenerFactory(groupTaskListenerFactory); process.addTask(initSortTask); - // Init Source - ServiceTask initSourceTask = new ServiceTask(); - initSourceTask.setName("InitSource"); - initSourceTask.setDisplayName("Group-InitSource"); - initSourceTask.setServiceTaskType(ServiceTaskType.INIT_SOURCE); - initSourceTask.setListenerFactory(groupTaskListenerFactory); - process.addTask(initSourceTask); - // End node EndEvent endEvent = new EndEvent(); process.setEndEvent(endEvent); - // Task dependency order: 1.MQ -> 2.Sink -> 3.Sort -> 4.Source + // Task dependency order: 1.MQ -> 2.Sink-in-Stream -> 3.Sort -> 4.Source-in-Stream // To ensure that after some tasks fail, data will not start to be collected by source or consumed by sort startEvent.addNext(initMQTask); - initMQTask.addNext(initSinkTask); - initSinkTask.addNext(initSortTask); - initSortTask.addNext(initSourceTask); - initSourceTask.addNext(endEvent); + initMQTask.addNext(initSortTask); + initSortTask.addNext(endEvent); return process; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java index 395a3dec9..5d3eb090e 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java @@ -18,13 +18,13 @@ package org.apache.inlong.manager.service.workflow.group; import lombok.extern.slf4j.Slf4j; -import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm; import org.apache.inlong.manager.common.enums.ProcessName; -import org.apache.inlong.manager.service.workflow.WorkflowDefinition; +import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm; +import org.apache.inlong.manager.service.listener.GroupTaskListenerFactory; import org.apache.inlong.manager.service.listener.group.UpdateGroupCompleteListener; import org.apache.inlong.manager.service.listener.group.UpdateGroupFailedListener; import org.apache.inlong.manager.service.listener.group.UpdateGroupListener; -import org.apache.inlong.manager.service.listener.GroupTaskListenerFactory; +import org.apache.inlong.manager.service.workflow.WorkflowDefinition; import org.apache.inlong.manager.workflow.definition.EndEvent; import org.apache.inlong.manager.workflow.definition.ServiceTask; import org.apache.inlong.manager.workflow.definition.ServiceTaskType; @@ -93,6 +93,8 @@ public class DeleteGroupWorkflowDefinition implements WorkflowDefinition { deleteSortTask.setListenerFactory(groupTaskListenerFactory); process.addTask(deleteSortTask); + // No need to delete the sink because we should not affect the existing data in the sink + // End node EndEvent endEvent = new EndEvent(); process.setEndEvent(endEvent); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java index 79f154785..6ce7822b5 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java @@ -18,13 +18,13 @@ package org.apache.inlong.manager.service.workflow.stream; import lombok.extern.slf4j.Slf4j; -import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm; import org.apache.inlong.manager.common.enums.ProcessName; -import org.apache.inlong.manager.service.workflow.WorkflowDefinition; +import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm; import org.apache.inlong.manager.service.listener.StreamTaskListenerFactory; import org.apache.inlong.manager.service.listener.stream.InitStreamCompleteListener; import org.apache.inlong.manager.service.listener.stream.InitStreamFailedListener; import org.apache.inlong.manager.service.listener.stream.InitStreamListener; +import org.apache.inlong.manager.service.workflow.WorkflowDefinition; import org.apache.inlong.manager.workflow.definition.EndEvent; import org.apache.inlong.manager.workflow.definition.ServiceTask; import org.apache.inlong.manager.workflow.definition.ServiceTaskType; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java index 7dd11ca4f..415d78f3f 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java @@ -18,13 +18,13 @@ package org.apache.inlong.manager.service.workflow.stream; import lombok.extern.slf4j.Slf4j; -import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm; import org.apache.inlong.manager.common.enums.ProcessName; -import org.apache.inlong.manager.service.workflow.WorkflowDefinition; +import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm; import org.apache.inlong.manager.service.listener.StreamTaskListenerFactory; import org.apache.inlong.manager.service.listener.stream.UpdateStreamCompleteListener; import org.apache.inlong.manager.service.listener.stream.UpdateStreamFailedListener; import org.apache.inlong.manager.service.listener.stream.UpdateStreamListener; +import org.apache.inlong.manager.service.workflow.WorkflowDefinition; import org.apache.inlong.manager.workflow.definition.EndEvent; import org.apache.inlong.manager.workflow.definition.ServiceTask; import org.apache.inlong.manager.workflow.definition.ServiceTaskType; diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java index 31c74a2cb..217d23841 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java @@ -37,11 +37,9 @@ public class CreateGroupWorkflowDefinitionTest extends ServiceBaseTest { WorkflowProcess cloneProcess1 = process.clone(); WorkflowProcess cloneProcess2 = cloneProcess1.clone(); Assertions.assertNotSame(cloneProcess2, cloneProcess1); - Assertions.assertNotNull(process.getTaskByName("InitSource")); Assertions.assertNotNull(process.getTaskByName("InitMQ")); Assertions.assertNotNull(process.getTaskByName("InitSort")); - Assertions.assertNotNull(process.getTaskByName("InitSink")); - Assertions.assertEquals(4, process.getNameToTaskMap().size()); + Assertions.assertEquals(2, process.getNameToTaskMap().size()); } } diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql index 951d854c9..53ece0c11 100644 --- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql +++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql @@ -606,7 +606,8 @@ CREATE TABLE IF NOT EXISTS `workflow_process` `display_name` varchar(256) NOT NULL COMMENT 'Process display name', `type` varchar(256) DEFAULT NULL COMMENT 'Process classification', `title` varchar(256) DEFAULT NULL COMMENT 'Process title', - `inlong_group_id` varchar(256) DEFAULT NULL COMMENT 'Inlong group id: to facilitate related inlong group', + `inlong_group_id` varchar(256) DEFAULT NULL COMMENT 'Inlong group id to which this process belongs', + `inlong_stream_id`varchar(256) DEFAULT NULL COMMENT 'Inlong stream id to which this process belongs', `applicant` varchar(256) NOT NULL COMMENT 'Applicant', `status` varchar(64) NOT NULL COMMENT 'Status', `form_data` mediumtext COMMENT 'Form information', diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql index fbd73961f..6f6a45fab 100644 --- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql +++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql @@ -642,7 +642,8 @@ CREATE TABLE IF NOT EXISTS `workflow_process` `display_name` varchar(256) NOT NULL COMMENT 'Process display name', `type` varchar(256) DEFAULT NULL COMMENT 'Process classification', `title` varchar(256) DEFAULT NULL COMMENT 'Process title', - `inlong_group_id` varchar(256) DEFAULT NULL COMMENT 'Inlong group id: to facilitate related inlong group', + `inlong_group_id` varchar(256) DEFAULT NULL COMMENT 'Inlong group id to which this process belongs', + `inlong_stream_id`varchar(256) DEFAULT NULL COMMENT 'Inlong stream id to which this process belongs', `applicant` varchar(256) NOT NULL COMMENT 'Applicant', `status` varchar(64) NOT NULL COMMENT 'Status', `form_data` mediumtext COMMENT 'Form information', diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java index 9d989875e..d3b26786e 100644 --- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java +++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java @@ -25,6 +25,7 @@ import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.dao.entity.WorkflowProcessEntity; import org.apache.inlong.manager.dao.mapper.WorkflowProcessEntityMapper; import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm; +import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm; import org.apache.inlong.manager.workflow.WorkflowAction; import org.apache.inlong.manager.workflow.WorkflowContext; import org.apache.inlong.manager.workflow.definition.StartEvent; @@ -91,6 +92,10 @@ public class StartEventProcessor extends AbstractNextableElementProcessor<StartE processEntity.setType(process.getType()); processEntity.setTitle(form.getTitle()); processEntity.setInlongGroupId(form.getInlongGroupId()); + if (form instanceof StreamResourceProcessForm) { + StreamResourceProcessForm streamForm = (StreamResourceProcessForm) form; + processEntity.setInlongStreamId(streamForm.getStreamInfo().getInlongStreamId()); + } processEntity.setApplicant(applicant); processEntity.setStatus(ProcessStatus.PROCESSING.name()); try {