This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new f1395eb0d8 [INLONG-9297][Manager] Support configuring multiple sink types of tasks under a single stream (#9298) f1395eb0d8 is described below commit f1395eb0d8bd9f6b4d5f4fd3fa322422f2c27aaa Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Tue Nov 28 20:11:20 2023 +0800 [INLONG-9297][Manager] Support configuring multiple sink types of tasks under a single stream (#9298) --- .../inlong/manager/common/consts/SinkType.java | 61 +++++++- .../consts/{StreamType.java => SortType.java} | 13 +- .../inlong/manager/common/consts/StreamType.java | 13 ++ .../{StreamType.java => SupportSortType.java} | 23 +-- .../inlong/manager/common/enums/GroupStatus.java | 3 +- .../plugin/listener/DeleteSortListener.java | 90 ++++++----- .../plugin/listener/RestartSortListener.java | 123 ++++++++------- .../plugin/listener/StartupSortListener.java | 125 ++++++++------- .../plugin/listener/StartupStreamListener.java | 123 ++++++++++++++- .../plugin/listener/SuspendSortListener.java | 93 ++++++----- .../manager/plugin/poller/SortStatusPoller.java | 31 ++-- .../inlong/manager/pojo/sort/SortStatusInfo.java | 3 + .../manager/service/core/impl/SortServiceImpl.java | 12 +- .../service/group/InlongGroupServiceImpl.java | 12 +- .../service/listener/sort/SortConfigListener.java | 17 +- .../listener/sort/StreamSortConfigListener.java | 31 ++-- .../resource/sort/DefaultSortConfigOperator.java | 173 +++++++++++---------- .../service/resource/sort/SortConfigOperator.java | 8 +- .../resource/sort/SortConfigOperatorFactory.java | 13 +- .../resources/application-unit-test.properties | 2 + .../manager/workflow/plugin/sort/SortPoller.java | 6 +- 21 files changed, 637 insertions(+), 338 deletions(-) diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java index f0b215ac30..1d53c71fc2 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java @@ -17,26 +17,85 @@ package org.apache.inlong.manager.common.consts; +import java.lang.reflect.Field; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; + /** * Constants of sink type. */ public class SinkType extends StreamType { + @SupportSortType(sortType = SortType.SORT_FLINK) public static final String HIVE = "HIVE"; + + @SupportSortType(sortType = SortType.SORT_FLINK) public static final String CLICKHOUSE = "CLICKHOUSE"; + + @SupportSortType(sortType = SortType.SORT_FLINK) public static final String HBASE = "HBASE"; - public static final String ELASTICSEARCH = "ES"; + + @SupportSortType(sortType = SortType.SORT_STANDALONE) + public static final String ELASTICSEARCH = "ELASTICSEARCH"; + + @SupportSortType(sortType = SortType.SORT_FLINK) public static final String HDFS = "HDFS"; + + @SupportSortType(sortType = SortType.SORT_FLINK) public static final String GREENPLUM = "GREENPLUM"; + + @SupportSortType(sortType = SortType.SORT_FLINK) public static final String MYSQL = "MYSQL"; + + @SupportSortType(sortType = SortType.SORT_FLINK) public static final String TDSQLPOSTGRESQL = "TDSQLPOSTGRESQL"; + + @SupportSortType(sortType = SortType.SORT_FLINK) public static final String DORIS = "DORIS"; + + @SupportSortType(sortType = SortType.SORT_FLINK) public static final String STARROCKS = "STARROCKS"; + + @SupportSortType(sortType = SortType.SORT_FLINK) public static final String KUDU = "KUDU"; + + @SupportSortType(sortType = SortType.SORT_FLINK) public static final String REDIS = "REDIS"; + + @SupportSortType(sortType = SortType.SORT_FLINK) + public static final String TUBEMQ = "TUBEMQ"; + /** * Tencent cloud log service * Details: <a href="https://www.tencentcloud.com/products/cls">CLS</a> */ + @SupportSortType(sortType = SortType.SORT_STANDALONE) public static final String CLS = "CLS"; + + public static final Set<String> SORT_FLINK_SINK = new HashSet<>(); + + public static final Set<String> SORT_STANDALONE_SINK = new HashSet<>(); + + static { + SinkType obj = new SinkType(); + Class<? extends SinkType> clazz = obj.getClass(); + Field[] fields = clazz.getFields(); + for (Field field : fields) { + if (field.isAnnotationPresent(SupportSortType.class)) { + SupportSortType annotation = field.getAnnotation(SupportSortType.class); + if (Objects.equals(annotation.sortType(), SortType.SORT_STANDALONE)) { + SORT_STANDALONE_SINK.add(field.getName()); + } else { + SORT_FLINK_SINK.add(field.getName()); + } + } + } + } + + public static boolean containSortFlinkSink(List<String> sinkTypes) { + return sinkTypes.stream().anyMatch(SORT_STANDALONE_SINK::contains); + } + } diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SortType.java similarity index 63% copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SortType.java index afbd57bc50..e40c2922e8 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SortType.java @@ -18,16 +18,11 @@ package org.apache.inlong.manager.common.consts; /** - * Constant for stream types, indicating that both StreamSource and StreamSink support these types. + * Sort task type, including sort flink and sort standalone */ -public class StreamType { +public enum SortType { - public static final String KAFKA = "KAFKA"; - public static final String HUDI = "HUDI"; - public static final String POSTGRESQL = "POSTGRESQL"; - public static final String SQLSERVER = "SQLSERVER"; - public static final String ORACLE = "ORACLE"; - public static final String PULSAR = "PULSAR"; - public static final String ICEBERG = "ICEBERG"; + SORT_FLINK, + SORT_STANDALONE } diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java index afbd57bc50..f8f70dfe19 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java @@ -22,12 +22,25 @@ package org.apache.inlong.manager.common.consts; */ public class StreamType { + @SupportSortType(sortType = SortType.SORT_FLINK) public static final String KAFKA = "KAFKA"; + + @SupportSortType(sortType = SortType.SORT_FLINK) public static final String HUDI = "HUDI"; + + @SupportSortType(sortType = SortType.SORT_FLINK) public static final String POSTGRESQL = "POSTGRESQL"; + + @SupportSortType(sortType = SortType.SORT_FLINK) public static final String SQLSERVER = "SQLSERVER"; + + @SupportSortType(sortType = SortType.SORT_FLINK) public static final String ORACLE = "ORACLE"; + + @SupportSortType(sortType = SortType.SORT_STANDALONE) public static final String PULSAR = "PULSAR"; + + @SupportSortType(sortType = SortType.SORT_FLINK) public static final String ICEBERG = "ICEBERG"; } diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SupportSortType.java similarity index 58% copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SupportSortType.java index afbd57bc50..00c9048aac 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SupportSortType.java @@ -17,17 +17,22 @@ package org.apache.inlong.manager.common.consts; +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + /** - * Constant for stream types, indicating that both StreamSource and StreamSink support these types. + * This annotation is used to indicate the type of inbound task used for inbound operations, including sort flink and + * sort standalone. On the user's SinkType class field, this annotation is used to identify which type of sort task each + * SinkType uses. */ -public class StreamType { +@Retention(RetentionPolicy.RUNTIME) +@Documented +@Target({ElementType.FIELD}) +public @interface SupportSortType { - public static final String KAFKA = "KAFKA"; - public static final String HUDI = "HUDI"; - public static final String POSTGRESQL = "POSTGRESQL"; - public static final String SQLSERVER = "SQLSERVER"; - public static final String ORACLE = "ORACLE"; - public static final String PULSAR = "PULSAR"; - public static final String ICEBERG = "ICEBERG"; + SortType sortType(); } diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java index d0bb62f7bf..79e89ad53b 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java @@ -75,7 +75,8 @@ public enum GroupStatus { GROUP_STATE_AUTOMATON.put(CONFIGURATION_OFFLINE, Sets.newHashSet(CONFIGURATION_OFFLINE, CONFIG_ONLINE_ING, CONFIG_DELETING)); - GROUP_STATE_AUTOMATON.put(CONFIG_ONLINE_ING, Sets.newHashSet(CONFIG_ONLINE_ING, CONFIG_FAILED)); + GROUP_STATE_AUTOMATON.put(CONFIG_ONLINE_ING, + Sets.newHashSet(CONFIG_ONLINE_ING, CONFIG_FAILED, CONFIG_SUCCESSFUL)); GROUP_STATE_AUTOMATON.put(CONFIG_DELETING, Sets.newHashSet(CONFIG_DELETING, CONFIG_DELETED, CONFIG_FAILED)); GROUP_STATE_AUTOMATON.put(CONFIG_DELETED, Sets.newHashSet(CONFIG_DELETED)); diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java index d7f33fd77d..a7918c640a 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java @@ -24,8 +24,9 @@ import org.apache.inlong.manager.common.util.JsonUtils; import org.apache.inlong.manager.plugin.flink.FlinkOperation; import org.apache.inlong.manager.plugin.flink.FlinkService; import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo; -import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo; -import org.apache.inlong.manager.pojo.group.InlongGroupInfo; +import org.apache.inlong.manager.pojo.sink.StreamSink; +import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm; import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm; import org.apache.inlong.manager.workflow.WorkflowContext; @@ -34,6 +35,7 @@ import org.apache.inlong.manager.workflow.event.task.SortOperateListener; import com.fasterxml.jackson.core.type.TypeReference; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.JobStatus; @@ -84,46 +86,54 @@ public class DeleteSortListener implements SortOperateListener { } GroupResourceProcessForm groupResourceProcessForm = (GroupResourceProcessForm) processForm; - InlongGroupInfo inlongGroupInfo = groupResourceProcessForm.getGroupInfo(); - List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList(); - log.info("inlong group ext info: {}", extList); - - Map<String, String> kvConf = new HashMap<>(); - extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue())); - String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES); - if (StringUtils.isNotEmpty(sortExt)) { - Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue( - JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() { - }); - kvConf.putAll(result); - } - - String jobId = kvConf.get(InlongConstants.SORT_JOB_ID); - if (StringUtils.isBlank(jobId)) { - String message = String.format("sort job id is empty for groupId=%s", groupId); - return ListenerResult.fail(message); - } - - FlinkInfo flinkInfo = new FlinkInfo(); - flinkInfo.setJobId(jobId); - String sortUrl = kvConf.get(InlongConstants.SORT_URL); - flinkInfo.setEndpoint(sortUrl); - - FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint()); - FlinkOperation flinkOperation = new FlinkOperation(flinkService); - try { - flinkOperation.delete(flinkInfo); - log.info("job delete success for jobId={}", jobId); - } catch (Exception e) { - flinkInfo.setException(true); - flinkInfo.setExceptionMsg(getExceptionStackMsg(e)); + List<InlongStreamInfo> streamInfos = groupResourceProcessForm.getStreamInfos(); + for (InlongStreamInfo streamInfo : streamInfos) { + List<StreamSink> sinkList = streamInfo.getSinkList(); + if (CollectionUtils.isEmpty(sinkList)) { + continue; + } + List<InlongStreamExtInfo> extList = streamInfo.getExtList(); + log.info("stream sink ext info: {}", extList); + + Map<String, String> kvConf = new HashMap<>(); + extList.forEach(v -> kvConf.put(v.getKeyName(), v.getKeyValue())); + String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES); + if (StringUtils.isNotEmpty(sortExt)) { + Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue( + JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() { + }); + kvConf.putAll(result); + } + + String jobId = kvConf.get(InlongConstants.SORT_JOB_ID); + if (StringUtils.isBlank(jobId)) { + String message = String.format("sort job id is empty for groupId=%s, streamId=%s", groupId, + streamInfo.getInlongStreamId()); + return ListenerResult.fail(message); + } + + FlinkInfo flinkInfo = new FlinkInfo(); + flinkInfo.setJobId(jobId); + String sortUrl = kvConf.get(InlongConstants.SORT_URL); + flinkInfo.setEndpoint(sortUrl); + + FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint()); + FlinkOperation flinkOperation = new FlinkOperation(flinkService); + try { + flinkOperation.delete(flinkInfo); + log.info("job delete success for jobId={}", jobId); + } catch (Exception e) { + flinkInfo.setException(true); + flinkInfo.setExceptionMsg(getExceptionStackMsg(e)); + flinkOperation.pollJobStatus(flinkInfo, JobStatus.CANCELED); + + String message = String.format("delete sort failed for groupId=%s, streamId=%s", groupId, + streamInfo.getInlongStreamId()); + log.error(message, e); + return ListenerResult.fail(message + ": " + e.getMessage()); + } flinkOperation.pollJobStatus(flinkInfo, JobStatus.CANCELED); - - String message = String.format("delete sort failed for groupId=%s", groupId); - log.error(message, e); - return ListenerResult.fail(message + ": " + e.getMessage()); } - flinkOperation.pollJobStatus(flinkInfo, JobStatus.CANCELED); return ListenerResult.success(); } diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java index c8a223e0af..66bf88b149 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java @@ -25,8 +25,9 @@ import org.apache.inlong.manager.plugin.flink.FlinkOperation; import org.apache.inlong.manager.plugin.flink.FlinkService; import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo; import org.apache.inlong.manager.plugin.flink.enums.Constants; -import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo; -import org.apache.inlong.manager.pojo.group.InlongGroupInfo; +import org.apache.inlong.manager.pojo.sink.StreamSink; +import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm; import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm; import org.apache.inlong.manager.workflow.WorkflowContext; @@ -35,6 +36,7 @@ import org.apache.inlong.manager.workflow.event.task.SortOperateListener; import com.fasterxml.jackson.core.type.TypeReference; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.JobStatus; @@ -85,66 +87,79 @@ public class RestartSortListener implements SortOperateListener { } GroupResourceProcessForm groupResourceProcessForm = (GroupResourceProcessForm) processForm; - InlongGroupInfo inlongGroupInfo = groupResourceProcessForm.getGroupInfo(); - List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList(); - log.info("inlong group ext info: {}", extList); - - Map<String, String> kvConf = new HashMap<>(); - extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue())); - String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES); - if (StringUtils.isNotEmpty(sortExt)) { - Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue( - JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() { - }); - kvConf.putAll(result); - } - - String jobId = kvConf.get(InlongConstants.SORT_JOB_ID); - if (StringUtils.isBlank(jobId)) { - String message = String.format("sort job id is empty for groupId [%s]", groupId); - return ListenerResult.fail(message); - } - String dataflow = kvConf.get(InlongConstants.DATAFLOW); - if (StringUtils.isEmpty(dataflow)) { - String message = String.format("dataflow is empty for groupId [%s]", groupId); - log.error(message); - return ListenerResult.fail(message); - } - FlinkInfo flinkInfo = new FlinkInfo(); - String jobName = Constants.SORT_JOB_NAME_GENERATOR.apply(processForm); - flinkInfo.setJobName(jobName); - String sortUrl = kvConf.get(InlongConstants.SORT_URL); - flinkInfo.setEndpoint(sortUrl); - - FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint()); - FlinkOperation flinkOperation = new FlinkOperation(flinkService); - try { - flinkOperation.genPath(flinkInfo, dataflow); - // todo Currently, savepoint is not being used to restart, but will be improved in the future - flinkOperation.start(flinkInfo); - log.info("job restart success for [{}]", jobId); - } catch (Exception e) { - flinkInfo.setException(true); - flinkInfo.setExceptionMsg(getExceptionStackMsg(e)); + List<InlongStreamInfo> streamInfos = groupResourceProcessForm.getStreamInfos(); + for (InlongStreamInfo streamInfo : streamInfos) { + List<StreamSink> sinkList = streamInfo.getSinkList(); + if (CollectionUtils.isEmpty(sinkList)) { + continue; + } + List<InlongStreamExtInfo> extList = streamInfo.getExtList(); + log.info("stream ext info: {}", extList); + + Map<String, String> kvConf = new HashMap<>(); + extList.forEach(v -> kvConf.put(v.getKeyName(), v.getKeyValue())); + String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES); + if (StringUtils.isNotEmpty(sortExt)) { + Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue( + JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() { + }); + kvConf.putAll(result); + } + + String jobId = kvConf.get(InlongConstants.SORT_JOB_ID); + if (StringUtils.isBlank(jobId)) { + String message = String.format("sort job id is empty for groupId [%s], streamId [%s]", groupId, + streamInfo.getInlongStreamId()); + return ListenerResult.fail(message); + } + String dataflow = kvConf.get(InlongConstants.DATAFLOW); + if (StringUtils.isEmpty(dataflow)) { + String message = String.format("dataflow is empty for groupId [%s], streamId [%s]", groupId, + streamInfo.getInlongStreamId()); + log.error(message); + return ListenerResult.fail(message); + } + + FlinkInfo flinkInfo = new FlinkInfo(); + String jobName = Constants.SORT_JOB_NAME_GENERATOR.apply(processForm) + streamInfo.getInlongStreamId(); + flinkInfo.setJobName(jobName); + String sortUrl = kvConf.get(InlongConstants.SORT_URL); + flinkInfo.setEndpoint(sortUrl); + + FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint()); + FlinkOperation flinkOperation = new FlinkOperation(flinkService); + try { + flinkOperation.genPath(flinkInfo, dataflow); + // todo Currently, savepoint is not being used to restart, but will be improved in the future + flinkOperation.start(flinkInfo); + log.info("job restart success for groupId = {}, streamId = {} jobId = {}", groupId, + streamInfo.getInlongStreamId(), jobId); + } catch (Exception e) { + flinkInfo.setException(true); + flinkInfo.setExceptionMsg(getExceptionStackMsg(e)); + flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING); + + String message = String.format("restart sort failed for groupId [%s], streamId [%s] ", groupId, + streamInfo.getInlongStreamId()); + log.error(message, e); + return ListenerResult.fail(message + e.getMessage()); + } + extList.forEach(groupExtInfo -> kvConf.remove(InlongConstants.SORT_JOB_ID)); + saveInfo(streamInfo, InlongConstants.SORT_JOB_ID, flinkInfo.getJobId(), extList); flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING); - - String message = String.format("restart sort failed for groupId [%s] ", groupId); - log.error(message, e); - return ListenerResult.fail(message + e.getMessage()); } - extList.forEach(groupExtInfo -> kvConf.remove(InlongConstants.SORT_JOB_ID)); - saveInfo(groupId, InlongConstants.SORT_JOB_ID, flinkInfo.getJobId(), extList); - flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING); return ListenerResult.success(); } /** - * Save ext info into list. + * Save stream ext info into list. */ - private void saveInfo(String inlongGroupId, String keyName, String keyValue, List<InlongGroupExtInfo> extInfoList) { - InlongGroupExtInfo extInfo = new InlongGroupExtInfo(); - extInfo.setInlongGroupId(inlongGroupId); + private void saveInfo(InlongStreamInfo streamInfo, String keyName, String keyValue, + List<InlongStreamExtInfo> extInfoList) { + InlongStreamExtInfo extInfo = new InlongStreamExtInfo(); + extInfo.setInlongGroupId(streamInfo.getInlongGroupId()); + extInfo.setInlongStreamId(streamInfo.getInlongStreamId()); extInfo.setKeyName(keyName); extInfo.setKeyValue(keyValue); extInfoList.add(extInfo); diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java index b65fb40ea6..0dc3cf08f3 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java @@ -18,6 +18,7 @@ package org.apache.inlong.manager.plugin.listener; import org.apache.inlong.manager.common.consts.InlongConstants; +import org.apache.inlong.manager.common.consts.SinkType; import org.apache.inlong.manager.common.enums.GroupOperateType; import org.apache.inlong.manager.common.enums.TaskEvent; import org.apache.inlong.manager.common.util.JsonUtils; @@ -25,8 +26,8 @@ import org.apache.inlong.manager.plugin.flink.FlinkOperation; import org.apache.inlong.manager.plugin.flink.FlinkService; import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo; import org.apache.inlong.manager.plugin.flink.enums.Constants; -import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo; -import org.apache.inlong.manager.pojo.group.InlongGroupInfo; +import org.apache.inlong.manager.pojo.sink.StreamSink; +import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm; import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm; @@ -36,9 +37,11 @@ import org.apache.inlong.manager.workflow.event.task.SortOperateListener; import com.fasterxml.jackson.core.type.TypeReference; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.JobStatus; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -71,7 +74,7 @@ public class StartupSortListener implements SortOperateListener { } log.info("add startup group listener for groupId [{}]", groupId); - return true; + return InlongConstants.DATASYNC_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode()); } @Override @@ -94,65 +97,77 @@ public class StartupSortListener implements SortOperateListener { return ListenerResult.success(); } - InlongGroupInfo inlongGroupInfo = groupResourceForm.getGroupInfo(); - List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList(); - log.info("inlong group ext info: {}", extList); - - Map<String, String> kvConf = extList.stream().filter(v -> StringUtils.isNotEmpty(v.getKeyName()) - && StringUtils.isNotEmpty(v.getKeyValue())).collect(Collectors.toMap( - InlongGroupExtInfo::getKeyName, - InlongGroupExtInfo::getKeyValue)); - String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES); - if (StringUtils.isNotEmpty(sortExt)) { - Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue( - JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() { - }); - kvConf.putAll(result); - } - - String dataflow = kvConf.get(InlongConstants.DATAFLOW); - if (StringUtils.isEmpty(dataflow)) { - String message = String.format("dataflow is empty for groupId [%s]", groupId); - log.error(message); - return ListenerResult.fail(message); - } - - FlinkInfo flinkInfo = new FlinkInfo(); - - String jobName = Constants.SORT_JOB_NAME_GENERATOR.apply(processForm); - flinkInfo.setJobName(jobName); - String sortUrl = kvConf.get(InlongConstants.SORT_URL); - flinkInfo.setEndpoint(sortUrl); - flinkInfo.setInlongStreamInfoList(groupResourceForm.getStreamInfos()); - - FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint()); - FlinkOperation flinkOperation = new FlinkOperation(flinkService); - - try { - flinkOperation.genPath(flinkInfo, dataflow); - flinkOperation.start(flinkInfo); - log.info("job submit success, jobId is [{}]", flinkInfo.getJobId()); - } catch (Exception e) { - flinkInfo.setException(true); - flinkInfo.setExceptionMsg(getExceptionStackMsg(e)); + for (InlongStreamInfo streamInfo : streamInfos) { + List<StreamSink> sinkList = streamInfo.getSinkList(); + List<String> sinkTypes = sinkList.stream().map(StreamSink::getSinkType).collect(Collectors.toList()); + if (CollectionUtils.isEmpty(sinkList) || !SinkType.containSortFlinkSink(sinkTypes)) { + return ListenerResult.success(); + } + + List<InlongStreamExtInfo> extList = streamInfo.getExtList(); + log.info("stream ext info: {}", extList); + Map<String, String> kvConf = extList.stream().filter(v -> StringUtils.isNotEmpty(v.getKeyName()) + && StringUtils.isNotEmpty(v.getKeyValue())).collect(Collectors.toMap( + InlongStreamExtInfo::getKeyName, + InlongStreamExtInfo::getKeyValue)); + + String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES); + if (StringUtils.isNotEmpty(sortExt)) { + Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue( + JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() { + }); + kvConf.putAll(result); + } + + String dataflow = kvConf.get(InlongConstants.DATAFLOW); + if (StringUtils.isEmpty(dataflow)) { + String message = String.format("dataflow is empty for groupId [%s], streamId [%s]", groupId, + streamInfo.getInlongStreamId()); + log.error(message); + return ListenerResult.fail(message); + } + + FlinkInfo flinkInfo = new FlinkInfo(); + + String jobName = Constants.SORT_JOB_NAME_GENERATOR.apply(processForm) + streamInfo.getInlongStreamId(); + flinkInfo.setJobName(jobName); + String sortUrl = kvConf.get(InlongConstants.SORT_URL); + flinkInfo.setEndpoint(sortUrl); + flinkInfo.setInlongStreamInfoList(Collections.singletonList(streamInfo)); + + FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint()); + FlinkOperation flinkOperation = new FlinkOperation(flinkService); + + try { + flinkOperation.genPath(flinkInfo, dataflow); + flinkOperation.start(flinkInfo); + log.info("job submit success for groupId = {}, streamId = {}, jobId = {}", groupId, + streamInfo.getInlongStreamId(), flinkInfo.getJobId()); + } catch (Exception e) { + flinkInfo.setException(true); + flinkInfo.setExceptionMsg(getExceptionStackMsg(e)); + flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING); + + String message = String.format("startup sort failed for groupId [%s], streamId [%s]", groupId, + streamInfo.getInlongStreamId()); + log.error(message, e); + return ListenerResult.fail(message + e.getMessage()); + } + + saveInfo(streamInfo, InlongConstants.SORT_JOB_ID, flinkInfo.getJobId(), extList); flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING); - - String message = String.format("startup sort failed for groupId [%s] ", groupId); - log.error(message, e); - return ListenerResult.fail(message + e.getMessage()); } - - saveInfo(groupId, InlongConstants.SORT_JOB_ID, flinkInfo.getJobId(), extList); - flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING); return ListenerResult.success(); } /** - * Save ext info into list. + * Save stream ext info into list. */ - private void saveInfo(String inlongGroupId, String keyName, String keyValue, List<InlongGroupExtInfo> extInfoList) { - InlongGroupExtInfo extInfo = new InlongGroupExtInfo(); - extInfo.setInlongGroupId(inlongGroupId); + private void saveInfo(InlongStreamInfo streamInfo, String keyName, String keyValue, + List<InlongStreamExtInfo> extInfoList) { + InlongStreamExtInfo extInfo = new InlongStreamExtInfo(); + extInfo.setInlongGroupId(streamInfo.getInlongGroupId()); + extInfo.setInlongStreamId(streamInfo.getInlongStreamId()); extInfo.setKeyName(keyName); extInfo.setKeyValue(keyValue); extInfoList.add(extInfo); diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java index 1f2a8fabef..b3a341c37d 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java @@ -17,12 +17,36 @@ package org.apache.inlong.manager.plugin.listener; +import org.apache.inlong.manager.common.consts.InlongConstants; +import org.apache.inlong.manager.common.consts.SinkType; +import org.apache.inlong.manager.common.enums.GroupOperateType; import org.apache.inlong.manager.common.enums.TaskEvent; +import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.plugin.flink.FlinkOperation; +import org.apache.inlong.manager.plugin.flink.FlinkService; +import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo; +import org.apache.inlong.manager.plugin.flink.enums.Constants; +import org.apache.inlong.manager.pojo.sink.StreamSink; +import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; +import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm; +import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm; import org.apache.inlong.manager.workflow.WorkflowContext; import org.apache.inlong.manager.workflow.event.ListenerResult; import org.apache.inlong.manager.workflow.event.task.SortOperateListener; +import com.fasterxml.jackson.core.type.TypeReference; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.common.JobStatus; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg; /** * Listener for startup the Sort task for InlongStream @@ -41,13 +65,108 @@ public class StartupStreamListener implements SortOperateListener { */ @Override public boolean accept(WorkflowContext workflowContext) { - log.info("not need to start the sort task for InlongStream"); - return false; + ProcessForm processForm = workflowContext.getProcessForm(); + String groupId = processForm.getInlongGroupId(); + if (!(processForm instanceof StreamResourceProcessForm)) { + log.info("not add startup stream listener, not StreamResourceProcessForm for groupId [{}]", groupId); + return false; + } + + StreamResourceProcessForm streamProcessForm = (StreamResourceProcessForm) processForm; + String streamId = streamProcessForm.getStreamInfo().getInlongStreamId(); + if (streamProcessForm.getGroupOperateType() != GroupOperateType.INIT) { + log.info("not add startup stream listener, as the operate was not INIT for groupId [{}] streamId [{}]", + groupId, streamId); + return false; + } + + log.info("add startup stream listener for groupId [{}] streamId [{}]", groupId, streamId); + return InlongConstants.STANDARD_MODE.equals(streamProcessForm.getGroupInfo().getInlongGroupMode()); } @Override public ListenerResult listen(WorkflowContext context) throws Exception { + ProcessForm processForm = context.getProcessForm(); + StreamResourceProcessForm streamResourceProcessForm = (StreamResourceProcessForm) processForm; + InlongStreamInfo streamInfo = streamResourceProcessForm.getStreamInfo(); + List<InlongStreamExtInfo> streamExtList = streamInfo.getExtList(); + log.info("inlong stream :{} ext info: {}", streamInfo.getInlongStreamId(), streamExtList); + final String groupId = streamInfo.getInlongGroupId(); + final String streamId = streamInfo.getInlongStreamId(); + + List<StreamSink> sinkList = streamInfo.getSinkList(); + List<String> sinkTypes = sinkList.stream().map(StreamSink::getSinkType).collect(Collectors.toList()); + if (CollectionUtils.isEmpty(sinkList) || !SinkType.containSortFlinkSink(sinkTypes)) { + log.warn("not any sink configured for group {} and stream {}, skip launching sort job", groupId, streamId); + return ListenerResult.success(); + } + + List<InlongStreamExtInfo> extList = streamInfo.getExtList(); + log.info("stream ext info: {}", extList); + Map<String, String> kvConf = extList.stream().filter(v -> StringUtils.isNotEmpty(v.getKeyName()) + && StringUtils.isNotEmpty(v.getKeyValue())).collect(Collectors.toMap( + InlongStreamExtInfo::getKeyName, + InlongStreamExtInfo::getKeyValue)); + + String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES); + if (StringUtils.isNotEmpty(sortExt)) { + Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue( + JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() { + }); + kvConf.putAll(result); + } + + String dataflow = kvConf.get(InlongConstants.DATAFLOW); + if (StringUtils.isEmpty(dataflow)) { + String message = String.format("dataflow is empty for groupId [%s], streamId [%s]", groupId, + streamInfo.getInlongStreamId()); + log.error(message); + return ListenerResult.fail(message); + } + + FlinkInfo flinkInfo = new FlinkInfo(); + + String jobName = Constants.SORT_JOB_NAME_GENERATOR.apply(processForm) + streamInfo.getInlongStreamId(); + flinkInfo.setJobName(jobName); + String sortUrl = kvConf.get(InlongConstants.SORT_URL); + flinkInfo.setEndpoint(sortUrl); + flinkInfo.setInlongStreamInfoList(Collections.singletonList(streamInfo)); + + FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint()); + FlinkOperation flinkOperation = new FlinkOperation(flinkService); + + try { + flinkOperation.genPath(flinkInfo, dataflow); + flinkOperation.start(flinkInfo); + log.info("job submit success for groupId = {}, streamId = {}, jobId = {}", groupId, + streamInfo.getInlongStreamId(), flinkInfo.getJobId()); + } catch (Exception e) { + flinkInfo.setException(true); + flinkInfo.setExceptionMsg(getExceptionStackMsg(e)); + flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING); + + String message = String.format("startup sort failed for groupId [%s], streamId [%s]", groupId, + streamInfo.getInlongStreamId()); + log.error(message, e); + return ListenerResult.fail(message + e.getMessage()); + } + + saveInfo(streamInfo, InlongConstants.SORT_JOB_ID, flinkInfo.getJobId(), extList); + flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING); return ListenerResult.success(); } + /** + * Save stream ext info into list. + */ + private void saveInfo(InlongStreamInfo streamInfo, String keyName, String keyValue, + List<InlongStreamExtInfo> extInfoList) { + InlongStreamExtInfo extInfo = new InlongStreamExtInfo(); + extInfo.setInlongGroupId(streamInfo.getInlongGroupId()); + extInfo.setInlongStreamId(streamInfo.getInlongStreamId()); + extInfo.setKeyName(keyName); + extInfo.setKeyValue(keyValue); + extInfoList.add(extInfo); + } + } diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java index affe9bbd22..06a76e1bf7 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java @@ -24,8 +24,9 @@ import org.apache.inlong.manager.common.util.JsonUtils; import org.apache.inlong.manager.plugin.flink.FlinkOperation; import org.apache.inlong.manager.plugin.flink.FlinkService; import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo; -import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo; -import org.apache.inlong.manager.pojo.group.InlongGroupInfo; +import org.apache.inlong.manager.pojo.sink.StreamSink; +import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm; import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm; import org.apache.inlong.manager.workflow.WorkflowContext; @@ -34,6 +35,7 @@ import org.apache.inlong.manager.workflow.event.task.SortOperateListener; import com.fasterxml.jackson.core.type.TypeReference; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.JobStatus; @@ -84,47 +86,56 @@ public class SuspendSortListener implements SortOperateListener { } GroupResourceProcessForm groupResourceProcessForm = (GroupResourceProcessForm) processForm; - InlongGroupInfo inlongGroupInfo = groupResourceProcessForm.getGroupInfo(); - List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList(); - log.info("inlong group ext info: {}", extList); - - Map<String, String> kvConf = new HashMap<>(); - extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue())); - String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES); - if (StringUtils.isNotEmpty(sortExt)) { - Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue( - JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() { - }); - kvConf.putAll(result); - } - - String jobId = kvConf.get(InlongConstants.SORT_JOB_ID); - if (StringUtils.isBlank(jobId)) { - String message = String.format("sort job id is empty for groupId [%s]", groupId); - return ListenerResult.fail(message); - } - - FlinkInfo flinkInfo = new FlinkInfo(); - flinkInfo.setJobId(jobId); - String sortUrl = kvConf.get(InlongConstants.SORT_URL); - flinkInfo.setEndpoint(sortUrl); - - FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint()); - FlinkOperation flinkOperation = new FlinkOperation(flinkService); - try { - // todo Currently, savepoint is not being used to stop, but will be improved in the future - flinkOperation.delete(flinkInfo); - log.info("job suspend success for [{}]", jobId); - } catch (Exception e) { - flinkInfo.setException(true); - flinkInfo.setExceptionMsg(getExceptionStackMsg(e)); + List<InlongStreamInfo> streamInfos = groupResourceProcessForm.getStreamInfos(); + for (InlongStreamInfo streamInfo : streamInfos) { + List<StreamSink> sinkList = streamInfo.getSinkList(); + if (CollectionUtils.isEmpty(sinkList)) { + continue; + } + List<InlongStreamExtInfo> extList = streamInfo.getExtList(); + log.info("stream ext info: {}", extList); + + Map<String, String> kvConf = new HashMap<>(); + extList.forEach(v -> kvConf.put(v.getKeyName(), v.getKeyValue())); + String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES); + if (StringUtils.isNotEmpty(sortExt)) { + Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue( + JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() { + }); + kvConf.putAll(result); + } + + String jobId = kvConf.get(InlongConstants.SORT_JOB_ID); + if (StringUtils.isBlank(jobId)) { + String message = String.format("sort job id is empty for groupId [%s]", groupId, + streamInfo.getInlongStreamId()); + return ListenerResult.fail(message); + } + + FlinkInfo flinkInfo = new FlinkInfo(); + flinkInfo.setJobId(jobId); + String sortUrl = kvConf.get(InlongConstants.SORT_URL); + flinkInfo.setEndpoint(sortUrl); + + FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint()); + FlinkOperation flinkOperation = new FlinkOperation(flinkService); + try { + // todo Currently, savepoint is not being used to stop, but will be improved in the future + flinkOperation.delete(flinkInfo); + log.info("job suspend success for groupId = {}, streamId ={}, jobId = {}", groupId, + streamInfo.getInlongStreamId(), jobId); + } catch (Exception e) { + flinkInfo.setException(true); + flinkInfo.setExceptionMsg(getExceptionStackMsg(e)); + flinkOperation.pollJobStatus(flinkInfo, JobStatus.CANCELED); + + String message = String.format("suspend sort failed for groupId [%s], streamId [%s]", groupId, + streamInfo.getInlongStreamId()); + log.error(message, e); + return ListenerResult.fail(message + e.getMessage()); + } flinkOperation.pollJobStatus(flinkInfo, JobStatus.CANCELED); - - String message = String.format("suspend sort failed for groupId [%s] ", groupId); - log.error(message, e); - return ListenerResult.fail(message + e.getMessage()); } - flinkOperation.pollJobStatus(flinkInfo, JobStatus.CANCELED); return ListenerResult.success(); } diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java index 0f689c0e3a..84509b8a0e 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java @@ -21,9 +21,9 @@ import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.SortStatus; import org.apache.inlong.manager.common.util.JsonUtils; import org.apache.inlong.manager.plugin.flink.FlinkService; -import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo; -import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.sort.SortStatusInfo; +import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.workflow.plugin.sort.SortPoller; import com.fasterxml.jackson.core.type.TypeReference; @@ -66,22 +66,21 @@ public class SortStatusPoller implements SortPoller { } @Override - public List<SortStatusInfo> pollSortStatus(List<InlongGroupInfo> groupInfos, String credentials) { - log.debug("begin to poll sort status for inlong groups"); - if (CollectionUtils.isEmpty(groupInfos)) { - log.debug("end to poll sort status, as the inlong groups is empty"); + public List<SortStatusInfo> pollSortStatus(List<InlongStreamInfo> streamInfos, String credentials) { + log.debug("begin to poll sort status for stream"); + if (CollectionUtils.isEmpty(streamInfos)) { + log.debug("end to poll sort status, as the stream list is empty"); return Collections.emptyList(); } - List<SortStatusInfo> statusInfos = new ArrayList<>(groupInfos.size()); - for (InlongGroupInfo groupInfo : groupInfos) { - String groupId = groupInfo.getInlongGroupId(); + List<SortStatusInfo> statusInfos = new ArrayList<>(streamInfos.size()); + for (InlongStreamInfo streamInfo : streamInfos) { try { - List<InlongGroupExtInfo> extList = groupInfo.getExtList(); - log.debug("inlong group {} ext info: {}", groupId, extList); + List<InlongStreamExtInfo> extList = streamInfo.getExtList(); + log.debug("stream {} ext info: {}", streamInfo.getInlongStreamId(), extList); Map<String, String> kvConf = new HashMap<>(); - extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue())); + extList.forEach(v -> kvConf.put(v.getKeyName(), v.getKeyValue())); String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES); if (StringUtils.isNotEmpty(sortExt)) { Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue( @@ -91,7 +90,10 @@ public class SortStatusPoller implements SortPoller { } String jobId = kvConf.get(InlongConstants.SORT_JOB_ID); - SortStatusInfo statusInfo = SortStatusInfo.builder().inlongGroupId(groupId).build(); + SortStatusInfo statusInfo = SortStatusInfo.builder() + .inlongGroupId(streamInfo.getInlongGroupId()) + .inlongStreamId(streamInfo.getInlongStreamId()) + .build(); if (StringUtils.isBlank(jobId)) { statusInfo.setSortStatus(SortStatus.NOT_EXISTS); statusInfos.add(statusInfo); @@ -104,7 +106,8 @@ public class SortStatusPoller implements SortPoller { JOB_SORT_STATUS_MAP.getOrDefault(flinkService.getJobStatus(jobId), SortStatus.UNKNOWN)); statusInfos.add(statusInfo); } catch (Exception e) { - log.error("polling sort status failed for groupId=" + groupId, e); + log.error("polling sort status failed for groupId=" + streamInfo.getInlongGroupId() + " streamId=" + + streamInfo.getInlongStreamId(), e); } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/SortStatusInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/SortStatusInfo.java index 6fd04ba855..7950c9cc1c 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/SortStatusInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/SortStatusInfo.java @@ -41,6 +41,9 @@ public class SortStatusInfo { @ApiModelProperty(value = "Inlong group id") private String inlongGroupId; + @ApiModelProperty(value = "Inlong stream id") + private String inlongStreamId; + @ApiModelProperty(value = "Sort status info") private SortStatus sortStatus; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java index 437e80c035..0072a877f1 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java @@ -26,10 +26,12 @@ import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.sort.SortStatusInfo; import org.apache.inlong.manager.pojo.sort.SortStatusRequest; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.service.core.SortClusterService; import org.apache.inlong.manager.service.core.SortService; import org.apache.inlong.manager.service.core.SortSourceService; import org.apache.inlong.manager.service.group.InlongGroupService; +import org.apache.inlong.manager.service.stream.InlongStreamService; import org.apache.inlong.manager.workflow.plugin.sort.PollerPlugin; import org.apache.inlong.manager.workflow.plugin.sort.SortPoller; @@ -38,6 +40,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; +import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; @@ -58,6 +61,8 @@ public class SortServiceImpl implements SortService, PluginBinder { private SortClusterService sortClusterService; @Autowired private InlongGroupService groupService; + @Autowired + private InlongStreamService streamService; /** * The plugin poller will be initialed after the application starts. @@ -92,8 +97,11 @@ public class SortServiceImpl implements SortService, PluginBinder { }) .filter(Objects::nonNull) .collect(Collectors.toList()); - - List<SortStatusInfo> statusInfos = sortPoller.pollSortStatus(groupInfoList, request.getCredentials()); + List<InlongStreamInfo> streamInfos = new ArrayList<>(); + groupInfoList.forEach(groupInfo -> { + streamInfos.addAll(streamService.list(groupInfo.getInlongGroupId())); + }); + List<SortStatusInfo> statusInfos = sortPoller.pollSortStatus(streamInfos, request.getCredentials()); log.debug("success to list sort status for request={}, result={}", request, statusInfos); return statusInfos; } catch (Exception e) { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java index 4e994f8be1..62f9ee7da3 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java @@ -29,9 +29,11 @@ import org.apache.inlong.manager.common.util.JsonUtils; import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.dao.entity.InlongGroupEntity; import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity; +import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity; import org.apache.inlong.manager.dao.entity.StreamSourceEntity; import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper; import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper; +import org.apache.inlong.manager.dao.mapper.InlongStreamExtEntityMapper; import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper; import org.apache.inlong.manager.pojo.cluster.ClusterInfo; import org.apache.inlong.manager.pojo.common.OrderFieldEnum; @@ -107,6 +109,8 @@ public class InlongGroupServiceImpl implements InlongGroupService { @Autowired private StreamSourceEntityMapper streamSourceMapper; @Autowired + private InlongStreamExtEntityMapper streamExtMapper; + @Autowired private InlongClusterService clusterService; @Autowired @@ -211,7 +215,8 @@ public class InlongGroupServiceImpl implements InlongGroupService { List<InlongGroupExtEntity> extEntityList = groupExtMapper.selectByGroupId(groupId); List<InlongGroupExtInfo> extList = CommonBeanUtils.copyListProperties(extEntityList, InlongGroupExtInfo::new); groupInfo.setExtList(extList); - BaseSortConf sortConf = buildSortConfig(extList); + List<InlongStreamExtEntity> streamExtEntities = streamExtMapper.selectByRelatedId(groupId, null); + BaseSortConf sortConf = buildSortConfig(streamExtEntities); groupInfo.setSortConf(sortConf); LOGGER.debug("success to get inlong group for groupId={}", groupId); @@ -232,7 +237,8 @@ public class InlongGroupServiceImpl implements InlongGroupService { List<InlongGroupExtEntity> extEntityList = groupExtMapper.selectByGroupId(groupId); List<InlongGroupExtInfo> extList = CommonBeanUtils.copyListProperties(extEntityList, InlongGroupExtInfo::new); groupInfo.setExtList(extList); - BaseSortConf sortConf = buildSortConfig(extList); + List<InlongStreamExtEntity> streamExtEntities = streamExtMapper.selectByRelatedId(groupId, null); + BaseSortConf sortConf = buildSortConfig(streamExtEntities); groupInfo.setSortConf(sortConf); return groupInfo; } @@ -595,7 +601,7 @@ public class InlongGroupServiceImpl implements InlongGroupService { return true; } - private BaseSortConf buildSortConfig(List<InlongGroupExtInfo> extInfos) { + private BaseSortConf buildSortConfig(List<InlongStreamExtEntity> extInfos) { Map<String, String> extMap = new HashMap<>(); extInfos.forEach(extInfo -> extMap.put(extInfo.getKeyName(), extInfo.getKeyValue())); String type = extMap.get(InlongConstants.SORT_TYPE); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java index b8d776c6e7..ebed9e72e3 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java @@ -17,11 +17,13 @@ package org.apache.inlong.manager.service.listener.sort; +import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.GroupOperateType; import org.apache.inlong.manager.common.enums.GroupStatus; import org.apache.inlong.manager.common.enums.TaskEvent; import org.apache.inlong.manager.common.exceptions.WorkflowListenerException; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; +import org.apache.inlong.manager.pojo.sink.StreamSink; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm; import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm; @@ -41,6 +43,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.List; +import java.util.stream.Collectors; /** * Event listener of build the Sort config, @@ -121,8 +124,18 @@ public class SortConfigListener implements SortOperateListener { } try { - SortConfigOperator operator = operatorFactory.getInstance(groupInfo.getEnableZookeeper()); - operator.buildConfig(groupInfo, streamInfos, false); + for (InlongStreamInfo streamInfo : streamInfos) { + List<StreamSink> sinkList = streamInfo.getSinkList(); + if (CollectionUtils.isEmpty(sinkList)) { + continue; + } + List<String> sinkTypeList = sinkList.stream().map(StreamSink::getSinkType).collect(Collectors.toList()); + List<SortConfigOperator> operatorList = operatorFactory.getInstance(sinkTypeList); + for (SortConfigOperator operator : operatorList) { + operator.buildConfig(groupInfo, streamInfo, + InlongConstants.STANDARD_MODE.equals(groupInfo.getInlongGroupMode())); + } + } } catch (Exception e) { String msg = String.format("failed to build sort config for groupId=%s, ", groupId); LOGGER.error(msg + "streamInfos=" + streamInfos, e); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/StreamSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/StreamSortConfigListener.java index 39ac951535..070f71c5a6 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/StreamSortConfigListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/StreamSortConfigListener.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.service.listener.sort; +import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.GroupOperateType; import org.apache.inlong.manager.common.enums.GroupStatus; import org.apache.inlong.manager.common.enums.TaskEvent; @@ -41,8 +42,8 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; /** * Event listener of build the Sort config for one inlong stream, @@ -85,6 +86,17 @@ public class StreamSortConfigListener implements SortOperateListener { InlongStreamInfo streamInfo = form.getStreamInfo(); final String groupId = streamInfo.getInlongGroupId(); final String streamId = streamInfo.getInlongStreamId(); + // Read the current information + InlongGroupInfo groupInfo = groupService.get(groupId); + if (groupInfo == null) { + String msg = "inlong group not found with groupId=" + groupId; + LOGGER.error(msg); + throw new WorkflowListenerException(msg); + } + form.setGroupInfo(groupInfo); + form.setStreamInfo(streamService.get(groupId, streamId)); + groupInfo = form.getGroupInfo(); + streamInfo = form.getStreamInfo(); LOGGER.info("begin to build sort config for groupId={}, streamId={}", groupId, streamId); GroupOperateType operateType = form.getGroupOperateType(); @@ -94,7 +106,6 @@ public class StreamSortConfigListener implements SortOperateListener { return ListenerResult.success(); } - InlongGroupInfo groupInfo = groupService.get(groupId); GroupStatus groupStatus = GroupStatus.forCode(groupInfo.getStatus()); Preconditions.expectTrue(GroupStatus.CONFIG_FAILED != groupStatus, String.format("group status=%s not support start stream for groupId=%s", groupStatus, groupId)); @@ -103,17 +114,19 @@ public class StreamSortConfigListener implements SortOperateListener { LOGGER.warn("not build sort config for groupId={}, streamId={}, as not found any sinks", groupId, streamId); return ListenerResult.success(); } - // Read the current information - form.setGroupInfo(groupInfo); - form.setStreamInfo(streamService.get(groupId, streamId)); - List<InlongStreamInfo> streamInfos = Collections.singletonList(streamInfo); try { - SortConfigOperator operator = operatorFactory.getInstance(groupInfo.getEnableZookeeper()); - operator.buildConfig(groupInfo, streamInfos, true); + + List<String> sinkTypeList = streamSinks.stream().map(StreamSink::getSinkType).collect(Collectors.toList()); + List<SortConfigOperator> operatorList = operatorFactory.getInstance(sinkTypeList); + for (SortConfigOperator operator : operatorList) { + operator.buildConfig(groupInfo, streamInfo, + InlongConstants.SYNC_SEND.equals(groupInfo.getInlongGroupMode())); + } + } catch (Exception e) { String msg = String.format("failed to build sort config for groupId=%s, streamId=%s, ", groupId, streamId); - LOGGER.error(msg + "streamInfos=" + streamInfos, e); + LOGGER.error(msg + "streamInfo=" + streamInfo, e); throw new WorkflowListenerException(msg + e.getMessage()); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java index 11c4e3d6f6..08bc53cbf0 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java @@ -18,13 +18,15 @@ package org.apache.inlong.manager.service.resource.sort; import org.apache.inlong.manager.common.consts.InlongConstants; -import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo; +import org.apache.inlong.manager.common.consts.SinkType; +import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.sink.StreamSink; import org.apache.inlong.manager.pojo.sort.node.NodeFactory; import org.apache.inlong.manager.pojo.sort.util.NodeRelationUtils; import org.apache.inlong.manager.pojo.sort.util.TransformNodeUtils; import org.apache.inlong.manager.pojo.source.StreamSource; +import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.pojo.stream.StreamField; import org.apache.inlong.manager.pojo.transform.TransformResponse; @@ -48,6 +50,7 @@ import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -78,111 +81,120 @@ public class DefaultSortConfigOperator implements SortConfigOperator { private AuditService auditService; @Override - public Boolean accept(Integer enableZk) { - return InlongConstants.DISABLE_ZK.equals(enableZk); + public Boolean accept(List<String> sinkTypeList) { + for (String sinkType : sinkTypeList) { + if (SinkType.SORT_FLINK_SINK.contains(sinkType)) { + return true; + } + } + return false; } @Override - public void buildConfig(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos, boolean isStream) + public void buildConfig(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, boolean isStream) throws Exception { if (isStream) { LOGGER.warn("no need to build sort config for stream process when disable zk"); return; } - if (groupInfo == null || CollectionUtils.isEmpty(streamInfos)) { - LOGGER.warn("no need to build sort config as the group is null or streams is empty when disable zk"); + if (groupInfo == null || streamInfo == null) { + LOGGER.warn("no need to build sort config as the group is null or stream is empty when disable zk"); return; } - - GroupInfo sortConfigInfo = this.getGroupInfo(groupInfo, streamInfos); + List<StreamSink> sinkList = new ArrayList<>(); + for (StreamSink sink : streamInfo.getSinkList()) { + if (SinkType.SORT_FLINK_SINK.contains(sink.getSinkType())) { + sinkList.add(sink); + } + } + if (CollectionUtils.isEmpty(sinkList)) { + return; + } + GroupInfo sortConfigInfo = this.getGroupInfo(groupInfo, streamInfo, sinkList); String dataflow = OBJECT_MAPPER.writeValueAsString(sortConfigInfo); - this.addToGroupExt(groupInfo, dataflow); + this.addToStreamExt(streamInfo, dataflow); if (LOGGER.isDebugEnabled()) { LOGGER.debug("success to build sort config, isStream={}, dataflow={}", isStream, dataflow); } } - private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) { + private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, InlongStreamInfo inlongStreamInfo, + List<StreamSink> sinkInfos) { + String streamId = inlongStreamInfo.getInlongStreamId(); // get source info - Map<String, List<StreamSource>> sourceMap = sourceService.getSourcesMap(groupInfo, streamInfoList); - // get sink info - Map<String, List<StreamSink>> sinkMap = sinkService.getSinksMap(groupInfo, streamInfoList); - List<TransformResponse> transformList = transformService.listTransform(groupInfo.getInlongGroupId(), null); + Map<String, List<StreamSource>> sourceMap = sourceService.getSourcesMap(groupInfo, + Collections.singletonList(inlongStreamInfo)); + List<TransformResponse> transformList = transformService.listTransform(groupInfo.getInlongGroupId(), streamId); Map<String, List<TransformResponse>> transformMap = transformList.stream() .collect(Collectors.groupingBy(TransformResponse::getInlongStreamId, HashMap::new, Collectors.toCollection(ArrayList::new))); List<StreamInfo> sortStreamInfos = new ArrayList<>(); - for (InlongStreamInfo inlongStream : streamInfoList) { - String streamId = inlongStream.getInlongStreamId(); - Map<String, StreamField> fieldMap = new HashMap<>(); - inlongStream.getSourceList().forEach( - source -> parseConstantFieldMap(source.getSourceName(), source.getFieldList(), fieldMap)); + Map<String, StreamField> fieldMap = new HashMap<>(); + inlongStreamInfo.getSourceList().forEach( + source -> parseConstantFieldMap(source.getSourceName(), source.getFieldList(), fieldMap)); + + List<TransformResponse> transformResponses = transformMap.get(streamId); + if (CollectionUtils.isNotEmpty(transformResponses)) { + transformResponses.forEach( + trans -> parseConstantFieldMap(trans.getTransformName(), trans.getFieldList(), fieldMap)); + } - List<TransformResponse> transformResponses = transformMap.get(streamId); - if (CollectionUtils.isNotEmpty(transformResponses)) { - transformResponses.forEach( - trans -> parseConstantFieldMap(trans.getTransformName(), trans.getFieldList(), fieldMap)); - } + // build a stream info from the nodes and relations + List<StreamSource> sources = sourceMap.get(streamId); + for (StreamSink sinkInfo : sinkInfos) { + CommonBeanUtils.copyProperties(inlongStreamInfo, sinkInfo, true); + addAuditId(sinkInfo.getProperties(), sinkInfo.getSinkType(), true); + } - // build a stream info from the nodes and relations - List<StreamSource> sources = sourceMap.get(streamId); - List<StreamSink> sinks = sinkMap.get(streamId); + for (StreamSource source : sources) { + source.setFieldList(inlongStreamInfo.getFieldList()); + } + List<NodeRelation> relations; - for (StreamSink sink : sinks) { - addAuditId(sink.getProperties(), sink.getSinkType(), true); - } - for (StreamSource source : sources) { - source.setFieldList(inlongStream.getFieldList()); + if (InlongConstants.STANDARD_MODE.equals(groupInfo.getInlongGroupMode())) { + if (CollectionUtils.isNotEmpty(transformResponses)) { + relations = NodeRelationUtils.createNodeRelations(inlongStreamInfo); + // in standard mode(include Data Ingestion and Synchronization), replace upstream source node and + // transform input fields node to MQ node (which is InLong stream id) + String mqNodeName = sources.get(0).getSourceName(); + Set<String> nodeNameSet = getInputNodeNames(sources, transformResponses); + adjustTransformField(transformResponses, nodeNameSet, mqNodeName); + adjustNodeRelations(relations, nodeNameSet, mqNodeName); + } else { + relations = NodeRelationUtils.createNodeRelations(sources, sinkInfos); } - List<NodeRelation> relations; - - if (InlongConstants.STANDARD_MODE.equals(groupInfo.getInlongGroupMode())) { - if (CollectionUtils.isNotEmpty(transformResponses)) { - relations = NodeRelationUtils.createNodeRelations(inlongStream); - // in standard mode(include Data Ingestion and Synchronization), replace upstream source node and - // transform input fields node to MQ node (which is InLong stream id) - String mqNodeName = sources.get(0).getSourceName(); - Set<String> nodeNameSet = getInputNodeNames(sources, transformResponses); - adjustTransformField(transformResponses, nodeNameSet, mqNodeName); - adjustNodeRelations(relations, nodeNameSet, mqNodeName); - } else { - relations = NodeRelationUtils.createNodeRelations(sources, sinks); - } - if (sources.size() == sinks.size()) { - for (int i = 0; i < sinks.size(); i++) { - addAuditId(sources.get(i).getProperties(), sinks.get(i).getSinkType(), false); - } - } + for (int i = 0; i < sources.size(); i++) { + addAuditId(sources.get(i).getProperties(), sinkInfos.get(0).getSinkType(), false); + } + } else { + if (CollectionUtils.isNotEmpty(transformResponses)) { + List<String> sourcesNames = sources.stream().map(StreamSource::getSourceName) + .collect(Collectors.toList()); + List<String> transFormNames = transformResponses.stream().map(TransformResponse::getTransformName) + .collect(Collectors.toList()); + relations = Arrays.asList(NodeRelationUtils.createNodeRelation(sourcesNames, transFormNames), + NodeRelationUtils.createNodeRelation(transFormNames, + sinkInfos.stream().map(StreamSink::getSinkName).collect(Collectors.toList()))); } else { - if (CollectionUtils.isNotEmpty(transformResponses)) { - List<String> sourcesNames = sources.stream().map(StreamSource::getSourceName) - .collect(Collectors.toList()); - List<String> transFormNames = transformResponses.stream().map(TransformResponse::getTransformName) - .collect(Collectors.toList()); - List<String> sinkNames = sinks.stream().map(StreamSink::getSinkName).collect(Collectors.toList()); - relations = Arrays.asList(NodeRelationUtils.createNodeRelation(sourcesNames, transFormNames), - NodeRelationUtils.createNodeRelation(transFormNames, sinkNames)); - } else { - relations = NodeRelationUtils.createNodeRelations(sources, sinks); - } + relations = NodeRelationUtils.createNodeRelations(sources, sinkInfos); + } - for (StreamSource source : sources) { - addAuditId(source.getProperties(), source.getSourceType(), false); - } + for (StreamSource source : sources) { + addAuditId(source.getProperties(), source.getSourceType(), false); } + } - // create extract-transform-load nodes - List<Node> nodes = this.createNodes(sources, transformResponses, sinks, fieldMap); + // create extract-transform-load nodes + List<Node> nodes = this.createNodes(sources, transformResponses, sinkInfos, fieldMap); - StreamInfo streamInfo = new StreamInfo(streamId, nodes, relations); - sortStreamInfos.add(streamInfo); + StreamInfo streamInfo = new StreamInfo(streamId, nodes, relations); + sortStreamInfos.add(streamInfo); - // rebuild joinerNode relation - NodeRelationUtils.optimizeNodeRelation(streamInfo, transformResponses); - } + // rebuild joinerNode relation + NodeRelationUtils.optimizeNodeRelation(streamInfo, transformResponses); return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos); } @@ -265,20 +277,21 @@ public class DefaultSortConfigOperator implements SortConfigOperator { } /** - * Add config into inlong group ext info + * Add config into inlong stream ext info */ - private void addToGroupExt(InlongGroupInfo groupInfo, String value) { - if (groupInfo.getExtList() == null) { - groupInfo.setExtList(new ArrayList<>()); + private void addToStreamExt(InlongStreamInfo streamInfo, String value) { + if (streamInfo.getExtList() == null) { + streamInfo.setExtList(new ArrayList<>()); } - InlongGroupExtInfo extInfo = new InlongGroupExtInfo(); - extInfo.setInlongGroupId(groupInfo.getInlongGroupId()); + InlongStreamExtInfo extInfo = new InlongStreamExtInfo(); + extInfo.setInlongGroupId(streamInfo.getInlongGroupId()); + extInfo.setInlongStreamId(streamInfo.getInlongStreamId()); extInfo.setKeyName(InlongConstants.DATAFLOW); extInfo.setKeyValue(value); - groupInfo.getExtList().removeIf(ext -> extInfo.getKeyName().equals(ext.getKeyName())); - groupInfo.getExtList().add(extInfo); + streamInfo.getExtList().removeIf(ext -> extInfo.getKeyName().equals(ext.getKeyName())); + streamInfo.getExtList().add(extInfo); } private void addAuditId(Map<String, Object> properties, String type, boolean isSent) { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/SortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/SortConfigOperator.java index ce0e31f589..bd4f3ee712 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/SortConfigOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/SortConfigOperator.java @@ -30,17 +30,17 @@ public interface SortConfigOperator { /** * Determines whether the current instance matches the specified type. * - * @param enableZk is the inlong group enable the ZooKeeper, 1: enable, 0: disable + * @param sinkTypeList sink type list */ - Boolean accept(Integer enableZk); + Boolean accept(List<String> sinkTypeList); /** * Build Sort config. * * @param groupInfo inlong group info - * @param streamInfos inlong stream info list + * @param streamInfo inlong stream info * @param isStream is the config built for inlong stream */ - void buildConfig(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos, boolean isStream) throws Exception; + void buildConfig(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, boolean isStream) throws Exception; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/SortConfigOperatorFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/SortConfigOperatorFactory.java index 95a9c5c39f..baa9708f68 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/SortConfigOperatorFactory.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/SortConfigOperatorFactory.java @@ -17,12 +17,11 @@ package org.apache.inlong.manager.service.resource.sort; -import org.apache.inlong.manager.common.exceptions.BusinessException; - import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; +import java.util.stream.Collectors; /** * Factory for {@link SortConfigOperator}. @@ -36,14 +35,10 @@ public class SortConfigOperatorFactory { /** * Get a Sort config operator instance. * - * @param enableZk is the inlong group enable the ZooKeeper, 1: enable, 0: disable + * @param sinkTypeList sink type */ - public SortConfigOperator getInstance(Integer enableZk) { - return operatorList.stream() - .filter(inst -> inst.accept(enableZk)) - .findFirst() - .orElseThrow(() -> new BusinessException("not found any instance of SortConfigOperator when enableZk=" - + enableZk)); + public List<SortConfigOperator> getInstance(List<String> sinkTypeList) { + return operatorList.stream().filter(inst -> inst.accept(sinkTypeList)).collect(Collectors.toList()); } } diff --git a/inlong-manager/manager-test/src/main/resources/application-unit-test.properties b/inlong-manager/manager-test/src/main/resources/application-unit-test.properties index aa9155e735..17e8ccd2ae 100644 --- a/inlong-manager/manager-test/src/main/resources/application-unit-test.properties +++ b/inlong-manager/manager-test/src/main/resources/application-unit-test.properties @@ -70,3 +70,5 @@ common.http-client.connectionRequestTimeout=3000 # tencent cloud log service endpoint, The Operator cls resource by it cls.manager.endpoint=127.0.0.1 + + diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/sort/SortPoller.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/sort/SortPoller.java index 68dd242850..a602e0e979 100644 --- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/sort/SortPoller.java +++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/sort/SortPoller.java @@ -17,8 +17,8 @@ package org.apache.inlong.manager.workflow.plugin.sort; -import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.sort.SortStatusInfo; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import java.util.List; @@ -30,11 +30,11 @@ public interface SortPoller { /** * Poll the Sort status infos by the given inlong groups * - * @param groupInfos inlong group infos + * @param streamInfos stream sink infos * @param credentials credential info * @return list of Sort status infos * @throws Exception any exception if occurred */ - List<SortStatusInfo> pollSortStatus(List<InlongGroupInfo> groupInfos, String credentials) throws Exception; + List<SortStatusInfo> pollSortStatus(List<InlongStreamInfo> streamInfos, String credentials) throws Exception; }