This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 5cb23e18fa [INLONG-9259][Manager] Optimize Elasticsearch sink and datanode (#9276) 5cb23e18fa is described below commit 5cb23e18faa514f4838efd6a55c9abf579b25cf4 Author: vernedeng <verned...@apache.org> AuthorDate: Mon Nov 13 22:19:34 2023 +0800 [INLONG-9259][Manager] Optimize Elasticsearch sink and datanode (#9276) --- .../pojo/node/es/ElasticsearchDataNodeDTO.java | 12 +++ .../manager/pojo/sink/es/ElasticsearchSink.java | 13 ++++ .../manager/pojo/sink/es/ElasticsearchSinkDTO.java | 57 +++----------- .../pojo/sink/es/ElasticsearchSinkRequest.java | 37 ++------- .../node/es/ElasticsearchDataNodeOperator.java | 5 ++ .../sink/es/ElasticsearchResourceOperator.java | 87 ---------------------- .../service/sink/es/ElasticsearchSinkOperator.java | 17 ----- .../service/sink/ElasticsearchSinkServiceTest.java | 7 -- 8 files changed, 47 insertions(+), 188 deletions(-) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchDataNodeDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchDataNodeDTO.java index 79fba20888..e55e1ef338 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchDataNodeDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchDataNodeDTO.java @@ -73,6 +73,18 @@ public class ElasticsearchDataNodeDTO { @ApiModelProperty("audit set name") private String auditSetName; + @ApiModelProperty("http hosts") + private String httpHosts; + + @ApiModelProperty("user name") + private String username; + + @ApiModelProperty("token") + private String token; + + @ApiModelProperty("password") + private String password; + /** * Get the dto instance from the request */ diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSink.java index 5f92dddbeb..a99c148790 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSink.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSink.java @@ -83,6 +83,19 @@ public class ElasticsearchSink extends StreamSink { @ApiModelProperty("The multiple index-pattern of sink") private String indexPattern; + // sortstandalone + @ApiModelProperty("indexNamePattern") + private String indexNamePattern; + + @ApiModelProperty("contentOffset") + private Integer contentOffset; + + @ApiModelProperty("fieldOffset") + private Integer fieldOffset; + + @ApiModelProperty("separator") + private String separator; + public ElasticsearchSink() { this.setSinkType(SinkType.ELASTICSEARCH); } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java index 8d08988fc8..35565f2cd5 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java @@ -19,7 +19,6 @@ package org.apache.inlong.manager.pojo.sink.es; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; -import org.apache.inlong.manager.common.util.AESUtils; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; @@ -32,9 +31,6 @@ import org.apache.commons.lang3.StringUtils; import javax.validation.constraints.NotNull; -import java.nio.charset.StandardCharsets; -import java.util.Map; - /** * Sink info of Elasticsearch */ @@ -44,44 +40,17 @@ import java.util.Map; @AllArgsConstructor public class ElasticsearchSinkDTO { - @ApiModelProperty("Host of the Elasticsearch server") - private String hosts; - - @ApiModelProperty("Username of the Elasticsearch server") - private String username; - - @ApiModelProperty("User password of the Elasticsearch server") - private String password; - - @ApiModelProperty("Elasticsearch index name") - private String indexName; - - @ApiModelProperty("Flush interval, unit: second, default is 1s") - private Integer flushInterval; - - @ApiModelProperty("Flush when record number reaches flushRecord") - private Integer flushRecord; - - @ApiModelProperty("Write max retry times, default is 3") - private Integer retryTimes; + @ApiModelProperty("indexNamePattern") + private String indexNamePattern; - @ApiModelProperty("Key field names, separate with commas") - private String keyFieldNames; + @ApiModelProperty("contentOffset") + private Integer contentOffset; - @ApiModelProperty("Document Type") - private String documentType; + @ApiModelProperty("fieldOffset") + private Integer fieldOffset; - @ApiModelProperty("Primary Key") - private String primaryKey; - - @ApiModelProperty("Elasticsearch version") - private Integer esVersion; - - @ApiModelProperty("Password encrypt version") - private Integer encryptVersion; - - @ApiModelProperty("Properties for elasticsearch") - private Map<String, Object> properties; + @ApiModelProperty("separator") + private String separator; /** * Get the dto instance from the request @@ -98,19 +67,11 @@ public class ElasticsearchSinkDTO { */ public static ElasticsearchSinkDTO getFromJson(@NotNull String extParams) { try { - return JsonUtils.parseObject(extParams, ElasticsearchSinkDTO.class).decryptPassword(); + return JsonUtils.parseObject(extParams, ElasticsearchSinkDTO.class); } catch (Exception e) { throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT, String.format("parse extParams of Elasticsearch SinkDTO failure: %s", e.getMessage())); } } - private ElasticsearchSinkDTO decryptPassword() throws Exception { - if (StringUtils.isNotEmpty(this.password)) { - byte[] passwordBytes = AESUtils.decryptAsString(this.password, this.encryptVersion); - this.password = new String(passwordBytes, StandardCharsets.UTF_8); - } - return this; - } - } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkRequest.java index 74d009641c..182308fe0a 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkRequest.java @@ -37,37 +37,16 @@ import lombok.ToString; @JsonTypeDefine(value = SinkType.ELASTICSEARCH) public class ElasticsearchSinkRequest extends SinkRequest { - @ApiModelProperty("Host of the Elasticsearch server") - private String hosts; + @ApiModelProperty("indexNamePattern") + private String indexNamePattern; - @ApiModelProperty("Username of the Elasticsearch server") - private String username; + @ApiModelProperty("contentOffset") + private Integer contentOffset; - @ApiModelProperty("User password of the Elasticsearch server") - private String password; + @ApiModelProperty("fieldOffset") + private Integer fieldOffset; - @ApiModelProperty("Elasticsearch index name") - private String indexName; - - @ApiModelProperty("Flush interval, unit: second, default is 1s") - private Integer flushInterval; - - @ApiModelProperty("Flush when record number reaches flushRecord") - private Integer flushRecord; - - @ApiModelProperty("Write max retry times, default is 3") - private Integer retryTimes; - - @ApiModelProperty("Key field names, separate with commas") - private String keyFieldNames; - - @ApiModelProperty("Document Type") - private String documentType; - - @ApiModelProperty("Primary Key") - private String primaryKey; - - @ApiModelProperty("Elasticsearch version") - private Integer esVersion; + @ApiModelProperty("separator") + private String separator; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java index 272af930bc..7c25f0502a 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java @@ -45,6 +45,9 @@ public class ElasticsearchDataNodeOperator extends AbstractDataNodeOperator { private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchDataNodeOperator.class); + // in order to compatible with the old sortstandalone version + public static final String KEY_PASSWORD = "password"; + @Autowired private ObjectMapper objectMapper; @@ -65,6 +68,8 @@ public class ElasticsearchDataNodeOperator extends AbstractDataNodeOperator { try { ElasticsearchDataNodeDTO dto = ElasticsearchDataNodeDTO.getFromRequest(esRequest, targetEntity.getExtParams()); + dto.setHttpHosts(request.getUrl()); + dto.setPassword(request.getToken()); targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); } catch (Exception e) { throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT, diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchResourceOperator.java index 70165bf541..87c1086836 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchResourceOperator.java @@ -20,27 +20,13 @@ package org.apache.inlong.manager.service.resource.sink.es; import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.consts.SinkType; import org.apache.inlong.manager.common.enums.SinkStatus; -import org.apache.inlong.manager.common.exceptions.WorkflowException; -import org.apache.inlong.manager.common.util.CommonBeanUtils; -import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity; -import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper; import org.apache.inlong.manager.pojo.sink.SinkInfo; -import org.apache.inlong.manager.pojo.sink.es.ElasticsearchFieldInfo; -import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSinkDTO; -import org.apache.inlong.manager.service.node.DataNodeOperateHelper; import org.apache.inlong.manager.service.resource.sink.AbstractStandaloneSinkResourceOperator; -import org.apache.inlong.manager.service.sink.StreamSinkService; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import java.util.ArrayList; -import java.util.List; - /** * Elasticsearch's resource operator */ @@ -48,12 +34,6 @@ import java.util.List; public class ElasticsearchResourceOperator extends AbstractStandaloneSinkResourceOperator { private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchResourceOperator.class); - @Autowired - private StreamSinkService sinkService; - @Autowired - private StreamSinkFieldEntityMapper sinkFieldMapper; - @Autowired - private DataNodeOperateHelper dataNodeHelper; @Override public Boolean accept(String sinkType) { @@ -75,74 +55,7 @@ public class ElasticsearchResourceOperator extends AbstractStandaloneSinkResourc return; } - this.createIndex(sinkInfo); this.assignCluster(sinkInfo); } - private void createIndex(SinkInfo sinkInfo) { - LOGGER.info("begin to create es index for sinkId={}", sinkInfo.getId()); - - List<StreamSinkFieldEntity> sinkList = sinkFieldMapper.selectBySinkId(sinkInfo.getId()); - if (CollectionUtils.isEmpty(sinkList)) { - LOGGER.warn("no es fields found, skip to create es index for sinkId={}", sinkInfo.getId()); - } - - // set fields - List<ElasticsearchFieldInfo> fieldList = getElasticsearchFieldFromSink(sinkList); - - try { - ElasticsearchApi client = new ElasticsearchApi(); - ElasticsearchSinkDTO esInfo = ElasticsearchSinkDTO.getFromJson(sinkInfo.getExtParams()); - client.setEsConfig(getElasticsearchConfig(sinkInfo, esInfo)); - String indexName = esInfo.getIndexName(); - boolean indexExists = client.indexExists(indexName); - - // 3. index not exists, create it - if (!indexExists) { - client.createIndexAndMapping(indexName, fieldList); - } else { - // 4. index exists, add fields - skip the exists fields - client.addNotExistFields(indexName, fieldList); - } - - // 5. update the sink status to success - String info = "success to create es resource"; - sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), info); - LOGGER.info(info + " for sinkInfo={}", sinkInfo); - } catch (Throwable e) { - String errMsg = "Create Elasticsearch index failed: " + e.getMessage(); - LOGGER.error(errMsg, e); - sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_FAILED.getCode(), errMsg); - throw new WorkflowException(errMsg); - } - } - - public List<ElasticsearchFieldInfo> getElasticsearchFieldFromSink(List<StreamSinkFieldEntity> sinkList) { - List<ElasticsearchFieldInfo> esFieldList = new ArrayList<>(); - for (StreamSinkFieldEntity fieldEntity : sinkList) { - if (StringUtils.isNotBlank(fieldEntity.getExtParams())) { - ElasticsearchFieldInfo elasticsearchFieldInfo = ElasticsearchFieldInfo.getFromJson( - fieldEntity.getExtParams()); - CommonBeanUtils.copyProperties(fieldEntity, elasticsearchFieldInfo, true); - esFieldList.add(elasticsearchFieldInfo); - } else { - ElasticsearchFieldInfo esFieldInfo = new ElasticsearchFieldInfo(); - CommonBeanUtils.copyProperties(fieldEntity, esFieldInfo, true); - esFieldList.add(esFieldInfo); - } - } - return esFieldList; - } - - private ElasticsearchConfig getElasticsearchConfig(SinkInfo sinkInfo, ElasticsearchSinkDTO esInfo) { - ElasticsearchConfig config = new ElasticsearchConfig(); - if (StringUtils.isNotEmpty(esInfo.getUsername())) { - config.setAuthEnable(true); - config.setUsername(esInfo.getUsername()); - config.setPassword(esInfo.getPassword()); - } - config.setHosts(esInfo.getHosts()); - return config; - } - } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java index 06ea6fac5f..fba388029f 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java @@ -17,12 +17,10 @@ package org.apache.inlong.manager.service.sink.es; -import org.apache.inlong.manager.common.consts.DataNodeType; import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.consts.SinkType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; -import org.apache.inlong.manager.common.util.AESUtils; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.dao.entity.StreamSinkEntity; import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity; @@ -44,7 +42,6 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -81,20 +78,6 @@ public class ElasticsearchSinkOperator extends AbstractSinkOperator { try { ElasticsearchSinkDTO dto = ElasticsearchSinkDTO.getFromRequest(sinkRequest, targetEntity.getExtParams()); - DataNodeInfo dataNodeInfo = - dataNodeHelper.getDataNodeInfo(request.getDataNodeName(), DataNodeType.ELASTICSEARCH); - String esUrl = dataNodeInfo.getUrl(); - dto.setHosts(esUrl); - - dto.setUsername(dataNodeInfo.getUsername()); - Integer encryptVersion = AESUtils.getCurrentVersion(null); - String passwd = null; - if (StringUtils.isNotEmpty(dataNodeInfo.getToken())) { - passwd = AESUtils.encryptToString(dataNodeInfo.getToken().getBytes(StandardCharsets.UTF_8), - encryptVersion); - } - dto.setPassword(passwd); - dto.setEncryptVersion(encryptVersion); targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); } catch (Exception e) { throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED, diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ElasticsearchSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ElasticsearchSinkServiceTest.java index ebf74d9574..964f216c9c 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ElasticsearchSinkServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ElasticsearchSinkServiceTest.java @@ -61,13 +61,6 @@ public class ElasticsearchSinkServiceTest extends ServiceBaseTest { sinkInfo.setInlongStreamId(globalStreamId); sinkInfo.setSinkType(SinkType.ELASTICSEARCH); - sinkInfo.setHosts("http://127.0.0.1:9200"); - sinkInfo.setUsername("elasticsearch"); - sinkInfo.setPassword("inlong"); - sinkInfo.setDocumentType("public"); - sinkInfo.setIndexName("index"); - sinkInfo.setPrimaryKey("name,age"); - sinkInfo.setEsVersion(7); sinkInfo.setDataNodeName(dataNodeName); sinkInfo.setSinkName(sinkName);