This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 863a652011e [fix](ES Catalog)Make sure ES meta is synced before using
(#46781)
863a652011e is described below
commit 863a652011eaf81c92cddc8f5e76a7103468cf90
Author: qiye <[email protected]>
AuthorDate: Mon Feb 10 17:48:28 2025 +0800
[fix](ES Catalog)Make sure ES meta is synced before using (#46781)
### What problem does this PR solve?
Issue Number: close #46780
Problem Summary:
1. Added calls to `EsResource.fillUrlsWithSchema` and
`initEsMetaTracker` to ensure ES table URLs are correctly parsed and
metadata tracker is initialized before use
2. Introduced `executeWithRetry` function to handle retries for ES
queries that may fail due to unsynchronized metadata.
3. Remove useless ES tests and config.
---
.../java/org/apache/doris/catalog/EsTable.java | 18 +++++--
regression-test/conf/regression-conf.groovy | 7 ---
.../external_table_p0/es/test_es_query.groovy | 30 +++++++++++-
.../es/test_es_query_no_http_url.groovy | 31 ++++++++++--
.../es/test_external_catalog_es.groovy | 52 --------------------
.../external_table_p2/es/test_external_es.groovy | 56 ----------------------
6 files changed, 72 insertions(+), 122 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
index a3ed061ed48..a9ee65ef74b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
@@ -152,17 +152,26 @@ public class EsTable extends Table implements
GsonPostProcessable {
}
public Map<String, String> fieldsContext() throws UserException {
+ initEsMetaStateTracker();
return esMetaStateTracker.searchContext().fetchFieldsContext();
}
public Map<String, String> docValueContext() throws UserException {
+ initEsMetaStateTracker();
return esMetaStateTracker.searchContext().docValueFieldsContext();
}
public List<String> needCompatDateFields() throws UserException {
+ initEsMetaStateTracker();
return esMetaStateTracker.searchContext().needCompatDateFields();
}
+ private void initEsMetaStateTracker() {
+ if (esMetaStateTracker == null) {
+ esMetaStateTracker = new EsMetaStateTracker(client, this);
+ }
+ }
+
private void validate(Map<String, String> properties) throws DdlException {
EsResource.valid(properties, false);
if (properties.containsKey(EsResource.USER)) {
@@ -315,9 +324,12 @@ public class EsTable extends Table implements
GsonPostProcessable {
} else {
throw new IOException("invalid partition type: " + partType);
}
+ // parse httpSslEnabled before use it here.
+ EsResource.fillUrlsWithSchema(seeds, httpSslEnabled);
client = new EsRestClient(seeds, userName, passwd, httpSslEnabled);
}
+ @Override
public void gsonPostProcess() throws IOException {
hosts = tableContext.get("hosts");
seeds = hosts.split(",");
@@ -346,6 +358,8 @@ public class EsTable extends Table implements
GsonPostProcessable {
includeHiddenIndex =
Boolean.parseBoolean(tableContext.getOrDefault(EsResource.INCLUDE_HIDDEN_INDEX,
EsResource.INCLUDE_HIDDEN_INDEX_DEFAULT_VALUE));
+ // parse httpSslEnabled before use it here.
+ EsResource.fillUrlsWithSchema(seeds, httpSslEnabled);
client = new EsRestClient(seeds, userName, passwd, httpSslEnabled);
}
@@ -353,9 +367,7 @@ public class EsTable extends Table implements
GsonPostProcessable {
* Sync es index meta from remote ES Cluster.
*/
public void syncTableMetaData() {
- if (esMetaStateTracker == null) {
- esMetaStateTracker = new EsMetaStateTracker(client, this);
- }
+ initEsMetaStateTracker();
try {
esMetaStateTracker.run();
this.esTablePartitions =
esMetaStateTracker.searchContext().tablePartitions();
diff --git a/regression-test/conf/regression-conf.groovy
b/regression-test/conf/regression-conf.groovy
index fd4259fbdd9..c30d04674f5 100644
--- a/regression-test/conf/regression-conf.groovy
+++ b/regression-test/conf/regression-conf.groovy
@@ -199,13 +199,6 @@ extPgPort = 5432
extPgUser = "****"
extPgPassword = "***********"
-// elasticsearch external test config for bigdata
-enableExternalEsTest = false
-extEsHost = "***********"
-extEsPort = 9200
-extEsUser = "*******"
-extEsPassword = "***********"
-
// minio external test config
enableExternalMinioTest = false
extMinioHost = "***.**.**.**"
diff --git a/regression-test/suites/external_table_p0/es/test_es_query.groovy
b/regression-test/suites/external_table_p0/es/test_es_query.groovy
index 1645fa6af51..22789b9ebe1 100644
--- a/regression-test/suites/external_table_p0/es/test_es_query.groovy
+++ b/regression-test/suites/external_table_p0/es/test_es_query.groovy
@@ -28,6 +28,8 @@ suite("test_es_query",
"p0,external,es,external_docker,external_docker_es") {
sql """drop catalog if exists test_es_query_es6;"""
sql """drop catalog if exists test_es_query_es7;"""
sql """drop catalog if exists test_es_query_es8;"""
+ sql """drop catalog if exists es6_hide;"""
+ sql """drop catalog if exists es7_hide;"""
sql """drop table if exists test_v1;"""
sql """drop table if exists test_v2;"""
@@ -166,9 +168,35 @@ suite("test_es_query",
"p0,external,es,external_docker,external_docker_es") {
);
"""
+ def executeWithRetry = { query, queryName, maxRetries ->
+ def retryCount = 0
+ def success = false
+
+ while (!success && retryCount < maxRetries) {
+ try {
+ sql query
+ success = true
+ } catch (Exception e) {
+ if (e.getMessage().contains("EsTable metadata has not been
synced, Try it later")) {
+ logger.error("Failed to execute ${queryName}:
${e.getMessage()}")
+ logger.info("Retrying... Attempt ${retryCount + 1}")
+ retryCount++
+ sleep(1000) // Sleep for 1 second
+ } else {
+ throw e // Rethrow if it's a different exception
+ }
+ }
+ }
+
+ if (!success) {
+ throw new RuntimeException("Failed to execute ${queryName}
after ${maxRetries} attempts")
+ }
+ }
+
def query_catalogs = { ->
sql """switch internal"""
sql """use regression_test_external_table_p0_es"""
+ executeWithRetry("""select * from test_v1 where test2='text#1'""",
"sql01", 30)
order_qt_sql01 """select * from test_v1 where test2='text#1'"""
order_qt_sql02 """select * from test_v1 where esquery(test2,
'{"match":{"test2":"text#1"}}')"""
order_qt_sql03 """select test4,test5,test6,test7,test8 from
test_v1 order by test8"""
@@ -182,7 +210,7 @@ suite("test_es_query",
"p0,external,es,external_docker,external_docker_es") {
order_qt_sql11 """select test6 from test_v1;"""
order_qt_sql12 """select test9 from test_v1;"""
- order_qt_sql20 """select * from test_v2 where test2='text#1'"""
+ executeWithRetry("""select * from test_v2 where test2='text#1'""",
"sql20", 30)
order_qt_sql21 """select * from test_v2 where esquery(test2,
'{"match":{"test2":"text#1"}}')"""
order_qt_sql22 """select test4,test5,test6,test7,test8 from
test_v2 order by test8"""
order_qt_sql23 """select * from test_v2 where esquery(c_long,
'{"term":{"c_long":"-1"}}');"""
diff --git
a/regression-test/suites/external_table_p0/es/test_es_query_no_http_url.groovy
b/regression-test/suites/external_table_p0/es/test_es_query_no_http_url.groovy
index 004c1aea31e..f5219c1509e 100644
---
a/regression-test/suites/external_table_p0/es/test_es_query_no_http_url.groovy
+++
b/regression-test/suites/external_table_p0/es/test_es_query_no_http_url.groovy
@@ -23,6 +23,31 @@ suite("test_es_query_no_http_url",
"p0,external,es,external_docker,external_dock
String es_7_port = context.config.otherConfigs.get("es_7_port")
String es_8_port = context.config.otherConfigs.get("es_8_port")
+ def executeWithRetry = { query, queryName, maxRetries ->
+ def retryCount = 0
+ def success = false
+
+ while (!success && retryCount < maxRetries) {
+ try {
+ sql query
+ success = true
+ } catch (Exception e) {
+ if (e.getMessage().contains("EsTable metadata has not been
synced, Try it later")) {
+ logger.error("Failed to execute ${queryName}:
${e.getMessage()}")
+ logger.info("Retrying... Attempt ${retryCount + 1}")
+ retryCount++
+ sleep(1000) // Sleep for 1 second
+ } else {
+ throw e // Rethrow if it's a different exception
+ }
+ }
+ }
+
+ if (!success) {
+ throw new RuntimeException("Failed to execute ${queryName}
after ${maxRetries} attempts")
+ }
+ }
+
sql """drop catalog if exists es6_no_http_url;"""
sql """drop catalog if exists es7_no_http_url;"""
sql """drop catalog if exists es8_no_http_url;"""
@@ -95,9 +120,9 @@ suite("test_es_query_no_http_url",
"p0,external,es,external_docker,external_dock
"http_ssl_enabled"="false"
);
"""
- order_qt_sql51 """select * from test_v1_no_http_url where
test2='text#1'"""
+ executeWithRetry("""select * from test_v1_no_http_url where
test2='text#1'""", "sql51", 30)
- sql """
+ sql """
CREATE TABLE `test_v2_no_http_url` (
`c_datetime` array<datev2> NULL,
`c_long` array<bigint(20)> NULL,
@@ -133,7 +158,7 @@ suite("test_es_query_no_http_url",
"p0,external,es,external_docker,external_dock
"http_ssl_enabled"="false"
);
"""
- order_qt_sql52 """select * from test_v2_no_http_url where
test2='text#1'"""
+ executeWithRetry("""select * from test_v2_no_http_url where
test2='text#1'""", "sql52", 30)
// es6
sql """switch es6_no_http_url"""
diff --git
a/regression-test/suites/external_table_p2/es/test_external_catalog_es.groovy
b/regression-test/suites/external_table_p2/es/test_external_catalog_es.groovy
deleted file mode 100644
index 5412bc736c7..00000000000
---
a/regression-test/suites/external_table_p2/es/test_external_catalog_es.groovy
+++ /dev/null
@@ -1,52 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-//import org.postgresql.Driver
-suite("test_external_catalog_es",
"p2,external,es,external_remote,external_remote_es") {
- Boolean ignoreP2 = true;
- if (ignoreP2) {
- logger.info("disable p2 test");
- return;
- }
-
- String enabled = context.config.otherConfigs.get("enableExternalEsTest")
- if (enabled != null && enabled.equalsIgnoreCase("true")) {
- String extEsHost = context.config.otherConfigs.get("extEsHost")
- String extEsPort = context.config.otherConfigs.get("extEsPort")
- String extEsUser = context.config.otherConfigs.get("extEsUser")
- String extEsPassword = context.config.otherConfigs.get("extEsPassword")
- String esCatalogName ="es7_catalog_name"
-
- String jdbcPg14Table1 = "accounts"
-
- sql """drop catalog if exists ${esCatalogName}"""
-
- sql """
- CREATE CATALOG ${esCatalogName} PROPERTIES (
- "type"="es",
- "elasticsearch.hosts"="http://${extEsHost}:${extEsPort}",
- "elasticsearch.nodes_discovery"="false",
- "elasticsearch.username"="${extEsUser}",
- "elasticsearch.password"="${extEsPassword}"
- );
- """
-
- qt_sql "select * from ${esCatalogName}.default_db.${jdbcPg14Table1}
order by account_number limit 10;"
-
- sql """drop catalog if exists ${esCatalogName};"""
-
- }
-}
diff --git
a/regression-test/suites/external_table_p2/es/test_external_es.groovy
b/regression-test/suites/external_table_p2/es/test_external_es.groovy
deleted file mode 100644
index fcec9b7de3e..00000000000
--- a/regression-test/suites/external_table_p2/es/test_external_es.groovy
+++ /dev/null
@@ -1,56 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-//import org.postgresql.Driver
-suite("test_external_es", "p2,external,es,external_remote,external_remote_es")
{
-
- String enabled = context.config.otherConfigs.get("enableExternalEsTest")
- if (enabled != null && enabled.equalsIgnoreCase("true")) {
- String extEsHost = context.config.otherConfigs.get("extEsHost")
- String extEsPort = context.config.otherConfigs.get("extEsPort")
- String extEsUser = context.config.otherConfigs.get("extEsUser")
- String extEsPassword = context.config.otherConfigs.get("extEsPassword")
- String jdbcPg14Database1 = "jdbc_es_14_database1"
- String jdbcPg14Table1 = "jdbc_es_14_table1"
-
-
- sql """drop database if exists ${jdbcPg14Database1};"""
- sql """create database ${jdbcPg14Database1};"""
- sql """use ${jdbcPg14Database1};"""
- sql """drop table if exists ${jdbcPg14Table1};"""
-
- sql """
- CREATE EXTERNAL TABLE `${jdbcPg14Table1}` (
- `name` varchar(20) COMMENT "",
- `age` varchar(20) COMMENT ""
- ) ENGINE=ELASTICSEARCH
- PROPERTIES (
- "hosts" = "http://${extEsHost}:${extEsPort}",
- "index" = "helloworld",
- "user" = "${extEsUser}",
- "password" = "${extEsPassword}"
- );
- """
- def res=sql """show create table ${jdbcPg14Table1};"""
- logger.info("recoding desc res: "+ res.toString())
-
- def res1=sql "select * from ${jdbcPg14Table1};"
- logger.info("recoding all: " + res1.toString())
-
- sql """drop table if exists ${jdbcPg14Table1};"""
- sql """drop database if exists ${jdbcPg14Database1};"""
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]