This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 6b9da03b43 [INLONG-9523][Manager] Fix the problem of sink remains in configuration after standalone cluster allocation failure (#9553) 6b9da03b43 is described below commit 6b9da03b43f83db56cefe28b679aaa04d6a755cf Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Tue Jan 9 14:06:43 2024 +0800 [INLONG-9523][Manager] Fix the problem of sink remains in configuration after standalone cluster allocation failure (#9553) --- .../main/resources/mappers/AuditEntityMapper.xml | 2 + .../AbstractStandaloneSinkResourceOperator.java | 46 +++++++++++++--------- .../service/sink/ck/ClickHouseSinkOperator.java | 1 + .../service/sink/es/ElasticsearchSinkOperator.java | 1 + .../service/sink/iceberg/IcebergSinkOperator.java | 1 + .../service/sink/kudu/KuduSinkOperator.java | 1 + .../sink/starrocks/StarRocksSinkOperator.java | 1 + 7 files changed, 34 insertions(+), 19 deletions(-) diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml index 63d6af6c73..f5cce709e3 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml @@ -41,6 +41,7 @@ <result column="log_ts" property="logTs" jdbcType="VARCHAR"/> <result column="total" property="total" jdbcType="BIGINT"/> <result column="total_delay" property="totalDelay" jdbcType="BIGINT"/> + <result column="total_size" property="totalSize" jdbcType="BIGINT"/> </resultMap> <resultMap id="SumGroupByIdResultMap" type="java.util.Map"> @@ -50,6 +51,7 @@ <result column="ip" property="ip" jdbcType="VARCHAR"/> <result column="total" property="total" jdbcType="BIGINT"/> <result column="total_delay" property="totalDelay" jdbcType="BIGINT"/> + <result column="total_size" property="totalSize" jdbcType="BIGINT"/> </resultMap> <select id="sumByLogTs" resultMap="SumByLogTsResultMap"> diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java index 79893ad9c0..75174f120a 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java @@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.resource.sink; import org.apache.inlong.manager.common.consts.SinkType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.enums.SinkStatus; +import org.apache.inlong.manager.common.exceptions.WorkflowException; import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.dao.entity.InlongClusterEntity; import org.apache.inlong.manager.dao.entity.InlongGroupEntity; @@ -55,26 +56,33 @@ public abstract class AbstractStandaloneSinkResourceOperator implements SinkReso @VisibleForTesting protected void assignCluster(SinkInfo sinkInfo) { - if (StringUtils.isBlank(sinkInfo.getSinkType())) { - throw new IllegalArgumentException(ErrorCodeEnum.SINK_TYPE_IS_NULL.getMessage()); + try { + if (StringUtils.isBlank(sinkInfo.getSinkType())) { + throw new IllegalArgumentException(ErrorCodeEnum.SINK_TYPE_IS_NULL.getMessage()); + } + + if (StringUtils.isNotBlank(sinkInfo.getInlongClusterName())) { + String info = "no need to auto-assign cluster since the cluster has already assigned"; + sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), info); + return; + } + + String targetCluster = assignOneCluster(sinkInfo); + Preconditions.expectNotBlank(targetCluster, + String.format("find no proper cluster assign to group=%s, stream=%s, sink type=%s, data node=%s ", + sinkInfo.getInlongGroupId(), sinkInfo.getInlongStreamId(), sinkInfo.getSinkType(), + sinkInfo.getDataNodeName())); + + StreamSinkEntity sink = sinkEntityMapper.selectByPrimaryKey(sinkInfo.getId()); + sink.setInlongClusterName(targetCluster); + sink.setStatus(SinkStatus.CONFIG_SUCCESSFUL.getCode()); + sinkEntityMapper.updateByIdSelective(sink); + } catch (Throwable e) { + String errMsg = "assign standalone cluster failed: " + e.getMessage(); + log.error(errMsg, e); + sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_FAILED.getCode(), errMsg); + throw new WorkflowException(errMsg); } - - if (StringUtils.isNotBlank(sinkInfo.getInlongClusterName())) { - String info = "no need to auto-assign cluster since the cluster has already assigned"; - sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), info); - return; - } - - String targetCluster = assignOneCluster(sinkInfo); - Preconditions.expectNotBlank(targetCluster, - String.format("find no proper cluster assign to group=%s, stream=%s, sink type=%s, data node=%s ", - sinkInfo.getInlongGroupId(), sinkInfo.getInlongStreamId(), sinkInfo.getSinkType(), - sinkInfo.getDataNodeName())); - - StreamSinkEntity sink = sinkEntityMapper.selectByPrimaryKey(sinkInfo.getId()); - sink.setInlongClusterName(targetCluster); - sink.setStatus(SinkStatus.CONFIG_SUCCESSFUL.getCode()); - sinkEntityMapper.updateByIdSelective(sink); } private String assignOneCluster(SinkInfo sinkInfo) { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java index 52618005d7..fa8759d7b6 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java @@ -126,6 +126,7 @@ public class ClickHouseSinkOperator extends AbstractSinkOperator { Integer sinkId = request.getId(); for (SinkField fieldInfo : fieldList) { this.checkFieldInfo(fieldInfo); + fieldInfo.setExtParams(null); StreamSinkFieldEntity fieldEntity = CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new); if (StringUtils.isEmpty(fieldEntity.getFieldComment())) { fieldEntity.setFieldComment(fieldEntity.getFieldName()); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java index 7b2109c352..37b9202e8d 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java @@ -139,6 +139,7 @@ public class ElasticsearchSinkOperator extends AbstractSinkOperator { Integer sinkId = request.getId(); for (SinkField fieldInfo : fieldList) { this.checkFieldInfo(fieldInfo); + fieldInfo.setExtParams(null); StreamSinkFieldEntity fieldEntity = CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new); if (StringUtils.isEmpty(fieldEntity.getFieldComment())) { fieldEntity.setFieldComment(fieldEntity.getFieldName()); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java index 4001b79981..9ee5fb6d3a 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java @@ -147,6 +147,7 @@ public class IcebergSinkOperator extends AbstractSinkOperator { Integer sinkId = request.getId(); for (SinkField fieldInfo : fieldList) { this.checkFieldInfo(fieldInfo); + fieldInfo.setExtParams(null); StreamSinkFieldEntity fieldEntity = CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new); if (StringUtils.isEmpty(fieldEntity.getFieldComment())) { fieldEntity.setFieldComment(fieldEntity.getFieldName()); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kudu/KuduSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kudu/KuduSinkOperator.java index c547930a6d..949f864a50 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kudu/KuduSinkOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kudu/KuduSinkOperator.java @@ -119,6 +119,7 @@ public class KuduSinkOperator extends AbstractSinkOperator { Integer sinkId = request.getId(); for (SinkField fieldInfo : fieldList) { this.checkFieldInfo(fieldInfo); + fieldInfo.setExtParams(null); StreamSinkFieldEntity fieldEntity = CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new); if (StringUtils.isEmpty(fieldEntity.getFieldComment())) { fieldEntity.setFieldComment(fieldEntity.getFieldName()); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java index 29accea5e7..a3cc57a091 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java @@ -134,6 +134,7 @@ public class StarRocksSinkOperator extends AbstractSinkOperator { Integer sinkId = request.getId(); for (SinkField fieldInfo : fieldList) { this.checkFieldInfo(fieldInfo); + fieldInfo.setExtParams(null); StreamSinkFieldEntity fieldEntity = CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new); if (StringUtils.isEmpty(fieldEntity.getFieldComment())) { fieldEntity.setFieldComment(fieldEntity.getFieldName());