This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new fcc6ee24a [INLONG-6018][Manager] Using the Hive data node when creating group resources (#6020) fcc6ee24a is described below commit fcc6ee24a70364d5a232dc782112f1f6917dccb4 Author: woofyzhao <490467...@qq.com> AuthorDate: Tue Sep 27 20:15:01 2022 +0800 [INLONG-6018][Manager] Using the Hive data node when creating group resources (#6020) --- .../resources/mappers/StreamSinkEntityMapper.xml | 1 + .../manager/pojo/node/hive/HiveDataNodeInfo.java | 3 -- .../apache/inlong/manager/pojo/sink/SinkInfo.java | 1 + .../service/node/DataNodeOperateHelper.java | 54 ++++++++++++++++++++++ .../resource/sink/hive/HiveResourceOperator.java | 31 ++++++++++++- 5 files changed, 86 insertions(+), 4 deletions(-) diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml index bd9b72330..5d1d3ea25 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml @@ -324,6 +324,7 @@ sink.status, sink.creator, sink.sink_name, + sink.data_node_name, stream.mq_resource, stream.data_type, stream.data_separator as sourceSeparator, diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeInfo.java index 34b50ed06..8602762e4 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeInfo.java @@ -37,9 +37,6 @@ import org.apache.inlong.manager.pojo.node.DataNodeInfo; @ApiModel("Hive data node info") public class HiveDataNodeInfo extends DataNodeInfo { - @ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}") - private String jdbcUrl; - @ApiModelProperty("Version for Hive, such as: 3.2.1") private String hiveVersion; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java index 916ae199e..c2d2c3fff 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java @@ -32,6 +32,7 @@ public class SinkInfo { private String inlongStreamId; private String sinkType; private String sinkName; + private String dataNodeName; private String description; private Integer enableCreateResource; private String extParams; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperateHelper.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperateHelper.java new file mode 100644 index 000000000..a77af7b66 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperateHelper.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.node; + +import lombok.extern.slf4j.Slf4j; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.dao.entity.DataNodeEntity; +import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper; +import org.apache.inlong.manager.pojo.node.DataNodeInfo; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * Data node helper service + */ +@Slf4j +@Service +public class DataNodeOperateHelper { + + @Autowired + private DataNodeEntityMapper dataNodeMapper; + @Autowired + private DataNodeOperatorFactory operatorFactory; + + /** + * Get data node info by name and type + */ + public DataNodeInfo getDataNodeInfo(String dataNodeName, String dataNodeType) { + DataNodeEntity entity = dataNodeMapper.selectByNameAndType(dataNodeName, dataNodeType); + if (entity == null) { + log.error("data node not found by name={}, type={}", dataNodeName, dataNodeType); + throw new BusinessException("data node not found"); + } + DataNodeOperator dataNodeOperator = operatorFactory.getInstance(dataNodeType); + DataNodeInfo dataNodeInfo = dataNodeOperator.getFromEntity(entity); + log.debug("success to get data node info by name={}, type={}", dataNodeName, dataNodeType); + return dataNodeInfo; + } +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hive/HiveResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hive/HiveResourceOperator.java index c67559140..9542c7e03 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hive/HiveResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hive/HiveResourceOperator.java @@ -18,16 +18,21 @@ package org.apache.inlong.manager.service.resource.sink.hive; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.consts.SinkType; import org.apache.inlong.manager.common.enums.SinkStatus; import org.apache.inlong.manager.common.exceptions.WorkflowException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity; import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper; +import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeInfo; import org.apache.inlong.manager.pojo.sink.SinkInfo; import org.apache.inlong.manager.pojo.sink.hive.HiveColumnInfo; import org.apache.inlong.manager.pojo.sink.hive.HiveSinkDTO; import org.apache.inlong.manager.pojo.sink.hive.HiveTableInfo; +import org.apache.inlong.manager.service.node.DataNodeOperateHelper; import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator; import org.apache.inlong.manager.service.sink.StreamSinkService; import org.slf4j.Logger; @@ -51,6 +56,8 @@ public class HiveResourceOperator implements SinkResourceOperator { private StreamSinkService sinkService; @Autowired private StreamSinkFieldEntityMapper sinkFieldMapper; + @Autowired + private DataNodeOperateHelper dataNodeHelper; @Override public Boolean accept(String sinkType) { @@ -78,6 +85,28 @@ public class HiveResourceOperator implements SinkResourceOperator { this.createTable(sinkInfo); } + private HiveSinkDTO getHiveInfo(SinkInfo sinkInfo) { + HiveSinkDTO hiveInfo = new HiveSinkDTO(); + + if (StringUtils.isNotBlank(sinkInfo.getExtParams())) { + HiveSinkDTO userSinkInfo = HiveSinkDTO.getFromJson(sinkInfo.getExtParams()); + CommonBeanUtils.copyProperties(userSinkInfo, hiveInfo); + } + + // read from data node if not supplied by user + if (StringUtils.isBlank(hiveInfo.getJdbcUrl())) { + String dataNodeName = sinkInfo.getDataNodeName(); + Preconditions.checkNotEmpty(dataNodeName, "hive jdbc url not specified and data node is empty"); + HiveDataNodeInfo dataNodeInfo = (HiveDataNodeInfo) dataNodeHelper.getDataNodeInfo( + dataNodeName, sinkInfo.getSinkType()); + CommonBeanUtils.copyProperties(dataNodeInfo, hiveInfo); + hiveInfo.setJdbcUrl(dataNodeInfo.getUrl()); + hiveInfo.setUsername(dataNodeInfo.getUsername()); + hiveInfo.setPassword(dataNodeInfo.getToken()); + } + return hiveInfo; + } + private void createTable(SinkInfo sinkInfo) { LOGGER.info("begin to create hive table for sinkId={}", sinkInfo.getId()); @@ -97,7 +126,7 @@ public class HiveResourceOperator implements SinkResourceOperator { } try { - HiveSinkDTO hiveInfo = HiveSinkDTO.getFromJson(sinkInfo.getExtParams()); + HiveSinkDTO hiveInfo = this.getHiveInfo(sinkInfo); HiveTableInfo tableInfo = HiveSinkDTO.getHiveTableInfo(hiveInfo, columnList); String url = hiveInfo.getJdbcUrl(); String user = hiveInfo.getUsername();