This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch dev-offline-sync in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/dev-offline-sync by this push: new ebbbde7602 [INLONG-9862][Manager] Support submit flink job for offline sync (#9865) ebbbde7602 is described below commit ebbbde7602aed8638513c7b3fbdf22c20d1bd503 Author: AloysZhang <aloyszh...@apache.org> AuthorDate: Fri Mar 22 18:24:26 2024 +0800 [INLONG-9862][Manager] Support submit flink job for offline sync (#9865) --- .../manager/common/consts/InlongConstants.java | 4 + .../plugin/listener/StartupSortListener.java | 28 ++++--- .../manager/pojo/sort/util/StreamParseUtils.java | 21 +++++ .../listener/StreamTaskListenerFactory.java | 28 +++++++ .../schedule/StreamScheduleResourceListener.java | 95 ++++++++++++++++++++++ .../service/listener/sort/SortConfigListener.java | 8 ++ .../stream/CreateStreamWorkflowDefinition.java | 13 ++- .../workflow/definition/ServiceTaskType.java | 1 + .../task/ScheduleOperateListener.java} | 36 ++++---- .../manager/workflow/plugin/ProcessPlugin.java | 5 ++ 10 files changed, 204 insertions(+), 35 deletions(-) diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java index e84dcc6602..f0156cd0e9 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java @@ -141,6 +141,10 @@ public class InlongConstants { */ public static final String DATAFLOW = "dataflow"; + public static final String REGISTER_SCHEDULE_STATUS = "register.schedule.status"; + + public static final String REGISTERED = "registered"; + public static final String STREAMS = "streams"; public static final String RELATIONS = "relations"; diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java index f6b3b6061a..fec87280d1 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java @@ -26,6 +26,7 @@ import org.apache.inlong.manager.plugin.flink.FlinkOperation; import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo; import org.apache.inlong.manager.plugin.flink.enums.Constants; import org.apache.inlong.manager.pojo.sink.StreamSink; +import org.apache.inlong.manager.pojo.sort.util.StreamParseUtils; import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm; @@ -98,6 +99,14 @@ public class StartupSortListener implements SortOperateListener { } for (InlongStreamInfo streamInfo : streamInfos) { + boolean isOfflineSync = InlongConstants.DATASYNC_OFFLINE_MODE + .equals(groupResourceForm.getGroupInfo().getInlongGroupMode()); + // do not submit flink job if the group mode is offline and the stream is not config successfully + if (isOfflineSync && !StreamParseUtils.isRegisterScheduleSuccess(streamInfo)) { + log.info("no need to submit flink job for groupId={} streamId={} as the mode is offline " + + "and the stream is not config successfully yet", groupId, streamInfo.getInlongStreamId()); + continue; + } List<StreamSink> sinkList = streamInfo.getSinkList(); List<String> sinkTypes = sinkList.stream().map(StreamSink::getSinkType).collect(Collectors.toList()); if (CollectionUtils.isEmpty(sinkList) || !SinkType.containSortFlinkSink(sinkTypes)) { @@ -131,9 +140,6 @@ public class StartupSortListener implements SortOperateListener { return ListenerResult.fail(message); } - boolean isRealTimeSync = InlongConstants.DATASYNC_REALTIME_MODE - .equals(groupResourceForm.getGroupInfo().getInlongGroupMode()); - FlinkInfo flinkInfo = new FlinkInfo(); String jobName = Constants.SORT_JOB_NAME_GENERATOR.apply(processForm) + InlongConstants.HYPHEN @@ -141,20 +147,18 @@ public class StartupSortListener implements SortOperateListener { flinkInfo.setJobName(jobName); flinkInfo.setEndpoint(kvConf.get(InlongConstants.SORT_URL)); flinkInfo.setInlongStreamInfoList(Collections.singletonList(streamInfo)); - if (isRealTimeSync) { - flinkInfo.setRuntimeExecutionMode(InlongConstants.RUNTIME_EXECUTION_MODE_STREAMING); - } else { + if (isOfflineSync) { flinkInfo.setRuntimeExecutionMode(InlongConstants.RUNTIME_EXECUTION_MODE_BATCH); + } else { + flinkInfo.setRuntimeExecutionMode(InlongConstants.RUNTIME_EXECUTION_MODE_STREAMING); } FlinkOperation flinkOperation = FlinkOperation.getInstance(); try { flinkOperation.genPath(flinkInfo, dataflow); - // only start job for real-time mode - if (isRealTimeSync) { - flinkOperation.start(flinkInfo); - log.info("job submit success for groupId = {}, streamId = {}, jobId = {}", groupId, - streamInfo.getInlongStreamId(), flinkInfo.getJobId()); - } + flinkOperation.start(flinkInfo); + log.info("job submit success for groupId = {}, mode = {}, streamId = {}, jobId = {}", + groupId, groupResourceForm.getGroupInfo().getInlongGroupMode(), + streamInfo.getInlongStreamId(), flinkInfo.getJobId()); } catch (Exception e) { flinkInfo.setException(true); flinkInfo.setExceptionMsg(getExceptionStackMsg(e)); diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/StreamParseUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/StreamParseUtils.java index 92470968ac..203bcc6778 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/StreamParseUtils.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/StreamParseUtils.java @@ -17,11 +17,14 @@ package org.apache.inlong.manager.pojo.sort.util; +import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.enums.TransformType; import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.pojo.sink.StreamSink; import org.apache.inlong.manager.pojo.source.StreamSource; +import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.pojo.stream.StreamNode; import org.apache.inlong.manager.pojo.stream.StreamPipeline; import org.apache.inlong.manager.pojo.stream.StreamTransform; @@ -38,6 +41,8 @@ import org.apache.inlong.manager.pojo.transform.splitter.SplitterDefinition; import com.google.gson.Gson; import com.google.gson.JsonObject; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; /** * Utils of stream parse. @@ -154,4 +159,20 @@ public class StreamParseUtils { return GSON.fromJson(tempView, StreamPipeline.class); } + public static String getStreamExtProperty(String key, InlongStreamInfo streamInfo) { + if (StringUtils.isNotBlank(key) && streamInfo != null && CollectionUtils.isNotEmpty(streamInfo.getExtList())) { + for (InlongStreamExtInfo ext : streamInfo.getExtList()) { + if (key.equalsIgnoreCase(ext.getKeyName())) { + return ext.getKeyValue(); + } + } + } + return null; + } + + public static boolean isRegisterScheduleSuccess(InlongStreamInfo streamInfo) { + return InlongConstants.REGISTERED + .equalsIgnoreCase(getStreamExtProperty(InlongConstants.REGISTER_SCHEDULE_STATUS, streamInfo)); + } + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java index 43fdfe4666..87c3a867c2 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java @@ -20,12 +20,14 @@ package org.apache.inlong.manager.service.listener; import org.apache.inlong.manager.common.plugin.Plugin; import org.apache.inlong.manager.common.plugin.PluginBinder; import org.apache.inlong.manager.service.listener.queue.StreamQueueResourceListener; +import org.apache.inlong.manager.service.listener.schedule.StreamScheduleResourceListener; import org.apache.inlong.manager.service.listener.sink.StreamSinkResourceListener; import org.apache.inlong.manager.service.listener.sort.StreamSortConfigListener; import org.apache.inlong.manager.workflow.WorkflowContext; import org.apache.inlong.manager.workflow.definition.ServiceTaskType; import org.apache.inlong.manager.workflow.definition.TaskListenerFactory; import org.apache.inlong.manager.workflow.event.task.QueueOperateListener; +import org.apache.inlong.manager.workflow.event.task.ScheduleOperateListener; import org.apache.inlong.manager.workflow.event.task.SinkOperateListener; import org.apache.inlong.manager.workflow.event.task.SortOperateListener; import org.apache.inlong.manager.workflow.event.task.SourceOperateListener; @@ -53,6 +55,7 @@ public class StreamTaskListenerFactory implements PluginBinder, TaskListenerFact private List<QueueOperateListener> queueOperateListeners; private List<SortOperateListener> sortOperateListeners; private List<SinkOperateListener> sinkOperateListeners; + private List<ScheduleOperateListener> scheduleOperateListeners; @Autowired private StreamQueueResourceListener queueResourceListener; @@ -60,6 +63,8 @@ public class StreamTaskListenerFactory implements PluginBinder, TaskListenerFact private StreamSortConfigListener streamSortConfigListener; @Autowired private StreamSinkResourceListener sinkResourceListener; + @Autowired + private StreamScheduleResourceListener scheduleResourceListener; @PostConstruct public void init() { @@ -70,6 +75,8 @@ public class StreamTaskListenerFactory implements PluginBinder, TaskListenerFact sortOperateListeners.add(streamSortConfigListener); sinkOperateListeners = new LinkedList<>(); sinkOperateListeners.add(sinkResourceListener); + scheduleOperateListeners = new LinkedList<>(); + scheduleOperateListeners.add(scheduleResourceListener); } @Override @@ -94,6 +101,10 @@ public class StreamTaskListenerFactory implements PluginBinder, TaskListenerFact if (CollectionUtils.isNotEmpty(pluginSinkOperateListeners)) { sinkOperateListeners.addAll(pluginSinkOperateListeners); } + List<ScheduleOperateListener> pluginScheduleOperateListeners = processPlugin.createScheduleOperateListeners(); + if (CollectionUtils.isNotEmpty(pluginScheduleOperateListeners)) { + scheduleOperateListeners.addAll(pluginScheduleOperateListeners); + } } @Override @@ -118,6 +129,9 @@ public class StreamTaskListenerFactory implements PluginBinder, TaskListenerFact case INIT_SINK: List<SinkOperateListener> sinkOperateListeners = getSinkOperateListener(workflowContext); return Lists.newArrayList(sinkOperateListeners); + case INIT_SCHEDULE: + List<ScheduleOperateListener> scheduleOperateListeners = getScheduleOperateListener(workflowContext); + return Lists.newArrayList(scheduleOperateListeners); default: throw new IllegalArgumentException(String.format("Unsupported ServiceTaskType %s", taskType)); } @@ -131,6 +145,7 @@ public class StreamTaskListenerFactory implements PluginBinder, TaskListenerFact queueOperateListeners = new LinkedList<>(); sortOperateListeners = new LinkedList<>(); sinkOperateListeners = new LinkedList<>(); + scheduleOperateListeners = new LinkedList<>(); } /** @@ -185,4 +200,17 @@ public class StreamTaskListenerFactory implements PluginBinder, TaskListenerFact return listeners; } + /** + * Get schedule operate listener list. + */ + private List<ScheduleOperateListener> getScheduleOperateListener(WorkflowContext context) { + List<ScheduleOperateListener> listeners = new ArrayList<>(); + for (ScheduleOperateListener listener : scheduleOperateListeners) { + if (listener != null && listener.accept(context)) { + listeners.add(listener); + } + } + return listeners; + } + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/StreamScheduleResourceListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/StreamScheduleResourceListener.java new file mode 100644 index 0000000000..61179dc343 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/StreamScheduleResourceListener.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.listener.schedule; + +import org.apache.inlong.manager.common.consts.InlongConstants; +import org.apache.inlong.manager.common.enums.GroupOperateType; +import org.apache.inlong.manager.common.enums.TaskEvent; +import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; +import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm; +import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm; +import org.apache.inlong.manager.workflow.WorkflowContext; +import org.apache.inlong.manager.workflow.event.ListenerResult; +import org.apache.inlong.manager.workflow.event.task.ScheduleOperateListener; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.util.List; + +@Service +@Slf4j +public class StreamScheduleResourceListener implements ScheduleOperateListener { + + @Override + public TaskEvent event() { + return TaskEvent.COMPLETE; + } + + @Override + public boolean accept(WorkflowContext context) { + ProcessForm processForm = context.getProcessForm(); + String groupId = processForm.getInlongGroupId(); + if (!(processForm instanceof StreamResourceProcessForm)) { + log.info("not add schedule stream listener, not StreamResourceProcessForm for groupId [{}]", groupId); + return false; + } + + StreamResourceProcessForm streamProcessForm = (StreamResourceProcessForm) processForm; + String streamId = streamProcessForm.getStreamInfo().getInlongStreamId(); + if (streamProcessForm.getGroupOperateType() != GroupOperateType.INIT) { + log.info("not add schedule stream listener, as the operate was not INIT for groupId [{}] streamId [{}]", + groupId, streamId); + return false; + } + + log.info("add startup stream listener for groupId [{}] streamId [{}]", groupId, streamId); + return InlongConstants.DATASYNC_OFFLINE_MODE.equals(streamProcessForm.getGroupInfo().getInlongGroupMode()); + } + + @Override + public ListenerResult listen(WorkflowContext context) throws Exception { + StreamResourceProcessForm form = (StreamResourceProcessForm) context.getProcessForm(); + InlongStreamInfo streamInfo = form.getStreamInfo(); + final String groupId = streamInfo.getInlongGroupId(); + final String streamId = streamInfo.getInlongStreamId(); + log.info("begin to register schedule info for groupId={}, streamId={}", groupId, streamId); + + // todo: register schedule info to schedule service + + // after register schedule info successfully, add ext property to stream info + saveInfo(streamInfo, InlongConstants.REGISTER_SCHEDULE_STATUS, + InlongConstants.REGISTERED, streamInfo.getExtList()); + log.info("success to register schedule info for group [" + groupId + "] and stream [" + streamId + "]"); + return ListenerResult.success(); + } + + /** + * Save stream ext info into list. + */ + private void saveInfo(InlongStreamInfo streamInfo, String keyName, String keyValue, + List<InlongStreamExtInfo> extInfoList) { + InlongStreamExtInfo extInfo = new InlongStreamExtInfo(); + extInfo.setInlongGroupId(streamInfo.getInlongGroupId()); + extInfo.setInlongStreamId(streamInfo.getInlongStreamId()); + extInfo.setKeyName(keyName); + extInfo.setKeyValue(keyValue); + extInfoList.add(extInfo); + } +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java index ebed9e72e3..f8981bf09a 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java @@ -24,6 +24,7 @@ import org.apache.inlong.manager.common.enums.TaskEvent; import org.apache.inlong.manager.common.exceptions.WorkflowListenerException; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.sink.StreamSink; +import org.apache.inlong.manager.pojo.sort.util.StreamParseUtils; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm; import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm; @@ -125,6 +126,13 @@ public class SortConfigListener implements SortOperateListener { try { for (InlongStreamInfo streamInfo : streamInfos) { + // do not build sort config if the group mode is offline and the stream is not config successfully + if (InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode()) + && !StreamParseUtils.isRegisterScheduleSuccess(streamInfo)) { + LOGGER.info("no need to build sort config for groupId={} streamId={} as the mode is offline " + + "and the stream is not config successfully yet", groupId, streamInfo.getInlongStreamId()); + continue; + } List<StreamSink> sinkList = streamInfo.getSinkList(); if (CollectionUtils.isEmpty(sinkList)) { continue; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java index d3cf199a21..5505ed0694 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java @@ -102,17 +102,26 @@ public class CreateStreamWorkflowDefinition implements WorkflowDefinition { initSourceTask.setListenerFactory(streamTaskListenerFactory); process.addTask(initSourceTask); + // Init Schedule info + ServiceTask initScheduleTask = new ServiceTask(); + initScheduleTask.setName("InitSchedule"); + initScheduleTask.setDisplayName("Stream-InitSchedule"); + initScheduleTask.setServiceTaskType(ServiceTaskType.INIT_SCHEDULE); + initScheduleTask.setListenerFactory(streamTaskListenerFactory); + process.addTask(initScheduleTask); + // End node EndEvent endEvent = new EndEvent(); process.setEndEvent(endEvent); - // Task dependency order: 1.MQ -> 2.Sink -> 3.Sort -> 4.Source + // Task dependency order: 1.MQ -> 2.Sink -> 3.Sort -> 4.Source -> 5.Schedule // To ensure that after some tasks fail, data will not start to be collected by source or consumed by sort startEvent.addNext(initMQTask); initMQTask.addNext(initSinkTask); initSinkTask.addNext(initSortTask); initSortTask.addNext(initSourceTask); - initSourceTask.addNext(endEvent); + initSourceTask.addNext(initScheduleTask); + initScheduleTask.addNext(endEvent); return process; } diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskType.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskType.java index 2686b4b60a..156863a676 100644 --- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskType.java +++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskType.java @@ -25,6 +25,7 @@ public enum ServiceTaskType { INIT_MQ, INIT_SORT, INIT_SINK, + INIT_SCHEDULE, STOP_SOURCE, STOP_SORT, diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskType.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/ScheduleOperateListener.java similarity index 55% copy from inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskType.java copy to inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/ScheduleOperateListener.java index 2686b4b60a..c3e162cd26 100644 --- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskType.java +++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/ScheduleOperateListener.java @@ -15,30 +15,24 @@ * limitations under the License. */ -package org.apache.inlong.manager.workflow.definition; +package org.apache.inlong.manager.workflow.event.task; -import java.util.Locale; +import org.apache.inlong.manager.common.enums.TaskEvent; +import org.apache.inlong.manager.workflow.WorkflowContext; +import org.apache.inlong.manager.workflow.event.ListenerResult; -public enum ServiceTaskType { +public interface ScheduleOperateListener extends TaskEventListener { - INIT_SOURCE, - INIT_MQ, - INIT_SORT, - INIT_SINK, + ScheduleOperateListener DEFAULT_SCHEDULE_OPERATE_LISTENER = new ScheduleOperateListener() { - STOP_SOURCE, - STOP_SORT, - - RESTART_SOURCE, - RESTART_SORT, - - DELETE_SOURCE, - DELETE_MQ, - DELETE_SORT; - - @Override - public String toString() { - return name().toLowerCase(Locale.ROOT); - } + @Override + public TaskEvent event() { + return TaskEvent.COMPLETE; + } + @Override + public ListenerResult listen(WorkflowContext context) { + return ListenerResult.success(); + } + }; } diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java index 7e15d52761..452fa2bfb8 100644 --- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java +++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java @@ -19,6 +19,7 @@ package org.apache.inlong.manager.workflow.plugin; import org.apache.inlong.manager.common.plugin.Plugin; import org.apache.inlong.manager.workflow.event.task.QueueOperateListener; +import org.apache.inlong.manager.workflow.event.task.ScheduleOperateListener; import org.apache.inlong.manager.workflow.event.task.SinkOperateListener; import org.apache.inlong.manager.workflow.event.task.SortOperateListener; import org.apache.inlong.manager.workflow.event.task.SourceOperateListener; @@ -38,6 +39,10 @@ public interface ProcessPlugin extends Plugin { return null; } + default List<ScheduleOperateListener> createScheduleOperateListeners() { + return null; + } + default List<QueueOperateListener> createQueueOperateListeners() { return null; }