This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit ef176deafa31ba55b89a962f0083ccd408fd65c2 Author: Stalary <stal...@163.com> AuthorDate: Sun Jun 26 09:51:29 2022 +0800 [fix](doe) fix doe on es v8 (#10391) doris on es8 can not work, because type change. The use of type is no longer recommended in es7, and support for type has been removed from es8. 1. /_mapping not support include_type_name 2. /_search not support use type --- be/src/exec/es/es_scan_reader.cpp | 32 +++++++++---- docs/en/extending-doris/doris-on-es.md | 14 +----- docs/zh-CN/extending-doris/doris-on-es.md | 14 +----- .../java/org/apache/doris/catalog/Catalog.java | 12 ++--- .../java/org/apache/doris/catalog/EsTable.java | 53 +++++++++++----------- .../external/elasticsearch/EsMajorVersion.java | 7 +-- .../doris/external/elasticsearch/EsRestClient.java | 5 +- .../doris/external/elasticsearch/MappingPhase.java | 20 ++------ .../java/org/apache/doris/planner/EsScanNode.java | 4 +- .../external/elasticsearch/MappingPhaseTest.java | 2 +- 10 files changed, 71 insertions(+), 92 deletions(-) diff --git a/be/src/exec/es/es_scan_reader.cpp b/be/src/exec/es/es_scan_reader.cpp index 87131a7647..5d67d995cf 100644 --- a/be/src/exec/es/es_scan_reader.cpp +++ b/be/src/exec/es/es_scan_reader.cpp @@ -47,7 +47,9 @@ ESScanReader::ESScanReader(const std::string& target, _doc_value_mode(doc_value_mode) { _target = target; _index = props.at(KEY_INDEX); - _type = props.at(KEY_TYPE); + if (props.find(KEY_TYPE) != props.end()) { + _type = props.at(KEY_TYPE); + } if (props.find(KEY_USER_NAME) != props.end()) { _user_name = props.at(KEY_USER_NAME); } @@ -73,20 +75,32 @@ ESScanReader::ESScanReader(const std::string& target, _exactly_once = true; std::stringstream scratch; // just send a normal search against the elasticsearch with additional terminate_after param to achieve terminate early effect when limit take effect - scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type - << "/_search?" - << "terminate_after=" << props.at(KEY_TERMINATE_AFTER) << REQUEST_PREFERENCE_PREFIX - << _shards << "&" << filter_path; + if (_type.empty()) { + scratch << _target << REQUEST_SEPARATOR << _index << "/_search?" + << "terminate_after=" << props.at(KEY_TERMINATE_AFTER) + << REQUEST_PREFERENCE_PREFIX << _shards << "&" << filter_path; + } else { + scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type + << "/_search?" + << "terminate_after=" << props.at(KEY_TERMINATE_AFTER) + << REQUEST_PREFERENCE_PREFIX << _shards << "&" << filter_path; + } _search_url = scratch.str(); } else { _exactly_once = false; std::stringstream scratch; // scroll request for scanning // add terminate_after for the first scroll to avoid decompress all postings list - scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type - << "/_search?" - << "scroll=" << _scroll_keep_alive << REQUEST_PREFERENCE_PREFIX << _shards << "&" - << filter_path << "&terminate_after=" << batch_size_str; + if (_type.empty()) { + scratch << _target << REQUEST_SEPARATOR << _index << "/_search?" + << "scroll=" << _scroll_keep_alive << REQUEST_PREFERENCE_PREFIX << _shards + << "&" << filter_path << "&terminate_after=" << batch_size_str; + } else { + scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type + << "/_search?" + << "scroll=" << _scroll_keep_alive << REQUEST_PREFERENCE_PREFIX << _shards + << "&" << filter_path << "&terminate_after=" << batch_size_str; + } _init_scroll_url = scratch.str(); _next_scroll_url = _target + REQUEST_SEARCH_SCROLL_PATH + "?" + filter_path; } diff --git a/docs/en/extending-doris/doris-on-es.md b/docs/en/extending-doris/doris-on-es.md index a653986c35..8a3a104b4a 100644 --- a/docs/en/extending-doris/doris-on-es.md +++ b/docs/en/extending-doris/doris-on-es.md @@ -119,7 +119,6 @@ PROPERTIES ( "hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200", "index" = "test", "type" = "doc", - "user" = "root", "password" = "root" ); @@ -131,7 +130,7 @@ Parameter | Description ---|--- **hosts** | ES Cluster Connection Address, maybe one or more node, load-balance is also accepted **index** | the related ES index name, alias is supported, and if you use doc_value, you need to use the real name -**type** | the type for this index, If not specified, `_doc` will be used +**type** | the type for this index, ES 7.x and later versions do not pass this parameter **user** | username for ES **password** | password for the user @@ -188,10 +187,8 @@ CREATE EXTERNAL TABLE `test` ( PROPERTIES ( "hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200", "index" = "test", -"type" = "doc", "user" = "root", "password" = "root", - "enable_docvalue_scan" = "true" ); ``` @@ -229,10 +226,8 @@ CREATE EXTERNAL TABLE `test` ( PROPERTIES ( "hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200", "index" = "test", -"type" = "doc", "user" = "root", "password" = "root", - "enable_keyword_sniff" = "true" ); ``` @@ -341,10 +336,8 @@ CREATE EXTERNAL TABLE `test` ( PROPERTIES ( "hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200", "index" = "test", -"type" = "doc", "user" = "root", "password" = "root", - "nodes_discovery" = "true" ); ``` @@ -370,10 +363,8 @@ CREATE EXTERNAL TABLE `test` ( PROPERTIES ( "hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200", "index" = "test", -"type" = "doc", "user" = "root", "password" = "root", - "http_ssl_enabled" = "true" ); ``` @@ -561,8 +552,7 @@ PROPERTIES ( "hosts" = "http://127.0.0.1:8200", "user" = "root", "password" = "root", -"index" = "doe", -"type" = "doc" +"index" = "doe" } ``` `Notice`: diff --git a/docs/zh-CN/extending-doris/doris-on-es.md b/docs/zh-CN/extending-doris/doris-on-es.md index 7840b6be27..1ca84bf4c5 100644 --- a/docs/zh-CN/extending-doris/doris-on-es.md +++ b/docs/zh-CN/extending-doris/doris-on-es.md @@ -117,7 +117,6 @@ PROPERTIES ( "hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200", "index" = "test", "type" = "doc", - "user" = "root", "password" = "root" ); @@ -129,7 +128,7 @@ PROPERTIES ( ---|--- **hosts** | ES集群地址,可以是一个或多个,也可以是ES前端的负载均衡地址 **index** | 对应的ES的index名字,支持alias,如果使用doc_value,需要使用真实的名称 -**type** | index的type,不指定的情况会使用_doc +**type** | index的type,ES 7.x及以后的版本不传此参数 **user** | ES集群用户名 **password** | 对应用户的密码信息 @@ -185,10 +184,8 @@ CREATE EXTERNAL TABLE `test` ( PROPERTIES ( "hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200", "index" = "test", -"type" = "doc", "user" = "root", "password" = "root", - "enable_docvalue_scan" = "true" ); ``` @@ -226,10 +223,8 @@ CREATE EXTERNAL TABLE `test` ( PROPERTIES ( "hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200", "index" = "test", -"type" = "doc", "user" = "root", "password" = "root", - "enable_keyword_sniff" = "true" ); ``` @@ -338,10 +333,8 @@ CREATE EXTERNAL TABLE `test` ( PROPERTIES ( "hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200", "index" = "test", -"type" = "doc", "user" = "root", "password" = "root", - "nodes_discovery" = "true" ); ``` @@ -367,10 +360,8 @@ CREATE EXTERNAL TABLE `test` ( PROPERTIES ( "hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200", "index" = "test", -"type" = "doc", "user" = "root", "password" = "root", - "http_ssl_enabled" = "true" ); ``` @@ -559,8 +550,7 @@ PROPERTIES ( "hosts" = "http://127.0.0.1:8200", "user" = "root", "password" = "root", -"index" = "doe", -"type" = "doc" +"index" = "doe" } ``` diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 22e532f78f..69832b139a 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -4362,12 +4362,8 @@ public class Catalog { if (partitionInfo.getType() == PartitionType.RANGE) { sb.append("\n"); sb.append("PARTITION BY RANGE("); - idx = 0; RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) partitionInfo; for (Column column : rangePartitionInfo.getPartitionColumns()) { - if (idx != 0) { - sb.append(", "); - } sb.append("`").append(column.getName()).append("`"); } sb.append(")\n()"); @@ -4379,12 +4375,14 @@ public class Catalog { sb.append("\"user\" = \"").append(esTable.getUserName()).append("\",\n"); sb.append("\"password\" = \"").append(hidePassword ? "" : esTable.getPasswd()).append("\",\n"); sb.append("\"index\" = \"").append(esTable.getIndexName()).append("\",\n"); - sb.append("\"type\" = \"").append(esTable.getMappingType()).append("\",\n"); + if (esTable.getMappingType() != null) { + sb.append("\"type\" = \"").append(esTable.getMappingType()).append("\",\n"); + } sb.append("\"transport\" = \"").append(esTable.getTransport()).append("\",\n"); sb.append("\"enable_docvalue_scan\" = \"").append(esTable.isDocValueScanEnable()).append("\",\n"); sb.append("\"max_docvalue_fields\" = \"").append(esTable.maxDocValueFields()).append("\",\n"); - sb.append("\"enable_keyword_sniff\" = \"").append(esTable.isKeywordSniffEnable()).append("\"\n"); - sb.append("\"nodes_discovery\" = \"").append(esTable.isNodesDiscovery()).append("\"\n"); + sb.append("\"enable_keyword_sniff\" = \"").append(esTable.isKeywordSniffEnable()).append("\",\n"); + sb.append("\"nodes_discovery\" = \"").append(esTable.isNodesDiscovery()).append("\",\n"); sb.append("\"http_ssl_enabled\" = \"").append(esTable.isHttpSslEnabled()).append("\"\n"); sb.append(")"); } else if (table.getType() == TableType.HIVE) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java index df19a86d96..cc5d902080 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java @@ -73,8 +73,8 @@ public class EsTable extends Table { // index name can be specific index、wildcard matched or alias. private String indexName; - // which type used for `indexName`, default to `_doc` - private String mappingType = "_doc"; + // which type used for `indexName` + private String mappingType = null; private String transport = "http"; // only save the partition definition, save the partition key, // partition list is got from es cluster dynamically and is saved in esTableState @@ -116,8 +116,8 @@ public class EsTable extends Table { super(TableType.ELASTICSEARCH); } - public EsTable(long id, String name, List<Column> schema, - Map<String, String> properties, PartitionInfo partitionInfo) throws DdlException { + public EsTable(long id, String name, List<Column> schema, Map<String, String> properties, + PartitionInfo partitionInfo) throws DdlException { super(id, name, TableType.ELASTICSEARCH, schema); this.partitionInfo = partitionInfo; validate(properties); @@ -154,32 +154,29 @@ public class EsTable extends Table { private void validate(Map<String, String> properties) throws DdlException { if (properties == null) { - throw new DdlException("Please set properties of elasticsearch table, " - + "they are: hosts, user, password, index"); + throw new DdlException( + "Please set properties of elasticsearch table, " + "they are: hosts, user, password, index"); } - if (Strings.isNullOrEmpty(properties.get(HOSTS)) - || Strings.isNullOrEmpty(properties.get(HOSTS).trim())) { + if (Strings.isNullOrEmpty(properties.get(HOSTS)) || Strings.isNullOrEmpty(properties.get(HOSTS).trim())) { throw new DdlException("Hosts of ES table is null. " + "Please add properties('hosts'='xxx.xxx.xxx.xxx,xxx.xxx.xxx.xxx') when create table"); } hosts = properties.get(HOSTS).trim(); seeds = hosts.split(","); - if (!Strings.isNullOrEmpty(properties.get(USER)) - && !Strings.isNullOrEmpty(properties.get(USER).trim())) { + if (!Strings.isNullOrEmpty(properties.get(USER)) && !Strings.isNullOrEmpty(properties.get(USER).trim())) { userName = properties.get(USER).trim(); } - if (!Strings.isNullOrEmpty(properties.get(PASSWORD)) - && !Strings.isNullOrEmpty(properties.get(PASSWORD).trim())) { + if (!Strings.isNullOrEmpty(properties.get(PASSWORD)) && !Strings.isNullOrEmpty( + properties.get(PASSWORD).trim())) { passwd = properties.get(PASSWORD).trim(); } - if (Strings.isNullOrEmpty(properties.get(INDEX)) - || Strings.isNullOrEmpty(properties.get(INDEX).trim())) { - throw new DdlException("Index of ES table is null. " - + "Please add properties('index'='xxxx') when create table"); + if (Strings.isNullOrEmpty(properties.get(INDEX)) || Strings.isNullOrEmpty(properties.get(INDEX).trim())) { + throw new DdlException( + "Index of ES table is null. " + "Please add properties('index'='xxxx') when create table"); } indexName = properties.get(INDEX).trim(); @@ -191,8 +188,8 @@ public class EsTable extends Table { throw new DdlException("Unsupported/Unknown ES Cluster version [" + properties.get(VERSION) + "] "); } } catch (Exception e) { - throw new DdlException("fail to parse ES major version, version= " - + properties.get(VERSION).trim() + ", should be like '6.5.3' "); + throw new DdlException("fail to parse ES major version, version= " + properties.get(VERSION).trim() + + ", should be like '6.5.3' "); } } @@ -222,13 +219,12 @@ public class EsTable extends Table { } } - if (!Strings.isNullOrEmpty(properties.get(TYPE)) - && !Strings.isNullOrEmpty(properties.get(TYPE).trim())) { + if (!Strings.isNullOrEmpty(properties.get(TYPE)) && !Strings.isNullOrEmpty(properties.get(TYPE).trim())) { mappingType = properties.get(TYPE).trim(); } - if (!Strings.isNullOrEmpty(properties.get(TRANSPORT)) - && !Strings.isNullOrEmpty(properties.get(TRANSPORT).trim())) { + if (!Strings.isNullOrEmpty(properties.get(TRANSPORT)) && !Strings.isNullOrEmpty( + properties.get(TRANSPORT).trim())) { transport = properties.get(TRANSPORT).trim(); if (!(TRANSPORT_HTTP.equals(transport) || TRANSPORT_THRIFT.equals(transport))) { throw new DdlException("transport of ES table must be http/https(recommend) or thrift(reserved inner usage)," @@ -250,7 +246,9 @@ public class EsTable extends Table { tableContext.put("userName", userName); tableContext.put("passwd", passwd); tableContext.put("indexName", indexName); - tableContext.put("mappingType", mappingType); + if (mappingType != null) { + tableContext.put("mappingType", mappingType); + } tableContext.put("transport", transport); if (majorVersion != null) { tableContext.put("majorVersion", majorVersion.toString()); @@ -264,8 +262,8 @@ public class EsTable extends Table { public TTableDescriptor toThrift() { TEsTable tEsTable = new TEsTable(); - TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.ES_TABLE, - fullSchema.size(), 0, getName(), ""); + TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.ES_TABLE, fullSchema.size(), 0, + getName(), ""); tTableDescriptor.setEsTable(tEsTable); return tTableDescriptor; } @@ -280,7 +278,9 @@ public class EsTable extends Table { sb.append(userName); sb.append(passwd); sb.append(indexName); - sb.append(mappingType); + if (mappingType != null) { + sb.append(mappingType); + } sb.append(transport); } else { for (Map.Entry<String, String> entry : tableContext.entrySet()) { @@ -305,6 +305,7 @@ public class EsTable extends Table { partitionInfo.write(out); } + @Override public void readFields(DataInput in) throws IOException { super.readFields(in); int size = in.readInt(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsMajorVersion.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsMajorVersion.java index d16fbd81d2..41801687f9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsMajorVersion.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsMajorVersion.java @@ -32,7 +32,7 @@ public class EsMajorVersion { public static final EsMajorVersion V_6_X = new EsMajorVersion((byte) 6, "6.x"); public static final EsMajorVersion V_7_X = new EsMajorVersion((byte) 7, "7.x"); public static final EsMajorVersion V_8_X = new EsMajorVersion((byte) 8, "8.x"); - public static final EsMajorVersion LATEST = V_7_X; + public static final EsMajorVersion LATEST = V_8_X; public final byte major; private final String version; @@ -89,8 +89,9 @@ public class EsMajorVersion { if (version.startsWith("8.")) { return new EsMajorVersion((byte) 8, version); } - throw new DorisEsException("Unsupported/Unknown ES Cluster version [" + version + "]." + - "Highest supported version is [" + LATEST.version + "]."); + throw new DorisEsException( + "Unsupported/Unknown ES Cluster version [" + version + "]." + "Highest supported version is [" + + LATEST.version + "]."); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java index 467047ac8c..5a00719154 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java @@ -127,11 +127,8 @@ public class EsRestClient { * @return * @throws Exception */ - public String getMapping(String indexName, boolean includeTypeName) throws DorisEsException { + public String getMapping(String indexName) throws DorisEsException { String path = indexName + "/_mapping"; - if (includeTypeName) { - path += "?include_type_name=true"; - } String indexMapping = execute(path); if (indexMapping == null) { throw new DorisEsException("index[" + indexName + "] not found"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/MappingPhase.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/MappingPhase.java index f736a9ee2f..4057348e66 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/MappingPhase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/MappingPhase.java @@ -37,22 +37,13 @@ public class MappingPhase implements SearchPhase { // json response for `{index}/_mapping` API private String jsonMapping; - private boolean includeTypeName = false; - public MappingPhase(EsRestClient client) { this.client = client; } - @Override - public void preProcess(SearchContext context) { - if (context.version() != null && context.version().onOrAfter(EsMajorVersion.V_7_X)) { - includeTypeName = true; - } - } - @Override public void execute(SearchContext context) throws DorisEsException { - jsonMapping = client.getMapping(context.sourceIndex(), includeTypeName); + jsonMapping = client.getMapping(context.sourceIndex()); } @Override @@ -78,15 +69,10 @@ public class MappingPhase implements SearchPhase { JSONObject mappings = (JSONObject) docData.get("mappings"); JSONObject rootSchema = (JSONObject) mappings.get(searchContext.type()); JSONObject properties; - // After (include) 7.x, type was removed from ES mapping, default type is `_doc` + // Elasticsearch 7.x, type was removed from ES mapping, default type is `_doc` // https://www.elastic.co/guide/en/elasticsearch/reference/7.0/removal-of-types.html + // Elasticsearch 8.x, include_type_name parameter is removed if (rootSchema == null) { - if (searchContext.type().equals("_doc") == false) { - throw new DorisEsException("index[" + searchContext.sourceIndex() + "]'s type must be exists, " - + " and after ES7.x type must be `_doc`, but found [" - + searchContext.type() + "], for table [" - + searchContext.esTable().getName() + "]"); - } properties = (JSONObject) mappings.get("properties"); } else { properties = (JSONObject) rootSchema.get("properties"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java index 2ed1a4bde8..42f3934a9a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java @@ -247,7 +247,9 @@ public class EsScanNode extends ScanNode { TEsScanRange esScanRange = new TEsScanRange(); esScanRange.setEsHosts(shardAllocations); esScanRange.setIndex(shardRouting.get(0).getIndexName()); - esScanRange.setType(table.getMappingType()); + if (table.getType() != null) { + esScanRange.setType(table.getMappingType()); + } esScanRange.setShardId(shardRouting.get(0).getShardId()); // Scan range TScanRange scanRange = new TScanRange(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/MappingPhaseTest.java b/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/MappingPhaseTest.java index d229d7c924..18330eb55a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/MappingPhaseTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/MappingPhaseTest.java @@ -89,7 +89,7 @@ public class MappingPhaseTest extends EsTestCase { String jsonMapping = loadJsonFromFile("data/es/test_index_mapping.json"); new Expectations(client) { { - client.getMapping(anyString, anyBoolean); + client.getMapping(anyString); minTimes = 0; result = jsonMapping; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org