This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 6bad73345 [INLONG-6988][Manager] Use the data node info for StarRocks (#6989) 6bad73345 is described below commit 6bad73345a13c9f8f8ad7f2b83e73ccfd7fc4653 Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Wed Dec 21 10:34:12 2022 +0800 [INLONG-6988][Manager] Use the data node info for StarRocks (#6989) --- .../inlong/manager/common/consts/DataNodeType.java | 1 + .../pojo/node/starrocks/StarRocksDataNodeDTO.java | 64 ++++++++++++++++ .../pojo/node/starrocks/StarRocksDataNodeInfo.java | 51 +++++++++++++ .../node/starrocks/StarRocksDataNodeRequest.java | 46 ++++++++++++ .../node/starrocks/StarRocksDataNodeOperator.java | 86 ++++++++++++++++++++++ .../sink/starrocks/StarRocksResourceOperator.java | 24 +++++- .../sink/starrocks/StarRocksSinkOperator.java | 10 +++ 7 files changed, 281 insertions(+), 1 deletion(-) diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java index 16fa92c77..a32adac37 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java @@ -29,5 +29,6 @@ public class DataNodeType { public static final String CLICKHOUSE = "CLICKHOUSE"; public static final String ELASTICSEARCH = "ELASTICSEARCH"; public static final String MYSQL = "MYSQL"; + public static final String STARROCKS = "STARROCKS"; } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/starrocks/StarRocksDataNodeDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/starrocks/StarRocksDataNodeDTO.java new file mode 100644 index 000000000..c01d99c6b --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/starrocks/StarRocksDataNodeDTO.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.node.starrocks; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonUtils; + +import javax.validation.constraints.NotNull; + +/** + * StarRocks data node info + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@ApiModel("StarRocks data node info") +public class StarRocksDataNodeDTO { + + @ApiModelProperty("StarRocks FE http address") + private String loadUrl; + + /** + * Get the dto instance from the request + */ + public static StarRocksDataNodeDTO getFromRequest(StarRocksDataNodeRequest request) throws Exception { + return CommonBeanUtils.copyProperties(request, StarRocksDataNodeDTO::new, true); + } + + /** + * Get the dto instance from the JSON string. + */ + public static StarRocksDataNodeDTO getFromJson(@NotNull String extParams) { + try { + return JsonUtils.parseObject(extParams, StarRocksDataNodeDTO.class); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage()); + } + } + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/starrocks/StarRocksDataNodeInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/starrocks/StarRocksDataNodeInfo.java new file mode 100644 index 000000000..8595445e8 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/starrocks/StarRocksDataNodeInfo.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.node.starrocks; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.inlong.manager.common.consts.DataNodeType; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.node.DataNodeInfo; + +/** + * StarRocks data node info + */ +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = DataNodeType.STARROCKS) +@ApiModel("StarRocks data node info") +public class StarRocksDataNodeInfo extends DataNodeInfo { + + @ApiModelProperty("StarRocks FE http address") + private String loadUrl; + + public StarRocksDataNodeInfo() { + this.setType(DataNodeType.STARROCKS); + } + + @Override + public StarRocksDataNodeRequest genRequest() { + return CommonBeanUtils.copyProperties(this, StarRocksDataNodeRequest::new); + } +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/starrocks/StarRocksDataNodeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/starrocks/StarRocksDataNodeRequest.java new file mode 100644 index 000000000..164b94479 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/starrocks/StarRocksDataNodeRequest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.node.starrocks; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.inlong.manager.common.consts.DataNodeType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.node.DataNodeRequest; + +/** + * StarRocks data node request + */ +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = DataNodeType.STARROCKS) +@ApiModel("StarRocks data node request") +public class StarRocksDataNodeRequest extends DataNodeRequest { + + @ApiModelProperty("StarRocks FE http address") + private String loadUrl; + + public StarRocksDataNodeRequest() { + this.setType(DataNodeType.STARROCKS); + } + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/starrocks/StarRocksDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/starrocks/StarRocksDataNodeOperator.java new file mode 100644 index 000000000..fb82d894c --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/starrocks/StarRocksDataNodeOperator.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.node.starrocks; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; +import org.apache.inlong.manager.common.consts.DataNodeType; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.dao.entity.DataNodeEntity; +import org.apache.inlong.manager.pojo.node.DataNodeInfo; +import org.apache.inlong.manager.pojo.node.DataNodeRequest; +import org.apache.inlong.manager.pojo.node.starrocks.StarRocksDataNodeDTO; +import org.apache.inlong.manager.pojo.node.starrocks.StarRocksDataNodeInfo; +import org.apache.inlong.manager.pojo.node.starrocks.StarRocksDataNodeRequest; +import org.apache.inlong.manager.service.node.AbstractDataNodeOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +public class StarRocksDataNodeOperator extends AbstractDataNodeOperator { + + private static final Logger LOGGER = LoggerFactory.getLogger(StarRocksDataNodeOperator.class); + + @Autowired + private ObjectMapper objectMapper; + + @Override + public Boolean accept(String dataNodeType) { + return getDataNodeType().equals(dataNodeType); + } + + @Override + public String getDataNodeType() { + return DataNodeType.STARROCKS; + } + + @Override + public DataNodeInfo getFromEntity(DataNodeEntity entity) { + if (entity == null) { + throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND); + } + + StarRocksDataNodeInfo starRocksDataNodeInfo = new StarRocksDataNodeInfo(); + CommonBeanUtils.copyProperties(entity, starRocksDataNodeInfo); + if (StringUtils.isNotBlank(entity.getExtParams())) { + StarRocksDataNodeDTO dto = StarRocksDataNodeDTO.getFromJson(entity.getExtParams()); + CommonBeanUtils.copyProperties(dto, starRocksDataNodeInfo); + } + + LOGGER.debug("success to get starRocks data node from entity"); + return starRocksDataNodeInfo; + } + + @Override + protected void setTargetEntity(DataNodeRequest request, DataNodeEntity targetEntity) { + StarRocksDataNodeRequest starRocksDataNodeRequest = (StarRocksDataNodeRequest) request; + CommonBeanUtils.copyProperties(starRocksDataNodeRequest, targetEntity, true); + try { + StarRocksDataNodeDTO dto = StarRocksDataNodeDTO.getFromRequest(starRocksDataNodeRequest); + targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); + LOGGER.debug("success to set entity for starRocks data node"); + } catch (Exception e) { + LOGGER.error("failed to set entity for starRocks data node: ", e); + throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage()); + } + } +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksResourceOperator.java index b657481b7..828bae027 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksResourceOperator.java @@ -24,12 +24,15 @@ import org.apache.inlong.manager.common.consts.SinkType; import org.apache.inlong.manager.common.enums.SinkStatus; import org.apache.inlong.manager.common.exceptions.WorkflowException; import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity; import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper; +import org.apache.inlong.manager.pojo.node.starrocks.StarRocksDataNodeInfo; import org.apache.inlong.manager.pojo.sink.SinkInfo; import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksColumnInfo; import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksSinkDTO; import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksTableInfo; +import org.apache.inlong.manager.service.node.DataNodeOperateHelper; import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator; import org.apache.inlong.manager.service.resource.sink.mysql.MySQLResourceOperator; import org.apache.inlong.manager.service.sink.StreamSinkService; @@ -55,6 +58,9 @@ public class StarRocksResourceOperator implements SinkResourceOperator { @Autowired private StreamSinkFieldEntityMapper fieldEntityMapper; + @Autowired + private DataNodeOperateHelper dataNodeHelper; + @Override public Boolean accept(String sinkType) { return SinkType.STARROCKS.equals(sinkType); @@ -87,7 +93,7 @@ public class StarRocksResourceOperator implements SinkResourceOperator { // get columns List<StarRocksColumnInfo> columnList = getStarRocksColumnInfoFromSink(fieldList); - StarRocksSinkDTO sinkDTO = StarRocksSinkDTO.getFromJson(sinkInfo.getExtParams()); + StarRocksSinkDTO sinkDTO = getStarRocksInfo(sinkInfo); StarRocksTableInfo tableInfo = StarRocksSinkDTO.getTableInfo(sinkDTO, columnList); String url = sinkDTO.getJdbcUrl(); String username = sinkDTO.getUsername(); @@ -131,4 +137,20 @@ public class StarRocksResourceOperator implements SinkResourceOperator { } return columnInfoList; } + + private StarRocksSinkDTO getStarRocksInfo(SinkInfo sinkInfo) { + StarRocksSinkDTO starRocksInfo = StarRocksSinkDTO.getFromJson(sinkInfo.getExtParams()); + + // read from data node if not supplied by user + if (StringUtils.isBlank(starRocksInfo.getJdbcUrl())) { + String dataNodeName = sinkInfo.getDataNodeName(); + Preconditions.checkNotEmpty(dataNodeName, "starRocks jdbc url not specified and data node is empty"); + StarRocksDataNodeInfo dataNodeInfo = (StarRocksDataNodeInfo) dataNodeHelper.getDataNodeInfo( + dataNodeName, sinkInfo.getSinkType()); + CommonBeanUtils.copyProperties(dataNodeInfo, starRocksInfo); + starRocksInfo.setJdbcUrl(dataNodeInfo.getUrl()); + starRocksInfo.setPassword(dataNodeInfo.getToken()); + } + return starRocksInfo; + } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java index e9ff58717..5b690fc19 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java @@ -28,6 +28,7 @@ import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.dao.entity.StreamSinkEntity; import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity; +import org.apache.inlong.manager.pojo.node.starrocks.StarRocksDataNodeInfo; import org.apache.inlong.manager.pojo.sink.SinkField; import org.apache.inlong.manager.pojo.sink.SinkRequest; import org.apache.inlong.manager.pojo.sink.StreamSink; @@ -88,6 +89,15 @@ public class StarRocksSinkOperator extends AbstractSinkOperator { } StarRocksSinkDTO dto = StarRocksSinkDTO.getFromJson(entity.getExtParams()); + if (StringUtils.isBlank(dto.getJdbcUrl())) { + Preconditions.checkNotEmpty(entity.getDataNodeName(), + "starRocks jdbc url unspecified and data node is empty"); + StarRocksDataNodeInfo dataNodeInfo = (StarRocksDataNodeInfo) dataNodeHelper.getDataNodeInfo( + entity.getDataNodeName(), entity.getSinkType()); + CommonBeanUtils.copyProperties(dataNodeInfo, dto, true); + dto.setJdbcUrl(dataNodeInfo.getUrl()); + dto.setPassword(dataNodeInfo.getToken()); + } Preconditions.checkNotEmpty(dto.getLoadUrl(), "StarRocks load url is empty"); Preconditions.checkNotEmpty(dto.getJdbcUrl(), "StarRocks jdbc url is empty"); CommonBeanUtils.copyProperties(entity, sink, true);