This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5cb23e18fa [INLONG-9259][Manager] Optimize Elasticsearch sink and 
datanode (#9276)
5cb23e18fa is described below

commit 5cb23e18faa514f4838efd6a55c9abf579b25cf4
Author: vernedeng <verned...@apache.org>
AuthorDate: Mon Nov 13 22:19:34 2023 +0800

    [INLONG-9259][Manager] Optimize Elasticsearch sink and datanode (#9276)
---
 .../pojo/node/es/ElasticsearchDataNodeDTO.java     | 12 +++
 .../manager/pojo/sink/es/ElasticsearchSink.java    | 13 ++++
 .../manager/pojo/sink/es/ElasticsearchSinkDTO.java | 57 +++-----------
 .../pojo/sink/es/ElasticsearchSinkRequest.java     | 37 ++-------
 .../node/es/ElasticsearchDataNodeOperator.java     |  5 ++
 .../sink/es/ElasticsearchResourceOperator.java     | 87 ----------------------
 .../service/sink/es/ElasticsearchSinkOperator.java | 17 -----
 .../service/sink/ElasticsearchSinkServiceTest.java |  7 --
 8 files changed, 47 insertions(+), 188 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchDataNodeDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchDataNodeDTO.java
index 79fba20888..e55e1ef338 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchDataNodeDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchDataNodeDTO.java
@@ -73,6 +73,18 @@ public class ElasticsearchDataNodeDTO {
     @ApiModelProperty("audit set name")
     private String auditSetName;
 
+    @ApiModelProperty("http hosts")
+    private String httpHosts;
+
+    @ApiModelProperty("user name")
+    private String username;
+
+    @ApiModelProperty("token")
+    private String token;
+
+    @ApiModelProperty("password")
+    private String password;
+
     /**
      * Get the dto instance from the request
      */
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSink.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSink.java
index 5f92dddbeb..a99c148790 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSink.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSink.java
@@ -83,6 +83,19 @@ public class ElasticsearchSink extends StreamSink {
     @ApiModelProperty("The multiple index-pattern of sink")
     private String indexPattern;
 
+    // sortstandalone
+    @ApiModelProperty("indexNamePattern")
+    private String indexNamePattern;
+
+    @ApiModelProperty("contentOffset")
+    private Integer contentOffset;
+
+    @ApiModelProperty("fieldOffset")
+    private Integer fieldOffset;
+
+    @ApiModelProperty("separator")
+    private String separator;
+
     public ElasticsearchSink() {
         this.setSinkType(SinkType.ELASTICSEARCH);
     }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java
index 8d08988fc8..35565f2cd5 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.pojo.sink.es;
 
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.util.AESUtils;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
 
@@ -32,9 +31,6 @@ import org.apache.commons.lang3.StringUtils;
 
 import javax.validation.constraints.NotNull;
 
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-
 /**
  * Sink info of Elasticsearch
  */
@@ -44,44 +40,17 @@ import java.util.Map;
 @AllArgsConstructor
 public class ElasticsearchSinkDTO {
 
-    @ApiModelProperty("Host of the Elasticsearch server")
-    private String hosts;
-
-    @ApiModelProperty("Username of the Elasticsearch server")
-    private String username;
-
-    @ApiModelProperty("User password of the Elasticsearch server")
-    private String password;
-
-    @ApiModelProperty("Elasticsearch index name")
-    private String indexName;
-
-    @ApiModelProperty("Flush interval, unit: second, default is 1s")
-    private Integer flushInterval;
-
-    @ApiModelProperty("Flush when record number reaches flushRecord")
-    private Integer flushRecord;
-
-    @ApiModelProperty("Write max retry times, default is 3")
-    private Integer retryTimes;
+    @ApiModelProperty("indexNamePattern")
+    private String indexNamePattern;
 
-    @ApiModelProperty("Key field names, separate with commas")
-    private String keyFieldNames;
+    @ApiModelProperty("contentOffset")
+    private Integer contentOffset;
 
-    @ApiModelProperty("Document Type")
-    private String documentType;
+    @ApiModelProperty("fieldOffset")
+    private Integer fieldOffset;
 
-    @ApiModelProperty("Primary Key")
-    private String primaryKey;
-
-    @ApiModelProperty("Elasticsearch version")
-    private Integer esVersion;
-
-    @ApiModelProperty("Password encrypt version")
-    private Integer encryptVersion;
-
-    @ApiModelProperty("Properties for elasticsearch")
-    private Map<String, Object> properties;
+    @ApiModelProperty("separator")
+    private String separator;
 
     /**
      * Get the dto instance from the request
@@ -98,19 +67,11 @@ public class ElasticsearchSinkDTO {
      */
     public static ElasticsearchSinkDTO getFromJson(@NotNull String extParams) {
         try {
-            return JsonUtils.parseObject(extParams, 
ElasticsearchSinkDTO.class).decryptPassword();
+            return JsonUtils.parseObject(extParams, 
ElasticsearchSinkDTO.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
                     String.format("parse extParams of Elasticsearch SinkDTO 
failure: %s", e.getMessage()));
         }
     }
 
-    private ElasticsearchSinkDTO decryptPassword() throws Exception {
-        if (StringUtils.isNotEmpty(this.password)) {
-            byte[] passwordBytes = AESUtils.decryptAsString(this.password, 
this.encryptVersion);
-            this.password = new String(passwordBytes, StandardCharsets.UTF_8);
-        }
-        return this;
-    }
-
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkRequest.java
index 74d009641c..182308fe0a 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkRequest.java
@@ -37,37 +37,16 @@ import lombok.ToString;
 @JsonTypeDefine(value = SinkType.ELASTICSEARCH)
 public class ElasticsearchSinkRequest extends SinkRequest {
 
-    @ApiModelProperty("Host of the Elasticsearch server")
-    private String hosts;
+    @ApiModelProperty("indexNamePattern")
+    private String indexNamePattern;
 
-    @ApiModelProperty("Username of the Elasticsearch server")
-    private String username;
+    @ApiModelProperty("contentOffset")
+    private Integer contentOffset;
 
-    @ApiModelProperty("User password of the Elasticsearch server")
-    private String password;
+    @ApiModelProperty("fieldOffset")
+    private Integer fieldOffset;
 
-    @ApiModelProperty("Elasticsearch index name")
-    private String indexName;
-
-    @ApiModelProperty("Flush interval, unit: second, default is 1s")
-    private Integer flushInterval;
-
-    @ApiModelProperty("Flush when record number reaches flushRecord")
-    private Integer flushRecord;
-
-    @ApiModelProperty("Write max retry times, default is 3")
-    private Integer retryTimes;
-
-    @ApiModelProperty("Key field names, separate with commas")
-    private String keyFieldNames;
-
-    @ApiModelProperty("Document Type")
-    private String documentType;
-
-    @ApiModelProperty("Primary Key")
-    private String primaryKey;
-
-    @ApiModelProperty("Elasticsearch version")
-    private Integer esVersion;
+    @ApiModelProperty("separator")
+    private String separator;
 
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java
index 272af930bc..7c25f0502a 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java
@@ -45,6 +45,9 @@ public class ElasticsearchDataNodeOperator extends 
AbstractDataNodeOperator {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchDataNodeOperator.class);
 
+    // in order to compatible with the old sortstandalone version
+    public static final String KEY_PASSWORD = "password";
+
     @Autowired
     private ObjectMapper objectMapper;
 
@@ -65,6 +68,8 @@ public class ElasticsearchDataNodeOperator extends 
AbstractDataNodeOperator {
         try {
             ElasticsearchDataNodeDTO dto =
                     ElasticsearchDataNodeDTO.getFromRequest(esRequest, 
targetEntity.getExtParams());
+            dto.setHttpHosts(request.getUrl());
+            dto.setPassword(request.getToken());
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchResourceOperator.java
index 70165bf541..87c1086836 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchResourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchResourceOperator.java
@@ -20,27 +20,13 @@ package org.apache.inlong.manager.service.resource.sink.es;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.enums.SinkStatus;
-import org.apache.inlong.manager.common.exceptions.WorkflowException;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
-import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
 import org.apache.inlong.manager.pojo.sink.SinkInfo;
-import org.apache.inlong.manager.pojo.sink.es.ElasticsearchFieldInfo;
-import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSinkDTO;
-import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
 import 
org.apache.inlong.manager.service.resource.sink.AbstractStandaloneSinkResourceOperator;
-import org.apache.inlong.manager.service.sink.StreamSinkService;
 
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import java.util.ArrayList;
-import java.util.List;
-
 /**
  * Elasticsearch's resource operator
  */
@@ -48,12 +34,6 @@ import java.util.List;
 public class ElasticsearchResourceOperator extends 
AbstractStandaloneSinkResourceOperator {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchResourceOperator.class);
-    @Autowired
-    private StreamSinkService sinkService;
-    @Autowired
-    private StreamSinkFieldEntityMapper sinkFieldMapper;
-    @Autowired
-    private DataNodeOperateHelper dataNodeHelper;
 
     @Override
     public Boolean accept(String sinkType) {
@@ -75,74 +55,7 @@ public class ElasticsearchResourceOperator extends 
AbstractStandaloneSinkResourc
             return;
         }
 
-        this.createIndex(sinkInfo);
         this.assignCluster(sinkInfo);
     }
 
-    private void createIndex(SinkInfo sinkInfo) {
-        LOGGER.info("begin to create es index for sinkId={}", 
sinkInfo.getId());
-
-        List<StreamSinkFieldEntity> sinkList = 
sinkFieldMapper.selectBySinkId(sinkInfo.getId());
-        if (CollectionUtils.isEmpty(sinkList)) {
-            LOGGER.warn("no es fields found, skip to create es index for 
sinkId={}", sinkInfo.getId());
-        }
-
-        // set fields
-        List<ElasticsearchFieldInfo> fieldList = 
getElasticsearchFieldFromSink(sinkList);
-
-        try {
-            ElasticsearchApi client = new ElasticsearchApi();
-            ElasticsearchSinkDTO esInfo = 
ElasticsearchSinkDTO.getFromJson(sinkInfo.getExtParams());
-            client.setEsConfig(getElasticsearchConfig(sinkInfo, esInfo));
-            String indexName = esInfo.getIndexName();
-            boolean indexExists = client.indexExists(indexName);
-
-            // 3. index not exists, create it
-            if (!indexExists) {
-                client.createIndexAndMapping(indexName, fieldList);
-            } else {
-                // 4. index exists, add fields - skip the exists fields
-                client.addNotExistFields(indexName, fieldList);
-            }
-
-            // 5. update the sink status to success
-            String info = "success to create es resource";
-            sinkService.updateStatus(sinkInfo.getId(), 
SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
-            LOGGER.info(info + " for sinkInfo={}", sinkInfo);
-        } catch (Throwable e) {
-            String errMsg = "Create Elasticsearch index failed: " + 
e.getMessage();
-            LOGGER.error(errMsg, e);
-            sinkService.updateStatus(sinkInfo.getId(), 
SinkStatus.CONFIG_FAILED.getCode(), errMsg);
-            throw new WorkflowException(errMsg);
-        }
-    }
-
-    public List<ElasticsearchFieldInfo> 
getElasticsearchFieldFromSink(List<StreamSinkFieldEntity> sinkList) {
-        List<ElasticsearchFieldInfo> esFieldList = new ArrayList<>();
-        for (StreamSinkFieldEntity fieldEntity : sinkList) {
-            if (StringUtils.isNotBlank(fieldEntity.getExtParams())) {
-                ElasticsearchFieldInfo elasticsearchFieldInfo = 
ElasticsearchFieldInfo.getFromJson(
-                        fieldEntity.getExtParams());
-                CommonBeanUtils.copyProperties(fieldEntity, 
elasticsearchFieldInfo, true);
-                esFieldList.add(elasticsearchFieldInfo);
-            } else {
-                ElasticsearchFieldInfo esFieldInfo = new 
ElasticsearchFieldInfo();
-                CommonBeanUtils.copyProperties(fieldEntity, esFieldInfo, true);
-                esFieldList.add(esFieldInfo);
-            }
-        }
-        return esFieldList;
-    }
-
-    private ElasticsearchConfig getElasticsearchConfig(SinkInfo sinkInfo, 
ElasticsearchSinkDTO esInfo) {
-        ElasticsearchConfig config = new ElasticsearchConfig();
-        if (StringUtils.isNotEmpty(esInfo.getUsername())) {
-            config.setAuthEnable(true);
-            config.setUsername(esInfo.getUsername());
-            config.setPassword(esInfo.getPassword());
-        }
-        config.setHosts(esInfo.getHosts());
-        return config;
-    }
-
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
index 06ea6fac5f..fba388029f 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
@@ -17,12 +17,10 @@
 
 package org.apache.inlong.manager.service.sink.es;
 
-import org.apache.inlong.manager.common.consts.DataNodeType;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.util.AESUtils;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
@@ -44,7 +42,6 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -81,20 +78,6 @@ public class ElasticsearchSinkOperator extends 
AbstractSinkOperator {
         try {
             ElasticsearchSinkDTO dto = 
ElasticsearchSinkDTO.getFromRequest(sinkRequest, targetEntity.getExtParams());
 
-            DataNodeInfo dataNodeInfo =
-                    dataNodeHelper.getDataNodeInfo(request.getDataNodeName(), 
DataNodeType.ELASTICSEARCH);
-            String esUrl = dataNodeInfo.getUrl();
-            dto.setHosts(esUrl);
-
-            dto.setUsername(dataNodeInfo.getUsername());
-            Integer encryptVersion = AESUtils.getCurrentVersion(null);
-            String passwd = null;
-            if (StringUtils.isNotEmpty(dataNodeInfo.getToken())) {
-                passwd = 
AESUtils.encryptToString(dataNodeInfo.getToken().getBytes(StandardCharsets.UTF_8),
-                        encryptVersion);
-            }
-            dto.setPassword(passwd);
-            dto.setEncryptVersion(encryptVersion);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ElasticsearchSinkServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ElasticsearchSinkServiceTest.java
index ebf74d9574..964f216c9c 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ElasticsearchSinkServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ElasticsearchSinkServiceTest.java
@@ -61,13 +61,6 @@ public class ElasticsearchSinkServiceTest extends 
ServiceBaseTest {
         sinkInfo.setInlongStreamId(globalStreamId);
         sinkInfo.setSinkType(SinkType.ELASTICSEARCH);
 
-        sinkInfo.setHosts("http://127.0.0.1:9200";);
-        sinkInfo.setUsername("elasticsearch");
-        sinkInfo.setPassword("inlong");
-        sinkInfo.setDocumentType("public");
-        sinkInfo.setIndexName("index");
-        sinkInfo.setPrimaryKey("name,age");
-        sinkInfo.setEsVersion(7);
         sinkInfo.setDataNodeName(dataNodeName);
 
         sinkInfo.setSinkName(sinkName);

Reply via email to