This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit deee153c76d2856dd5405137f45fb47bc267f99b
Author: jasonmfehr <[email protected]>
AuthorDate: Tue Mar 26 14:15:01 2024 -0700

    IMPALA-12426: Skip Inserting HS2 Operation Queries into the Completed 
Queries Table
    
    Prevents queries associated with HS2 metadata operations
    from being written to the completed queries table. These
    queries are represented by the TMetadataOpcode enum.
    
    A Custom cluster test that makes an HS2 connection to
    Impala and runs these operations has been added. This test
    asserts that none of the operations have their queries
    written to the completed queries table.
    
    Change-Id: Ie19cf5953522fa85941e6c0b9c15a9c9ba9dc362
    Reviewed-on: http://gerrit.cloudera.org:8080/21207
    Reviewed-by: Michael Smith <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/service/client-request-state.h  |   1 +
 be/src/service/impala-hs2-server.cc    |  10 ++-
 be/src/service/workload-management.cc  |  22 ++++-
 common/thrift/Query.thrift             |   3 +
 tests/custom_cluster/test_query_log.py | 142 +++++++++++++++++++++++++++++++++
 5 files changed, 173 insertions(+), 5 deletions(-)

diff --git a/be/src/service/client-request-state.h 
b/be/src/service/client-request-state.h
index c96fcc73e..d5c582a57 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -332,6 +332,7 @@ class ClientRequestState {
   const TQueryOptions& query_options() const {
     return query_ctx_.client_request.query_options;
   }
+  bool hs2_metadata_op() const { return 
query_ctx_.client_request.hs2_metadata_op; }
   /// Returns 0:0 if this is a root query
   TUniqueId parent_query_id() const { return query_ctx_.parent_query_id; }
   const vector<TTableName>& tables() const { return exec_request().tables; }
diff --git a/be/src/service/impala-hs2-server.cc 
b/be/src/service/impala-hs2-server.cc
index 9392e1a02..e651a8439 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -145,9 +145,13 @@ void ImpalaServer::ExecuteMetadataOp(const 
THandleIdentifier& session_handle,
   // from an RPC. As a best effort, we use the type of the operation.
   map<int, const char*>::const_iterator query_text_it =
       _TMetadataOpcode_VALUES_TO_NAMES.find(request->opcode);
-  const string& query_text = query_text_it == 
_TMetadataOpcode_VALUES_TO_NAMES.end() ?
-      "N/A" : query_text_it->second;
-  query_ctx.client_request.stmt = query_text;
+  if (query_text_it == _TMetadataOpcode_VALUES_TO_NAMES.end()) {
+    query_ctx.client_request.stmt = "N/A";
+  } else {
+    query_ctx.client_request.stmt = query_text_it->second;
+    query_ctx.client_request.__set_hs2_metadata_op(true);
+  }
+
   QueryHandle query_handle;
   QueryDriver::CreateNewDriver(this, &query_handle, query_ctx, session);
   Status register_status = RegisterQuery(query_ctx.query_id, session, 
&query_handle);
diff --git a/be/src/service/workload-management.cc 
b/be/src/service/workload-management.cc
index 6394c508d..611395071 100644
--- a/be/src/service/workload-management.cc
+++ b/be/src/service/workload-management.cc
@@ -225,12 +225,27 @@ void ImpalaServer::EnqueueCompletedQuery(const 
QueryHandle& query_handle,
       case TCatalogOpType::DESCRIBE_TABLE:
       case TCatalogOpType::DESCRIBE_DB:
       case TCatalogOpType::DESCRIBE_HISTORY:
-        return;  // Note: early return
+        VLOG_QUERY << "skipping enqueue of completed query '" <<
+            PrintId(query_handle->query_id()) << "' with type '" <<
+          query_handle->stmt_type() << "' and catalog op type '" <<
+          query_handle->catalog_op_type() << "'";
+        return;  // Note: early return, query will not be added to the queue
       case TCatalogOpType::RESET_METADATA:
       case TCatalogOpType::DDL:
+        // No-op, continue execution of this function.
         break;
       default:
-        LOG(FATAL) << "unknown ddl type: " << 
to_string(query_handle->catalog_op_type());
+        LOG(WARNING) << "unknown ddl type: " <<
+            to_string(query_handle->catalog_op_type());
+        DCHECK(false);
+        return;  // Note: early return
+    }
+  } else if(query_handle->stmt_type() == TStmtType::UNKNOWN) {
+    if (query_handle->hs2_metadata_op()) {
+      VLOG_QUERY << "skipping enqueue of completed query '" <<
+          PrintId(query_handle->query_id()) << "' with type '" <<
+          query_handle->stmt_type() << "'";
+      return;  // Note: early return, query will not be added to the queue
     }
   }
 
@@ -246,6 +261,9 @@ void ImpalaServer::EnqueueCompletedQuery(const QueryHandle& 
query_handle,
       completed_queries_cv_.notify_all();
     }
   }
