This is an automated email from the ASF dual-hosted git repository. wzhou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 2d5307418b57efd16196cf0538095339d3b8d96d Author: stiga-huang <[email protected]> AuthorDate: Wed Jan 10 13:58:38 2024 +0800 IMPALA-12687: Fix key conflicts in tracking in-flight catalog operations In-flight catalog operations are tracked in a map using query id as the key. It's ok since catalog clients use 0 as the timeout by default (see --catalog_client_rpc_timeout_ms), i.e. catalog RPCs never timeout, which means each query will have at most one in-flight catalog RPC at a time. However, in case catalog_client_rpc_timeout_ms is set to non-zero, impalad could retry the catalog RPC when it's considered timed out. That causes several in-flight catalog operations coming from the same query (so using the same query-id as the map key). To fix the key conflicts, this patch use the pair of (queryId, threadId) as the key of the in-flight operations map. 'threadId' comes from the thrift thread that handles the RPC so it's unique across different retries. Tests: - Add custom-cluster test to verify all retries are shown in the /operations page. Change-Id: Icd94ac7532fe7f3d68028c2da82298037be706c4 Reviewed-on: http://gerrit.cloudera.org:8080/20877 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../catalog/monitor/CatalogOperationTracker.java | 115 ++++++++++++++------- tests/custom_cluster/test_web_pages.py | 70 +++++++++++-- 2 files changed, 136 insertions(+), 49 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationTracker.java b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationTracker.java index 47dc8d18f..f66f0cfd7 100644 --- a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationTracker.java +++ b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationTracker.java @@ -29,6 +29,9 @@ import org.apache.impala.thrift.TResetMetadataRequest; import org.apache.impala.thrift.TTableName; import org.apache.impala.thrift.TUniqueId; import org.apache.impala.thrift.TUpdateCatalogRequest; +import org.apache.impala.util.TUniqueIdUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; @@ -48,29 +51,59 @@ import java.util.concurrent.ConcurrentLinkedQueue; * are also kept in memory and the size is controlled by 'catalog_operation_log_size'. */ public final class CatalogOperationTracker { + private static final Logger LOG = + LoggerFactory.getLogger(CatalogOperationTracker.class); public final static CatalogOperationTracker INSTANCE = new CatalogOperationTracker(); // Keeps track of the on-going DDL operations - CatalogDdlCounter catalogDdlCounter; + CatalogDdlCounter catalogDdlCounter_; // Keeps track of the on-going reset metadata requests (refresh/invalidate) - CatalogResetMetadataCounter catalogResetMetadataCounter; + CatalogResetMetadataCounter catalogResetMetadataCounter_; // Keeps track of the on-going finalize DML requests (insert/CTAS/upgrade) - CatalogFinalizeDmlCounter catalogFinalizeDmlCounter; + CatalogFinalizeDmlCounter catalogFinalizeDmlCounter_; - private final Map<TUniqueId, TCatalogOpRecord> inFlightOperations = + /** + * Key to track in-flight catalog operations. Each operation is triggered by an RPC. + * Each RPC is identified by the query id and the thrift thread id that handles it. + * Note that the thread id is important to identify different RPC retries. + */ + private static class RpcKey { + private final TUniqueId queryId_; + private final long threadId_; + + public RpcKey(TUniqueId queryId) { + queryId_ = queryId; + threadId_ = Thread.currentThread().getId(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof RpcKey)) return false; + RpcKey key = (RpcKey) o; + return queryId_.equals(key.queryId_) && threadId_ == key.threadId_; + } + + @Override + public int hashCode() { + return queryId_.hashCode() * 31 + Long.hashCode(threadId_); + } + } + + private final Map<RpcKey, TCatalogOpRecord> inFlightOperations_ = new ConcurrentHashMap<>(); - private final Queue<TCatalogOpRecord> finishedOperations = + private final Queue<TCatalogOpRecord> finishedOperations_ = new ConcurrentLinkedQueue<>(); - private final int catalogOperationLogSize; + private final int catalogOperationLogSize_; private CatalogOperationTracker() { - catalogDdlCounter = new CatalogDdlCounter(); - catalogResetMetadataCounter = new CatalogResetMetadataCounter(); - catalogFinalizeDmlCounter = new CatalogFinalizeDmlCounter(); - catalogOperationLogSize = BackendConfig.INSTANCE.catalogOperationLogSize(); - Preconditions.checkState(catalogOperationLogSize >= 0); + catalogDdlCounter_ = new CatalogDdlCounter(); + catalogResetMetadataCounter_ = new CatalogResetMetadataCounter(); + catalogFinalizeDmlCounter_ = new CatalogFinalizeDmlCounter(); + catalogOperationLogSize_ = BackendConfig.INSTANCE.catalogOperationLogSize(); + Preconditions.checkState(catalogOperationLogSize_ >= 0); } private void addRecord(TCatalogServiceRequestHeader header, @@ -91,29 +124,33 @@ public final class CatalogOperationTracker { if (queryId != null) { TCatalogOpRecord record = new TCatalogOpRecord(Thread.currentThread().getId(), queryId, clientIp, coordinator, catalogOpName, - catalogDdlCounter.getTableName(tTableName), user, + catalogDdlCounter_.getTableName(tTableName), user, System.currentTimeMillis(), -1, "STARTED", details); - inFlightOperations.put(queryId, record); + inFlightOperations_.put(new RpcKey(queryId), record); } } private void archiveRecord(TUniqueId queryId, String errorMsg) { - if (queryId != null && inFlightOperations.containsKey(queryId)) { - TCatalogOpRecord record = inFlightOperations.remove(queryId); - if (catalogOperationLogSize == 0) return; - record.setFinish_time_ms(System.currentTimeMillis()); - if (errorMsg != null) { - record.setStatus("FAILED"); - record.setDetails(record.getDetails() + ", error=" + errorMsg); - } else { - record.setStatus("FINISHED"); - } - synchronized (finishedOperations) { - if (finishedOperations.size() >= catalogOperationLogSize) { - finishedOperations.poll(); - } - finishedOperations.add(record); + if (queryId == null) return; + RpcKey key = new RpcKey(queryId); + TCatalogOpRecord record = inFlightOperations_.remove(key); + if (record == null) { + LOG.error("Null record for query {}", TUniqueIdUtil.PrintId(queryId)); + return; + } + if (catalogOperationLogSize_ == 0) return; + record.setFinish_time_ms(System.currentTimeMillis()); + if (errorMsg != null) { + record.setStatus("FAILED"); + record.setDetails(record.getDetails() + ", error=" + errorMsg); + } else { + record.setStatus("FINISHED"); + } + synchronized (finishedOperations_) { + if (finishedOperations_.size() >= catalogOperationLogSize_) { + finishedOperations_.poll(); } + finishedOperations_.add(record); } } @@ -129,13 +166,13 @@ public final class CatalogOperationTracker { String details = "query_options=" + ddlRequest.query_options.toString(); addRecord(ddlRequest.getHeader(), getDdlType(ddlRequest), tTableName, details); } - catalogDdlCounter.incrementOperation(ddlRequest.ddl_type, tTableName); + catalogDdlCounter_.incrementOperation(ddlRequest.ddl_type, tTableName); } public void decrement(TDdlType tDdlType, TUniqueId queryId, Optional<TTableName> tTableName, String errorMsg) { archiveRecord(queryId, errorMsg); - catalogDdlCounter.decrementOperation(tDdlType, tTableName); + catalogDdlCounter_.decrementOperation(tDdlType, tTableName); } public void increment(TResetMetadataRequest req) { @@ -152,14 +189,14 @@ public final class CatalogOperationTracker { CatalogResetMetadataCounter.getResetMetadataType(req, tTableName).name(), tTableName, details); } - catalogResetMetadataCounter.incrementOperation(req); + catalogResetMetadataCounter_.incrementOperation(req); } public void decrement(TResetMetadataRequest req, String errorMsg) { if (req.isSetHeader()) { archiveRecord(req.getHeader().getQuery_id(), errorMsg); } - catalogResetMetadataCounter.decrementOperation(req); + catalogResetMetadataCounter_.decrementOperation(req); } public void increment(TUpdateCatalogRequest req) { @@ -181,14 +218,14 @@ public final class CatalogOperationTracker { CatalogFinalizeDmlCounter.getDmlType(req.getHeader().redacted_sql_stmt).name(), tTableName, details); } - catalogFinalizeDmlCounter.incrementOperation(req); + catalogFinalizeDmlCounter_.incrementOperation(req); } public void decrement(TUpdateCatalogRequest req, String errorMsg) { if (req.isSetHeader()) { archiveRecord(req.getHeader().getQuery_id(), errorMsg); } - catalogFinalizeDmlCounter.decrementOperation(req); + catalogFinalizeDmlCounter_.decrementOperation(req); } /** @@ -197,14 +234,14 @@ public final class CatalogOperationTracker { */ public TGetOperationUsageResponse getOperationMetrics() { List<TOperationUsageCounter> merged = new ArrayList<>(); - merged.addAll(catalogDdlCounter.getOperationUsage()); - merged.addAll(catalogResetMetadataCounter.getOperationUsage()); - merged.addAll(catalogFinalizeDmlCounter.getOperationUsage()); + merged.addAll(catalogDdlCounter_.getOperationUsage()); + merged.addAll(catalogResetMetadataCounter_.getOperationUsage()); + merged.addAll(catalogFinalizeDmlCounter_.getOperationUsage()); TGetOperationUsageResponse res = new TGetOperationUsageResponse(merged); - for (TCatalogOpRecord record : inFlightOperations.values()) { + for (TCatalogOpRecord record : inFlightOperations_.values()) { res.addToIn_flight_catalog_operations(record); } - List<TCatalogOpRecord> records = new ArrayList<>(finishedOperations); + List<TCatalogOpRecord> records = new ArrayList<>(finishedOperations_); // Reverse the list to show recent operations first. Collections.reverse(records); res.setFinished_catalog_operations(records); diff --git a/tests/custom_cluster/test_web_pages.py b/tests/custom_cluster/test_web_pages.py index 2644f70a2..2c5b87015 100644 --- a/tests/custom_cluster/test_web_pages.py +++ b/tests/custom_cluster/test_web_pages.py @@ -22,7 +22,9 @@ import re import requests import psutil import pytest +import time +from tests.beeswax.impala_beeswax import ImpalaBeeswaxException from tests.common.custom_cluster_test_suite import ( DEFAULT_CLUSTER_SIZE, CustomClusterTestSuite) @@ -260,6 +262,22 @@ class TestWebPage(CustomClusterTestSuite): assert 'Content-Security-Policy' not in response.headers, \ "CSP header present despite being disabled (port %s)" % port + @staticmethod + def _get_inflight_catalog_operations(): + response = requests.get("http://localhost:25020/operations?json") + assert response.status_code == requests.codes.ok + operations = json.loads(response.text) + assert "inflight_catalog_operations" in operations + return operations["inflight_catalog_operations"] + + @staticmethod + def _get_finished_catalog_operations(): + response = requests.get("http://localhost:25020/operations?json") + assert response.status_code == requests.codes.ok + operations = json.loads(response.text) + assert "finished_catalog_operations" in operations + return operations["finished_catalog_operations"] + @CustomClusterTestSuite.with_args(catalogd_args="--catalog_operation_log_size=2") def test_catalog_operations_limit(self, unique_database): tbl = unique_database + ".tbl" @@ -267,11 +285,7 @@ class TestWebPage(CustomClusterTestSuite): self.execute_query("create table {0}_2 (id int)".format(tbl)) self.execute_query("create table {0}_3 (id int)".format(tbl)) self.execute_query("drop table {0}_1".format(tbl)) - response = requests.get("http://localhost:25020/operations?json") - assert response.status_code == requests.codes.ok - operations = json.loads(response.text) - assert "finished_catalog_operations" in operations - finished_operations = operations["finished_catalog_operations"] + finished_operations = self._get_finished_catalog_operations() # Verify only 2 operations are shown assert len(finished_operations) == 2 op = finished_operations[0] @@ -293,11 +307,7 @@ class TestWebPage(CustomClusterTestSuite): num = 500 for i in range(num): self.execute_query("invalidate metadata " + tbl) - response = requests.get("http://localhost:25020/operations?json") - assert response.status_code == requests.codes.ok - operations = json.loads(response.text) - assert "finished_catalog_operations" in operations - finished_operations = operations["finished_catalog_operations"] + finished_operations = self._get_finished_catalog_operations() # Verify all operations are in the history. There are one DROP_DATABASE, one # CREATE_DATABASE, one CREATE_TABLE and 'num' INVALIDATEs in the list. assert len(finished_operations) == 3 + num @@ -319,6 +329,46 @@ class TestWebPage(CustomClusterTestSuite): assert op["catalog_op_name"] == "DROP_DATABASE" assert op["target_name"] == unique_database + @CustomClusterTestSuite.with_args( + impalad_args="--catalog_client_rpc_timeout_ms=10 " + "--catalog_client_rpc_retry_interval_ms=10 " + "--catalog_client_connection_num_retries=2") + def test_catalog_operations_with_rpc_retry(self): + """Test that catalog RPC retries are all shown in the /operations page""" + # Run a DESCRIBE to ensure the table is loaded. So the first RPC attempt will + # time out in its real work. + self.execute_query("describe functional.alltypes") + try: + self.execute_query("refresh functional.alltypes", { + "debug_action": "catalogd_refresh_hdfs_listing_delay:SLEEP@30" + }) + except ImpalaBeeswaxException as e: + assert "RPC recv timed out" in str(e) + # In impalad side, the query fails by the above error. However, in catalogd side, + # the RPCs are still running. Check the in-flight operations. + inflight_operations = self._get_inflight_catalog_operations() + assert len(inflight_operations) == 2 + for op in inflight_operations: + assert op["status"] == "STARTED" + assert op["catalog_op_name"] == "REFRESH" + assert op["target_name"] == "functional.alltypes" + assert inflight_operations[0]["query_id"] == inflight_operations[1]["query_id"] + assert inflight_operations[0]["thread_id"] != inflight_operations[1]["thread_id"] + + # Wait until the catalog operations finish + while len(self._get_inflight_catalog_operations()) != 0: + time.sleep(1) + + # Verify both RPC attempts are shown as finished operations. + finished_operations = self._get_finished_catalog_operations() + assert len(finished_operations) == 2 + for op in finished_operations: + assert op["status"] == "FINISHED" + assert op["catalog_op_name"] == "REFRESH" + assert op["target_name"] == "functional.alltypes" + assert finished_operations[0]["query_id"] == finished_operations[1]["query_id"] + assert finished_operations[0]["thread_id"] != finished_operations[1]["thread_id"] + def _verify_topic_size_metrics(self): # Calculate the total topic metrics from the /topics page response = requests.get("http://localhost:25010/topics?json")
