This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 614b782d4d [feature](doris-on-es) Support es external table not assign
schema (#9583)
614b782d4d is described below
commit 614b782d4df538ce4195db0b8190abd55da978a6
Author: Stalary <[email protected]>
AuthorDate: Sun Jul 3 23:19:05 2022 +0800
[feature](doris-on-es) Support es external table not assign schema (#9583)
---
.../docs/ecosystem/external-table/doris-on-es.md | 10 ++
.../docs/ecosystem/external-table/doris-on-es.md | 10 ++
fe/fe-core/pom.xml | 1 -
.../org/apache/doris/analysis/CreateTableStmt.java | 2 +-
.../java/org/apache/doris/catalog/EsTable.java | 167 +++++++++++++++------
.../main/java/org/apache/doris/catalog/Table.java | 4 +
.../doris/datasource/InternalDataSource.java | 15 +-
.../doris/external/elasticsearch/EsRepository.java | 2 +-
.../doris/external/elasticsearch/EsUtil.java | 132 ++++++++++++++--
.../doris/external/elasticsearch/MappingPhase.java | 109 +-------------
.../doris/external/elasticsearch/EsTestCase.java | 11 +-
.../doris/external/elasticsearch/EsUtilTest.java | 92 +++++++++++-
.../external/elasticsearch/MappingPhaseTest.java | 113 --------------
13 files changed, 383 insertions(+), 285 deletions(-)
diff --git a/docs/en/docs/ecosystem/external-table/doris-on-es.md
b/docs/en/docs/ecosystem/external-table/doris-on-es.md
index c9854373f8..50e8af69de 100644
--- a/docs/en/docs/ecosystem/external-table/doris-on-es.md
+++ b/docs/en/docs/ecosystem/external-table/doris-on-es.md
@@ -110,6 +110,16 @@ POST /_bulk
Refer to the specific table syntax:[CREATE
TABLE](../../sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md)
```
+CREATE EXTERNAL TABLE `test` // If no schema is specified, es mapping is
automatically pulled to create a table
+ENGINE=ELASTICSEARCH
+PROPERTIES (
+"hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200",
+"index" = "test",
+"type" = "doc",
+"user" = "root",
+"password" = "root"
+);
+
CREATE EXTERNAL TABLE `test` (
`k1` bigint(20) COMMENT "",
`k2` datetime COMMENT "",
diff --git a/docs/zh-CN/docs/ecosystem/external-table/doris-on-es.md
b/docs/zh-CN/docs/ecosystem/external-table/doris-on-es.md
index 39c73f2621..8bb07f1c2e 100644
--- a/docs/zh-CN/docs/ecosystem/external-table/doris-on-es.md
+++ b/docs/zh-CN/docs/ecosystem/external-table/doris-on-es.md
@@ -108,6 +108,16 @@ POST /_bulk
具体建表语法参照:[CREATE
TABLE](../../sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md)
```
+CREATE EXTERNAL TABLE `test` // 不指定schema,自动拉取es mapping进行建表
+ENGINE=ELASTICSEARCH
+PROPERTIES (
+"hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200",
+"index" = "test",
+"type" = "doc",
+"user" = "root",
+"password" = "root"
+);
+
CREATE EXTERNAL TABLE `test` (
`k1` bigint(20) COMMENT "",
`k2` datetime COMMENT "",
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index ccbc7e9c9e..db3cfa2e0b 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -603,7 +603,6 @@ under the License.
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
- <scope>provided</scope>
</dependency>
<dependency>
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
index a70e3925e0..3a5e8be619 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
@@ -355,7 +355,7 @@ public class CreateTableStmt extends DdlStmt {
}
// analyze column def
- if (!(engineName.equals("iceberg") || engineName.equals("hudi"))
+ if (!(engineName.equals("iceberg") || engineName.equals("hudi") ||
engineName.equals("elasticsearch"))
&& (columnDefs == null || columnDefs.isEmpty())) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLE_MUST_HAVE_COLUMNS);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
index 7d7f05943a..b03ce6d0ae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
@@ -28,14 +28,16 @@ import org.apache.doris.thrift.TEsTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
-import com.google.common.base.Strings;
import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.json.simple.JSONObject;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@@ -43,8 +45,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+/**
+ * Elasticsearch table.
+ **/
public class EsTable extends Table {
- private static final Logger LOG = LogManager.getLogger(EsTable.class);
public static final Set<String> DEFAULT_DOCVALUE_DISABLED_FIELDS = new
HashSet<>(Arrays.asList("text"));
@@ -53,18 +57,34 @@ public class EsTable extends Table {
public static final String PASSWORD = "password";
public static final String INDEX = "index";
public static final String TYPE = "type";
- public static final String TRANSPORT = "transport";
public static final String VERSION = "version";
public static final String DOC_VALUES_MODE = "doc_values_mode";
public static final String TRANSPORT_HTTP = "http";
- public static final String TRANSPORT_THRIFT = "thrift";
public static final String DOC_VALUE_SCAN = "enable_docvalue_scan";
public static final String KEYWORD_SNIFF = "enable_keyword_sniff";
public static final String MAX_DOCVALUE_FIELDS = "max_docvalue_fields";
public static final String NODES_DISCOVERY = "nodes_discovery";
public static final String HTTP_SSL_ENABLED = "http_ssl_enabled";
+ private static final Logger LOG = LogManager.getLogger(EsTable.class);
+
+ // Solr doc_values vs stored_fields performance-smackdown indicate:
+ // It is possible to notice that retrieving an high number of fields leads
+ // to a sensible worsening of performance if DocValues are used.
+ // Instead, the (almost) surprising thing is that, by returning less than
20 fields,
+ // DocValues performs better than stored fields and the difference
+ // gets little as the number of fields returned increases.
+ // Asking for 9 DocValues fields and 1 stored field takes an average query
time is 6.86
+ // (more than returning 10 stored fields)
+ // Here we have a slightly conservative value of 20, but at the same time
+ // we also provide configurable parameters for expert-using
+ // @see `MAX_DOCVALUE_FIELDS`
+ private static final int DEFAULT_MAX_DOCVALUE_FIELDS = 20;
+
+ // version would be used to be compatible with different ES Cluster
+ public EsMajorVersion majorVersion = null;
+
private String hosts;
private String[] seeds;
private String userName = "";
@@ -74,6 +94,7 @@ public class EsTable extends Table {
// which type used for `indexName`
private String mappingType = null;
+ // only support http
private String transport = "http";
// only save the partition definition, save the partition key,
// partition list is got from es cluster dynamically and is saved in
esTableState
@@ -92,37 +113,43 @@ public class EsTable extends Table {
private boolean httpSslEnabled = false;
- // Solr doc_values vs stored_fields performance-smackdown indicate:
- // It is possible to notice that retrieving an high number of fields leads
to a sensible worsening of performance
- // if DocValues are used. Instead, the (almost) surprising thing is that,
by returning less than 20 fields,
- // DocValues performs better than stored fields and the difference gets
little as the number of fields
- // returned increases. Asking for 9 DocValues fields and 1 stored field
takes an average query time is 6.86
- // (more than returning 10 stored fields) Here we have a slightly
conservative value of 20, but at the same time
- // we also provide configurable parameters for expert-using
- // @see `MAX_DOCVALUE_FIELDS`
- private static final int DEFAULT_MAX_DOCVALUE_FIELDS = 20;
-
- // version would be used to be compatible with different ES Cluster
- public EsMajorVersion majorVersion = null;
-
// tableContext is used for being convenient to persist some configuration
parameters uniformly
private Map<String, String> tableContext = new HashMap<>();
// record the latest and recently exception when sync ES table metadata
(mapping, shard location)
private Throwable lastMetaDataSyncException = null;
+ // connect es.
+ private EsRestClient client = null;
+
+ // Periodically pull es metadata
+ private EsMetaStateTracker esMetaStateTracker;
+
public EsTable() {
super(TableType.ELASTICSEARCH);
}
- public EsTable(long id, String name, List<Column> schema, Map<String,
String> properties,
- PartitionInfo partitionInfo) throws DdlException {
+ /**
+ * Create table for user.
+ **/
+ public EsTable(String name, Map<String, String> properties) throws
DdlException {
+ super(TableType.ELASTICSEARCH);
+ this.name = name;
+ validate(properties);
+ this.client = new EsRestClient(seeds, userName, passwd,
httpSslEnabled);
+ }
+
+ /**
+ * Create table for test.
+ **/
+ public EsTable(long id, String name, List<Column> schema,
+ Map<String, String> properties, PartitionInfo partitionInfo)
throws DdlException {
super(id, name, TableType.ELASTICSEARCH, schema);
this.partitionInfo = partitionInfo;
validate(properties);
+ this.client = new EsRestClient(seeds, userName, passwd,
httpSslEnabled);
}
-
public Map<String, String> fieldsContext() {
return esMetaStateTracker.searchContext().fetchFieldsContext();
}
@@ -156,26 +183,23 @@ public class EsTable extends Table {
throw new DdlException(
"Please set properties of elasticsearch table, " + "they
are: hosts, user, password, index");
}
-
- if (Strings.isNullOrEmpty(properties.get(HOSTS)) ||
Strings.isNullOrEmpty(properties.get(HOSTS).trim())) {
+ if (StringUtils.isBlank(properties.get(HOSTS))) {
throw new DdlException("Hosts of ES table is null. "
+ "Please add
properties('hosts'='xxx.xxx.xxx.xxx,xxx.xxx.xxx.xxx') when create table");
}
hosts = properties.get(HOSTS).trim();
seeds = hosts.split(",");
-
- if (!Strings.isNullOrEmpty(properties.get(USER)) &&
!Strings.isNullOrEmpty(properties.get(USER).trim())) {
+ if (properties.containsKey(USER)) {
userName = properties.get(USER).trim();
}
- if (!Strings.isNullOrEmpty(properties.get(PASSWORD)) &&
!Strings.isNullOrEmpty(
- properties.get(PASSWORD).trim())) {
+ if (properties.containsKey(PASSWORD)) {
passwd = properties.get(PASSWORD).trim();
}
- if (Strings.isNullOrEmpty(properties.get(INDEX)) ||
Strings.isNullOrEmpty(properties.get(INDEX).trim())) {
- throw new DdlException(
- "Index of ES table is null. " + "Please add
properties('index'='xxxx') when create table");
+ if (StringUtils.isBlank(properties.get(INDEX))) {
+ throw new DdlException("Index of ES table is null. "
+ + "Please add properties('index'='xxxx') when create
table");
}
indexName = properties.get(INDEX).trim();
@@ -218,19 +242,10 @@ public class EsTable extends Table {
}
}
- if (!Strings.isNullOrEmpty(properties.get(TYPE)) &&
!Strings.isNullOrEmpty(properties.get(TYPE).trim())) {
+ if (StringUtils.isNotBlank(properties.get(TYPE))) {
mappingType = properties.get(TYPE).trim();
}
- if (!Strings.isNullOrEmpty(properties.get(TRANSPORT)) &&
!Strings.isNullOrEmpty(
- properties.get(TRANSPORT).trim())) {
- transport = properties.get(TRANSPORT).trim();
- if (!(TRANSPORT_HTTP.equals(transport) ||
TRANSPORT_THRIFT.equals(transport))) {
- throw new DdlException("transport of ES table must be
http/https(recommend)"
- + " or thrift(reserved inner usage), but value is " +
transport);
- }
- }
-
if (properties.containsKey(MAX_DOCVALUE_FIELDS)) {
try {
maxDocValueFields =
Integer.parseInt(properties.get(MAX_DOCVALUE_FIELDS).trim());
@@ -259,6 +274,7 @@ public class EsTable extends Table {
tableContext.put(HTTP_SSL_ENABLED, String.valueOf(httpSslEnabled));
}
+ @Override
public TTableDescriptor toThrift() {
TEsTable tEsTable = new TEsTable();
TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(),
TTableType.ES_TABLE, fullSchema.size(), 0,
@@ -359,7 +375,7 @@ public class EsTable extends Table {
} else {
throw new IOException("invalid partition type: " + partType);
}
-
+ client = new EsRestClient(seeds, userName, passwd, httpSslEnabled);
}
public String getHosts() {
@@ -414,14 +430,14 @@ public class EsTable extends Table {
this.lastMetaDataSyncException = lastMetaDataSyncException;
}
- private EsMetaStateTracker esMetaStateTracker;
+ public void setPartitionInfo(PartitionInfo partitionInfo) {
+ this.partitionInfo = partitionInfo;
+ }
/**
- * sync es index meta from remote ES Cluster
- *
- * @param client esRestClient
+ * Sync es index meta from remote ES Cluster.
*/
- public void syncTableMetaData(EsRestClient client) {
+ public void syncTableMetaData() {
if (esMetaStateTracker == null) {
esMetaStateTracker = new EsMetaStateTracker(client, this);
}
@@ -435,4 +451,65 @@ public class EsTable extends Table {
this.lastMetaDataSyncException = e;
}
}
+
+ /**
+ * Generate columns from ES Cluster.
+ **/
+ public List<Column> genColumnsFromEs() {
+ String mapping = client.getMapping(indexName);
+ JSONObject mappingProps = EsUtil.getMappingProps(indexName, mapping,
mappingType);
+ Set<String> keys = (Set<String>) mappingProps.keySet();
+ List<Column> columns = new ArrayList<>();
+ for (String key : keys) {
+ JSONObject field = (JSONObject) mappingProps.get(key);
+ // Complex types are not currently supported.
+ if (field.containsKey("type")) {
+ Type type = toDorisType(field.get("type").toString());
+ if (!type.isInvalid()) {
+ Column column = new Column();
+ column.setName(key);
+ column.setType(type);
+ column.setIsKey(true);
+ column.setIsAllowNull(true);
+ columns.add(column);
+ }
+ }
+ }
+ return columns;
+ }
+
+ private Type toDorisType(String esType) {
+ // reference
https://www.elastic.co/guide/en/elasticsearch/reference/8.3/sql-data-types.html
+ switch (esType) {
+ case "null":
+ return Type.NULL;
+ case "boolean":
+ return Type.BOOLEAN;
+ case "byte":
+ return Type.TINYINT;
+ case "short":
+ return Type.SMALLINT;
+ case "integer":
+ return Type.INT;
+ case "long":
+ case "unsigned_long":
+ return Type.BIGINT;
+ case "float":
+ case "half_float":
+ return Type.FLOAT;
+ case "double":
+ case "scaled_float":
+ return Type.DOUBLE;
+ case "keyword":
+ case "text":
+ case "ip":
+ case "nested":
+ case "object":
+ return Type.STRING;
+ case "date":
+ return Type.DATE;
+ default:
+ return Type.INVALID;
+ }
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
index b31c9b5b9c..124c453261 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
@@ -433,6 +433,10 @@ public class Table extends MetaObject implements Writable,
TableIf {
this.comment = Strings.nullToEmpty(comment);
}
+ public void setId(long id) {
+ this.id = id;
+ }
+
public CreateTableStmt toCreateTableStmt(String dbName) {
throw new NotImplementedException();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
index 1a76da5a76..8c1fa032bd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
@@ -2108,13 +2108,21 @@ public class InternalDataSource implements
DataSourceIf<Database> {
private Table createEsTable(Database db, CreateTableStmt stmt) throws
DdlException {
String tableName = stmt.getTableName();
+ // validate props to get column from es.
+ EsTable esTable = new EsTable(tableName, stmt.getProperties());
+
// create columns
List<Column> baseSchema = stmt.getColumns();
+
+ if (baseSchema.isEmpty()) {
+ baseSchema = esTable.genColumnsFromEs();
+ }
validateColumns(baseSchema);
+ esTable.setNewFullSchema(baseSchema);
// create partition info
PartitionDesc partitionDesc = stmt.getPartitionDesc();
- PartitionInfo partitionInfo = null;
+ PartitionInfo partitionInfo;
Map<String, Long> partitionNameToId = Maps.newHashMap();
if (partitionDesc != null) {
partitionInfo = partitionDesc.toPartitionInfo(baseSchema,
partitionNameToId, false);
@@ -2124,11 +2132,12 @@ public class InternalDataSource implements
DataSourceIf<Database> {
partitionNameToId.put(tableName, partitionId);
partitionInfo = new SinglePartitionInfo();
}
+ esTable.setPartitionInfo(partitionInfo);
long tableId = Catalog.getCurrentCatalog().getNextId();
- EsTable esTable = new EsTable(tableId, tableName, baseSchema,
stmt.getProperties(), partitionInfo);
+ esTable.setId(tableId);
esTable.setComment(stmt.getComment());
-
+ esTable.syncTableMetaData();
if (!db.createTableWithLock(esTable, false,
stmt.isSetIfNotExists()).first) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR,
tableName);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java
b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java
index 95f54dceb7..4e5a79376d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java
@@ -73,7 +73,7 @@ public class EsRepository extends MasterDaemon {
protected void runAfterCatalogReady() {
for (EsTable esTable : esTables.values()) {
try {
- esTable.syncTableMetaData(esClients.get(esTable.getId()));
+ esTable.syncTableMetaData();
} catch (Throwable e) {
LOG.warn("Exception happens when fetch index [{}] meta data
from remote es cluster",
esTable.getName(), e);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java
index 4db3e96b0a..f6800d1c3c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java
@@ -20,15 +20,26 @@ package org.apache.doris.external.elasticsearch;
import org.apache.doris.analysis.DistributionDesc;
import org.apache.doris.analysis.PartitionDesc;
import org.apache.doris.analysis.RangePartitionDesc;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.EsTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
+import org.apache.commons.lang3.StringUtils;
import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+import java.util.Iterator;
import java.util.Map;
+/**
+ * Util for ES, some static method.
+ **/
public class EsUtil {
+ /**
+ * Analyze partition and distributionDesc.
+ **/
public static void analyzePartitionAndDistributionDesc(PartitionDesc
partitionDesc,
DistributionDesc distributionDesc) throws AnalysisException {
if (partitionDesc == null && distributionDesc == null) {
@@ -49,25 +60,19 @@ public class EsUtil {
}
}
- private static void analyzePartitionDesc(RangePartitionDesc partDesc)
- throws AnalysisException {
+ private static void analyzePartitionDesc(RangePartitionDesc partDesc)
throws AnalysisException {
if (partDesc.getPartitionColNames() == null ||
partDesc.getPartitionColNames().isEmpty()) {
throw new AnalysisException("No partition columns.");
}
if (partDesc.getPartitionColNames().size() > 1) {
- throw new AnalysisException(
- "Elasticsearch table's partition column could only be a
single column");
+ throw new AnalysisException("Elasticsearch table's partition
column could only be a single column");
}
}
/**
- * get the json object from specified jsonObject
- *
- * @param jsonObject
- * @param key
- * @return
+ * Get the json object from specified jsonObject
*/
public static JSONObject getJsonObject(JSONObject jsonObject, String key,
int fromIndex) {
int firstOccr = key.indexOf('.', fromIndex);
@@ -87,6 +92,9 @@ public class EsUtil {
}
}
+ /**
+ * Get boolean throw DdlException when parse error
+ **/
public static boolean getBoolean(Map<String, String> properties, String
name) throws DdlException {
String property = properties.get(name).trim();
try {
@@ -96,4 +104,110 @@ public class EsUtil {
+ "value should be double quotation marks", name, name,
property, name));
}
}
+
+ /**
+ * Get mapping properties JSONObject.
+ **/
+ public static JSONObject getMappingProps(String sourceIndex, String
indexMapping, String mappingType) {
+ JSONObject jsonObject = (JSONObject) JSONValue.parse(indexMapping);
+ // the indexName use alias takes the first mapping
+ Iterator<String> keys = jsonObject.keySet().iterator();
+ String docKey = keys.next();
+ JSONObject docData = (JSONObject) jsonObject.get(docKey);
+ JSONObject mappings = (JSONObject) docData.get("mappings");
+ JSONObject rootSchema = (JSONObject) mappings.get(mappingType);
+ JSONObject properties;
+ // Elasticsearch 7.x, type was removed from ES mapping, default type
is `_doc`
+ //
https://www.elastic.co/guide/en/elasticsearch/reference/7.0/removal-of-types.html
+ // Elasticsearch 8.x, include_type_name parameter is removed
+ if (rootSchema == null) {
+ properties = (JSONObject) mappings.get("properties");
+ } else {
+ properties = (JSONObject) rootSchema.get("properties");
+ }
+ if (properties == null) {
+ throw new DorisEsException(
+ "index[" + sourceIndex + "] type[" + mappingType + "]
mapping not found for the ES Cluster");
+ }
+ return properties;
+ }
+
+ /**
+ * Parse the required field information from the json.
+ *
+ * @param searchContext the current associated column searchContext
+ * @param indexMapping the return value of _mapping
+ */
+ public static void resolveFields(SearchContext searchContext, String
indexMapping) throws DorisEsException {
+ JSONObject properties = getMappingProps(searchContext.sourceIndex(),
indexMapping, searchContext.type());
+ for (Column col : searchContext.columns()) {
+ String colName = col.getName();
+ // if column exists in Doris Table but no found in ES's mapping,
we choose to ignore this situation?
+ if (!properties.containsKey(colName)) {
+ throw new DorisEsException(
+ "index[" + searchContext.sourceIndex() + "] type[" +
indexMapping + "] mapping not found column"
+ + colName + " for the ES Cluster");
+ }
+ JSONObject fieldObject = (JSONObject) properties.get(colName);
+ resolveKeywordFields(searchContext, fieldObject, colName);
+ resolveDocValuesFields(searchContext, fieldObject, colName);
+ }
+ }
+
+ // get a field of keyword type in the fields
+ private static void resolveKeywordFields(SearchContext searchContext,
JSONObject fieldObject, String colName) {
+ String fieldType = (String) fieldObject.get("type");
+ // string-type field used keyword type to generate predicate
+ // if text field type seen, we should use the `field` keyword type?
+ if ("text".equals(fieldType)) {
+ JSONObject fieldsObject = (JSONObject) fieldObject.get("fields");
+ if (fieldsObject != null) {
+ for (Object key : fieldsObject.keySet()) {
+ JSONObject innerTypeObject = (JSONObject)
fieldsObject.get((String) key);
+ // just for text type
+ if ("keyword".equals((String)
innerTypeObject.get("type"))) {
+ searchContext.fetchFieldsContext().put(colName,
colName + "." + key);
+ }
+ }
+ }
+ }
+ }
+
+ private static void resolveDocValuesFields(SearchContext searchContext,
JSONObject fieldObject, String colName) {
+ String fieldType = (String) fieldObject.get("type");
+ String docValueField = null;
+ if (EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains(fieldType)) {
+ JSONObject fieldsObject = (JSONObject) fieldObject.get("fields");
+ if (fieldsObject != null) {
+ for (Object key : fieldsObject.keySet()) {
+ JSONObject innerTypeObject = (JSONObject)
fieldsObject.get((String) key);
+ if
(EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains((String)
innerTypeObject.get("type"))) {
+ continue;
+ }
+ if (innerTypeObject.containsKey("doc_values")) {
+ boolean docValue = (Boolean)
innerTypeObject.get("doc_values");
+ if (docValue) {
+ docValueField = colName;
+ }
+ } else {
+ // a : {c : {}} -> a -> a.c
+ docValueField = colName + "." + key;
+ }
+ }
+ }
+ } else {
+ // set doc_value = false manually
+ if (fieldObject.containsKey("doc_values")) {
+ Boolean docValue = (Boolean) fieldObject.get("doc_values");
+ if (!docValue) {
+ return;
+ }
+ }
+ docValueField = colName;
+ }
+ // docValueField Cannot be null
+ if (StringUtils.isNotEmpty(docValueField)) {
+ searchContext.docValueFieldsContext().put(colName, docValueField);
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/MappingPhase.java
b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/MappingPhase.java
index 12fa4e37b4..560cebf1ff 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/MappingPhase.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/MappingPhase.java
@@ -17,15 +17,6 @@
package org.apache.doris.external.elasticsearch;
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.EsTable;
-
-import org.apache.commons.lang3.StringUtils;
-import org.json.simple.JSONObject;
-import org.json.simple.JSONValue;
-
-import java.util.Iterator;
-
/**
* Get index mapping from remote ES Cluster, and resolved `keyword` and
`doc_values` field
* Later we can use it to parse all relevant indexes
@@ -48,105 +39,7 @@ public class MappingPhase implements SearchPhase {
@Override
public void postProcess(SearchContext context) {
- resolveFields(context, jsonMapping);
+ EsUtil.resolveFields(context, jsonMapping);
}
-
- /**
- * Parse the required field information from the json
- *
- * @param searchContext the current associated column searchContext
- * @param indexMapping the return value of _mapping
- * @return fetchFieldsContext and docValueFieldsContext
- * @throws Exception
- */
- public void resolveFields(SearchContext searchContext, String
indexMapping) throws DorisEsException {
- JSONObject jsonObject = (JSONObject) JSONValue.parse(indexMapping);
- // the indexName use alias takes the first mapping
- Iterator<String> keys = jsonObject.keySet().iterator();
- String docKey = keys.next();
- JSONObject docData = (JSONObject) jsonObject.get(docKey);
- JSONObject mappings = (JSONObject) docData.get("mappings");
- JSONObject rootSchema = (JSONObject)
mappings.get(searchContext.type());
- JSONObject properties;
- // Elasticsearch 7.x, type was removed from ES mapping, default type
is `_doc`
- //
https://www.elastic.co/guide/en/elasticsearch/reference/7.0/removal-of-types.html
- // Elasticsearch 8.x, include_type_name parameter is removed
- if (rootSchema == null) {
- properties = (JSONObject) mappings.get("properties");
- } else {
- properties = (JSONObject) rootSchema.get("properties");
- }
- if (properties == null) {
- throw new DorisEsException("index[" + searchContext.sourceIndex()
+ "] type[" + searchContext.type()
- + "] mapping not found for the ES Cluster");
- }
- for (Column col : searchContext.columns()) {
- String colName = col.getName();
- // if column exists in Doris Table but no found in ES's mapping,
we choose to ignore this situation?
- if (!properties.containsKey(colName)) {
- continue;
- }
- JSONObject fieldObject = (JSONObject) properties.get(colName);
- resolveKeywordFields(searchContext, fieldObject, colName);
- resolveDocValuesFields(searchContext, fieldObject, colName);
- }
- }
-
- // get a field of keyword type in the fields
- private void resolveKeywordFields(SearchContext searchContext, JSONObject
fieldObject, String colName) {
- String fieldType = (String) fieldObject.get("type");
- // string-type field used keyword type to generate predicate
- // if text field type seen, we should use the `field` keyword type?
- if ("text".equals(fieldType)) {
- JSONObject fieldsObject = (JSONObject) fieldObject.get("fields");
- if (fieldsObject != null) {
- for (Object key : fieldsObject.keySet()) {
- JSONObject innerTypeObject = (JSONObject)
fieldsObject.get((String) key);
- // just for text type
- if ("keyword".equals((String)
innerTypeObject.get("type"))) {
- searchContext.fetchFieldsContext().put(colName,
colName + "." + key);
- }
- }
- }
- }
- }
-
- private void resolveDocValuesFields(SearchContext searchContext,
JSONObject fieldObject, String colName) {
- String fieldType = (String) fieldObject.get("type");
- String docValueField = null;
- if (EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains(fieldType)) {
- JSONObject fieldsObject = (JSONObject) fieldObject.get("fields");
- if (fieldsObject != null) {
- for (Object key : fieldsObject.keySet()) {
- JSONObject innerTypeObject = (JSONObject)
fieldsObject.get((String) key);
- if
(EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains((String)
innerTypeObject.get("type"))) {
- continue;
- }
- if (innerTypeObject.containsKey("doc_values")) {
- boolean docValue = (Boolean)
innerTypeObject.get("doc_values");
- if (docValue) {
- docValueField = colName;
- }
- } else {
- // a : {c : {}} -> a -> a.c
- docValueField = colName + "." + key;
- }
- }
- }
- } else {
- // set doc_value = false manually
- if (fieldObject.containsKey("doc_values")) {
- Boolean docValue = (Boolean) fieldObject.get("doc_values");
- if (!docValue) {
- return;
- }
- }
- docValueField = colName;
- }
- // docValueField Cannot be null
- if (StringUtils.isNotEmpty(docValueField)) {
- searchContext.docValueFieldsContext().put(colName, docValueField);
- }
- }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/EsTestCase.java
b/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/EsTestCase.java
index 8be08db701..fe9033e0ff 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/EsTestCase.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/EsTestCase.java
@@ -41,13 +41,18 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
+/**
+ * Test case for es.
+ **/
public class EsTestCase {
protected static FakeEditLog fakeEditLog;
protected static FakeCatalog fakeCatalog;
protected static Catalog masterCatalog;
- protected static String mappingsStr = "";
+ /**
+ * Init
+ **/
@BeforeClass
public static void init() throws Exception {
fakeEditLog = new FakeEditLog();
@@ -60,7 +65,7 @@ public class EsTestCase {
}
protected String loadJsonFromFile(String fileName) throws IOException,
URISyntaxException {
- File file = new
File(MappingPhaseTest.class.getClassLoader().getResource(fileName).toURI());
+ File file = new
File(EsUtil.class.getClassLoader().getResource(fileName).toURI());
InputStream is = new FileInputStream(file);
BufferedReader br = new BufferedReader(new InputStreamReader(is));
StringBuilder jsonStr = new StringBuilder();
@@ -73,7 +78,7 @@ public class EsTestCase {
return jsonStr.toString();
}
- public EsTable fakeEsTable(String table, String index, String type,
List<Column> columns) throws DdlException {
+ protected EsTable fakeEsTable(String table, String index, String type,
List<Column> columns) throws DdlException {
Map<String, String> props = new HashMap<>();
props.put(EsTable.HOSTS, "127.0.0.1:8200");
props.put(EsTable.INDEX, index);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/EsUtilTest.java
b/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/EsUtilTest.java
index 28ed67507f..ca7608923b 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/EsUtilTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/EsUtilTest.java
@@ -17,12 +17,28 @@
package org.apache.doris.external.elasticsearch;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.EsTable;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.common.ExceptionChecker;
+
+import mockit.Expectations;
+import mockit.Injectable;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
-public class EsUtilTest {
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test for es util.
+ **/
+public class EsUtilTest extends EsTestCase {
+
+ private List<Column> columns = new ArrayList<>();
private String jsonStr = "{\"settings\": {\n"
+ " \"index\": {\n"
@@ -42,6 +58,80 @@ public class EsUtilTest {
+ " }\n"
+ " }}";
+ /**
+ * Init columns.
+ **/
+ @Before
+ public void setUp() {
+ Column k1 = new Column("k1", PrimitiveType.BIGINT);
+ Column k2 = new Column("k2", PrimitiveType.VARCHAR);
+ Column k3 = new Column("k3", PrimitiveType.VARCHAR);
+ columns.add(k1);
+ columns.add(k2);
+ columns.add(k3);
+ }
+
+ @Test
+ public void testExtractFieldsNormal() throws Exception {
+
+ // ES version < 7.0
+ EsTable esTableBefore7X = fakeEsTable("fake", "test", "doc", columns);
+ SearchContext searchContext = new SearchContext(esTableBefore7X);
+ EsUtil.resolveFields(searchContext,
loadJsonFromFile("data/es/test_index_mapping.json"));
+ Assert.assertEquals("k3.keyword",
searchContext.fetchFieldsContext().get("k3"));
+ Assert.assertEquals("k3.keyword",
searchContext.docValueFieldsContext().get("k3"));
+ Assert.assertEquals("k1",
searchContext.docValueFieldsContext().get("k1"));
+ Assert.assertEquals("k2",
searchContext.docValueFieldsContext().get("k2"));
+
+ // ES version >= 7.0
+ EsTable esTableAfter7X = fakeEsTable("fake", "test", "_doc", columns);
+ SearchContext searchContext1 = new SearchContext(esTableAfter7X);
+ EsUtil.resolveFields(searchContext1,
loadJsonFromFile("data/es/test_index_mapping_after_7x.json"));
+ Assert.assertEquals("k3.keyword",
searchContext1.fetchFieldsContext().get("k3"));
+ Assert.assertEquals("k3.keyword",
searchContext1.docValueFieldsContext().get("k3"));
+ Assert.assertEquals("k1",
searchContext1.docValueFieldsContext().get("k1"));
+ Assert.assertEquals("k2",
searchContext1.docValueFieldsContext().get("k2"));
+ }
+
+ @Test
+ public void testTypeNotExist() throws Exception {
+ EsTable table = fakeEsTable("fake", "test", "not_exists", columns);
+ SearchContext searchContext = new SearchContext(table);
+ // type not exists
+ ExceptionChecker.expectThrows(DorisEsException.class,
+ () -> EsUtil.resolveFields(searchContext,
loadJsonFromFile("data/es/test_index_mapping.json")));
+ }
+
+ @Test
+ public void testWorkFlow(@Injectable EsRestClient client) throws Exception
{
+ EsTable table = fakeEsTable("fake", "test", "doc", columns);
+ SearchContext searchContext1 = new SearchContext(table);
+ String jsonMapping =
loadJsonFromFile("data/es/test_index_mapping.json");
+ new Expectations(client) {
+ {
+ client.getMapping(anyString);
+ minTimes = 0;
+ result = jsonMapping;
+ }
+ };
+ MappingPhase mappingPhase = new MappingPhase(client);
+ ExceptionChecker.expectThrowsNoException(() ->
mappingPhase.execute(searchContext1));
+ ExceptionChecker.expectThrowsNoException(() ->
mappingPhase.postProcess(searchContext1));
+ Assert.assertEquals("k3.keyword",
searchContext1.fetchFieldsContext().get("k3"));
+ Assert.assertEquals("k3.keyword",
searchContext1.docValueFieldsContext().get("k3"));
+ Assert.assertEquals("k1",
searchContext1.docValueFieldsContext().get("k1"));
+ Assert.assertEquals("k2",
searchContext1.docValueFieldsContext().get("k2"));
+
+ }
+
+ @Test
+ public void testMultTextFields() throws Exception {
+ EsTable esTableAfter7X = fakeEsTable("fake", "test", "_doc", columns);
+ SearchContext searchContext = new SearchContext(esTableAfter7X);
+ EsUtil.resolveFields(searchContext,
loadJsonFromFile("data/es/test_index_mapping_field_mult_analyzer.json"));
+
Assert.assertFalse(searchContext.docValueFieldsContext().containsKey("k3"));
+ }
+
@Test
public void testGetJsonObject() {
JSONObject json = (JSONObject) JSONValue.parse(jsonStr);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/MappingPhaseTest.java
b/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/MappingPhaseTest.java
deleted file mode 100644
index 9391f0b7b5..0000000000
---
a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/MappingPhaseTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.external.elasticsearch;
-
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.EsTable;
-import org.apache.doris.catalog.PrimitiveType;
-import org.apache.doris.common.ExceptionChecker;
-
-import mockit.Expectations;
-import mockit.Injectable;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class MappingPhaseTest extends EsTestCase {
-
-
- List<Column> columns = new ArrayList<>();
-
- @Before
- public void setUp() {
- Column k1 = new Column("k1", PrimitiveType.BIGINT);
- Column k2 = new Column("k2", PrimitiveType.VARCHAR);
- Column k3 = new Column("k3", PrimitiveType.VARCHAR);
- columns.add(k1);
- columns.add(k2);
- columns.add(k3);
- }
-
- @Test
- public void testExtractFieldsNormal() throws Exception {
-
- MappingPhase mappingPhase = new MappingPhase(null);
- // ES version < 7.0
- EsTable esTableBefore7X = fakeEsTable("fake", "test", "doc", columns);
- SearchContext searchContext = new SearchContext(esTableBefore7X);
- mappingPhase.resolveFields(searchContext,
loadJsonFromFile("data/es/test_index_mapping.json"));
- Assert.assertEquals("k3.keyword",
searchContext.fetchFieldsContext().get("k3"));
- Assert.assertEquals("k3.keyword",
searchContext.docValueFieldsContext().get("k3"));
- Assert.assertEquals("k1",
searchContext.docValueFieldsContext().get("k1"));
- Assert.assertEquals("k2",
searchContext.docValueFieldsContext().get("k2"));
-
- // ES version >= 7.0
- EsTable esTableAfter7X = fakeEsTable("fake", "test", "_doc", columns);
- SearchContext searchContext1 = new SearchContext(esTableAfter7X);
- mappingPhase.resolveFields(searchContext1,
loadJsonFromFile("data/es/test_index_mapping_after_7x.json"));
- Assert.assertEquals("k3.keyword",
searchContext1.fetchFieldsContext().get("k3"));
- Assert.assertEquals("k3.keyword",
searchContext1.docValueFieldsContext().get("k3"));
- Assert.assertEquals("k1",
searchContext1.docValueFieldsContext().get("k1"));
- Assert.assertEquals("k2",
searchContext1.docValueFieldsContext().get("k2"));
- }
-
- @Test
- public void testTypeNotExist() throws Exception {
- MappingPhase mappingPhase = new MappingPhase(null);
- EsTable table = fakeEsTable("fake", "test", "not_exists", columns);
- SearchContext searchContext = new SearchContext(table);
- // type not exists
- ExceptionChecker.expectThrows(DorisEsException.class,
- () -> mappingPhase.resolveFields(searchContext,
loadJsonFromFile("data/es/test_index_mapping.json")));
- }
-
- @Test
- public void testWorkFlow(@Injectable EsRestClient client) throws Exception
{
- EsTable table = fakeEsTable("fake", "test", "doc", columns);
- SearchContext searchContext1 = new SearchContext(table);
- String jsonMapping =
loadJsonFromFile("data/es/test_index_mapping.json");
- new Expectations(client) {
- {
- client.getMapping(anyString);
- minTimes = 0;
- result = jsonMapping;
- }
- };
- MappingPhase mappingPhase = new MappingPhase(client);
- ExceptionChecker.expectThrowsNoException(() ->
mappingPhase.execute(searchContext1));
- ExceptionChecker.expectThrowsNoException(() ->
mappingPhase.postProcess(searchContext1));
- Assert.assertEquals("k3.keyword",
searchContext1.fetchFieldsContext().get("k3"));
- Assert.assertEquals("k3.keyword",
searchContext1.docValueFieldsContext().get("k3"));
- Assert.assertEquals("k1",
searchContext1.docValueFieldsContext().get("k1"));
- Assert.assertEquals("k2",
searchContext1.docValueFieldsContext().get("k2"));
-
- }
-
- @Test
- public void testMultTextFields() throws Exception {
- MappingPhase mappingPhase = new MappingPhase(null);
- EsTable esTableAfter7X = fakeEsTable("fake", "test", "_doc", columns);
- SearchContext searchContext = new SearchContext(esTableAfter7X);
- mappingPhase.resolveFields(searchContext,
loadJsonFromFile("data/es/test_index_mapping_field_mult_analyzer.json"));
-
Assert.assertFalse(searchContext.docValueFieldsContext().containsKey("k3"));
-
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]