+
+  VLOG_QUERY << "enqueued completed '" << query_handle->stmt_type() << "' 
query '" <<
+      PrintId(query_handle->query_id()) << "'";
 } // ImpalaServer::EnqueueCompletedQuery
 
 void ImpalaServer::CompletedQueriesThread() {
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 75043a489..57f322e7d 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -723,6 +723,9 @@ struct TClientRequest {
 
   // Redacted SQL stmt
   3: optional string redacted_stmt
+
+  // Indicates if an HS2 metadata operation code was provided in the client 
request
+  4: optional bool hs2_metadata_op
 }
 
 // Per-client session state
diff --git a/tests/custom_cluster/test_query_log.py 
b/tests/custom_cluster/test_query_log.py
index 1d39b2da2..e2f9a08fb 100644
--- a/tests/custom_cluster/test_query_log.py
+++ b/tests/custom_cluster/test_query_log.py
@@ -22,9 +22,16 @@ import pytest
 import string
 import tempfile
 
+from getpass import getuser
+from ImpalaService import ImpalaHiveServer2Service
 from random import choice, randint
 from signal import SIGRTMIN
+from TCLIService import TCLIService
+from thrift.transport.TSocket import TSocket
+from thrift.transport.TTransport import TBufferedTransport
+from thrift.protocol import TBinaryProtocol
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.impala_test_suite import IMPALAD_HS2_HOST_PORT
 from tests.common.test_dimensions import create_single_exec_option_dimension
 from tests.common.test_vector import ImpalaTestDimension
 from tests.util.retry import retry
@@ -441,12 +448,147 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
   """Tests to assert the query log table is correctly populated when using the 
HS2
      client protocol."""
 
+  HS2_OPERATIONS_CLUSTER_ID = "hs2-operations-" + str(int(time()))
+
   @classmethod
   def add_test_dimensions(cls):
     super(TestQueryLogTableHS2, cls).add_test_dimensions()
     cls.ImpalaTestMatrix.add_constraint(lambda v:
         v.get_value('protocol') == 'hs2')
 
+  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
+                                                 
"--query_log_write_interval_s=1 "
+                                                 "--cluster_id={} "
+                                                 "--shutdown_grace_period_s=10 
"
+                                                 "--shutdown_deadline_s=60"
+                                                 
.format(HS2_OPERATIONS_CLUSTER_ID),
+                                    catalogd_args="--enable_workload_mgmt",
+                                    cluster_size=2,
+                                    impalad_graceful_shutdown=True)
+  def test_query_log_table_hs2_operations(self, vector):
+    """Certain HS2 operations appear to Impala as a special kind of query. 
Specifically,
+       these operations have a type of unknown and a normally invalid sql 
syntax. This
+       test asserts those queries are not written to the completed queries 
table since
+       they are trivial."""
+    host, port = IMPALAD_HS2_HOST_PORT.split(":")
+    socket = TSocket(host, port)
+    transport = TBufferedTransport(socket)
+    transport.open()
+    protocol = TBinaryProtocol.TBinaryProtocol(transport)
+    hs2_client = ImpalaHiveServer2Service.Client(protocol)
+
+    # Asserts the response from an HS2 operation indicates success.
+    def assert_resp(resp):
+      assert resp.status.statusCode == TCLIService.TStatusCode.SUCCESS_STATUS
+
+    # Closes an HS2 operation.
+    def close_op(client, resp):
+      close_operation_req = TCLIService.TCloseOperationReq()
+      close_operation_req.operationHandle = resp.operationHandle
+      assert_resp(hs2_client.CloseOperation(close_operation_req))
+
+    try:
+      # Open a new HS2 session.
+      open_session_req = TCLIService.TOpenSessionReq()
+      open_session_req.username = getuser()
+      open_session_req.configuration = dict()
+      open_sess_resp = hs2_client.OpenSession(open_session_req)
+      assert_resp(open_sess_resp)
+
+      # Test the get_type_info query.
+      get_typeinfo_req = TCLIService.TGetTypeInfoReq()
+      get_typeinfo_req.sessionHandle = open_sess_resp.sessionHandle
+      get_typeinfo_resp = hs2_client.GetTypeInfo(get_typeinfo_req)
+      assert_resp(get_typeinfo_resp)
+      close_op(hs2_client, get_typeinfo_resp)
+
+      # Test the get_catalogs query.
+      get_cats_req = TCLIService.TGetCatalogsReq()
+      get_cats_req.sessionHandle = open_sess_resp.sessionHandle
+      get_cats_resp = hs2_client.GetCatalogs(get_cats_req)
+      assert_resp(get_cats_resp)
+      close_op(hs2_client, get_cats_resp)
+
+      # Test the get_schemas query.
+      get_schemas_req = TCLIService.TGetSchemasReq()
+      get_schemas_req.sessionHandle = open_sess_resp.sessionHandle
+      get_schemas_resp = hs2_client.GetSchemas(get_schemas_req)
+      assert_resp(get_schemas_resp)
+      close_op(hs2_client, get_schemas_resp)
+
+      # Test the get_tables query.
+      get_tables_req = TCLIService.TGetTablesReq()
+      get_tables_req.sessionHandle = open_sess_resp.sessionHandle
+      get_tables_req.schemaName = self.WM_DB
+      get_tables_resp = hs2_client.GetTables(get_tables_req)
+      assert_resp(get_tables_resp)
+      close_op(hs2_client, get_tables_resp)
+
+      # Test the get_table_types query.
+      get_tbl_typ_req = TCLIService.TGetTableTypesReq()
+      get_tbl_typ_req.sessionHandle = open_sess_resp.sessionHandle
+      get_tbl_typ_req.schemaName = self.WM_DB
+      get_tbl_typ_resp = hs2_client.GetTableTypes(get_tbl_typ_req)
+      assert_resp(get_tbl_typ_resp)
+      close_op(hs2_client, get_tbl_typ_resp)
+
+      # Test the get_columns query.
+      get_cols_req = TCLIService.TGetColumnsReq()
+      get_cols_req.sessionHandle = open_sess_resp.sessionHandle
+      get_cols_req.schemaName = 'functional'
+      get_cols_req.tableName = 'parent_table'
+      get_cols_resp = hs2_client.GetColumns(get_cols_req)
+      assert_resp(get_cols_resp)
+      close_op(hs2_client, get_cols_resp)
+
+      # Test the get_primary_keys query.
+      get_pk_req = TCLIService.TGetPrimaryKeysReq()
+      get_pk_req.sessionHandle = open_sess_resp.sessionHandle
+      get_pk_req.schemaName = 'functional'
+      get_pk_req.tableName = 'parent_table'
+      get_pk_resp = hs2_client.GetPrimaryKeys(get_pk_req)
+      assert_resp(get_pk_resp)
+      close_op(hs2_client, get_pk_resp)
+
+      # Test the get_cross_reference query.
+      get_cr_req = TCLIService.TGetCrossReferenceReq()
+      get_cr_req.sessionHandle = open_sess_resp.sessionHandle
+      get_cr_req.parentSchemaName = "functional"
+      get_cr_req.foreignSchemaName = "functional"
+      get_cr_req.parentTableName = "parent_table"
+      get_cr_req.foreignTableName = "child_table"
+      get_cr_resp = hs2_client.GetCrossReference(get_cr_req)
+      assert_resp(get_cr_resp)
+      close_op(hs2_client, get_cr_resp)
+
+      close_session_req = TCLIService.TCloseSessionReq()
+      close_session_req.sessionHandle = open_sess_resp.sessionHandle
+      resp = hs2_client.CloseSession(close_session_req)
+      assert resp.status.statusCode == TCLIService.TStatusCode.SUCCESS_STATUS
+    finally:
+      socket.close()
+
+    # Assert none of the queries were written to the completed queries table.
+    client = self.create_impala_client(protocol=vector.get_value('protocol'))
+
+    # Execute a general query and wait for it to appear in the completed 
queries table to
+    # ensure there are no false positives caused by the assertion query 
executing before
+    # Impala has a chance to write queued queries to the completed queries 
table.
+    assert client.execute("select 1").success
+    self.cluster.get_first_impalad().service.wait_for_metric_value(
+          "impala-server.completed-queries.written", 1, 30)
+
+    # Force Impala to process the inserts to the completed queries table.
+    client.execute("refresh {}".format(self.QUERY_TBL))
+
+    try:
+      assert_results = client.execute("select count(*) from {} where 
cluster_id='{}'"
+          .format(self.QUERY_TBL, self.HS2_OPERATIONS_CLUSTER_ID))
+      assert assert_results.success
+      assert assert_results.data[0] == "1"
+    finally:
+      client.close()
+
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
                                                  
"--query_log_write_interval_s=1 "
                                                  
"--cluster_id=test_query_hist_mult "

Reply via email to