This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit deee153c76d2856dd5405137f45fb47bc267f99b Author: jasonmfehr <[email protected]> AuthorDate: Tue Mar 26 14:15:01 2024 -0700 IMPALA-12426: Skip Inserting HS2 Operation Queries into the Completed Queries Table Prevents queries associated with HS2 metadata operations from being written to the completed queries table. These queries are represented by the TMetadataOpcode enum. A Custom cluster test that makes an HS2 connection to Impala and runs these operations has been added. This test asserts that none of the operations have their queries written to the completed queries table. Change-Id: Ie19cf5953522fa85941e6c0b9c15a9c9ba9dc362 Reviewed-on: http://gerrit.cloudera.org:8080/21207 Reviewed-by: Michael Smith <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/service/client-request-state.h | 1 + be/src/service/impala-hs2-server.cc | 10 ++- be/src/service/workload-management.cc | 22 ++++- common/thrift/Query.thrift | 3 + tests/custom_cluster/test_query_log.py | 142 +++++++++++++++++++++++++++++++++ 5 files changed, 173 insertions(+), 5 deletions(-) diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h index c96fcc73e..d5c582a57 100644 --- a/be/src/service/client-request-state.h +++ b/be/src/service/client-request-state.h @@ -332,6 +332,7 @@ class ClientRequestState { const TQueryOptions& query_options() const { return query_ctx_.client_request.query_options; } + bool hs2_metadata_op() const { return query_ctx_.client_request.hs2_metadata_op; } /// Returns 0:0 if this is a root query TUniqueId parent_query_id() const { return query_ctx_.parent_query_id; } const vector<TTableName>& tables() const { return exec_request().tables; } diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc index 9392e1a02..e651a8439 100644 --- a/be/src/service/impala-hs2-server.cc +++ b/be/src/service/impala-hs2-server.cc @@ -145,9 +145,13 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle, // from an RPC. As a best effort, we use the type of the operation. map<int, const char*>::const_iterator query_text_it = _TMetadataOpcode_VALUES_TO_NAMES.find(request->opcode); - const string& query_text = query_text_it == _TMetadataOpcode_VALUES_TO_NAMES.end() ? - "N/A" : query_text_it->second; - query_ctx.client_request.stmt = query_text; + if (query_text_it == _TMetadataOpcode_VALUES_TO_NAMES.end()) { + query_ctx.client_request.stmt = "N/A"; + } else { + query_ctx.client_request.stmt = query_text_it->second; + query_ctx.client_request.__set_hs2_metadata_op(true); + } + QueryHandle query_handle; QueryDriver::CreateNewDriver(this, &query_handle, query_ctx, session); Status register_status = RegisterQuery(query_ctx.query_id, session, &query_handle); diff --git a/be/src/service/workload-management.cc b/be/src/service/workload-management.cc index 6394c508d..611395071 100644 --- a/be/src/service/workload-management.cc +++ b/be/src/service/workload-management.cc @@ -225,12 +225,27 @@ void ImpalaServer::EnqueueCompletedQuery(const QueryHandle& query_handle, case TCatalogOpType::DESCRIBE_TABLE: case TCatalogOpType::DESCRIBE_DB: case TCatalogOpType::DESCRIBE_HISTORY: - return; // Note: early return + VLOG_QUERY << "skipping enqueue of completed query '" << + PrintId(query_handle->query_id()) << "' with type '" << + query_handle->stmt_type() << "' and catalog op type '" << + query_handle->catalog_op_type() << "'"; + return; // Note: early return, query will not be added to the queue case TCatalogOpType::RESET_METADATA: case TCatalogOpType::DDL: + // No-op, continue execution of this function. break; default: - LOG(FATAL) << "unknown ddl type: " << to_string(query_handle->catalog_op_type()); + LOG(WARNING) << "unknown ddl type: " << + to_string(query_handle->catalog_op_type()); + DCHECK(false); + return; // Note: early return + } + } else if(query_handle->stmt_type() == TStmtType::UNKNOWN) { + if (query_handle->hs2_metadata_op()) { + VLOG_QUERY << "skipping enqueue of completed query '" << + PrintId(query_handle->query_id()) << "' with type '" << + query_handle->stmt_type() << "'"; + return; // Note: early return, query will not be added to the queue } } @@ -246,6 +261,9 @@ void ImpalaServer::EnqueueCompletedQuery(const QueryHandle& query_handle, completed_queries_cv_.notify_all(); } } + + VLOG_QUERY << "enqueued completed '" << query_handle->stmt_type() << "' query '" << + PrintId(query_handle->query_id()) << "'"; } // ImpalaServer::EnqueueCompletedQuery void ImpalaServer::CompletedQueriesThread() { diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift index 75043a489..57f322e7d 100644 --- a/common/thrift/Query.thrift +++ b/common/thrift/Query.thrift @@ -723,6 +723,9 @@ struct TClientRequest { // Redacted SQL stmt 3: optional string redacted_stmt + + // Indicates if an HS2 metadata operation code was provided in the client request + 4: optional bool hs2_metadata_op } // Per-client session state diff --git a/tests/custom_cluster/test_query_log.py b/tests/custom_cluster/test_query_log.py index 1d39b2da2..e2f9a08fb 100644 --- a/tests/custom_cluster/test_query_log.py +++ b/tests/custom_cluster/test_query_log.py @@ -22,9 +22,16 @@ import pytest import string import tempfile +from getpass import getuser +from ImpalaService import ImpalaHiveServer2Service from random import choice, randint from signal import SIGRTMIN +from TCLIService import TCLIService +from thrift.transport.TSocket import TSocket +from thrift.transport.TTransport import TBufferedTransport +from thrift.protocol import TBinaryProtocol from tests.common.custom_cluster_test_suite import CustomClusterTestSuite +from tests.common.impala_test_suite import IMPALAD_HS2_HOST_PORT from tests.common.test_dimensions import create_single_exec_option_dimension from tests.common.test_vector import ImpalaTestDimension from tests.util.retry import retry @@ -441,12 +448,147 @@ class TestQueryLogTableHS2(TestQueryLogTableBase): """Tests to assert the query log table is correctly populated when using the HS2 client protocol.""" + HS2_OPERATIONS_CLUSTER_ID = "hs2-operations-" + str(int(time())) + @classmethod def add_test_dimensions(cls): super(TestQueryLogTableHS2, cls).add_test_dimensions() cls.ImpalaTestMatrix.add_constraint(lambda v: v.get_value('protocol') == 'hs2') + @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " + "--query_log_write_interval_s=1 " + "--cluster_id={} " + "--shutdown_grace_period_s=10 " + "--shutdown_deadline_s=60" + .format(HS2_OPERATIONS_CLUSTER_ID), + catalogd_args="--enable_workload_mgmt", + cluster_size=2, + impalad_graceful_shutdown=True) + def test_query_log_table_hs2_operations(self, vector): + """Certain HS2 operations appear to Impala as a special kind of query. Specifically, + these operations have a type of unknown and a normally invalid sql syntax. This + test asserts those queries are not written to the completed queries table since + they are trivial.""" + host, port = IMPALAD_HS2_HOST_PORT.split(":") + socket = TSocket(host, port) + transport = TBufferedTransport(socket) + transport.open() + protocol = TBinaryProtocol.TBinaryProtocol(transport) + hs2_client = ImpalaHiveServer2Service.Client(protocol) + + # Asserts the response from an HS2 operation indicates success. + def assert_resp(resp): + assert resp.status.statusCode == TCLIService.TStatusCode.SUCCESS_STATUS + + # Closes an HS2 operation. + def close_op(client, resp): + close_operation_req = TCLIService.TCloseOperationReq() + close_operation_req.operationHandle = resp.operationHandle + assert_resp(hs2_client.CloseOperation(close_operation_req)) + + try: + # Open a new HS2 session. + open_session_req = TCLIService.TOpenSessionReq() + open_session_req.username = getuser() + open_session_req.configuration = dict() + open_sess_resp = hs2_client.OpenSession(open_session_req) + assert_resp(open_sess_resp) + + # Test the get_type_info query. + get_typeinfo_req = TCLIService.TGetTypeInfoReq() + get_typeinfo_req.sessionHandle = open_sess_resp.sessionHandle + get_typeinfo_resp = hs2_client.GetTypeInfo(get_typeinfo_req) + assert_resp(get_typeinfo_resp) + close_op(hs2_client, get_typeinfo_resp) + + # Test the get_catalogs query. + get_cats_req = TCLIService.TGetCatalogsReq() + get_cats_req.sessionHandle = open_sess_resp.sessionHandle + get_cats_resp = hs2_client.GetCatalogs(get_cats_req) + assert_resp(get_cats_resp) + close_op(hs2_client, get_cats_resp) + + # Test the get_schemas query. + get_schemas_req = TCLIService.TGetSchemasReq() + get_schemas_req.sessionHandle = open_sess_resp.sessionHandle + get_schemas_resp = hs2_client.GetSchemas(get_schemas_req) + assert_resp(get_schemas_resp) + close_op(hs2_client, get_schemas_resp) + + # Test the get_tables query. + get_tables_req = TCLIService.TGetTablesReq() + get_tables_req.sessionHandle = open_sess_resp.sessionHandle + get_tables_req.schemaName = self.WM_DB + get_tables_resp = hs2_client.GetTables(get_tables_req) + assert_resp(get_tables_resp) + close_op(hs2_client, get_tables_resp) + + # Test the get_table_types query. + get_tbl_typ_req = TCLIService.TGetTableTypesReq() + get_tbl_typ_req.sessionHandle = open_sess_resp.sessionHandle + get_tbl_typ_req.schemaName = self.WM_DB + get_tbl_typ_resp = hs2_client.GetTableTypes(get_tbl_typ_req) + assert_resp(get_tbl_typ_resp) + close_op(hs2_client, get_tbl_typ_resp) + + # Test the get_columns query. + get_cols_req = TCLIService.TGetColumnsReq() + get_cols_req.sessionHandle = open_sess_resp.sessionHandle + get_cols_req.schemaName = 'functional' + get_cols_req.tableName = 'parent_table' + get_cols_resp = hs2_client.GetColumns(get_cols_req) + assert_resp(get_cols_resp) + close_op(hs2_client, get_cols_resp) + + # Test the get_primary_keys query. + get_pk_req = TCLIService.TGetPrimaryKeysReq() + get_pk_req.sessionHandle = open_sess_resp.sessionHandle + get_pk_req.schemaName = 'functional' + get_pk_req.tableName = 'parent_table' + get_pk_resp = hs2_client.GetPrimaryKeys(get_pk_req) + assert_resp(get_pk_resp) + close_op(hs2_client, get_pk_resp) + + # Test the get_cross_reference query. + get_cr_req = TCLIService.TGetCrossReferenceReq() + get_cr_req.sessionHandle = open_sess_resp.sessionHandle + get_cr_req.parentSchemaName = "functional" + get_cr_req.foreignSchemaName = "functional" + get_cr_req.parentTableName = "parent_table" + get_cr_req.foreignTableName = "child_table" + get_cr_resp = hs2_client.GetCrossReference(get_cr_req) + assert_resp(get_cr_resp) + close_op(hs2_client, get_cr_resp) + + close_session_req = TCLIService.TCloseSessionReq() + close_session_req.sessionHandle = open_sess_resp.sessionHandle + resp = hs2_client.CloseSession(close_session_req) + assert resp.status.statusCode == TCLIService.TStatusCode.SUCCESS_STATUS + finally: + socket.close() + + # Assert none of the queries were written to the completed queries table. + client = self.create_impala_client(protocol=vector.get_value('protocol')) + + # Execute a general query and wait for it to appear in the completed queries table to + # ensure there are no false positives caused by the assertion query executing before + # Impala has a chance to write queued queries to the completed queries table. + assert client.execute("select 1").success + self.cluster.get_first_impalad().service.wait_for_metric_value( + "impala-server.completed-queries.written", 1, 30) + + # Force Impala to process the inserts to the completed queries table. + client.execute("refresh {}".format(self.QUERY_TBL)) + + try: + assert_results = client.execute("select count(*) from {} where cluster_id='{}'" + .format(self.QUERY_TBL, self.HS2_OPERATIONS_CLUSTER_ID)) + assert assert_results.success + assert assert_results.data[0] == "1" + finally: + client.close() + @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " "--query_log_write_interval_s=1 " "--cluster_id=test_query_hist_mult "
