This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 614b782d4d [feature](doris-on-es) Support es external table not assign 
schema (#9583)
614b782d4d is described below

commit 614b782d4df538ce4195db0b8190abd55da978a6
Author: Stalary <[email protected]>
AuthorDate: Sun Jul 3 23:19:05 2022 +0800

    [feature](doris-on-es) Support es external table not assign schema (#9583)
---
 .../docs/ecosystem/external-table/doris-on-es.md   |  10 ++
 .../docs/ecosystem/external-table/doris-on-es.md   |  10 ++
 fe/fe-core/pom.xml                                 |   1 -
 .../org/apache/doris/analysis/CreateTableStmt.java |   2 +-
 .../java/org/apache/doris/catalog/EsTable.java     | 167 +++++++++++++++------
 .../main/java/org/apache/doris/catalog/Table.java  |   4 +
 .../doris/datasource/InternalDataSource.java       |  15 +-
 .../doris/external/elasticsearch/EsRepository.java |   2 +-
 .../doris/external/elasticsearch/EsUtil.java       | 132 ++++++++++++++--
 .../doris/external/elasticsearch/MappingPhase.java | 109 +-------------
 .../doris/external/elasticsearch/EsTestCase.java   |  11 +-
 .../doris/external/elasticsearch/EsUtilTest.java   |  92 +++++++++++-
 .../external/elasticsearch/MappingPhaseTest.java   | 113 --------------
 13 files changed, 383 insertions(+), 285 deletions(-)

diff --git a/docs/en/docs/ecosystem/external-table/doris-on-es.md 
b/docs/en/docs/ecosystem/external-table/doris-on-es.md
index c9854373f8..50e8af69de 100644
--- a/docs/en/docs/ecosystem/external-table/doris-on-es.md
+++ b/docs/en/docs/ecosystem/external-table/doris-on-es.md
@@ -110,6 +110,16 @@ POST /_bulk
 Refer to the specific table syntax:[CREATE 
TABLE](../../sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md)
 
 ```
+CREATE EXTERNAL TABLE `test` // If no schema is specified, es mapping is 
automatically pulled to create a table
+ENGINE=ELASTICSEARCH 
+PROPERTIES (
+"hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200";,
+"index" = "test",
+"type" = "doc",
+"user" = "root",
+"password" = "root"
+);
+
 CREATE EXTERNAL TABLE `test` (
   `k1` bigint(20) COMMENT "",
   `k2` datetime COMMENT "",
diff --git a/docs/zh-CN/docs/ecosystem/external-table/doris-on-es.md 
b/docs/zh-CN/docs/ecosystem/external-table/doris-on-es.md
index 39c73f2621..8bb07f1c2e 100644
--- a/docs/zh-CN/docs/ecosystem/external-table/doris-on-es.md
+++ b/docs/zh-CN/docs/ecosystem/external-table/doris-on-es.md
@@ -108,6 +108,16 @@ POST /_bulk
 具体建表语法参照:[CREATE 
TABLE](../../sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md)
 
 ```
+CREATE EXTERNAL TABLE `test` // 不指定schema,自动拉取es mapping进行建表 
+ENGINE=ELASTICSEARCH 
+PROPERTIES (
+"hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200";,
+"index" = "test",
+"type" = "doc",
+"user" = "root",
+"password" = "root"
+);
+
 CREATE EXTERNAL TABLE `test` (
   `k1` bigint(20) COMMENT "",
   `k2` datetime COMMENT "",
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index ccbc7e9c9e..db3cfa2e0b 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -603,7 +603,6 @@ under the License.
         <dependency>
             <groupId>org.apache.iceberg</groupId>
             <artifactId>iceberg-core</artifactId>
-            <scope>provided</scope>
         </dependency>
 
         <dependency>
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
index a70e3925e0..3a5e8be619 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
@@ -355,7 +355,7 @@ public class CreateTableStmt extends DdlStmt {
         }
 
         // analyze column def
-        if (!(engineName.equals("iceberg") || engineName.equals("hudi"))
+        if (!(engineName.equals("iceberg") || engineName.equals("hudi") || 
engineName.equals("elasticsearch"))
                 && (columnDefs == null || columnDefs.isEmpty())) {
             
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLE_MUST_HAVE_COLUMNS);
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
index 7d7f05943a..b03ce6d0ae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
@@ -28,14 +28,16 @@ import org.apache.doris.thrift.TEsTable;
 import org.apache.doris.thrift.TTableDescriptor;
 import org.apache.doris.thrift.TTableType;
 
-import com.google.common.base.Strings;
 import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.json.simple.JSONObject;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -43,8 +45,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+/**
+ * Elasticsearch table.
+ **/
 public class EsTable extends Table {
-    private static final Logger LOG = LogManager.getLogger(EsTable.class);
 
     public static final Set<String> DEFAULT_DOCVALUE_DISABLED_FIELDS = new 
HashSet<>(Arrays.asList("text"));
 
@@ -53,18 +57,34 @@ public class EsTable extends Table {
     public static final String PASSWORD = "password";
     public static final String INDEX = "index";
     public static final String TYPE = "type";
-    public static final String TRANSPORT = "transport";
     public static final String VERSION = "version";
     public static final String DOC_VALUES_MODE = "doc_values_mode";
 
     public static final String TRANSPORT_HTTP = "http";
-    public static final String TRANSPORT_THRIFT = "thrift";
     public static final String DOC_VALUE_SCAN = "enable_docvalue_scan";
     public static final String KEYWORD_SNIFF = "enable_keyword_sniff";
     public static final String MAX_DOCVALUE_FIELDS = "max_docvalue_fields";
     public static final String NODES_DISCOVERY = "nodes_discovery";
     public static final String HTTP_SSL_ENABLED = "http_ssl_enabled";
 
+    private static final Logger LOG = LogManager.getLogger(EsTable.class);
+
+    // Solr doc_values vs stored_fields performance-smackdown indicate:
+    // It is possible to notice that retrieving an high number of fields leads
+    // to a sensible worsening of performance if DocValues are used.
+    // Instead,  the (almost) surprising thing is that, by returning less than 
20 fields,
+    // DocValues performs better than stored fields and the difference
+    // gets little as the number of fields returned increases.
+    // Asking for 9 DocValues fields and 1 stored field takes an average query 
time is 6.86
+    // (more than returning 10 stored fields)
+    // Here we have a slightly conservative value of 20, but at the same time
+    // we also provide configurable parameters for expert-using
+    // @see `MAX_DOCVALUE_FIELDS`
+    private static final int DEFAULT_MAX_DOCVALUE_FIELDS = 20;
+
+    // version would be used to be compatible with different ES Cluster
+    public EsMajorVersion majorVersion = null;
+
     private String hosts;
     private String[] seeds;
     private String userName = "";
@@ -74,6 +94,7 @@ public class EsTable extends Table {
 
     // which type used for `indexName`
     private String mappingType = null;
+    // only support http
     private String transport = "http";
     // only save the partition definition, save the partition key,
     // partition list is got from es cluster dynamically and is saved in 
esTableState
@@ -92,37 +113,43 @@ public class EsTable extends Table {
 
     private boolean httpSslEnabled = false;
 
-    // Solr doc_values vs stored_fields performance-smackdown indicate:
-    // It is possible to notice that retrieving an high number of fields leads 
to a sensible worsening of performance
-    // if DocValues are used. Instead, the (almost) surprising thing is that, 
by returning less than 20 fields,
-    // DocValues performs better than stored fields and the difference gets 
little as the number of fields
-    // returned increases. Asking for 9 DocValues fields and 1 stored field 
takes an average query time is 6.86
-    // (more than returning 10 stored fields) Here we have a slightly 
conservative value of 20, but at the same time
-    // we also provide configurable parameters for expert-using
-    // @see `MAX_DOCVALUE_FIELDS`
-    private static final int DEFAULT_MAX_DOCVALUE_FIELDS = 20;
-
-    // version would be used to be compatible with different ES Cluster
-    public EsMajorVersion majorVersion = null;
-
     // tableContext is used for being convenient to persist some configuration 
parameters uniformly
     private Map<String, String> tableContext = new HashMap<>();
 
     // record the latest and recently exception when sync ES table metadata 
(mapping, shard location)
     private Throwable lastMetaDataSyncException = null;
 
+    // connect es.
+    private EsRestClient client = null;
+
+    // Periodically pull es metadata
+    private EsMetaStateTracker esMetaStateTracker;
+
     public EsTable() {
         super(TableType.ELASTICSEARCH);
     }
 
-    public EsTable(long id, String name, List<Column> schema, Map<String, 
String> properties,
-            PartitionInfo partitionInfo) throws DdlException {
+    /**
+     * Create table for user.
+     **/
+    public EsTable(String name, Map<String, String> properties) throws 
DdlException {
+        super(TableType.ELASTICSEARCH);
+        this.name = name;
+        validate(properties);
+        this.client = new EsRestClient(seeds, userName, passwd, 
httpSslEnabled);
+    }
+
+    /**
+     * Create table for test.
+     **/
+    public EsTable(long id, String name, List<Column> schema,
+            Map<String, String> properties, PartitionInfo partitionInfo) 
throws DdlException {
         super(id, name, TableType.ELASTICSEARCH, schema);
         this.partitionInfo = partitionInfo;
         validate(properties);
+        this.client = new EsRestClient(seeds, userName, passwd, 
httpSslEnabled);
     }
 
-
     public Map<String, String> fieldsContext() {
         return esMetaStateTracker.searchContext().fetchFieldsContext();
     }
@@ -156,26 +183,23 @@ public class EsTable extends Table {
             throw new DdlException(
                     "Please set properties of elasticsearch table, " + "they 
are: hosts, user, password, index");
         }
-
-        if (Strings.isNullOrEmpty(properties.get(HOSTS)) || 
Strings.isNullOrEmpty(properties.get(HOSTS).trim())) {
+        if (StringUtils.isBlank(properties.get(HOSTS))) {
             throw new DdlException("Hosts of ES table is null. "
                     + "Please add 
properties('hosts'='xxx.xxx.xxx.xxx,xxx.xxx.xxx.xxx') when create table");
         }
         hosts = properties.get(HOSTS).trim();
         seeds = hosts.split(",");
-
-        if (!Strings.isNullOrEmpty(properties.get(USER)) && 
!Strings.isNullOrEmpty(properties.get(USER).trim())) {
+        if (properties.containsKey(USER)) {
             userName = properties.get(USER).trim();
         }
 
-        if (!Strings.isNullOrEmpty(properties.get(PASSWORD)) && 
!Strings.isNullOrEmpty(
-                properties.get(PASSWORD).trim())) {
+        if (properties.containsKey(PASSWORD)) {
             passwd = properties.get(PASSWORD).trim();
         }
 
-        if (Strings.isNullOrEmpty(properties.get(INDEX)) || 
Strings.isNullOrEmpty(properties.get(INDEX).trim())) {
-            throw new DdlException(
-                    "Index of ES table is null. " + "Please add 
properties('index'='xxxx') when create table");
+        if (StringUtils.isBlank(properties.get(INDEX))) {
+            throw new DdlException("Index of ES table is null. "
+                    + "Please add properties('index'='xxxx') when create 
table");
         }
         indexName = properties.get(INDEX).trim();
 
@@ -218,19 +242,10 @@ public class EsTable extends Table {
             }
         }
 
-        if (!Strings.isNullOrEmpty(properties.get(TYPE)) && 
!Strings.isNullOrEmpty(properties.get(TYPE).trim())) {
+        if (StringUtils.isNotBlank(properties.get(TYPE))) {
             mappingType = properties.get(TYPE).trim();
         }
 
-        if (!Strings.isNullOrEmpty(properties.get(TRANSPORT)) && 
!Strings.isNullOrEmpty(
-                properties.get(TRANSPORT).trim())) {
-            transport = properties.get(TRANSPORT).trim();
-            if (!(TRANSPORT_HTTP.equals(transport) || 
TRANSPORT_THRIFT.equals(transport))) {
-                throw new DdlException("transport of ES table must be 
http/https(recommend)"
-                        + " or thrift(reserved inner usage), but value is " + 
transport);
-            }
-        }
-
         if (properties.containsKey(MAX_DOCVALUE_FIELDS)) {
             try {
                 maxDocValueFields = 
Integer.parseInt(properties.get(MAX_DOCVALUE_FIELDS).trim());
@@ -259,6 +274,7 @@ public class EsTable extends Table {
         tableContext.put(HTTP_SSL_ENABLED, String.valueOf(httpSslEnabled));
     }
 
+    @Override
     public TTableDescriptor toThrift() {
         TEsTable tEsTable = new TEsTable();
         TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), 
TTableType.ES_TABLE, fullSchema.size(), 0,
@@ -359,7 +375,7 @@ public class EsTable extends Table {
         } else {
             throw new IOException("invalid partition type: " + partType);
         }
-
+        client = new EsRestClient(seeds, userName, passwd, httpSslEnabled);
     }
 
     public String getHosts() {
@@ -414,14 +430,14 @@ public class EsTable extends Table {
         this.lastMetaDataSyncException = lastMetaDataSyncException;
     }
 
-    private EsMetaStateTracker esMetaStateTracker;
+    public void setPartitionInfo(PartitionInfo partitionInfo) {
+        this.partitionInfo = partitionInfo;
+    }
 
     /**
-     * sync es index meta from remote ES Cluster
-     *
-     * @param client esRestClient
+     * Sync es index meta from remote ES Cluster.
      */
-    public void syncTableMetaData(EsRestClient client) {
+    public void syncTableMetaData() {
         if (esMetaStateTracker == null) {
             esMetaStateTracker = new EsMetaStateTracker(client, this);
         }
@@ -435,4 +451,65 @@ public class EsTable extends Table {
             this.lastMetaDataSyncException = e;
         }
     }
+
+    /**
+     * Generate columns from ES Cluster.
+     **/
+    public List<Column> genColumnsFromEs() {
+        String mapping = client.getMapping(indexName);
+        JSONObject mappingProps = EsUtil.getMappingProps(indexName, mapping, 
mappingType);
+        Set<String> keys = (Set<String>) mappingProps.keySet();
+        List<Column> columns = new ArrayList<>();
+        for (String key : keys) {
+            JSONObject field = (JSONObject) mappingProps.get(key);
+            // Complex types are not currently supported.
+            if (field.containsKey("type")) {
+                Type type = toDorisType(field.get("type").toString());
+                if (!type.isInvalid()) {
+                    Column column = new Column();
+                    column.setName(key);
+                    column.setType(type);
+                    column.setIsKey(true);
+                    column.setIsAllowNull(true);
+                    columns.add(column);
+                }
+            }
+        }
+        return columns;
+    }
+
+    private Type toDorisType(String esType) {
+        // reference 
https://www.elastic.co/guide/en/elasticsearch/reference/8.3/sql-data-types.html
+        switch (esType) {
+            case "null":
+                return Type.NULL;
+            case "boolean":
+                return Type.BOOLEAN;
+            case "byte":
+                return Type.TINYINT;
+            case "short":
+                return Type.SMALLINT;
+            case "integer":
+                return Type.INT;
+            case "long":
+            case "unsigned_long":
+                return Type.BIGINT;
+            case "float":
+            case "half_float":
+                return Type.FLOAT;
+            case "double":
+            case "scaled_float":
+                return Type.DOUBLE;
+            case "keyword":
+            case "text":
+            case "ip":
+            case "nested":
+            case "object":
+                return Type.STRING;
+            case "date":
+                return Type.DATE;
+            default:
+                return Type.INVALID;
+        }
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
index b31c9b5b9c..124c453261 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
@@ -433,6 +433,10 @@ public class Table extends MetaObject implements Writable, 
TableIf {
         this.comment = Strings.nullToEmpty(comment);
     }
 
+    public void setId(long id) {
+        this.id = id;
+    }
+
     public CreateTableStmt toCreateTableStmt(String dbName) {
         throw new NotImplementedException();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
index 1a76da5a76..8c1fa032bd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
@@ -2108,13 +2108,21 @@ public class InternalDataSource implements 
DataSourceIf<Database> {
     private Table createEsTable(Database db, CreateTableStmt stmt) throws 
DdlException {
         String tableName = stmt.getTableName();
 
+        // validate props to get column from es.
+        EsTable esTable = new EsTable(tableName, stmt.getProperties());
+
         // create columns
         List<Column> baseSchema = stmt.getColumns();
+
+        if (baseSchema.isEmpty()) {
+            baseSchema = esTable.genColumnsFromEs();
+        }
         validateColumns(baseSchema);
+        esTable.setNewFullSchema(baseSchema);
 
         // create partition info
         PartitionDesc partitionDesc = stmt.getPartitionDesc();
-        PartitionInfo partitionInfo = null;
+        PartitionInfo partitionInfo;
         Map<String, Long> partitionNameToId = Maps.newHashMap();
         if (partitionDesc != null) {
             partitionInfo = partitionDesc.toPartitionInfo(baseSchema, 
partitionNameToId, false);
@@ -2124,11 +2132,12 @@ public class InternalDataSource implements 
DataSourceIf<Database> {
             partitionNameToId.put(tableName, partitionId);
             partitionInfo = new SinglePartitionInfo();
         }
+        esTable.setPartitionInfo(partitionInfo);
 
         long tableId = Catalog.getCurrentCatalog().getNextId();
-        EsTable esTable = new EsTable(tableId, tableName, baseSchema, 
stmt.getProperties(), partitionInfo);
+        esTable.setId(tableId);
         esTable.setComment(stmt.getComment());
-
+        esTable.syncTableMetaData();
         if (!db.createTableWithLock(esTable, false, 
stmt.isSetIfNotExists()).first) {
             ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, 
tableName);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java
 
b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java
index 95f54dceb7..4e5a79376d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java
@@ -73,7 +73,7 @@ public class EsRepository extends MasterDaemon {
     protected void runAfterCatalogReady() {
         for (EsTable esTable : esTables.values()) {
             try {
-                esTable.syncTableMetaData(esClients.get(esTable.getId()));
+                esTable.syncTableMetaData();
             } catch (Throwable e) {
                 LOG.warn("Exception happens when fetch index [{}] meta data 
from remote es cluster",
                         esTable.getName(), e);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java
index 4db3e96b0a..f6800d1c3c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java
@@ -20,15 +20,26 @@ package org.apache.doris.external.elasticsearch;
 import org.apache.doris.analysis.DistributionDesc;
 import org.apache.doris.analysis.PartitionDesc;
 import org.apache.doris.analysis.RangePartitionDesc;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.EsTable;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
 
+import org.apache.commons.lang3.StringUtils;
 import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
 
+import java.util.Iterator;
 import java.util.Map;
 
+/**
+ * Util for ES, some static method.
+ **/
 public class EsUtil {
 
+    /**
+     * Analyze partition and distributionDesc.
+     **/
     public static void analyzePartitionAndDistributionDesc(PartitionDesc 
partitionDesc,
             DistributionDesc distributionDesc) throws AnalysisException {
         if (partitionDesc == null && distributionDesc == null) {
@@ -49,25 +60,19 @@ public class EsUtil {
         }
     }
 
-    private static void analyzePartitionDesc(RangePartitionDesc partDesc)
-            throws AnalysisException {
+    private static void analyzePartitionDesc(RangePartitionDesc partDesc) 
throws AnalysisException {
         if (partDesc.getPartitionColNames() == null || 
partDesc.getPartitionColNames().isEmpty()) {
             throw new AnalysisException("No partition columns.");
         }
 
         if (partDesc.getPartitionColNames().size() > 1) {
-            throw new AnalysisException(
-                    "Elasticsearch table's partition column could only be a 
single column");
+            throw new AnalysisException("Elasticsearch table's partition 
column could only be a single column");
         }
     }
 
 
     /**
-     * get the json object from specified jsonObject
-     *
-     * @param jsonObject
-     * @param key
-     * @return
+     * Get the json object from specified jsonObject
      */
     public static JSONObject getJsonObject(JSONObject jsonObject, String key, 
int fromIndex) {
         int firstOccr = key.indexOf('.', fromIndex);
@@ -87,6 +92,9 @@ public class EsUtil {
         }
     }
 
+    /**
+     * Get boolean throw DdlException when parse error
+     **/
     public static boolean getBoolean(Map<String, String> properties, String 
name) throws DdlException {
         String property = properties.get(name).trim();
         try {
@@ -96,4 +104,110 @@ public class EsUtil {
                     + "value should be double quotation marks", name, name, 
property, name));
         }
     }
+
+    /**
+     * Get mapping properties JSONObject.
+     **/
+    public static JSONObject getMappingProps(String sourceIndex, String 
indexMapping, String mappingType) {
+        JSONObject jsonObject = (JSONObject) JSONValue.parse(indexMapping);
+        // the indexName use alias takes the first mapping
+        Iterator<String> keys = jsonObject.keySet().iterator();
+        String docKey = keys.next();
+        JSONObject docData = (JSONObject) jsonObject.get(docKey);
+        JSONObject mappings = (JSONObject) docData.get("mappings");
+        JSONObject rootSchema = (JSONObject) mappings.get(mappingType);
+        JSONObject properties;
+        // Elasticsearch 7.x, type was removed from ES mapping, default type 
is `_doc`
+        // 
https://www.elastic.co/guide/en/elasticsearch/reference/7.0/removal-of-types.html
+        // Elasticsearch 8.x, include_type_name parameter is removed
+        if (rootSchema == null) {
+            properties = (JSONObject) mappings.get("properties");
+        } else {
+            properties = (JSONObject) rootSchema.get("properties");
+        }
+        if (properties == null) {
+            throw new DorisEsException(
+                    "index[" + sourceIndex + "] type[" + mappingType + "] 
mapping not found for the ES Cluster");
+        }
+        return properties;
+    }
+
+    /**
+     * Parse the required field information from the json.
+     *
+     * @param searchContext the current associated column searchContext
+     * @param indexMapping the return value of _mapping
+     */
+    public static void resolveFields(SearchContext searchContext, String 
indexMapping) throws DorisEsException {
+        JSONObject properties = getMappingProps(searchContext.sourceIndex(), 
indexMapping, searchContext.type());
+        for (Column col : searchContext.columns()) {
+            String colName = col.getName();
+            // if column exists in Doris Table but no found in ES's mapping, 
we choose to ignore this situation?
+            if (!properties.containsKey(colName)) {
+                throw new DorisEsException(
+                        "index[" + searchContext.sourceIndex() + "] type[" + 
indexMapping + "] mapping not found column"
+                                + colName + " for the ES Cluster");
+            }
+            JSONObject fieldObject = (JSONObject) properties.get(colName);
+            resolveKeywordFields(searchContext, fieldObject, colName);
+            resolveDocValuesFields(searchContext, fieldObject, colName);
+        }
+    }
+
+    // get a field of keyword type in the fields
+    private static void resolveKeywordFields(SearchContext searchContext, 
JSONObject fieldObject, String colName) {
+        String fieldType = (String) fieldObject.get("type");
+        // string-type field used keyword type to generate predicate
+        // if text field type seen, we should use the `field` keyword type?
+        if ("text".equals(fieldType)) {
+            JSONObject fieldsObject = (JSONObject) fieldObject.get("fields");
+            if (fieldsObject != null) {
+                for (Object key : fieldsObject.keySet()) {
+                    JSONObject innerTypeObject = (JSONObject) 
fieldsObject.get((String) key);
+                    // just for text type
+                    if ("keyword".equals((String) 
innerTypeObject.get("type"))) {
+                        searchContext.fetchFieldsContext().put(colName, 
colName + "." + key);
+                    }
+                }
+            }
+        }
+    }
+
+    private static void resolveDocValuesFields(SearchContext searchContext, 
JSONObject fieldObject, String colName) {
+        String fieldType = (String) fieldObject.get("type");
+        String docValueField = null;
+        if (EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains(fieldType)) {
+            JSONObject fieldsObject = (JSONObject) fieldObject.get("fields");
+            if (fieldsObject != null) {
+                for (Object key : fieldsObject.keySet()) {
+                    JSONObject innerTypeObject = (JSONObject) 
fieldsObject.get((String) key);
+                    if 
(EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains((String) 
innerTypeObject.get("type"))) {
+                        continue;
+                    }
+                    if (innerTypeObject.containsKey("doc_values")) {
+                        boolean docValue = (Boolean) 
innerTypeObject.get("doc_values");
+                        if (docValue) {
+                            docValueField = colName;
+                        }
+                    } else {
+                        // a : {c : {}} -> a -> a.c
+                        docValueField = colName + "." + key;
+                    }
+                }
+            }
+        } else {
+            // set doc_value = false manually
+            if (fieldObject.containsKey("doc_values")) {
+                Boolean docValue = (Boolean) fieldObject.get("doc_values");
+                if (!docValue) {
+                    return;
+                }
+            }
+            docValueField = colName;
+        }
+        // docValueField Cannot be null
+        if (StringUtils.isNotEmpty(docValueField)) {
+            searchContext.docValueFieldsContext().put(colName, docValueField);
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/MappingPhase.java
 
b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/MappingPhase.java
index 12fa4e37b4..560cebf1ff 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/MappingPhase.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/MappingPhase.java
@@ -17,15 +17,6 @@
 
 package org.apache.doris.external.elasticsearch;
 
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.EsTable;
-
-import org.apache.commons.lang3.StringUtils;
-import org.json.simple.JSONObject;
-import org.json.simple.JSONValue;
-
-import java.util.Iterator;
-
 /**
  * Get index mapping from remote ES Cluster, and resolved `keyword` and 
`doc_values` field
  * Later we can use it to parse all relevant indexes
@@ -48,105 +39,7 @@ public class MappingPhase implements SearchPhase {
 
     @Override
     public void postProcess(SearchContext context) {
-        resolveFields(context, jsonMapping);
+        EsUtil.resolveFields(context, jsonMapping);
     }
 
-
-    /**
-     * Parse the required field information from the json
-     *
-     * @param searchContext the current associated column searchContext
-     * @param indexMapping  the return value of _mapping
-     * @return fetchFieldsContext and docValueFieldsContext
-     * @throws Exception
-     */
-    public void resolveFields(SearchContext searchContext, String 
indexMapping) throws DorisEsException {
-        JSONObject jsonObject = (JSONObject) JSONValue.parse(indexMapping);
-        // the indexName use alias takes the first mapping
-        Iterator<String> keys = jsonObject.keySet().iterator();
-        String docKey = keys.next();
-        JSONObject docData = (JSONObject) jsonObject.get(docKey);
-        JSONObject mappings = (JSONObject) docData.get("mappings");
-        JSONObject rootSchema = (JSONObject) 
mappings.get(searchContext.type());
-        JSONObject properties;
-        // Elasticsearch 7.x, type was removed from ES mapping, default type 
is `_doc`
-        // 
https://www.elastic.co/guide/en/elasticsearch/reference/7.0/removal-of-types.html
-        // Elasticsearch 8.x, include_type_name parameter is removed
-        if (rootSchema == null) {
-            properties = (JSONObject) mappings.get("properties");
-        } else {
-            properties = (JSONObject) rootSchema.get("properties");
-        }
-        if (properties == null) {
-            throw new DorisEsException("index[" + searchContext.sourceIndex() 
+ "] type[" + searchContext.type()
-                    + "] mapping not found for the ES Cluster");
-        }
-        for (Column col : searchContext.columns()) {
-            String colName = col.getName();
-            // if column exists in Doris Table but no found in ES's mapping, 
we choose to ignore this situation?
-            if (!properties.containsKey(colName)) {
-                continue;
-            }
-            JSONObject fieldObject = (JSONObject) properties.get(colName);
-            resolveKeywordFields(searchContext, fieldObject, colName);
-            resolveDocValuesFields(searchContext, fieldObject, colName);
-        }
-    }
-
-    // get a field of keyword type in the fields
-    private void resolveKeywordFields(SearchContext searchContext, JSONObject 
fieldObject, String colName) {
-        String fieldType = (String) fieldObject.get("type");
-        // string-type field used keyword type to generate predicate
-        // if text field type seen, we should use the `field` keyword type?
-        if ("text".equals(fieldType)) {
-            JSONObject fieldsObject = (JSONObject) fieldObject.get("fields");
-            if (fieldsObject != null) {
-                for (Object key : fieldsObject.keySet()) {
-                    JSONObject innerTypeObject = (JSONObject) 
fieldsObject.get((String) key);
-                    // just for text type
-                    if ("keyword".equals((String) 
innerTypeObject.get("type"))) {
-                        searchContext.fetchFieldsContext().put(colName, 
colName + "." + key);
-                    }
-                }
-            }
-        }
-    }
-
-    private void resolveDocValuesFields(SearchContext searchContext, 
JSONObject fieldObject, String colName) {
-        String fieldType = (String) fieldObject.get("type");
-        String docValueField = null;
-        if (EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains(fieldType)) {
-            JSONObject fieldsObject = (JSONObject) fieldObject.get("fields");
-            if (fieldsObject != null) {
-                for (Object key : fieldsObject.keySet()) {
-                    JSONObject innerTypeObject = (JSONObject) 
fieldsObject.get((String) key);
-                    if 
(EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains((String) 
innerTypeObject.get("type"))) {
-                        continue;
-                    }
-                    if (innerTypeObject.containsKey("doc_values")) {
-                        boolean docValue = (Boolean) 
innerTypeObject.get("doc_values");
-                        if (docValue) {
-                            docValueField = colName;
-                        }
-                    } else {
-                        // a : {c : {}} -> a -> a.c
-                        docValueField = colName + "." + key;
-                    }
-                }
-            }
-        } else {
-            // set doc_value = false manually
-            if (fieldObject.containsKey("doc_values")) {
-                Boolean docValue = (Boolean) fieldObject.get("doc_values");
-                if (!docValue) {
-                    return;
-                }
-            }
-            docValueField = colName;
-        }
-        // docValueField Cannot be null
-        if (StringUtils.isNotEmpty(docValueField)) {
-            searchContext.docValueFieldsContext().put(colName, docValueField);
-        }
-    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/EsTestCase.java
 
b/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/EsTestCase.java
index 8be08db701..fe9033e0ff 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/EsTestCase.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/EsTestCase.java
@@ -41,13 +41,18 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
+/**
+ * Test case for es.
+ **/
 public class EsTestCase {
 
     protected static FakeEditLog fakeEditLog;
     protected static FakeCatalog fakeCatalog;
     protected static Catalog masterCatalog;
-    protected static String mappingsStr = "";
 
+    /**
+     * Init
+     **/
     @BeforeClass
     public static void init() throws Exception {
         fakeEditLog = new FakeEditLog();
@@ -60,7 +65,7 @@ public class EsTestCase {
     }
 
     protected String loadJsonFromFile(String fileName) throws IOException, 
URISyntaxException {
-        File file = new 
File(MappingPhaseTest.class.getClassLoader().getResource(fileName).toURI());
+        File file = new 
File(EsUtil.class.getClassLoader().getResource(fileName).toURI());
         InputStream is = new FileInputStream(file);
         BufferedReader br = new BufferedReader(new InputStreamReader(is));
         StringBuilder jsonStr = new StringBuilder();
@@ -73,7 +78,7 @@ public class EsTestCase {
         return jsonStr.toString();
     }
 
-    public EsTable fakeEsTable(String table, String index, String type, 
List<Column> columns) throws DdlException {
+    protected EsTable fakeEsTable(String table, String index, String type, 
List<Column> columns) throws DdlException {
         Map<String, String> props = new HashMap<>();
         props.put(EsTable.HOSTS, "127.0.0.1:8200");
         props.put(EsTable.INDEX, index);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/EsUtilTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/EsUtilTest.java
index 28ed67507f..ca7608923b 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/EsUtilTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/EsUtilTest.java
@@ -17,12 +17,28 @@
 
 package org.apache.doris.external.elasticsearch;
 
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.EsTable;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.common.ExceptionChecker;
+
+import mockit.Expectations;
+import mockit.Injectable;
 import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
-public class EsUtilTest {
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test for es util.
+ **/
+public class EsUtilTest extends EsTestCase {
+
+    private List<Column> columns = new ArrayList<>();
 
     private String jsonStr = "{\"settings\": {\n"
             + "               \"index\": {\n"
@@ -42,6 +58,80 @@ public class EsUtilTest {
             + "               }\n"
             + "            }}";
 
+    /**
+     * Init columns.
+     **/
+    @Before
+    public void setUp() {
+        Column k1 = new Column("k1", PrimitiveType.BIGINT);
+        Column k2 = new Column("k2", PrimitiveType.VARCHAR);
+        Column k3 = new Column("k3", PrimitiveType.VARCHAR);
+        columns.add(k1);
+        columns.add(k2);
+        columns.add(k3);
+    }
+
+    @Test
+    public void testExtractFieldsNormal() throws Exception {
+
+        // ES version < 7.0
+        EsTable esTableBefore7X = fakeEsTable("fake", "test", "doc", columns);
+        SearchContext searchContext = new SearchContext(esTableBefore7X);
+        EsUtil.resolveFields(searchContext, 
loadJsonFromFile("data/es/test_index_mapping.json"));
+        Assert.assertEquals("k3.keyword", 
searchContext.fetchFieldsContext().get("k3"));
+        Assert.assertEquals("k3.keyword", 
searchContext.docValueFieldsContext().get("k3"));
+        Assert.assertEquals("k1", 
searchContext.docValueFieldsContext().get("k1"));
+        Assert.assertEquals("k2", 
searchContext.docValueFieldsContext().get("k2"));
+
+        // ES version >= 7.0
+        EsTable esTableAfter7X = fakeEsTable("fake", "test", "_doc", columns);
+        SearchContext searchContext1 = new SearchContext(esTableAfter7X);
+        EsUtil.resolveFields(searchContext1, 
loadJsonFromFile("data/es/test_index_mapping_after_7x.json"));
+        Assert.assertEquals("k3.keyword", 
searchContext1.fetchFieldsContext().get("k3"));
+        Assert.assertEquals("k3.keyword", 
searchContext1.docValueFieldsContext().get("k3"));
+        Assert.assertEquals("k1", 
searchContext1.docValueFieldsContext().get("k1"));
+        Assert.assertEquals("k2", 
searchContext1.docValueFieldsContext().get("k2"));
+    }
+
+    @Test
+    public void testTypeNotExist() throws Exception {
+        EsTable table = fakeEsTable("fake", "test", "not_exists", columns);
+        SearchContext searchContext = new SearchContext(table);
+        // type not exists
+        ExceptionChecker.expectThrows(DorisEsException.class,
+                () -> EsUtil.resolveFields(searchContext, 
loadJsonFromFile("data/es/test_index_mapping.json")));
+    }
+
+    @Test
+    public void testWorkFlow(@Injectable EsRestClient client) throws Exception 
{
+        EsTable table = fakeEsTable("fake", "test", "doc", columns);
+        SearchContext searchContext1 = new SearchContext(table);
+        String jsonMapping = 
loadJsonFromFile("data/es/test_index_mapping.json");
+        new Expectations(client) {
+            {
+                client.getMapping(anyString);
+                minTimes = 0;
+                result = jsonMapping;
+            }
+        };
+        MappingPhase mappingPhase = new MappingPhase(client);
+        ExceptionChecker.expectThrowsNoException(() -> 
mappingPhase.execute(searchContext1));
+        ExceptionChecker.expectThrowsNoException(() -> 
mappingPhase.postProcess(searchContext1));
+        Assert.assertEquals("k3.keyword", 
searchContext1.fetchFieldsContext().get("k3"));
+        Assert.assertEquals("k3.keyword", 
searchContext1.docValueFieldsContext().get("k3"));
+        Assert.assertEquals("k1", 
searchContext1.docValueFieldsContext().get("k1"));
+        Assert.assertEquals("k2", 
searchContext1.docValueFieldsContext().get("k2"));
+
+    }
+
+    @Test
+    public void testMultTextFields() throws Exception {
+        EsTable esTableAfter7X = fakeEsTable("fake", "test", "_doc", columns);
+        SearchContext searchContext = new SearchContext(esTableAfter7X);
+        EsUtil.resolveFields(searchContext, 
loadJsonFromFile("data/es/test_index_mapping_field_mult_analyzer.json"));
+        
Assert.assertFalse(searchContext.docValueFieldsContext().containsKey("k3"));
+    }
+
     @Test
     public void testGetJsonObject() {
         JSONObject json = (JSONObject) JSONValue.parse(jsonStr);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/MappingPhaseTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/MappingPhaseTest.java
deleted file mode 100644
index 9391f0b7b5..0000000000
--- 
a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/MappingPhaseTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.external.elasticsearch;
-
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.EsTable;
-import org.apache.doris.catalog.PrimitiveType;
-import org.apache.doris.common.ExceptionChecker;
-
-import mockit.Expectations;
-import mockit.Injectable;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class MappingPhaseTest extends EsTestCase {
-
-
-    List<Column> columns = new ArrayList<>();
-
-    @Before
-    public void setUp() {
-        Column k1 = new Column("k1", PrimitiveType.BIGINT);
-        Column k2 = new Column("k2", PrimitiveType.VARCHAR);
-        Column k3 = new Column("k3", PrimitiveType.VARCHAR);
-        columns.add(k1);
-        columns.add(k2);
-        columns.add(k3);
-    }
-
-    @Test
-    public void testExtractFieldsNormal() throws Exception {
-
-        MappingPhase mappingPhase = new MappingPhase(null);
-        // ES version < 7.0
-        EsTable esTableBefore7X = fakeEsTable("fake", "test", "doc", columns);
-        SearchContext searchContext = new SearchContext(esTableBefore7X);
-        mappingPhase.resolveFields(searchContext, 
loadJsonFromFile("data/es/test_index_mapping.json"));
-        Assert.assertEquals("k3.keyword", 
searchContext.fetchFieldsContext().get("k3"));
-        Assert.assertEquals("k3.keyword", 
searchContext.docValueFieldsContext().get("k3"));
-        Assert.assertEquals("k1", 
searchContext.docValueFieldsContext().get("k1"));
-        Assert.assertEquals("k2", 
searchContext.docValueFieldsContext().get("k2"));
-
-        // ES version >= 7.0
-        EsTable esTableAfter7X = fakeEsTable("fake", "test", "_doc", columns);
-        SearchContext searchContext1 = new SearchContext(esTableAfter7X);
-        mappingPhase.resolveFields(searchContext1, 
loadJsonFromFile("data/es/test_index_mapping_after_7x.json"));
-        Assert.assertEquals("k3.keyword", 
searchContext1.fetchFieldsContext().get("k3"));
-        Assert.assertEquals("k3.keyword", 
searchContext1.docValueFieldsContext().get("k3"));
-        Assert.assertEquals("k1", 
searchContext1.docValueFieldsContext().get("k1"));
-        Assert.assertEquals("k2", 
searchContext1.docValueFieldsContext().get("k2"));
-    }
-
-    @Test
-    public void testTypeNotExist() throws Exception {
-        MappingPhase mappingPhase = new MappingPhase(null);
-        EsTable table = fakeEsTable("fake", "test", "not_exists", columns);
-        SearchContext searchContext = new SearchContext(table);
-        // type not exists
-        ExceptionChecker.expectThrows(DorisEsException.class,
-                () -> mappingPhase.resolveFields(searchContext, 
loadJsonFromFile("data/es/test_index_mapping.json")));
-    }
-
-    @Test
-    public void testWorkFlow(@Injectable EsRestClient client) throws Exception 
{
-        EsTable table = fakeEsTable("fake", "test", "doc", columns);
-        SearchContext searchContext1 = new SearchContext(table);
-        String jsonMapping = 
loadJsonFromFile("data/es/test_index_mapping.json");
-        new Expectations(client) {
-            {
-                client.getMapping(anyString);
-                minTimes = 0;
-                result = jsonMapping;
-            }
-        };
-        MappingPhase mappingPhase = new MappingPhase(client);
-        ExceptionChecker.expectThrowsNoException(() -> 
mappingPhase.execute(searchContext1));
-        ExceptionChecker.expectThrowsNoException(() -> 
mappingPhase.postProcess(searchContext1));
-        Assert.assertEquals("k3.keyword", 
searchContext1.fetchFieldsContext().get("k3"));
-        Assert.assertEquals("k3.keyword", 
searchContext1.docValueFieldsContext().get("k3"));
-        Assert.assertEquals("k1", 
searchContext1.docValueFieldsContext().get("k1"));
-        Assert.assertEquals("k2", 
searchContext1.docValueFieldsContext().get("k2"));
-
-    }
-
-    @Test
-    public void testMultTextFields() throws Exception {
-        MappingPhase mappingPhase = new MappingPhase(null);
-        EsTable esTableAfter7X = fakeEsTable("fake", "test", "_doc", columns);
-        SearchContext searchContext = new SearchContext(esTableAfter7X);
-        mappingPhase.resolveFields(searchContext, 
loadJsonFromFile("data/es/test_index_mapping_field_mult_analyzer.json"));
-        
Assert.assertFalse(searchContext.docValueFieldsContext().containsKey("k3"));
-
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to