This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bad73345 [INLONG-6988][Manager] Use the data node info for StarRocks 
(#6989)
6bad73345 is described below

commit 6bad73345a13c9f8f8ad7f2b83e73ccfd7fc4653
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Wed Dec 21 10:34:12 2022 +0800

    [INLONG-6988][Manager] Use the data node info for StarRocks (#6989)
---
 .../inlong/manager/common/consts/DataNodeType.java |  1 +
 .../pojo/node/starrocks/StarRocksDataNodeDTO.java  | 64 ++++++++++++++++
 .../pojo/node/starrocks/StarRocksDataNodeInfo.java | 51 +++++++++++++
 .../node/starrocks/StarRocksDataNodeRequest.java   | 46 ++++++++++++
 .../node/starrocks/StarRocksDataNodeOperator.java  | 86 ++++++++++++++++++++++
 .../sink/starrocks/StarRocksResourceOperator.java  | 24 +++++-
 .../sink/starrocks/StarRocksSinkOperator.java      | 10 +++
 7 files changed, 281 insertions(+), 1 deletion(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
index 16fa92c77..a32adac37 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
@@ -29,5 +29,6 @@ public class DataNodeType {
     public static final String CLICKHOUSE = "CLICKHOUSE";
     public static final String ELASTICSEARCH = "ELASTICSEARCH";
     public static final String MYSQL = "MYSQL";
+    public static final String STARROCKS = "STARROCKS";
 
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/starrocks/StarRocksDataNodeDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/starrocks/StarRocksDataNodeDTO.java
new file mode 100644
index 000000000..c01d99c6b
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/starrocks/StarRocksDataNodeDTO.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.starrocks;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * StarRocks data node info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("StarRocks data node info")
+public class StarRocksDataNodeDTO {
+
+    @ApiModelProperty("StarRocks FE http address")
+    private String loadUrl;
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static StarRocksDataNodeDTO getFromRequest(StarRocksDataNodeRequest 
request) throws Exception {
+        return CommonBeanUtils.copyProperties(request, 
StarRocksDataNodeDTO::new, true);
+    }
+
+    /**
+     * Get the dto instance from the JSON string.
+     */
+    public static StarRocksDataNodeDTO getFromJson(@NotNull String extParams) {
+        try {
+            return JsonUtils.parseObject(extParams, 
StarRocksDataNodeDTO.class);
+        } catch (Exception e) {
+            throw new 
BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage());
+        }
+    }
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/starrocks/StarRocksDataNodeInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/starrocks/StarRocksDataNodeInfo.java
new file mode 100644
index 000000000..8595445e8
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/starrocks/StarRocksDataNodeInfo.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.starrocks;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+
+/**
+ * StarRocks data node info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.STARROCKS)
+@ApiModel("StarRocks data node info")
+public class StarRocksDataNodeInfo extends DataNodeInfo {
+
+    @ApiModelProperty("StarRocks FE http address")
+    private String loadUrl;
+
+    public StarRocksDataNodeInfo() {
+        this.setType(DataNodeType.STARROCKS);
+    }
+
+    @Override
+    public StarRocksDataNodeRequest genRequest() {
+        return CommonBeanUtils.copyProperties(this, 
StarRocksDataNodeRequest::new);
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/starrocks/StarRocksDataNodeRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/starrocks/StarRocksDataNodeRequest.java
new file mode 100644
index 000000000..164b94479
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/starrocks/StarRocksDataNodeRequest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.starrocks;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+
+/**
+ * StarRocks data node request
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.STARROCKS)
+@ApiModel("StarRocks data node request")
+public class StarRocksDataNodeRequest extends DataNodeRequest {
+
+    @ApiModelProperty("StarRocks FE http address")
+    private String loadUrl;
+
+    public StarRocksDataNodeRequest() {
+        this.setType(DataNodeType.STARROCKS);
+    }
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/starrocks/StarRocksDataNodeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/starrocks/StarRocksDataNodeOperator.java
new file mode 100644
index 000000000..fb82d894c
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/starrocks/StarRocksDataNodeOperator.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node.starrocks;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.apache.inlong.manager.pojo.node.starrocks.StarRocksDataNodeDTO;
+import org.apache.inlong.manager.pojo.node.starrocks.StarRocksDataNodeInfo;
+import org.apache.inlong.manager.pojo.node.starrocks.StarRocksDataNodeRequest;
+import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class StarRocksDataNodeOperator extends AbstractDataNodeOperator {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(StarRocksDataNodeOperator.class);
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(String dataNodeType) {
+        return getDataNodeType().equals(dataNodeType);
+    }
+
+    @Override
+    public String getDataNodeType() {
+        return DataNodeType.STARROCKS;
+    }
+
+    @Override
+    public DataNodeInfo getFromEntity(DataNodeEntity entity) {
+        if (entity == null) {
+            throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND);
+        }
+
+        StarRocksDataNodeInfo starRocksDataNodeInfo = new 
StarRocksDataNodeInfo();
+        CommonBeanUtils.copyProperties(entity, starRocksDataNodeInfo);
+        if (StringUtils.isNotBlank(entity.getExtParams())) {
+            StarRocksDataNodeDTO dto = 
StarRocksDataNodeDTO.getFromJson(entity.getExtParams());
+            CommonBeanUtils.copyProperties(dto, starRocksDataNodeInfo);
+        }
+
+        LOGGER.debug("success to get starRocks data node from entity");
+        return starRocksDataNodeInfo;
+    }
+
+    @Override
+    protected void setTargetEntity(DataNodeRequest request, DataNodeEntity 
targetEntity) {
+        StarRocksDataNodeRequest starRocksDataNodeRequest = 
(StarRocksDataNodeRequest) request;
+        CommonBeanUtils.copyProperties(starRocksDataNodeRequest, targetEntity, 
true);
+        try {
+            StarRocksDataNodeDTO dto = 
StarRocksDataNodeDTO.getFromRequest(starRocksDataNodeRequest);
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+            LOGGER.debug("success to set entity for starRocks data node");
+        } catch (Exception e) {
+            LOGGER.error("failed to set entity for starRocks data node: ", e);
+            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+        }
+    }
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksResourceOperator.java
index b657481b7..828bae027 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksResourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksResourceOperator.java
@@ -24,12 +24,15 @@ import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.common.exceptions.WorkflowException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
 import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.pojo.node.starrocks.StarRocksDataNodeInfo;
 import org.apache.inlong.manager.pojo.sink.SinkInfo;
 import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksColumnInfo;
 import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksSinkDTO;
 import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksTableInfo;
+import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
 import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
 import 
org.apache.inlong.manager.service.resource.sink.mysql.MySQLResourceOperator;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
@@ -55,6 +58,9 @@ public class StarRocksResourceOperator implements 
SinkResourceOperator {
     @Autowired
     private StreamSinkFieldEntityMapper fieldEntityMapper;
 
+    @Autowired
+    private DataNodeOperateHelper dataNodeHelper;
+
     @Override
     public Boolean accept(String sinkType) {
         return SinkType.STARROCKS.equals(sinkType);
@@ -87,7 +93,7 @@ public class StarRocksResourceOperator implements 
SinkResourceOperator {
         // get columns
         List<StarRocksColumnInfo> columnList = 
getStarRocksColumnInfoFromSink(fieldList);
 
-        StarRocksSinkDTO sinkDTO = 
StarRocksSinkDTO.getFromJson(sinkInfo.getExtParams());
+        StarRocksSinkDTO sinkDTO = getStarRocksInfo(sinkInfo);
         StarRocksTableInfo tableInfo = StarRocksSinkDTO.getTableInfo(sinkDTO, 
columnList);
         String url = sinkDTO.getJdbcUrl();
         String username = sinkDTO.getUsername();
@@ -131,4 +137,20 @@ public class StarRocksResourceOperator implements 
SinkResourceOperator {
         }
         return columnInfoList;
     }
+
+    private StarRocksSinkDTO getStarRocksInfo(SinkInfo sinkInfo) {
+        StarRocksSinkDTO starRocksInfo = 
StarRocksSinkDTO.getFromJson(sinkInfo.getExtParams());
+
+        // read from data node if not supplied by user
+        if (StringUtils.isBlank(starRocksInfo.getJdbcUrl())) {
+            String dataNodeName = sinkInfo.getDataNodeName();
+            Preconditions.checkNotEmpty(dataNodeName, "starRocks jdbc url not 
specified and data node is empty");
+            StarRocksDataNodeInfo dataNodeInfo = (StarRocksDataNodeInfo) 
dataNodeHelper.getDataNodeInfo(
+                    dataNodeName, sinkInfo.getSinkType());
+            CommonBeanUtils.copyProperties(dataNodeInfo, starRocksInfo);
+            starRocksInfo.setJdbcUrl(dataNodeInfo.getUrl());
+            starRocksInfo.setPassword(dataNodeInfo.getToken());
+        }
+        return starRocksInfo;
+    }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java
index e9ff58717..5b690fc19 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java
@@ -28,6 +28,7 @@ import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.pojo.node.starrocks.StarRocksDataNodeInfo;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -88,6 +89,15 @@ public class StarRocksSinkOperator extends 
AbstractSinkOperator {
         }
 
         StarRocksSinkDTO dto = 
StarRocksSinkDTO.getFromJson(entity.getExtParams());
+        if (StringUtils.isBlank(dto.getJdbcUrl())) {
+            Preconditions.checkNotEmpty(entity.getDataNodeName(),
+                    "starRocks jdbc url unspecified and data node is empty");
+            StarRocksDataNodeInfo dataNodeInfo = (StarRocksDataNodeInfo) 
dataNodeHelper.getDataNodeInfo(
+                    entity.getDataNodeName(), entity.getSinkType());
+            CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
+            dto.setJdbcUrl(dataNodeInfo.getUrl());
+            dto.setPassword(dataNodeInfo.getToken());
+        }
         Preconditions.checkNotEmpty(dto.getLoadUrl(), "StarRocks load url is 
empty");
         Preconditions.checkNotEmpty(dto.getJdbcUrl(), "StarRocks jdbc url is 
empty");
         CommonBeanUtils.copyProperties(entity, sink, true);

Reply via email to