This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b9da03b43 [INLONG-9523][Manager] Fix the problem of sink remains in 
configuration after standalone cluster allocation failure (#9553)
6b9da03b43 is described below

commit 6b9da03b43f83db56cefe28b679aaa04d6a755cf
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Tue Jan 9 14:06:43 2024 +0800

    [INLONG-9523][Manager] Fix the problem of sink remains in configuration 
after standalone cluster allocation failure (#9553)
---
 .../main/resources/mappers/AuditEntityMapper.xml   |  2 +
 .../AbstractStandaloneSinkResourceOperator.java    | 46 +++++++++++++---------
 .../service/sink/ck/ClickHouseSinkOperator.java    |  1 +
 .../service/sink/es/ElasticsearchSinkOperator.java |  1 +
 .../service/sink/iceberg/IcebergSinkOperator.java  |  1 +
 .../service/sink/kudu/KuduSinkOperator.java        |  1 +
 .../sink/starrocks/StarRocksSinkOperator.java      |  1 +
 7 files changed, 34 insertions(+), 19 deletions(-)

diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml 
b/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml
index 63d6af6c73..f5cce709e3 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml
@@ -41,6 +41,7 @@
         <result column="log_ts" property="logTs" jdbcType="VARCHAR"/>
         <result column="total" property="total" jdbcType="BIGINT"/>
         <result column="total_delay" property="totalDelay" jdbcType="BIGINT"/>
+        <result column="total_size" property="totalSize" jdbcType="BIGINT"/>
     </resultMap>
 
     <resultMap id="SumGroupByIdResultMap" type="java.util.Map">
@@ -50,6 +51,7 @@
         <result column="ip" property="ip" jdbcType="VARCHAR"/>
         <result column="total" property="total" jdbcType="BIGINT"/>
         <result column="total_delay" property="totalDelay" jdbcType="BIGINT"/>
+        <result column="total_size" property="totalSize" jdbcType="BIGINT"/>
     </resultMap>
 
     <select id="sumByLogTs" resultMap="SumByLogTsResultMap">
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
index 79893ad9c0..75174f120a 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.resource.sink;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.SinkStatus;
+import org.apache.inlong.manager.common.exceptions.WorkflowException;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
@@ -55,26 +56,33 @@ public abstract class 
AbstractStandaloneSinkResourceOperator implements SinkReso
 
     @VisibleForTesting
     protected void assignCluster(SinkInfo sinkInfo) {
-        if (StringUtils.isBlank(sinkInfo.getSinkType())) {
-            throw new 
IllegalArgumentException(ErrorCodeEnum.SINK_TYPE_IS_NULL.getMessage());
+        try {
+            if (StringUtils.isBlank(sinkInfo.getSinkType())) {
+                throw new 
IllegalArgumentException(ErrorCodeEnum.SINK_TYPE_IS_NULL.getMessage());
+            }
+
+            if (StringUtils.isNotBlank(sinkInfo.getInlongClusterName())) {
+                String info = "no need to auto-assign cluster since the 
cluster has already assigned";
+                sinkService.updateStatus(sinkInfo.getId(), 
SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
+                return;
+            }
+
+            String targetCluster = assignOneCluster(sinkInfo);
+            Preconditions.expectNotBlank(targetCluster,
+                    String.format("find no proper cluster assign to group=%s, 
stream=%s, sink type=%s, data node=%s ",
+                            sinkInfo.getInlongGroupId(), 
sinkInfo.getInlongStreamId(), sinkInfo.getSinkType(),
+                            sinkInfo.getDataNodeName()));
+
+            StreamSinkEntity sink = 
sinkEntityMapper.selectByPrimaryKey(sinkInfo.getId());
+            sink.setInlongClusterName(targetCluster);
+            sink.setStatus(SinkStatus.CONFIG_SUCCESSFUL.getCode());
+            sinkEntityMapper.updateByIdSelective(sink);
+        } catch (Throwable e) {
+            String errMsg = "assign standalone cluster failed: " + 
e.getMessage();
+            log.error(errMsg, e);
+            sinkService.updateStatus(sinkInfo.getId(), 
SinkStatus.CONFIG_FAILED.getCode(), errMsg);
+            throw new WorkflowException(errMsg);
         }
-
-        if (StringUtils.isNotBlank(sinkInfo.getInlongClusterName())) {
-            String info = "no need to auto-assign cluster since the cluster 
has already assigned";
-            sinkService.updateStatus(sinkInfo.getId(), 
SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
-            return;
-        }
-
-        String targetCluster = assignOneCluster(sinkInfo);
-        Preconditions.expectNotBlank(targetCluster,
-                String.format("find no proper cluster assign to group=%s, 
stream=%s, sink type=%s, data node=%s ",
-                        sinkInfo.getInlongGroupId(), 
sinkInfo.getInlongStreamId(), sinkInfo.getSinkType(),
-                        sinkInfo.getDataNodeName()));
-
-        StreamSinkEntity sink = 
sinkEntityMapper.selectByPrimaryKey(sinkInfo.getId());
-        sink.setInlongClusterName(targetCluster);
-        sink.setStatus(SinkStatus.CONFIG_SUCCESSFUL.getCode());
-        sinkEntityMapper.updateByIdSelective(sink);
     }
 
     private String assignOneCluster(SinkInfo sinkInfo) {
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java
index 52618005d7..fa8759d7b6 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java
@@ -126,6 +126,7 @@ public class ClickHouseSinkOperator extends 
AbstractSinkOperator {
         Integer sinkId = request.getId();
         for (SinkField fieldInfo : fieldList) {
             this.checkFieldInfo(fieldInfo);
+            fieldInfo.setExtParams(null);
             StreamSinkFieldEntity fieldEntity = 
CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new);
             if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
                 fieldEntity.setFieldComment(fieldEntity.getFieldName());
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
index 7b2109c352..37b9202e8d 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
@@ -139,6 +139,7 @@ public class ElasticsearchSinkOperator extends 
AbstractSinkOperator {
         Integer sinkId = request.getId();
         for (SinkField fieldInfo : fieldList) {
             this.checkFieldInfo(fieldInfo);
+            fieldInfo.setExtParams(null);
             StreamSinkFieldEntity fieldEntity = 
CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new);
             if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
                 fieldEntity.setFieldComment(fieldEntity.getFieldName());
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
index 4001b79981..9ee5fb6d3a 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
@@ -147,6 +147,7 @@ public class IcebergSinkOperator extends 
AbstractSinkOperator {
         Integer sinkId = request.getId();
         for (SinkField fieldInfo : fieldList) {
             this.checkFieldInfo(fieldInfo);
+            fieldInfo.setExtParams(null);
             StreamSinkFieldEntity fieldEntity = 
CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new);
             if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
                 fieldEntity.setFieldComment(fieldEntity.getFieldName());
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kudu/KuduSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kudu/KuduSinkOperator.java
index c547930a6d..949f864a50 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kudu/KuduSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kudu/KuduSinkOperator.java
@@ -119,6 +119,7 @@ public class KuduSinkOperator extends AbstractSinkOperator {
         Integer sinkId = request.getId();
         for (SinkField fieldInfo : fieldList) {
             this.checkFieldInfo(fieldInfo);
+            fieldInfo.setExtParams(null);
             StreamSinkFieldEntity fieldEntity = 
CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new);
             if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
                 fieldEntity.setFieldComment(fieldEntity.getFieldName());
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java
index 29accea5e7..a3cc57a091 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java
@@ -134,6 +134,7 @@ public class StarRocksSinkOperator extends 
AbstractSinkOperator {
         Integer sinkId = request.getId();
         for (SinkField fieldInfo : fieldList) {
             this.checkFieldInfo(fieldInfo);
+            fieldInfo.setExtParams(null);
             StreamSinkFieldEntity fieldEntity = 
CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new);
             if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
                 fieldEntity.setFieldComment(fieldEntity.getFieldName());

Reply via email to