This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit b410a90b4707be2e08dcb6c02b9473aa9ade8d96 Author: stiga-huang <[email protected]> AuthorDate: Wed Jun 28 16:42:38 2023 +0800 IMPALA-12152: Add query options to wait for HMS events sync up It's a common scenario to run Impala queries after the dependent external changes are done. E.g. running COMPUTE STATS on a table after Hive/Spark jobs ingest some new data to it. Currently, it's unsafe to run the Impala queries immediately after the Hive/Spark jobs finish since EventProcessor might have a long lag in applying the HMS events. Note that running REFRESH/INVALIDATE on the table can also solve the problem. But one of the motivation of EventProcessor is to get rid of such Impala specific commands. This patch adds a mechanism to let query planning wait until the metadata is synced up. Two new query options are added: - SYNC_HMS_EVENTS_WAIT_TIME_S configures the timeout in seconds for waiting. It's 0 by default, which disables the waiting mechanism. - SYNC_HMS_EVENTS_STRICT_MODE controls the behavior if we can't wait for metadata to be synced up, e.g. when the waiting times out or EventProcessor is in ERROR state. It defaults to false (non-strict mode). In the strict mode, coordinator will fail the query. In the non-strict mode, coordinator will start planning with a warning message in profile (and in client outputs if the client consumes the get_log results, e.g. in impala-shell). Example usage - query the table after inserting into dynamic partitions in Hive. We don't know what partitions are modified so running REFRESH in Impala is inefficient since it reloads all partitions. hive> insert into tbl partition(p) select * from tbl2; impala> set sync_hms_events_wait_time_s=300; impala> select * from tbl; With this new feature, let catalogd reload the updated partitions based on HMS events, which is more efficient than REFRESH. The wait time can be set to the largest lag of event processing that has been observed in the cluster. Note the lag of event processing is shown as the "Lag time" in the /events page of catalogd WebUI and "events-processor.lag-time" in the /metrics page. Users can monitor it to get a sense of the lag. Some timeline items are added in query profile for this waiting mechanism, e.g. A succeeded wait: Query Compilation: 937.279ms - Synced events from Metastore: 909.162ms (909.162ms) - Metadata of all 1 tables cached: 911.005ms (1.843ms) - Analysis finished: 919.600ms (8.595ms) A failed wait: Query Compilation: 1s321ms - Continuing without syncing Metastore events: 40.883ms (40.883ms) - Metadata load started: 41.618ms (735.633us) Added a histogram metric, impala-server.wait-for-hms-event-durations-ms, to track the duration of this waiting. -------- Implementation A new catalogd RPC, WaitForHmsEvent, is added to CatalogService API so that coordinator can wait until catalogd processes the latest event when this RPC is triggered. Query planning starts or fails after this RPC returns. The RPC request contains the potential dbs/tables that are required by the query. Catalogd records the latest event id when it receives this RPC. When the last synced event id reaches this, catalogd returns the catalog updates to the coordinator in the RPC response. Before that, the RPC thread is in a waiting loop that sleeps in a configurable interval. It's configured by a hidden flag, hms_event_sync_sleep_interval_ms (defaults to 100). Entry-point functions - Frontend#waitForHmsEvents() - CatalogServiceCatalog#waitForHmsEvent() Some statements don't need to wait for HMS events, e.g. CREATE/DROP ROLE statements. This patch adds an overrided method, requiresHmsMetadata(), in each Statement to mark whether they can skip HMS event sync. Test side changes: - Some test codes use EventProcessorUtils.wait_for_event_processing() to wait for HMS events being synced up before running a query. Now they are updated to just use these new query options in the query. - Note that we still need wait_for_event_processing() in test codes that verify metrics after HMS events are synced up. -------- Limitation Currently, UPDATE_TBL_COL_STAT_EVENT, UPDATE_PART_COL_STAT_EVENT, OPEN_TXN events are ignored by the event processor. If the latest event happens to be in these types and there are no more other events, the last synced event id can never reach the latest event id. We need to fix last synced event id to also consider ignored events (IMPALA-13623). The current implementation waits for the event id when the WaitForHmsEvent RPC is received at catalogd side. We can improve it by leveraging HIVE-27499 to efficiently detect whether the given dbs/tables have unsynced events and just wait for the *largest* id of them. Dbs/tables without unsynced events don't need to block query planning. However, this only works for non-transactional tables. Transactional tables might be modified by COMMIT_TXN or ABORT_TXN events which don't have the table names. So even with HIVE-27499, we can't determine whether a transactional table has pending events. IMPALA-13684 will target on improving this on non-transactional tables. Tests - Add test to verify planning waits until catalogd is synced with HMS changes. - Add test on the error handling when HMS event processing is disabled Change-Id: I36ac941bb2c2217b09fcfa2eb567b011b38efa2a Reviewed-on: http://gerrit.cloudera.org:8080/20131 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/catalog/catalog-server.cc | 23 ++ be/src/catalog/catalog-service-client-wrapper.h | 8 + be/src/catalog/catalog.cc | 9 + be/src/catalog/catalog.h | 7 + be/src/exec/catalog-op-executor.cc | 20 ++ be/src/exec/catalog-op-executor.h | 4 + be/src/service/fe-support.cc | 34 +++ be/src/service/impala-server.cc | 37 ++-- be/src/service/query-options.cc | 11 + be/src/service/query-options.h | 8 +- be/src/util/backend-gflag-util.cc | 2 + be/src/util/impalad-metrics.cc | 6 + be/src/util/impalad-metrics.h | 4 + common/thrift/BackendGflags.thrift | 2 + common/thrift/CatalogService.thrift | 32 +++ common/thrift/ImpalaService.thrift | 12 ++ common/thrift/Query.thrift | 6 + common/thrift/metrics.json | 10 + .../org/apache/impala/analysis/AdminFnStmt.java | 3 + .../org/apache/impala/analysis/AlterDbStmt.java | 3 + .../java/org/apache/impala/analysis/Analyzer.java | 9 + .../org/apache/impala/analysis/CreateDbStmt.java | 2 + .../apache/impala/analysis/CreateDropRoleStmt.java | 3 + .../org/apache/impala/analysis/DescribeDbStmt.java | 2 + .../org/apache/impala/analysis/DropDbStmt.java | 2 + .../impala/analysis/GrantRevokeRoleStmt.java | 3 + .../java/org/apache/impala/analysis/SetStmt.java | 3 + .../apache/impala/analysis/ShowDataSrcsStmt.java | 3 + .../apache/impala/analysis/ShowFunctionsStmt.java | 2 + .../impala/analysis/ShowGrantPrincipalStmt.java | 4 +- .../org/apache/impala/analysis/ShowRolesStmt.java | 3 + .../impala/analysis/ShowTablesOrViewsStmt.java | 1 + .../org/apache/impala/analysis/StatementBase.java | 13 ++ .../java/org/apache/impala/catalog/Catalog.java | 5 + .../org/apache/impala/catalog/CatalogDeltaLog.java | 51 ++++- .../impala/catalog/CatalogServiceCatalog.java | 189 ++++++++++++++++ .../org/apache/impala/catalog/ImpaladCatalog.java | 3 +- .../catalog/events/ExternalEventsProcessor.java | 5 + .../catalog/events/MetastoreEventsProcessor.java | 68 +++++- .../java/org/apache/impala/service/FeSupport.java | 22 ++ .../java/org/apache/impala/service/Frontend.java | 146 +++++++++++-- .../java/org/apache/impala/service/JniCatalog.java | 13 ++ tests/common/impala_test_suite.py | 64 +++--- .../custom_cluster/test_event_processing_error.py | 10 +- tests/custom_cluster/test_events_custom_configs.py | 107 ++++++++- .../test_hive_parquet_codec_interop.py | 3 +- tests/custom_cluster/test_kudu.py | 9 +- tests/metadata/test_event_processing.py | 150 +++++++++++-- tests/metadata/test_event_processing_base.py | 239 ++++++++++----------- tests/metadata/test_metadata_query_statements.py | 8 +- tests/query_test/test_hive_codec_interop.py | 2 +- 51 files changed, 1158 insertions(+), 227 deletions(-) diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc index 3c64be3f3..41615005a 100644 --- a/be/src/catalog/catalog-server.cc +++ b/be/src/catalog/catalog-server.cc @@ -99,6 +99,10 @@ DEFINE_int32(max_wait_time_for_sync_ddl_s, 0, "Maximum time (in seconds) until " "before throwing an error indicating that not all the " "coordinators might have applied the changes caused due to the ddl."); +DEFINE_int32_hidden(hms_event_sync_sleep_interval_ms, 100, "Sleep interval (in ms) " + "used in the thread of catalogd processing the WaitForHmsEvent RPC. The thread " + "sleeps for such an interval when checking for HMS events to be synced."); + DECLARE_string(debug_actions); DEFINE_bool(start_hms_server, false, "When set to true catalog server starts a HMS " "server at a port specified by hms_port flag"); @@ -465,6 +469,25 @@ class CatalogServiceThriftIf : public CatalogServiceIf { VLOG_RPC << "SetEventProcessorStatus(): response=" << ThriftDebugStringNoThrow(resp); } + void WaitForHmsEvent(TWaitForHmsEventResponse& resp, + const TWaitForHmsEventRequest& req) override { + VLOG_RPC << "WaitForHmsEvent(): request=" << ThriftDebugString(req); + Status status = AcceptRequest(req.protocol_version); + if (status.ok()) { + status = catalog_server_->catalog()->WaitForHmsEvent(req, &resp); + } + if (!status.ok()) { + LOG(WARNING) << status.GetDetail(); + // Status in response is not set if the error is due to an exception. + if (resp.status.status_code == TErrorCode::OK) { + TStatus thrift_status; + status.ToThrift(&thrift_status); + resp.__set_status(thrift_status); + } + } + VLOG_RPC << "WaitForHmsEvent(): response.status=" << resp.status; + } + private: CatalogServer* catalog_server_; string server_address_; diff --git a/be/src/catalog/catalog-service-client-wrapper.h b/be/src/catalog/catalog-service-client-wrapper.h index 1c1560493..6fe798b98 100644 --- a/be/src/catalog/catalog-service-client-wrapper.h +++ b/be/src/catalog/catalog-service-client-wrapper.h @@ -131,6 +131,14 @@ class CatalogServiceClientWrapper : public CatalogServiceClient { *send_done = true; recv_SetEventProcessorStatus(_return); } + + void WaitForHmsEvent(TWaitForHmsEventResponse& _return, + const TWaitForHmsEventRequest& req, bool* send_done) { + DCHECK(!*send_done); + send_WaitForHmsEvent(req); + *send_done = true; + recv_WaitForHmsEvent(_return); + } #pragma clang diagnostic pop }; diff --git a/be/src/catalog/catalog.cc b/be/src/catalog/catalog.cc index a22d18ea5..cf329bcac 100644 --- a/be/src/catalog/catalog.cc +++ b/be/src/catalog/catalog.cc @@ -77,6 +77,7 @@ Catalog::Catalog() { {"getLatestCompactions", "([B)[B", &get_latest_compactions_id_}, {"getAllHadoopConfigs", "()[B", &get_hadoop_configs_id_}, {"setEventProcessorStatus", "([B)[B", &set_event_processor_status_id_}, + {"waitForHmsEvent", "([B)[B", &wait_for_hms_event_id_}, }; JNIEnv* jni_env = JniUtil::GetJNIEnv(); @@ -244,3 +245,11 @@ Status Catalog::SetEventProcessorStatus( const TSetEventProcessorStatusRequest& req, TSetEventProcessorStatusResponse* resp) { return JniUtil::CallJniMethod(catalog_, set_event_processor_status_id_, req, resp); } + +Status Catalog::WaitForHmsEvent(const TWaitForHmsEventRequest& req, + TWaitForHmsEventResponse* resp) { + if (req.header.__isset.query_id) { + GetThreadDebugInfo()->SetQueryId(req.header.query_id); + } + return JniUtil::CallJniMethod(catalog_, wait_for_hms_event_id_, req, resp); +} diff --git a/be/src/catalog/catalog.h b/be/src/catalog/catalog.h index d871f26d2..17b95abb0 100644 --- a/be/src/catalog/catalog.h +++ b/be/src/catalog/catalog.h @@ -152,6 +152,7 @@ class Catalog { /// Returns OK if the refreshing was successful, otherwise a Status object with /// information on the error will be returned. Status RefreshDataSources(); + /// Returns all Hadoop configurations in key, value form in result. Status GetAllHadoopConfigs(TGetAllHadoopConfigsResponse* result); @@ -159,6 +160,11 @@ class Catalog { Status SetEventProcessorStatus( const TSetEventProcessorStatusRequest& req, TSetEventProcessorStatusResponse* resp); + /// Waits until catalogd processes the latest HMS event and get the catalog version + /// that catches up the metadata changes. + Status WaitForHmsEvent(const TWaitForHmsEventRequest& req, + TWaitForHmsEventResponse* resp); + private: jobject catalog_; // instance of org.apache.impala.service.JniCatalog jmethodID update_metastore_id_; // JniCatalog.updateMetaastore() @@ -187,6 +193,7 @@ class Catalog { jmethodID get_latest_compactions_id_; // JniCatalog.getLatestCompactions() jmethodID get_hadoop_configs_id_; // JniCatalog.getAllHadoopConfigs() jmethodID set_event_processor_status_id_; // JniCatalog.setEventProcessorStatus() + jmethodID wait_for_hms_event_id_; // JniCatalog.waitForHmsEvent() }; } diff --git a/be/src/exec/catalog-op-executor.cc b/be/src/exec/catalog-op-executor.cc index cbaaf2694..b14a57348 100644 --- a/be/src/exec/catalog-op-executor.cc +++ b/be/src/exec/catalog-op-executor.cc @@ -29,6 +29,8 @@ #include "service/impala-server.h" #include "service/hs2-util.h" #include "util/debug-util.h" +#include "util/histogram-metric.h" +#include "util/impalad-metrics.h" #include "util/runtime-profile-counters.h" #include "util/string-parser.h" #include "util/test-info.h" @@ -491,3 +493,21 @@ Status CatalogOpExecutor::SetEventProcessorStatus( if (result->status.status_code != TErrorCode::OK) return Status(result->status); return Status::OK(); } + +Status CatalogOpExecutor::WaitForHmsEvent(const TWaitForHmsEventRequest& req, + TWaitForHmsEventResponse* resp) { + int attempt = 0; // Used for debug action only. + MonotonicStopWatch sw; + sw.Start(); + CatalogServiceConnection::RpcStatus rpc_status = + CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(), + *ExecEnv::GetInstance()->GetCatalogdAddress().get(), + &CatalogServiceClientWrapper::WaitForHmsEvent, req, + FLAGS_catalog_client_connection_num_retries, + FLAGS_catalog_client_rpc_retry_interval_ms, + [&attempt]() { return CatalogRpcDebugFn(&attempt); }, resp); + RETURN_IF_ERROR(rpc_status.status); + ImpaladMetrics::WAIT_FOR_HMS_EVENT_DURATIONS->Update( + sw.ElapsedTime() / NANOS_PER_MICRO / MICROS_PER_MILLI); + return Status(resp->status); +} diff --git a/be/src/exec/catalog-op-executor.h b/be/src/exec/catalog-op-executor.h index f93acffeb..33eabfb3a 100644 --- a/be/src/exec/catalog-op-executor.h +++ b/be/src/exec/catalog-op-executor.h @@ -110,6 +110,10 @@ class CatalogOpExecutor { /// a pointer to the profile of the execution in catalogd. const TRuntimeProfileNode* catalog_profile() const { return catalog_profile_.get(); } + /// Makes an RPC to the catalog server to wait until it processes the latest HMS event. + Status WaitForHmsEvent(const TWaitForHmsEventRequest& req, + TWaitForHmsEventResponse* resp); + private: /// Helper functions used in ExecComputeStats() for setting the thrift structs in params /// for the table/column stats based on the results of the corresponding child query. diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc index 3182f320c..9c900ab0b 100644 --- a/be/src/service/fe-support.cc +++ b/be/src/service/fe-support.cc @@ -765,6 +765,36 @@ Java_org_apache_impala_service_FeSupport_NativeGetLatestCompactions( return result_bytes; } +extern "C" JNIEXPORT jbyteArray JNICALL +Java_org_apache_impala_service_FeSupport_NativeWaitForHmsEvents(JNIEnv* env, + jclass fe_support_class, jbyteArray thrift_request, jbyteArray thrift_query_options) { + TWaitForHmsEventRequest request; + TQueryOptions query_options; + THROW_IF_ERROR_RET(DeserializeThriftMsg(env, thrift_request, &request), env, + JniUtil::internal_exc_class(), nullptr); + THROW_IF_ERROR_RET(DeserializeThriftMsg(env, thrift_query_options, &query_options), env, + JniUtil::internal_exc_class(), nullptr); + CatalogOpExecutor catalog_op_executor(ExecEnv::GetInstance(), nullptr, nullptr); + TWaitForHmsEventResponse response; + Status status = catalog_op_executor.WaitForHmsEvent(request, &response); + TStatus result; + jbyteArray result_bytes = nullptr; + if (!status.ok()) { + status.ToThrift(&result); + THROW_IF_ERROR_RET(SerializeThriftMsg(env, &result, &result_bytes), env, + JniUtil::internal_exc_class(), result_bytes); + return result_bytes; + } + ImpalaServer* server = ExecEnv::GetInstance()->impala_server(); + DCHECK(server != nullptr); + status = server->ProcessCatalogUpdateResult(response.result, + /*wait_for_all_subscribers*/false, query_options, /*timeline*/nullptr); + status.ToThrift(&result); + THROW_IF_ERROR_RET(SerializeThriftMsg(env, &result, &result_bytes), env, + JniUtil::internal_exc_class(), result_bytes); + return result_bytes; +} + namespace impala { static JNINativeMethod native_methods[] = { @@ -856,6 +886,10 @@ static JNINativeMethod native_methods[] = { const_cast<char*>("NativeNumLiveQueries"), const_cast<char*>("()J"), (void*)::Java_org_apache_impala_service_FeSupport_NativeNumLiveQueries }, + { + const_cast<char*>("NativeWaitForHmsEvents"), const_cast<char*>("([B[B)[B"), + (void*)::Java_org_apache_impala_service_FeSupport_NativeWaitForHmsEvents + }, }; void InitFeSupport(bool disable_codegen) { diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index dbc20bc4f..bd70838c4 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -1583,8 +1583,9 @@ void ImpalaServer::UpdateExecSummary(const QueryHandle& query_handle) const { query_handle->summary_profile()->SetTExecSummary(t_exec_summary); string exec_summary = PrintExecSummary(t_exec_summary); query_handle->summary_profile()->AddInfoStringRedacted("ExecSummary", exec_summary); - query_handle->summary_profile()->AddInfoStringRedacted("Errors", - query_handle->GetCoordinator()->GetErrorLog()); + vector<string> errors = query_handle->GetAnalysisWarnings(); + errors.emplace_back(query_handle->GetCoordinator()->GetErrorLog()); + query_handle->summary_profile()->AddInfoStringRedacted("Errors", join(errors, "\n")); } Status ImpalaServer::UnregisterQuery(const TUniqueId& query_id, bool check_inflight, @@ -2306,6 +2307,13 @@ void ImpalaServer::CatalogUpdateCallback( catalog_version_update_cv_.NotifyAll(); } +static inline void MarkTimelineEvent(RuntimeProfile::EventSequence* timeline, + const string& str) { + if (timeline != nullptr) { + timeline->MarkEvent(str); + } +} + void ImpalaServer::WaitForCatalogUpdate(const int64_t catalog_update_version, const TUniqueId& catalog_service_id, RuntimeProfile::EventSequence* timeline) { unique_lock<mutex> unique_lock(catalog_version_lock_); @@ -2318,10 +2326,12 @@ void ImpalaServer::WaitForCatalogUpdate(const int64_t catalog_update_version, } if (catalog_update_info_.catalog_service_id != catalog_service_id) { - timeline->MarkEvent("Detected change in catalog service ID"); - VLOG_QUERY << "Detected change in catalog service ID"; + MarkTimelineEvent(timeline, "Catalog service ID changed when waiting for " + "catalog update to arrive"); + VLOG_QUERY << "Detected catalog service ID changed when waiting for " + "catalog update to arrive"; } else { - timeline->MarkEvent(Substitute("Applied catalog version $0", + MarkTimelineEvent(timeline, Substitute("Applied catalog version $0", catalog_update_version)); VLOG_QUERY << "Applied catalog version: " << catalog_update_version; } @@ -2341,11 +2351,14 @@ void ImpalaServer::WaitForCatalogUpdateTopicPropagation( } if (catalog_update_info_.catalog_service_id != catalog_service_id) { - timeline->MarkEvent("Detected change in catalog service ID"); - VLOG_QUERY << "Detected change in catalog service ID"; + MarkTimelineEvent(timeline, "Catalog service ID changed when waiting for " + "catalog propagation"); + VLOG_QUERY << "Detected catalog service ID changed when waiting for " + "catalog propagation"; } else { - timeline->MarkEvent(Substitute("Min catalog topic version of coordinators reached $0", - min_req_subscriber_topic_version)); + MarkTimelineEvent(timeline, + Substitute("Min catalog topic version of coordinators reached $0", + min_req_subscriber_topic_version)); VLOG_QUERY << "Min catalog topic version of coordinators: " << min_req_subscriber_topic_version; } @@ -2368,10 +2381,10 @@ void ImpalaServer::WaitForMinCatalogUpdate(const int64_t min_req_catalog_object_ } if (catalog_update_info_.catalog_service_id != catalog_service_id) { - timeline->MarkEvent("Detected change in catalog service ID"); + MarkTimelineEvent(timeline, "Detected change in catalog service ID"); VLOG_QUERY << "Detected change in catalog service ID"; } else { - timeline->MarkEvent(Substitute("Local min catalog version reached $0", + MarkTimelineEvent(timeline, Substitute("Local min catalog version reached $0", min_req_catalog_object_version)); VLOG_QUERY << "Updated catalog object version lower bound: " << min_req_catalog_object_version; @@ -2429,7 +2442,7 @@ Status ImpalaServer::ProcessCatalogUpdateResult( // Apply the changes to the local catalog cache. TUpdateCatalogCacheResponse resp; Status status = exec_env_->frontend()->UpdateCatalogCache(update_req, &resp); - timeline->MarkEvent("Applied catalog updates from DDL"); + MarkTimelineEvent(timeline, "Applied catalog updates from DDL"); if (!status.ok()) LOG(ERROR) << status.GetDetail(); RETURN_IF_ERROR(status); } else { diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc index 5da1c6eb8..5a58a09cf 100644 --- a/be/src/service/query-options.cc +++ b/be/src/service/query-options.cc @@ -1334,6 +1334,17 @@ Status impala::SetQueryOption(TImpalaQueryOptions::type option, const string& va query_options->__set_use_legacy_hive_timestamp_conversion(IsTrue(value)); break; } + case TImpalaQueryOptions::SYNC_HMS_EVENTS_WAIT_TIME_S: { + int32_t time_s = 0; + RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckNonNegative<int32_t>( + option, value, &time_s)); + query_options->__set_sync_hms_events_wait_time_s(time_s); + break; + } + case TImpalaQueryOptions::SYNC_HMS_EVENTS_STRICT_MODE: { + query_options->__set_sync_hms_events_strict_mode(IsTrue(value)); + break; + } default: string key = to_string(option); if (IsRemovedQueryOption(key)) { diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h index 6ce06aea7..acd0e213f 100644 --- a/be/src/service/query-options.h +++ b/be/src/service/query-options.h @@ -51,7 +51,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type> // plus one. Thus, the second argument to the DCHECK has to be updated every // time we add or remove a query option to/from the enum TImpalaQueryOptions. constexpr unsigned NUM_QUERY_OPTIONS = - TImpalaQueryOptions::USE_LEGACY_HIVE_TIMESTAMP_CONVERSION + 1; + TImpalaQueryOptions::SYNC_HMS_EVENTS_STRICT_MODE + 1; #define QUERY_OPTS_TABLE \ DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(), NUM_QUERY_OPTIONS); \ REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \ @@ -365,7 +365,11 @@ constexpr unsigned NUM_QUERY_OPTIONS = QUERY_OPT_FN(estimate_duplicate_in_preagg, \ ESTIMATE_DUPLICATE_IN_PREAGG, TQueryOptionLevel::ADVANCED) \ QUERY_OPT_FN(use_legacy_hive_timestamp_conversion, \ - USE_LEGACY_HIVE_TIMESTAMP_CONVERSION, TQueryOptionLevel::ADVANCED) + USE_LEGACY_HIVE_TIMESTAMP_CONVERSION, TQueryOptionLevel::ADVANCED) \ + QUERY_OPT_FN(sync_hms_events_wait_time_s, SYNC_HMS_EVENTS_WAIT_TIME_S, \ + TQueryOptionLevel::ADVANCED) \ + QUERY_OPT_FN(sync_hms_events_strict_mode, SYNC_HMS_EVENTS_STRICT_MODE, \ + TQueryOptionLevel::ADVANCED) \ ; /// Enforce practical limits on some query options to avoid undesired query state. diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc index ccc8feb95..cff239fa7 100644 --- a/be/src/util/backend-gflag-util.cc +++ b/be/src/util/backend-gflag-util.cc @@ -132,6 +132,7 @@ DECLARE_int32(dbcp_max_wait_millis_for_conn); DECLARE_int32(dbcp_data_source_idle_timeout_s); DECLARE_bool(enable_catalogd_ha); DECLARE_string(injected_group_members_debug_only); +DECLARE_int32(hms_event_sync_sleep_interval_ms); // HS2 SAML2.0 configuration // Defined here because TAG_FLAG caused issues in global-flags.cc @@ -509,6 +510,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) { cfg.__set_is_release_build(false); #endif cfg.__set_enable_catalogd_ha(FLAGS_enable_catalogd_ha); + cfg.__set_hms_event_sync_sleep_interval_ms(FLAGS_hms_event_sync_sleep_interval_ms); return Status::OK(); } diff --git a/be/src/util/impalad-metrics.cc b/be/src/util/impalad-metrics.cc index 2e85d76b2..8ff0fe1ef 100644 --- a/be/src/util/impalad-metrics.cc +++ b/be/src/util/impalad-metrics.cc @@ -178,6 +178,8 @@ const char* ImpaladMetricKeys::COMPLETED_QUERIES_MAX_RECORDS_WRITES = "impala-server.completed-queries.max-records-writes"; const char* ImpaladMetricKeys::COMPLETED_QUERIES_WRITE_DURATIONS = "impala-server.completed-queries.write-durations"; +const char* ImpaladMetricKeys::WAIT_FOR_HMS_EVENT_DURATIONS = + "impala-server.wait-for-hms-event-durations-ms"; const char* ImpaladMetricKeys::DEBUG_ACTION_NUM_FAIL = "impala.debug_action.fail"; const char* ImpaladMetricKeys::QUERY_LOG_EST_TOTAL_BYTES = "impala-server.query-log-est-total-bytes"; @@ -275,6 +277,7 @@ StringProperty* ImpaladMetrics::ACTIVE_CATALOGD_ADDRESS = nullptr; HistogramMetric* ImpaladMetrics::QUERY_DURATIONS = nullptr; HistogramMetric* ImpaladMetrics::DDL_DURATIONS = nullptr; HistogramMetric* ImpaladMetrics::COMPLETED_QUERIES_WRITE_DURATIONS = nullptr; +HistogramMetric* ImpaladMetrics::WAIT_FOR_HMS_EVENT_DURATIONS = nullptr; // Other StatsMetric<uint64_t, StatsType::MEAN>* @@ -465,6 +468,9 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) { COMPLETED_QUERIES_WRITE_DURATIONS = m->RegisterMetric(new HistogramMetric( MetricDefs::Get(ImpaladMetricKeys::COMPLETED_QUERIES_WRITE_DURATIONS), FIVE_HOURS_IN_MS, 3)); + WAIT_FOR_HMS_EVENT_DURATIONS = m->RegisterMetric(new HistogramMetric( + MetricDefs::Get(ImpaladMetricKeys::WAIT_FOR_HMS_EVENT_DURATIONS), + FIVE_HOURS_IN_MS, 3)); // Initialize Hedged read metrics HEDGED_READ_OPS = m->AddCounter(ImpaladMetricKeys::HEDGED_READ_OPS, 0); diff --git a/be/src/util/impalad-metrics.h b/be/src/util/impalad-metrics.h index 2dcf2c568..a383a1fca 100644 --- a/be/src/util/impalad-metrics.h +++ b/be/src/util/impalad-metrics.h @@ -287,6 +287,9 @@ class ImpaladMetricKeys { /// Time spent writing completed queries to the query log table. static const char* COMPLETED_QUERIES_WRITE_DURATIONS; + + /// Time spent in the WaitForHmsEvent catalogd RPC. + static const char* WAIT_FOR_HMS_EVENT_DURATIONS; }; /// Global impalad-wide metrics. This is useful for objects that want to update metrics @@ -387,6 +390,7 @@ class ImpaladMetrics { static HistogramMetric* QUERY_DURATIONS; static HistogramMetric* DDL_DURATIONS; static HistogramMetric* COMPLETED_QUERIES_WRITE_DURATIONS; + static HistogramMetric* WAIT_FOR_HMS_EVENT_DURATIONS; // Other static StatsMetric<uint64_t, StatsType::MEAN>* IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO; diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift index cfa7111ef..c04e08754 100644 --- a/common/thrift/BackendGflags.thrift +++ b/common/thrift/BackendGflags.thrift @@ -316,4 +316,6 @@ struct TBackendGflags { 142: required bool enable_reading_puffin_stats 143: required string injected_group_members_debug_only + + 144: required i32 hms_event_sync_sleep_interval_ms } diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift index 807637ac1..f57f62ec9 100644 --- a/common/thrift/CatalogService.thrift +++ b/common/thrift/CatalogService.thrift @@ -772,6 +772,34 @@ struct TSetEventProcessorStatusResponse { 2: optional string info } +struct TWaitForHmsEventRequest { + 1: required CatalogServiceVersion protocol_version = CatalogServiceVersion.V2 + + // Common header included in all CatalogService requests. + 2: required TCatalogServiceRequestHeader header + + // Timeout in seconds for waiting catalogd to catch up the latest event when it + // receives this request. + 3: required i32 timeout_s + + // Set to true for SHOW DATABASES statements. + 4: required bool want_db_list + + // Set to true for SHOW TABLES/VIEWS statements. + 5: required bool want_table_list + + // Descriptors of catalog objects that might be used by the query. + 6: optional list<CatalogObjects.TCatalogObject> object_descs +} + +struct TWaitForHmsEventResponse { + // The status of the operation, OK if the operation was successful. + 1: required Status.TStatus status + + // Catalog updates that should be applied on coordinator side + 2: optional TCatalogUpdateResult result +} + // The CatalogService API service CatalogService { // Executes a DDL request and returns details on the result of the operation. @@ -816,4 +844,8 @@ service CatalogService { // Update the status of EventProcessor. TSetEventProcessorStatusResponse SetEventProcessorStatus( 1: TSetEventProcessorStatusRequest req); + + // Waits until catalogd processes the latest HMS event and get the catalog version + // to catch up the metadata changes. + TWaitForHmsEventResponse WaitForHmsEvent(1: TWaitForHmsEventRequest req); } diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index b28754848..f4ac57a95 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -986,6 +986,18 @@ enum TImpalaQueryOptions { // method and they produce the same results for modern time periods (post 1970, and in // most instances before that). USE_LEGACY_HIVE_TIMESTAMP_CONVERSION = 186 + + // Maximum time in seconds to wait for catalogd catching up HMS events before query + // planning. Only events generated before the query is submitted will be waited for. + // Defaults to 0 which disables waiting. Please only use this on queries that depend on + // external modifications. Don't set it cluster wide since it impacts performance. + SYNC_HMS_EVENTS_WAIT_TIME_S = 187 + + // Whether to fail the query if coordinator fails to wait for HMS events to be synced + // in catalogd, e.g. when event-processor is in ERROR state or timed out waiting for + // SYNC_HMS_EVENTS_WAIT_TIME_S seconds. Defaults to false, which means coordinator will + // start query planning regardless of the failure. + SYNC_HMS_EVENTS_STRICT_MODE = 188 } // The summary of a DML statement. diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift index 0e8458082..ccbff8f99 100644 --- a/common/thrift/Query.thrift +++ b/common/thrift/Query.thrift @@ -762,6 +762,12 @@ struct TQueryOptions { // See comment in ImpalaService.thrift 187: optional bool use_legacy_hive_timestamp_conversion = false; + + // See comment in ImpalaService.thrift + 188: optional i32 sync_hms_events_wait_time_s = 0 + + // See comment in ImpalaService.thrift + 189: optional bool sync_hms_events_strict_mode = false } // Impala currently has three types of sessions: Beeswax, HiveServer2 and external diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json index fd74c8e7e..f7d03d1a7 100644 --- a/common/thrift/metrics.json +++ b/common/thrift/metrics.json @@ -3033,6 +3033,16 @@ "kind": "HISTOGRAM", "key": "impala-server.ddl-durations-ms" }, + { + "description": "Distribution of WaitForHmsEvent RPC latencies", + "contexts": [ + "IMPALAD" + ], + "label": "WaitForHmsEvent RPC distribution", + "units": "TIME_MS", + "kind": "HISTOGRAM", + "key": "impala-server.wait-for-hms-event-durations-ms" + }, { "description": "Number of currently cached HDFS file handles in the IO manager.", "contexts": [ diff --git a/fe/src/main/java/org/apache/impala/analysis/AdminFnStmt.java b/fe/src/main/java/org/apache/impala/analysis/AdminFnStmt.java index 90561df99..ac857db58 100644 --- a/fe/src/main/java/org/apache/impala/analysis/AdminFnStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/AdminFnStmt.java @@ -201,4 +201,7 @@ public class AdminFnStmt extends StatementBase { event_id_ = params_.get(1).evalToInteger(analyzer, "event_id"); } } + + @Override + public boolean requiresHmsMetadata() { return false; } } diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterDbStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterDbStmt.java index b69cc4968..e477f9795 100644 --- a/fe/src/main/java/org/apache/impala/analysis/AlterDbStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/AlterDbStmt.java @@ -43,4 +43,7 @@ public abstract class AlterDbStmt extends StatementBase { public void analyze(Analyzer analyzer) throws AnalysisException { analyzer.getDb(dbName_, Privilege.ALTER); } + + @Override + public String getParsedDb() { return dbName_; } } diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java index 3d54f23a6..7ba4a488e 100644 --- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java +++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java @@ -4164,6 +4164,15 @@ public class Analyzer { globalState_.warnings.put(msg, count + 1); } + /** + * Add a batch of warnings based on addWarning(). + */ + public void addWarnings(Iterable<String> warnings) { + for (String msg : warnings) { + addWarning(msg); + } + } + /** * 'addWarning' method may be called after the warnings are retrieved, e.g. in * analyzing some substituted/cloned predicates (IMPALA-11021). We need to make sure diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateDbStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateDbStmt.java index 538c314f1..40c6e9417 100644 --- a/fe/src/main/java/org/apache/impala/analysis/CreateDbStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/CreateDbStmt.java @@ -64,6 +64,8 @@ public class CreateDbStmt extends StatementBase { } public String getComment() { return comment_; } + @Override + public String getParsedDb() { return dbName_; } public String getDb() { return dbName_; } public boolean getIfNotExists() { return ifNotExists_; } diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateDropRoleStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateDropRoleStmt.java index e85c0ab08..36fe0e362 100644 --- a/fe/src/main/java/org/apache/impala/analysis/CreateDropRoleStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/CreateDropRoleStmt.java @@ -52,4 +52,7 @@ public class CreateDropRoleStmt extends AuthorizationStmt { public void analyze(Analyzer analyzer) throws AnalysisException { super.analyze(analyzer); } + + @Override + public boolean requiresHmsMetadata() { return false; } } \ No newline at end of file diff --git a/fe/src/main/java/org/apache/impala/analysis/DescribeDbStmt.java b/fe/src/main/java/org/apache/impala/analysis/DescribeDbStmt.java index af5182ab9..a3f04e1b7 100644 --- a/fe/src/main/java/org/apache/impala/analysis/DescribeDbStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/DescribeDbStmt.java @@ -55,6 +55,8 @@ public class DescribeDbStmt extends StatementBase { return sb.toString() + dbName_; } + @Override + public String getParsedDb() { return dbName_; } public String getDb() { return dbName_; } @Override diff --git a/fe/src/main/java/org/apache/impala/analysis/DropDbStmt.java b/fe/src/main/java/org/apache/impala/analysis/DropDbStmt.java index 7efd00c3e..daca8557c 100644 --- a/fe/src/main/java/org/apache/impala/analysis/DropDbStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/DropDbStmt.java @@ -44,6 +44,8 @@ public class DropDbStmt extends StatementBase { this.cascade_ = cascade; } + @Override + public String getParsedDb() { return dbName_; } public String getDb() { return dbName_; } public boolean getIfExists() { return ifExists_; } public boolean getCascade() { return cascade_; } diff --git a/fe/src/main/java/org/apache/impala/analysis/GrantRevokeRoleStmt.java b/fe/src/main/java/org/apache/impala/analysis/GrantRevokeRoleStmt.java index 06d9347c1..170a24d7e 100644 --- a/fe/src/main/java/org/apache/impala/analysis/GrantRevokeRoleStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/GrantRevokeRoleStmt.java @@ -64,4 +64,7 @@ public class GrantRevokeRoleStmt extends AuthorizationStmt { throw new AnalysisException("Group name in GRANT/REVOKE ROLE cannot be empty."); } } + + @Override + public boolean requiresHmsMetadata() { return false; } } \ No newline at end of file diff --git a/fe/src/main/java/org/apache/impala/analysis/SetStmt.java b/fe/src/main/java/org/apache/impala/analysis/SetStmt.java index 0d270b877..7a5518a5e 100644 --- a/fe/src/main/java/org/apache/impala/analysis/SetStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/SetStmt.java @@ -79,4 +79,7 @@ public class SetStmt extends StatementBase { request.setQuery_option_type(queryOptionType_); return request; } + + @Override + public boolean requiresHmsMetadata() { return false; } } diff --git a/fe/src/main/java/org/apache/impala/analysis/ShowDataSrcsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ShowDataSrcsStmt.java index 7e7511401..97c44636f 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ShowDataSrcsStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/ShowDataSrcsStmt.java @@ -69,4 +69,7 @@ public class ShowDataSrcsStmt extends StatementBase { params.setShow_pattern(getPattern()); return params; } + + @Override + public boolean requiresHmsMetadata() { return false; } } diff --git a/fe/src/main/java/org/apache/impala/analysis/ShowFunctionsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ShowFunctionsStmt.java index cb74d4a0b..eefc6356f 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ShowFunctionsStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/ShowFunctionsStmt.java @@ -65,6 +65,8 @@ public class ShowFunctionsStmt extends StatementBase { return postAnalysisDb_; } + @Override + public String getParsedDb() { return parsedDb_; } public String getPattern() { return pattern_; } @Override diff --git a/fe/src/main/java/org/apache/impala/analysis/ShowGrantPrincipalStmt.java b/fe/src/main/java/org/apache/impala/analysis/ShowGrantPrincipalStmt.java index 86f1b4960..1b0760159 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ShowGrantPrincipalStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/ShowGrantPrincipalStmt.java @@ -25,7 +25,6 @@ import org.apache.impala.common.InternalException; import org.apache.impala.thrift.TPrincipalType; import org.apache.impala.thrift.TShowGrantPrincipalParams; -import java.util.HashSet; import java.util.List; /** @@ -106,4 +105,7 @@ public class ShowGrantPrincipalStmt extends AuthorizationStmt { } return params; } + + @Override + public boolean requiresHmsMetadata() { return false; } } diff --git a/fe/src/main/java/org/apache/impala/analysis/ShowRolesStmt.java b/fe/src/main/java/org/apache/impala/analysis/ShowRolesStmt.java index 8d4d62fb2..9c84d1366 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ShowRolesStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/ShowRolesStmt.java @@ -68,4 +68,7 @@ public class ShowRolesStmt extends AuthorizationStmt { super.analyze(analyzer); requestingUser_ = analyzer.getUser(); } + + @Override + public boolean requiresHmsMetadata() { return false; } } diff --git a/fe/src/main/java/org/apache/impala/analysis/ShowTablesOrViewsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ShowTablesOrViewsStmt.java index 85b7ffc5b..21d4d1daa 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ShowTablesOrViewsStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/ShowTablesOrViewsStmt.java @@ -66,6 +66,7 @@ public abstract class ShowTablesOrViewsStmt extends StatementBase { this.postAnalysisDb_ = null; } + @Override public String getParsedDb() { return parsedDb_; } public String getPattern() { return pattern_; } diff --git a/fe/src/main/java/org/apache/impala/analysis/StatementBase.java b/fe/src/main/java/org/apache/impala/analysis/StatementBase.java index 7512e2026..69b5ddba1 100644 --- a/fe/src/main/java/org/apache/impala/analysis/StatementBase.java +++ b/fe/src/main/java/org/apache/impala/analysis/StatementBase.java @@ -32,6 +32,8 @@ import org.apache.impala.rewrite.ExprRewriter; import com.google.common.base.Preconditions; +import javax.annotation.Nullable; + import static org.apache.impala.analysis.ToSqlOptions.DEFAULT; /** @@ -68,6 +70,17 @@ public abstract class StatementBase extends StmtNode { */ public void collectTableRefs(List<TableRef> tblRefs) { } + /** + * Returns the db name for CREATE/DROP/ALTER database or SHOW TABLES/VIEWS statements. + */ + @Nullable + public String getParsedDb() { return null; } + + /** + * Returns whether the analysis on the statement requires HMS metadata. + */ + public boolean requiresHmsMetadata() { return true; } + /** * Analyzes the statement and throws an AnalysisException if analysis fails. A failure * could be due to a problem with the statement or because one or more tables/views diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java b/fe/src/main/java/org/apache/impala/catalog/Catalog.java index 46ef622ae..cf90bc9b7 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java @@ -745,6 +745,11 @@ public abstract class Catalog implements AutoCloseable { } } + public static String toCatalogObjectSummary(TCatalogObject catalogObject) { + return String.format("%s(%d)", toCatalogObjectKey(catalogObject), + catalogObject.catalog_version); + } + /** * Returns true if the two objects have the same object type and key (generated using * toCatalogObjectKey()). diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java index b4267c0c3..72eff61f5 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java @@ -17,12 +17,15 @@ package org.apache.impala.catalog; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; +import org.apache.commons.lang3.StringUtils; import org.apache.impala.thrift.TCatalogObject; +import org.apache.impala.thrift.TCatalogObjectType; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -57,8 +60,9 @@ import com.google.common.collect.ImmutableList; public class CatalogDeltaLog { // Map of the catalog version an object was removed from the catalog // to the catalog object, ordered by catalog version. - private SortedMap<Long, TCatalogObject> removedCatalogObjects_ = - new TreeMap<Long, TCatalogObject>(); + private SortedMap<Long, TCatalogObject> removedCatalogObjects_ = new TreeMap<>(); + // Map of the catalog object key to its latest removed version. + private Map<String, Long> latestRemovedVersions_ = new HashMap<>(); /** * Adds a new item to the map of removed catalog objects. @@ -66,6 +70,8 @@ public class CatalogDeltaLog { public synchronized void addRemovedObject(TCatalogObject catalogObject) { Preconditions.checkNotNull(catalogObject); removedCatalogObjects_.put(catalogObject.getCatalog_version(), catalogObject); + String key = Catalog.toCatalogObjectKey(catalogObject); + latestRemovedVersions_.merge(key, catalogObject.catalog_version, Long::max); } /** @@ -76,7 +82,34 @@ public class CatalogDeltaLog { long toVersion) { SortedMap<Long, TCatalogObject> objects = removedCatalogObjects_.subMap(fromVersion + 1, toVersion + 1); - return ImmutableList.<TCatalogObject>copyOf(objects.values()); + return ImmutableList.copyOf(objects.values()); + } + + /** + * Retrieve all the removed db objects. + */ + public synchronized List<TCatalogObject> retrieveDbObjects() { + Map<String, TCatalogObject> res = new HashMap<>(); + for (TCatalogObject obj : removedCatalogObjects_.values()) { + if (obj.type != TCatalogObjectType.DATABASE) continue; + res.put(Catalog.toCatalogObjectKey(obj), obj); + } + return ImmutableList.copyOf(res.values()); + } + + /** + * Retrieve all the removed table objects from dbName. + */ + public synchronized List<TCatalogObject> retrieveTableObjects(String dbName) { + Map<String, TCatalogObject> res = new HashMap<>(); + for (TCatalogObject obj : removedCatalogObjects_.values()) { + if (obj.type != TCatalogObjectType.TABLE + || !StringUtils.equals(dbName, obj.table.db_name)) { + continue; + } + res.put(Catalog.toCatalogObjectKey(obj), obj); + } + return ImmutableList.copyOf(res.values()); } /** @@ -89,8 +122,10 @@ public class CatalogDeltaLog { // Nothing will be garbage collected so avoid creating a new object. if (!removedCatalogObjects_.isEmpty() && removedCatalogObjects_.firstKey() < currentCatalogVersion) { - removedCatalogObjects_ = new TreeMap<Long, TCatalogObject>( + removedCatalogObjects_ = new TreeMap<>( removedCatalogObjects_.tailMap(currentCatalogVersion)); + latestRemovedVersions_.entrySet().removeIf( + e -> e.getValue() < currentCatalogVersion); } } @@ -111,4 +146,12 @@ public class CatalogDeltaLog { } return false; } + + /** + * Gets the recently removed version of a catalog object. + */ + public synchronized long getLatestRemovedVersion(TCatalogObject catalogObject) { + String key = Catalog.toCatalogObjectKey(catalogObject); + return latestRemovedVersions_.getOrDefault(key, 0L); + } } diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index 273497062..9bbd0e915 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -28,6 +28,7 @@ import static org.apache.impala.service.CatalogOpExecutor.FETCHED_HMS_TABLE; import static org.apache.impala.service.CatalogOpExecutor.FETCHED_LATEST_HMS_EVENT_ID; import static org.apache.impala.service.CatalogOpExecutor.GOT_TABLE_READ_LOCK; import static org.apache.impala.service.CatalogOpExecutor.GOT_TABLE_WRITE_LOCK; +import static org.apache.impala.thrift.TCatalogObjectType.DATABASE; import static org.apache.impala.thrift.TCatalogObjectType.HDFS_PARTITION; import static org.apache.impala.thrift.TCatalogObjectType.TABLE; @@ -131,6 +132,7 @@ import org.apache.impala.thrift.TResetMetadataRequest; import org.apache.impala.thrift.TSetEventProcessorStatusResponse; import org.apache.impala.thrift.TStatus; import org.apache.impala.thrift.TSystemTableName; +import org.apache.impala.thrift.TStatus; import org.apache.impala.thrift.TTable; import org.apache.impala.thrift.TTableName; import org.apache.impala.thrift.TTableType; @@ -138,6 +140,8 @@ import org.apache.impala.thrift.TTableUsage; import org.apache.impala.thrift.TTableUsageMetrics; import org.apache.impala.thrift.TUniqueId; import org.apache.impala.thrift.TUpdateTableUsageRequest; +import org.apache.impala.thrift.TWaitForHmsEventRequest; +import org.apache.impala.thrift.TWaitForHmsEventResponse; import org.apache.impala.util.AcidUtils; import org.apache.impala.util.CatalogBlacklistUtils; import org.apache.impala.util.DebugUtils; @@ -4192,6 +4196,191 @@ public class CatalogServiceCatalog extends Catalog { } } + private void addRemovedTableToWaitForHmsEventResponse(TWaitForHmsEventResponse res, + String dbName, String tblName) { + TCatalogObject tblDesc = new TCatalogObject(); + tblDesc.setType(TCatalogObjectType.TABLE); + tblDesc.setTable(new TTable(dbName, tblName)); + long version = deleteLog_.getLatestRemovedVersion(tblDesc); + if (version > 0) { + tblDesc.setCatalog_version(version); + res.result.addToRemoved_catalog_objects(tblDesc); + } + // If the table is not in the delete log, i.e. version <= 0, it comes from an + // illegal table name. It's OK to return nothing for it. + // Note that coordinators could send illegal table names since the request is + // sent before the query is analyzed. What we get are potential table names + // that could be used by a query. E.g. "SELECT c FROM a.b" might read table "a.b" + // or table "default.a" in the case when "b" is an array/map column. Coordinator + // will send the request for table "a.b" and "default.a". Catalogd just sends back + // info of the legal table. + } + + private boolean addTableToWaitForHmsEventResponse(TWaitForHmsEventResponse res, + Table tbl, boolean wantMinimalResponse) { + Preconditions.checkNotNull(tbl); + // Collect updates for the local-catalog mode. + if (wantMinimalResponse) { + res.result.addToUpdated_catalog_objects(tbl.toInvalidationObject()); + return true; + } + // Collect updates for the legacy catalog mode. + String errMsg; + try { + // TODO: count the lock waiting time into the request's timeout? + if (tbl.readLock().tryLock(600000L, TimeUnit.MILLISECONDS)) { + res.result.addToUpdated_catalog_objects(tbl.toTCatalogObject()); + return true; + } + errMsg = "HMS events are synced as expected but timed out to get " + + "the update of table " + tbl.getFullName(); + } catch (InterruptedException e) { + errMsg = "HMS events are synced as expected but acquiring read lock of table " + + tbl.getFullName() + " got interrupted"; + } finally { + if (tbl.isReadLockedByCurrentThread()) { + tbl.readLock().unlock(); + } + } + TStatus errStatus = new TStatus(TErrorCode.RPC_GENERAL_ERROR, + Lists.newArrayList(errMsg)); + res.setStatus(errStatus); + // 'status' is a required field of TCatalogUpdateResult, so we have to set + // it though it's unused. + res.result.setStatus(errStatus); + LOG.error(errMsg); + return false; + } + + private boolean addDbToWaitForHmsEventResponse(TWaitForHmsEventResponse res, + String dbName, boolean wantMinimalResponse, boolean wantTableList) { + Db db = getDb(dbName); + TCatalogObject resDb = new TCatalogObject(); + resDb.setType(TCatalogObjectType.DATABASE); + // If the db was removed, add deletions for it and all tables under it. + if (db == null) { + resDb.setDb(new TDatabase(dbName)); + long version = deleteLog_.getLatestRemovedVersion(resDb); + if (version > 0) { + resDb.setCatalog_version(version); + res.result.addToRemoved_catalog_objects(resDb); + } + for (TCatalogObject obj : deleteLog_.retrieveTableObjects(dbName)) { + res.result.addToRemoved_catalog_objects(obj); + } + return true; + } + if (wantMinimalResponse) { + res.result.addToUpdated_catalog_objects(db.toMinimalTCatalogObject()); + } else { + resDb.setCatalog_version(db.getCatalogVersion()); + resDb.setDb(db.toThrift()); + res.result.addToUpdated_catalog_objects(resDb); + } + if (wantTableList) { + // TODO: gets a list of known tables from the coordinator and only sends + // back unknown/updated/removed tables. + for (Table tbl : db.getTables()) { + if (!addTableToWaitForHmsEventResponse(res, tbl, wantMinimalResponse)) { + return false; + } + } + // Add tables that are removed. + for (TCatalogObject obj : deleteLog_.retrieveTableObjects(dbName)) { + // Ignore re-created tables + if (db.getTable(obj.table.tbl_name) != null) continue; + res.result.addToRemoved_catalog_objects(obj); + } + } + return true; + } + + public TWaitForHmsEventResponse waitForHmsEvent(TWaitForHmsEventRequest req) { + LOG.info("waitForHmsEvent request: want_minimal_response={}, coordinator={}, " + + "timeout_s={}, want_db_list={}, want_table_list={}, objects=[{}]", + req.header.want_minimal_response, req.header.coordinator_hostname, req.timeout_s, + req.want_db_list, req.want_table_list, !req.isSetObject_descs() ? "" : + req.object_descs.stream().map(Catalog::toCatalogObjectKey) + .collect(Collectors.joining(", "))); + TWaitForHmsEventResponse res = new TWaitForHmsEventResponse(); + if (!(metastoreEventProcessor_ instanceof MetastoreEventsProcessor)) { + res.setStatus(new TStatus(TErrorCode.RPC_GENERAL_ERROR, + Lists.newArrayList("HMS event processing is disabled"))); + LOG.error("HMS event processing is disabled. Return without waiting."); + return res; + } + MetastoreEventsProcessor eventsProcessor = + (MetastoreEventsProcessor) metastoreEventProcessor_; + TStatus status = eventsProcessor.waitForSyncUpToCurrentEvent(req.timeout_s * 1000L); + if (status.status_code != TErrorCode.OK) { + res.setStatus(status); + LOG.error(String.join("\n", status.error_msgs)); + return res; + } + res.setResult(new TCatalogUpdateResult()); + res.getResult().setCatalog_service_id(JniCatalog.getServiceId()); + + // Collect catalog objects required by the query + boolean wantMinimalResponse = req.header.want_minimal_response; + if (req.isSetObject_descs()) { + for (TCatalogObject catalogObject: req.getObject_descs()) { + if (catalogObject.isSetDb()) { + if (!addDbToWaitForHmsEventResponse(res, catalogObject.getDb().db_name, + wantMinimalResponse, req.want_table_list)) { + // Error status is already set in addDbToWaitForHmsEventResponse() + return res; + } + } else if (catalogObject.isSetTable()) { + TTable table = catalogObject.getTable(); + Db db = getDb(table.db_name); + // If the db was removed, adds a deletion for it in the response + if (db == null) { + boolean success = addDbToWaitForHmsEventResponse( + res, table.db_name, wantMinimalResponse, /*wantTableList*/false); + Preconditions.checkState(success, + "should success since wantTableList is false"); + continue; + } + Table tbl = db.getTable(table.tbl_name); + if (tbl == null) { + addRemovedTableToWaitForHmsEventResponse(res, table.db_name, table.tbl_name); + } else if (!addTableToWaitForHmsEventResponse(res, tbl, wantMinimalResponse)) { + // Error status is already set in addTableToWaitForHmsEventResponse() + return res; + } + } + } + } else if (req.want_db_list) { + for (Db db : getAllDbs()) { + boolean success = addDbToWaitForHmsEventResponse(res, db.getName(), + wantMinimalResponse, /*wantTableList*/false); + Preconditions.checkState(success, + "should success since wantTableList is false"); + } + // Also add dbs that are recently deleted + for (TCatalogObject deletedDb : deleteLog_.retrieveDbObjects()) { + // Ignore re-created dbs + if (getDb(deletedDb.db.db_name) != null) continue; + res.result.addToRemoved_catalog_objects(deletedDb); + } + } + TStatus okStatus = new TStatus(TErrorCode.OK, Collections.emptyList()); + res.setStatus(okStatus); + // 'status' is a required field of TCatalogUpdateResult, so we have to set it though + // it's unused. + res.result.setStatus(okStatus); + LOG.info("waitForHmsEvent succeeds. updated_objects=[{}], removed_objects=[{}]", + getCatalogUpdateSummary(res.result.updated_catalog_objects), + getCatalogUpdateSummary(res.result.removed_catalog_objects)); + return res; + } + + private String getCatalogUpdateSummary(List<TCatalogObject> objs) { + if (objs == null) return ""; + return objs.stream().map(Catalog::toCatalogObjectSummary) + .collect(Collectors.joining(", ")); + } + /** * Marks write ids with corresponding status for the table if it is loaded HdfsTable. */ diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java index a5237008d..896179ffe 100644 --- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java @@ -392,7 +392,8 @@ public class ImpaladCatalog extends Catalog implements FeCatalog { * catalog version is < the catalog version of this drop operation. */ private void removeCatalogObject(TCatalogObject catalogObject) { - Preconditions.checkState(catalogObject.getCatalog_version() != 0); + Preconditions.checkState(catalogObject.getCatalog_version() != 0, + "Dropped catalog version is 0. type: %s", catalogObject.getType()); long dropCatalogVersion = catalogObject.getCatalog_version(); switch(catalogObject.getType()) { case DATABASE: diff --git a/fe/src/main/java/org/apache/impala/catalog/events/ExternalEventsProcessor.java b/fe/src/main/java/org/apache/impala/catalog/events/ExternalEventsProcessor.java index d9a70e7bf..ba98684ad 100644 --- a/fe/src/main/java/org/apache/impala/catalog/events/ExternalEventsProcessor.java +++ b/fe/src/main/java/org/apache/impala/catalog/events/ExternalEventsProcessor.java @@ -40,6 +40,11 @@ public interface ExternalEventsProcessor { */ long getCurrentEventId() throws MetastoreNotificationFetchException; + /** + * Get the latest event id that has been processed. + */ + default long getLastSyncedEventId() { return -1; } + /** * Pauses the event processing. Use <code>start(fromEventId)</code> method below to * restart the event processing diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java index 5c7a5e4aa..4928e4b2b 100644 --- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java +++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java @@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.common.util.concurrent.Uninterruptibles; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; @@ -60,12 +61,15 @@ import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent; import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventFactory; import org.apache.impala.common.Metrics; import org.apache.impala.common.PrintUtils; +import org.apache.impala.common.Reference; import org.apache.impala.compat.MetastoreShim; import org.apache.impala.service.BackendConfig; import org.apache.impala.service.CatalogOpExecutor; +import org.apache.impala.thrift.TErrorCode; import org.apache.impala.thrift.TEventBatchProgressInfo; import org.apache.impala.thrift.TEventProcessorMetrics; import org.apache.impala.thrift.TEventProcessorMetricsSummaryResponse; +import org.apache.impala.thrift.TStatus; import org.apache.impala.util.AcidUtils; import org.apache.impala.util.MetaStoreUtil; import org.apache.impala.util.NoOpEventSequence; @@ -678,6 +682,9 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { // can ignore the drop events when they are received later. private final DeleteEventLog deleteEventLog_ = new DeleteEventLog(); + // Sleep interval when waiting for HMS events to be synced. + private final int hmsEventSyncSleepIntervalMs_; + @VisibleForTesting MetastoreEventsProcessor(CatalogOpExecutor catalogOpExecutor, long startSyncFromId, long pollingFrequencyInSec) throws CatalogException { @@ -689,6 +696,10 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { initMetrics(); metastoreEventFactory_ = new MetastoreEventFactory(catalogOpExecutor); pollingFrequencyInSec_ = pollingFrequencyInSec; + hmsEventSyncSleepIntervalMs_ = BackendConfig.INSTANCE + .getBackendCfg().hms_event_sync_sleep_interval_ms; + Preconditions.checkState(hmsEventSyncSleepIntervalMs_ > 0, + "hms_event_sync_sleep_interval_ms must be positive"); } /** @@ -804,10 +815,8 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { } /** - * returns the current value of LastSyncedEventId. This method is not thread-safe and - * only to be used for testing purposes + * returns the current value of LastSyncedEventId. */ - @VisibleForTesting public long getLastSyncedEventId() { return lastSyncedEventId_.get(); } @@ -1521,4 +1530,57 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { return tableName_; } } + + public TStatus waitForSyncUpToCurrentEvent(long timeoutMs) { + TStatus res = new TStatus(); + // Only waits when event-processor is in ACTIVE/PAUSED states. PAUSED states happen + // at startup or when global invalidate is running, so it's ok to wait for. + if (!EventProcessorStatus.ACTIVE.equals(eventProcessorStatus_) + && !EventProcessorStatus.PAUSED.equals(eventProcessorStatus_)) { + res.setStatus_code(TErrorCode.GENERAL); + res.addToError_msgs( + "Current state of HMS event processor is " + eventProcessorStatus_); + return res; + } + long waitForEventId; + try { + waitForEventId = getCurrentEventId(); + } catch (MetastoreNotificationFetchException e) { + res.setStatus_code(TErrorCode.GENERAL); + res.addToError_msgs("Failed to fetch current HMS event id: " + e.getMessage()); + return res; + } + long lastSyncedEventId = getLastSyncedEventId(); + long startMs = System.currentTimeMillis(); + long sleepIntervalMs = Math.min(timeoutMs, hmsEventSyncSleepIntervalMs_); + // Avoid too many log entries if the waiting interval is smaller than 500ms. + int logIntervals = Math.max(1, 1000 / hmsEventSyncSleepIntervalMs_); + int numIters = 0; + while (lastSyncedEventId < waitForEventId + && System.currentTimeMillis() - startMs < timeoutMs) { + if (numIters++ % logIntervals == 0) { + LOG.info("Waiting for last synced event id ({}) to reach {}", + lastSyncedEventId, waitForEventId); + } + Uninterruptibles.sleepUninterruptibly(sleepIntervalMs, TimeUnit.MILLISECONDS); + lastSyncedEventId = getLastSyncedEventId(); + if (!EventProcessorStatus.ACTIVE.equals(eventProcessorStatus_) + && !EventProcessorStatus.PAUSED.equals(eventProcessorStatus_)) { + res.setStatus_code(TErrorCode.GENERAL); + res.addToError_msgs( + "Current state of HMS event processor is " + eventProcessorStatus_); + return res; + } + } + if (lastSyncedEventId < waitForEventId) { + res.setStatus_code(TErrorCode.GENERAL); + res.addToError_msgs(String.format("Timeout waiting for HMS events to be synced. " + + "Event id to wait for: %d. Last synced event id: %d", + waitForEventId, lastSyncedEventId)); + return res; + } + LOG.info("Last synced event id ({}) reached {}", lastSyncedEventId, waitForEventId); + res.setStatus_code(TErrorCode.OK); + return res; + } } diff --git a/fe/src/main/java/org/apache/impala/service/FeSupport.java b/fe/src/main/java/org/apache/impala/service/FeSupport.java index ed72ba2ff..baaf142eb 100644 --- a/fe/src/main/java/org/apache/impala/service/FeSupport.java +++ b/fe/src/main/java/org/apache/impala/service/FeSupport.java @@ -47,10 +47,13 @@ import org.apache.impala.thrift.TPrioritizeLoadResponse; import org.apache.impala.thrift.TQueryCtx; import org.apache.impala.thrift.TQueryOptions; import org.apache.impala.thrift.TResultRow; +import org.apache.impala.thrift.TStatus; import org.apache.impala.thrift.TSymbolLookupParams; import org.apache.impala.thrift.TSymbolLookupResult; import org.apache.impala.thrift.TTable; import org.apache.impala.thrift.TUniqueId; +import org.apache.impala.thrift.TWaitForHmsEventRequest; +import org.apache.impala.thrift.TWaitForHmsEventResponse; import org.apache.impala.util.NativeLibUtil; import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; @@ -155,6 +158,9 @@ public class FeSupport { // Get the number of live queries. public native static long NativeNumLiveQueries(); + public native static byte[] NativeWaitForHmsEvents(byte[] thriftReq, + byte[] thriftQueryOptions); + /** * Locally caches the jar at the specified HDFS location. * @@ -556,6 +562,22 @@ public class FeSupport { return NativeNumLiveQueries(); } + public static TStatus WaitForHmsEvents(TWaitForHmsEventRequest req, + TQueryOptions queryOptions) throws InternalException { + try { + TSerializer serializer = new TSerializer(); + byte[] result = NativeWaitForHmsEvents(serializer.serialize(req), + serializer.serialize(queryOptions)); + Preconditions.checkNotNull(result, "result of NativeWaitForHmsEvents is null"); + TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory()); + TStatus status = new TStatus(); + deserializer.deserialize(status, result); + return status; + } catch (TException e) { + throw new InternalException("Error waiting for HMS events", e); + } + } + /** * Calling this function before loadLibrary() causes external frontend * initialization to be used during NativeFeInit() diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index e677580d3..0a2710f86 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -92,13 +92,16 @@ import org.apache.impala.analysis.OptimizeStmt; import org.apache.impala.analysis.Parser; import org.apache.impala.analysis.QueryStmt; import org.apache.impala.analysis.ResetMetadataStmt; +import org.apache.impala.analysis.ShowDbsStmt; import org.apache.impala.analysis.ShowFunctionsStmt; import org.apache.impala.analysis.ShowGrantPrincipalStmt; import org.apache.impala.analysis.ShowRolesStmt; +import org.apache.impala.analysis.ShowTablesOrViewsStmt; import org.apache.impala.analysis.StatementBase; import org.apache.impala.analysis.StmtMetadataLoader; import org.apache.impala.analysis.StmtMetadataLoader.StmtTableCache; import org.apache.impala.analysis.TableName; +import org.apache.impala.analysis.TableRef; import org.apache.impala.analysis.TruncateStmt; import org.apache.impala.authentication.saml.ImpalaSamlClient; import org.apache.impala.authorization.AuthorizationChecker; @@ -133,6 +136,7 @@ import org.apache.impala.catalog.MaterializedViewHdfsTable; import org.apache.impala.catalog.MetaStoreClientPool; import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient; import org.apache.impala.catalog.StructType; +import org.apache.impala.catalog.SystemTable; import org.apache.impala.catalog.TableLoadingException; import org.apache.impala.catalog.Type; import org.apache.impala.catalog.local.InconsistentMetadataFetchException; @@ -160,6 +164,8 @@ import org.apache.impala.planner.ScanNode; import org.apache.impala.thrift.CatalogLookupStatus; import org.apache.impala.thrift.TAlterDbParams; import org.apache.impala.thrift.TBackendGflags; +import org.apache.impala.thrift.TCatalogObject; +import org.apache.impala.thrift.TCatalogObjectType; import org.apache.impala.thrift.TCatalogOpRequest; import org.apache.impala.thrift.TCatalogOpType; import org.apache.impala.thrift.TCatalogServiceRequestHeader; @@ -170,12 +176,14 @@ import org.apache.impala.thrift.TCommentOnParams; import org.apache.impala.thrift.TCopyTestCaseReq; import org.apache.impala.thrift.TCounter; import org.apache.impala.thrift.TCreateDropRoleParams; +import org.apache.impala.thrift.TDatabase; import org.apache.impala.thrift.TDdlExecRequest; import org.apache.impala.thrift.TDdlQueryOptions; import org.apache.impala.thrift.TDdlType; import org.apache.impala.thrift.TDescribeHistoryParams; import org.apache.impala.thrift.TDescribeOutputStyle; import org.apache.impala.thrift.TDescribeResult; +import org.apache.impala.thrift.TErrorCode; import org.apache.impala.thrift.TImpalaTableType; import org.apache.impala.thrift.TDescribeTableParams; import org.apache.impala.thrift.TIcebergDmlFinalizeParams; @@ -212,13 +220,17 @@ import org.apache.impala.thrift.TRuntimeProfileNode; import org.apache.impala.thrift.TShowFilesParams; import org.apache.impala.thrift.TShowStatsOp; import org.apache.impala.thrift.TSlotCountStrategy; +import org.apache.impala.thrift.TStatus; import org.apache.impala.thrift.TStmtType; +import org.apache.impala.thrift.TTable; import org.apache.impala.thrift.TTableName; import org.apache.impala.thrift.TTruncateParams; import org.apache.impala.thrift.TUniqueId; import org.apache.impala.thrift.TUnit; import org.apache.impala.thrift.TUpdateCatalogCacheRequest; import org.apache.impala.thrift.TUpdateCatalogCacheResponse; +import org.apache.impala.thrift.TWaitForHmsEventRequest; +import org.apache.impala.thrift.TWaitForHmsEventResponse; import org.apache.impala.util.AcidUtils; import org.apache.impala.util.DebugUtils; import org.apache.impala.util.EventSequence; @@ -904,17 +916,9 @@ public class Frontend { ddl.setSync_ddl(result.getQuery_options().isSync_ddl()); result.setCatalog_op_request(ddl); if (ddl.getOp_type() == TCatalogOpType.DDL) { - TCatalogServiceRequestHeader header = new TCatalogServiceRequestHeader(); - header.setRequesting_user(analysis.getAnalyzer().getUser().getName()); - TQueryCtx queryCtx = analysis.getAnalyzer().getQueryCtx(); - header.setQuery_id(queryCtx.query_id); - header.setClient_ip(queryCtx.getSession().getNetwork_address().getHostname()); - TClientRequest clientRequest = queryCtx.getClient_request(); - header.setRedacted_sql_stmt(clientRequest.isSetRedacted_stmt() ? - clientRequest.getRedacted_stmt() : clientRequest.getStmt()); - header.setWant_minimal_response( - BackendConfig.INSTANCE.getBackendCfg().use_local_catalog); - header.setCoordinator_hostname(BackendConfig.INSTANCE.getHostname()); + TCatalogServiceRequestHeader header = createCatalogServiceRequestHeader( + analysis.getAnalyzer().getUser().getName(), + analysis.getAnalyzer().getQueryCtx()); ddl.getDdl_params().setHeader(header); // Forward relevant query options to the catalogd. TDdlQueryOptions ddlQueryOpts = new TDdlQueryOptions(); @@ -2185,6 +2189,119 @@ public class Frontend { expectedNumExecutor(execGroupSet), execGroupSet.getNum_cores_per_executor()); } + private static TCatalogServiceRequestHeader createCatalogServiceRequestHeader( + String requestingUser, TQueryCtx queryCtx) { + TCatalogServiceRequestHeader header = new TCatalogServiceRequestHeader(); + header.setRequesting_user(requestingUser); + header.setQuery_id(queryCtx.query_id); + header.setClient_ip(queryCtx.getSession().getNetwork_address().getHostname()); + TClientRequest clientRequest = queryCtx.getClient_request(); + header.setRedacted_sql_stmt(clientRequest.isSetRedacted_stmt() ? + clientRequest.getRedacted_stmt() : clientRequest.getStmt()); + header.setWant_minimal_response( + BackendConfig.INSTANCE.getBackendCfg().use_local_catalog); + header.setCoordinator_hostname(BackendConfig.INSTANCE.getHostname()); + return header; + } + + /** + * Collect required catalog objects for query planning of the statement and update + * the request. + */ + private void collectRequiredObjects(TWaitForHmsEventRequest req, StatementBase stmt, + String sessionDb) { + if (stmt instanceof ShowDbsStmt) { + req.want_db_list = true; + return; + } + // The statement hasn't been analyzed yet. We can only get the parsed db, i.e. the db + // name written in the query string. + String dbName = stmt.getParsedDb(); + // Use the session database if the db is not specified in SHOW TABLES/VIEWS/FUNCTIONS + // statement. + if (dbName == null && + (stmt instanceof ShowFunctionsStmt || stmt instanceof ShowTablesOrViewsStmt)) { + dbName = sessionDb; + } + if (dbName != null) { + TCatalogObject dbDesc = new TCatalogObject(); + dbDesc.setType(TCatalogObjectType.DATABASE); + dbDesc.setDb(new TDatabase(dbName)); + req.addToObject_descs(dbDesc); + if (stmt instanceof ShowTablesOrViewsStmt) { + req.setWant_table_list(true); + LOG.info("Waiting for HMS events on database {} and underlying tables", dbName); + } else { + LOG.info("Waiting for HMS events on database " + dbName); + } + // 'dbName' is not null only for CREATE/DROP/ALTER database or SHOW TABLES/VIEWS + // statements. No more tables are needed. + return; + } + List<TableRef> tblRefs = new ArrayList<>(); + stmt.collectTableRefs(tblRefs); + Set<TableName> tableNames = new HashSet<>(); + for (TableRef ref : tblRefs) { + tableNames.addAll(org.apache.impala.analysis.Path.getCandidateTables( + ref.getPath(), sessionDb)); + } + Set<String> dbNames = new HashSet<>(); + for (TableName tblName : tableNames) { + dbNames.add(tblName.getDb()); + TCatalogObject tblDesc = new TCatalogObject(); + tblDesc.setType(TCatalogObjectType.TABLE); + tblDesc.setTable(new TTable(tblName.getDb(), tblName.getTbl())); + req.addToObject_descs(tblDesc); + } + // Add dbs needed by the tables. + for (String name : dbNames) { + TCatalogObject dbDesc = new TCatalogObject(); + dbDesc.setType(TCatalogObjectType.DATABASE); + dbDesc.setDb(new TDatabase(name)); + req.addToObject_descs(dbDesc); + } + LOG.info("Waiting for HMS events on the dbs: {} and tables: {}", + String.join(", ", dbNames), + tableNames.stream().map(TableName::toString).collect(Collectors.joining(", "))); + } + + private void waitForHmsEvents(TQueryCtx queryCtx, TQueryOptions queryOptions, + List<String> warnings, EventSequence timeline) throws ImpalaException { + if (!queryOptions.isSetSync_hms_events_wait_time_s() + || queryOptions.sync_hms_events_wait_time_s <= 0) { + return; + } + StatementBase stmt = Parser.parse( + queryCtx.client_request.stmt, queryCtx.client_request.query_options); + // Return immediately for simple statements that don't require HMS metadata, e.g. + // SET query_option=value; + if (!stmt.requiresHmsMetadata()) return; + TWaitForHmsEventRequest req = new TWaitForHmsEventRequest(); + req.setTimeout_s(queryOptions.sync_hms_events_wait_time_s); + req.setHeader(createCatalogServiceRequestHeader( + TSessionStateUtil.getEffectiveUser(queryCtx.session), queryCtx)); + collectRequiredObjects(req, stmt, queryCtx.session.database); + // TODO: share 'timeline' to BE so we know when the updates are applied + TStatus status = FeSupport.WaitForHmsEvents(req, queryOptions); + if (status.status_code != TErrorCode.OK) { + String errorMsg; + if (queryOptions.sync_hms_events_strict_mode) { + errorMsg = "Failed to sync events from Metastore"; + } else { + errorMsg = "Continuing without syncing Metastore events"; + } + timeline.markEvent(errorMsg); + errorMsg += ": " + String.join("", status.error_msgs); + LOG.error(errorMsg); + warnings.add(errorMsg); + if (queryOptions.sync_hms_events_strict_mode) { + throw new InternalException(errorMsg); + } + } else { + timeline.markEvent("Synced events from Metastore"); + } + } + private TExecRequest getTExecRequest(PlanCtx planCtx, EventSequence timeline) throws ImpalaException { TQueryCtx queryCtx = planCtx.getQueryContext(); @@ -2193,6 +2310,8 @@ public class Frontend { + queryCtx.session.database); TQueryOptions queryOptions = queryCtx.client_request.getQuery_options(); + List<String> warnings = new ArrayList<>(); + waitForHmsEvents(queryCtx, queryOptions, warnings, timeline); boolean enable_replan = queryOptions.isEnable_replan(); final boolean clientSetRequestPool = queryOptions.isSetRequest_pool(); Preconditions.checkState( @@ -2288,7 +2407,7 @@ public class Frontend { String retryMsg = ""; while (true) { try { - req = doCreateExecRequest(planCtx, timeline); + req = doCreateExecRequest(planCtx, warnings, timeline); markTimelineRetries(attempt, retryMsg, timeline); break; } catch (InconsistentMetadataFetchException e) { @@ -2584,7 +2703,7 @@ public class Frontend { } private TExecRequest doCreateExecRequest(PlanCtx planCtx, - EventSequence timeline) throws ImpalaException { + List<String> warnings, EventSequence timeline) throws ImpalaException { TQueryCtx queryCtx = planCtx.getQueryContext(); // Parse stmt and collect/load metadata to populate a stmt-local table cache StatementBase stmt = Parser.parse( @@ -2628,6 +2747,7 @@ public class Frontend { LOG.info("Analysis finished."); } Preconditions.checkNotNull(analysisResult.getStmt()); + analysisResult.getAnalyzer().addWarnings(warnings); TExecRequest result = createBaseExecRequest(queryCtx, analysisResult); for (TableName table : stmtTableCache.tables.keySet()) { result.addToTables(table.toThrift()); diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java index f6e80c5c1..cdf4d91ba 100644 --- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java +++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java @@ -91,6 +91,8 @@ import org.apache.impala.thrift.TUniqueId; import org.apache.impala.thrift.TUpdateCatalogRequest; import org.apache.impala.thrift.TUpdateTableUsageRequest; import org.apache.impala.thrift.TGetAllHadoopConfigsResponse; +import org.apache.impala.thrift.TWaitForHmsEventRequest; +import org.apache.impala.thrift.TWaitForHmsEventResponse; import org.apache.impala.util.AuthorizationUtil; import org.apache.impala.util.CatalogOpUtil; import org.apache.impala.util.EventSequence; @@ -98,6 +100,7 @@ import org.apache.impala.util.GlogAppender; import org.apache.impala.util.MetaStoreUtil; import org.apache.impala.util.NoOpEventSequence; import org.apache.impala.util.PatternMatcher; +import org.apache.impala.util.TUniqueIdUtil; import org.apache.impala.util.ThreadNameAnnotator; import org.apache.impala.util.TUniqueIdUtil; import org.apache.thrift.TBase; @@ -105,6 +108,7 @@ import org.apache.thrift.TException; import org.apache.thrift.TSerializer; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TSimpleJSONProtocol; +import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -637,4 +641,13 @@ public class JniCatalog { throw new InternalException(e.getMessage()); } } + + public byte[] waitForHmsEvent(byte[] req) throws TException, ImpalaException { + TWaitForHmsEventRequest request = new TWaitForHmsEventRequest(); + JniUtil.deserializeThrift(protocolFactory_, request, req); + String queryId = TUniqueIdUtil.PrintId(request.getHeader().getQuery_id()); + String shortDesc = "waitForHmsEvent " + queryId; + return execAndSerialize( + "waitForHmsEvent", shortDesc, () -> catalog_.waitForHmsEvent(request)); + } } diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py index 0cc5d4ea5..4addc0ecd 100644 --- a/tests/common/impala_test_suite.py +++ b/tests/common/impala_test_suite.py @@ -624,6 +624,26 @@ class ImpalaTestSuite(BaseTestSuite): return int(m['value']) assert False, "Could not find metric: %s" % name + def verify_timeline_item(self, section, label, profile): + in_section = False + for line in profile.split('\n'): + line = line.strip() + if line.startswith(section + ": "): + in_section = True + continue + if not in_section: + continue + if not line.startswith("- "): + # Not a label. The current timeline section ends. + break + if line.startswith("- {}: ".format(label)): + match = re.search(r'\((.*)\)', line) + assert match, "Value not found in '{}'".format(line) + duration = match.group(1) + assert duration != '0ns' and duration != '0' + return + assert False, "'- {}:' not found in\n{}".format(label, profile) + def __do_replacements(self, s, use_db=None, extra=None): globs = globals() # following assignment are purposefully redundant to avoid flake8 warnings (F401). @@ -1477,49 +1497,37 @@ class ImpalaTestSuite(BaseTestSuite): """Check if lines contain value.""" return any([line.find(value) != -1 for line in lines]) + def execute_query_with_hms_sync(self, query, timeout_s, strict=True): + """Execute the query with WaitForHmsSync enabled""" + with self.create_impala_client() as client: + client.set_configuration({"sync_hms_events_wait_time_s": timeout_s, + "sync_hms_events_strict_mode": strict}) + return self.__execute_query(client, query) + def wait_for_db_to_appear(self, db_name, timeout_s): """Wait until the database with 'db_name' is present in the impalad's local catalog. - Fail after timeout_s if the doesn't appear.""" - start_time = time.time() - while time.time() - start_time < timeout_s: - try: - # This will throw an exception if the database is not present. - self.client.execute("describe database `{db_name}`".format(db_name=db_name)) - return - except Exception: - time.sleep(0.2) - continue - raise Exception("DB {0} didn't show up after {1}s", db_name, timeout_s) + Fail after timeout_s if it doesn't appear.""" + result = self.execute_query_with_hms_sync( + "describe database `{0}`".format(db_name), timeout_s) + assert result.success def confirm_db_exists(self, db_name): """Confirm the database with 'db_name' is present in the impalad's local catalog. Fail if the db is not present""" # This will throw an exception if the database is not present. self.client.execute("describe database `{db_name}`".format(db_name=db_name)) - return def confirm_table_exists(self, db_name, tbl_name): """Confirms if the table exists. The describe table command will fail if the table does not exist.""" self.client.execute("describe `{0}`.`{1}`".format(db_name, tbl_name)) - return - def wait_for_table_to_appear(self, db_name, table_name, timeout_s): + def wait_for_table_to_appear(self, db_name, table_name, timeout_s=10): """Wait until the table with 'table_name' in 'db_name' is present in the - impalad's local catalog. Fail after timeout_s if the doesn't appear.""" - start_time = time.time() - while time.time() - start_time < timeout_s: - try: - # This will throw an exception if the table is not present. - self.client.execute("describe `{db_name}`.`{table_name}`".format( - db_name=db_name, table_name=table_name)) - return - except Exception as ex: - print(str(ex)) - time.sleep(0.2) - continue - raise Exception("Table {0}.{1} didn't show up after {2}s", db_name, table_name, - timeout_s) + impalad's local catalog. Fail after timeout_s if it doesn't appear.""" + result = self.execute_query_with_hms_sync( + "describe `{0}`.`{1}`".format(db_name, table_name), timeout_s) + assert result.success def assert_eventually(self, timeout_s, period_s, condition, error_msg=None): """Assert that the condition (a function with no parameters) returns True within the diff --git a/tests/custom_cluster/test_event_processing_error.py b/tests/custom_cluster/test_event_processing_error.py index afe954311..64516480d 100644 --- a/tests/custom_cluster/test_event_processing_error.py +++ b/tests/custom_cluster/test_event_processing_error.py @@ -356,21 +356,19 @@ class TestEventProcessingError(CustomClusterTestSuite): replication tests """ # inserts on transactional tables - TestEventProcessingBase._run_test_insert_events_impl(self.hive_client, self.client, - self.cluster, unique_database, True) + TestEventProcessingBase._run_test_insert_events_impl(unique_database, True) assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" try: test_db = unique_database + "_no_transact" self.run_stmt_in_hive("""create database {}""".format(test_db)) # inserts on external tables - TestEventProcessingBase._run_test_insert_events_impl(self.hive_client, - self.client, self.cluster, test_db, False) + TestEventProcessingBase._run_test_insert_events_impl(test_db, False) assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" finally: self.run_stmt_in_hive("""drop database {} cascade""".format(test_db)) # replication related tests - TestEventProcessingBase._run_event_based_replication_tests_impl(self.hive_client, - self.client, self.cluster, self.filesystem_client) + TestEventProcessingBase._run_event_based_replication_tests_impl( + self.filesystem_client) assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" def __create_table_and_load__(self, db_name, table_name, is_transactional, diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py index f8c3d0752..c3f9bd648 100644 --- a/tests/custom_cluster/test_events_custom_configs.py +++ b/tests/custom_cluster/test_events_custom_configs.py @@ -46,6 +46,10 @@ STATESTORED_ARGS = ( "-statestore_heartbeat_frequency_ms={freq_ms} " "-statestore_priority_update_frequency_ms={freq_ms}").format( freq_ms=STATESTORE_RPC_FREQUENCY_MS) +EVENT_SYNC_QUERY_OPTIONS = { + "sync_hms_events_wait_time_s": 10, + "sync_hms_events_strict_mode": True +} def wait_single_statestore_heartbeat(): @@ -428,11 +432,11 @@ class TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase): create table {db}.{tbl} (id int); insert into {db}.{tbl} values(1);""".format(db=db_name, tbl=tbl_name)) # With MetastoreEventProcessor running, the insert event will be processed. Query - # the table from Impala. - EventProcessorUtils.wait_for_event_processing(self, event_proc_timeout) - # Verify that the data is present in Impala. - data = self.execute_scalar("select * from %s.%s" % (db_name, tbl_name)) - assert data == '1' + # the table from Impala. Verify that the data is present in Impala. + result = self.execute_query_with_hms_sync( + "select * from %s.%s" % (db_name, tbl_name), event_proc_timeout) + assert len(result.data) == 1 + assert result.data[0] == '1' # Execute ALTER TABLE + DROP in quick succession so they will be processed in the # same event batch. self.run_stmt_in_hive(""" @@ -1491,3 +1495,96 @@ class TestEventProcessingWithImpala(TestEventProcessingCustomConfigsBase): self.cluster.catalogd.set_jvm_log_level("org.apache.impala.util.DebugUtils", "trace") self._run_self_events_test(unique_database, vector.get_value('exec_option'), use_impala=True) + + [email protected] +class TestEventSyncFailures(TestEventProcessingCustomConfigsBase): + + @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=0") + def test_hms_event_sync_with_event_processing_disabled(self, vector): + """Test with HMS event processing disabled. Verify the error message appears + correctly in non-strict mode. Verify the query fails in strict mode.""" + client = self.default_impala_client(vector.get_value('protocol')) + query = "select count(*) from functional.alltypes" + # Timeline label shown in the profile when the wait failed. + label = "Continuing without syncing Metastore events" + # Verify error messages in non-strict mode + client.set_configuration({"sync_hms_events_wait_time_s": 10}) + handle = client.execute_async(query) + client.wait_for_finished_timeout(handle, 5) + results = client.fetch(query, handle) + assert results.success + assert len(results.data) == 1 + assert int(results.data[0]) == 7300 + + client_log = client.get_log(handle) + expected_error = "Continuing without syncing Metastore events: " \ + "HMS event processing is disabled" + assert expected_error in client_log + profile = client.get_runtime_profile(handle) + assert "Errors: " + expected_error in profile, profile + self.verify_timeline_item("Query Compilation", label, profile) + client.close_query(handle) + + # Verify multi-lines in error log + client.set_configuration({"sync_hms_events_wait_time_s": 10, + "debug_action": "0:PREPARE:INJECT_ERROR_LOG"}) + handle = client.execute_async(query) + client.wait_for_finished_timeout(handle, 5) + client_log = client.get_log(handle) + expected_error += "\nDebug Action: INJECT_ERROR_LOG" + assert expected_error in client_log + profile = client.get_runtime_profile(handle) + assert "Errors: " + expected_error in profile, profile + self.verify_timeline_item("Query Compilation", label, profile) + + # Verify failures in strict mode + err = self.execute_query_expect_failure( + client, query, EVENT_SYNC_QUERY_OPTIONS) + expected_error = "Failed to sync events from Metastore: " \ + "HMS event processing is disabled" + assert expected_error in str(err) + client.close_query(handle) + + @CustomClusterTestSuite.with_args( + catalogd_args="--debug_actions=catalogd_event_processing_delay:SLEEP@2000") + def test_hms_event_sync_timeout(self, vector, unique_database): + client = self.default_impala_client(vector.get_value('protocol')) + # Timeline label shown in the profile when the wait failed. + label = "Continuing without syncing Metastore events" + # Prepare a partitioned table and load it into catalogd + create_stmt = "create table {}.part (i int) partitioned by (p int)".format( + unique_database) + self.execute_query_expect_success(client, create_stmt) + self.execute_query_expect_success(client, "describe {}.part".format(unique_database)) + + # Add a partition in Hive which generates an ADD_PARTITION event + alter_stmt = "insert into {}.part partition (p=0) values (0)".format(unique_database) + self.run_stmt_in_hive(alter_stmt) + + # SELECT gets 0 rows since timeout waiting for HMS events to be synced + query = "select * from {}.part".format(unique_database) + client.set_configuration({"sync_hms_events_wait_time_s": 2}) + handle = client.execute_async(query) + client.wait_for_finished_timeout(handle, 60) + results = client.fetch(query, handle) + assert results.success + assert len(results.data) == 0 + + # Verify the warnings + client_log = client.get_log(handle) + expected_error = ( + "Continuing without syncing Metastore events: " + "Timeout waiting for HMS events to be synced. Event id to wait for:") + assert expected_error in client_log + assert ". Last synced event id: " in client_log + profile = client.get_runtime_profile(handle) + assert "Errors: " + expected_error in profile, profile + self.verify_timeline_item("Query Compilation", label, profile) + # The duration is something like "2s034ms". Just checking "2s" here. + assert "- Continuing without syncing Metastore events: 2s" in profile, profile + client.close_query(handle) + + # SELECT gets the new row if waiting for enough time + results = self.execute_query_expect_success(client, query, EVENT_SYNC_QUERY_OPTIONS) + assert len(results.data) == 1 diff --git a/tests/custom_cluster/test_hive_parquet_codec_interop.py b/tests/custom_cluster/test_hive_parquet_codec_interop.py index a020a2be7..4f54e36ea 100644 --- a/tests/custom_cluster/test_hive_parquet_codec_interop.py +++ b/tests/custom_cluster/test_hive_parquet_codec_interop.py @@ -99,8 +99,7 @@ class TestParquetInterop(CustomClusterTestSuite): # Make sure Impala's metadata is in sync. if cluster_properties.is_event_polling_enabled(): assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" - EventProcessorUtils.wait_for_event_processing(self) - self.confirm_table_exists(unique_database, "t1_hive") + self.wait_for_table_to_appear(unique_database, "t1_hive") else: self.client.execute("invalidate metadata {0}".format(hive_table)) diff --git a/tests/custom_cluster/test_kudu.py b/tests/custom_cluster/test_kudu.py index e712b7421..aa380a951 100644 --- a/tests/custom_cluster/test_kudu.py +++ b/tests/custom_cluster/test_kudu.py @@ -28,7 +28,6 @@ from tests.common.kudu_test_suite import KuduTestSuite from tests.common.skip import SkipIfKudu, SkipIfBuildType, SkipIf from tests.common.test_dimensions import BEESWAX, add_mandatory_exec_option from tests.common.test_result_verifier import error_msg_expected -from tests.util.event_processor_utils import EventProcessorUtils KUDU_MASTER_HOSTS = pytest.config.option.kudu_master_hosts LOG = logging.getLogger(__name__) @@ -319,15 +318,13 @@ class TestKuduHMSIntegration(CustomKuduTest): kudu_client.delete_table(kudu_tbl_name) assert not kudu_client.table_exists(kudu_tbl_name) - # Wait for events to prevent race condition - EventProcessorUtils.wait_for_event_processing(self) - try: - cursor.execute("DROP TABLE %s" % kudu_tbl_name) + self.execute_query_with_hms_sync( + "DROP TABLE %s" % kudu_tbl_name, 10, strict=True) assert False except Exception as e: LOG.info(str(e)) - "Table does not exist: %s" % kudu_tbl_name in str(e) + assert "Table does not exist: %s" % kudu_tbl_name in str(e) @pytest.mark.execute_serially def test_drop_external_kudu_table(self, cursor, kudu_client, unique_database): diff --git a/tests/metadata/test_event_processing.py b/tests/metadata/test_event_processing.py index 16f3e29cc..1d1d2cc19 100644 --- a/tests/metadata/test_event_processing.py +++ b/tests/metadata/test_event_processing.py @@ -20,33 +20,34 @@ import pytest import re import time -from tests.common.impala_cluster import ImpalaCluster +from tests.common.test_dimensions import ( + create_single_exec_option_dimension, + add_mandatory_exec_option) from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.skip import SkipIfFS, SkipIfHive2, SkipIfCatalogV2 from tests.metadata.test_event_processing_base import TestEventProcessingBase from tests.util.event_processor_utils import EventProcessorUtils +PROCESSING_TIMEOUT_S = 10 + @SkipIfFS.hive @SkipIfCatalogV2.hms_event_polling_disabled() class TestEventProcessing(ImpalaTestSuite): """This class contains tests that exercise the event processing mechanism in the catalog.""" - CATALOG_URL = "http://localhost:25020" - PROCESSING_TIMEOUT_S = 10 @SkipIfHive2.acid def test_transactional_insert_events(self, unique_database): """Executes 'run_test_insert_events' for transactional tables. """ - TestEventProcessingBase._run_test_insert_events_impl(self.hive_client, self.client, - ImpalaCluster.get_e2e_test_cluster(), unique_database, is_transactional=True) + TestEventProcessingBase._run_test_insert_events_impl( + unique_database, is_transactional=True) def test_insert_events(self, unique_database): """Executes 'run_test_insert_events' for non-transactional tables. """ - TestEventProcessingBase._run_test_insert_events_impl(self.hive_client, self.client, - ImpalaCluster.get_e2e_test_cluster(), unique_database) + TestEventProcessingBase._run_test_insert_events_impl(unique_database) def test_iceberg_inserts(self): """IMPALA-10735: INSERT INTO Iceberg table fails during INSERT event generation @@ -75,8 +76,8 @@ class TestEventProcessing(ImpalaTestSuite): self._run_test_empty_partition_events(unique_database, False) def test_event_based_replication(self): - TestEventProcessingBase._run_event_based_replication_tests_impl(self.hive_client, - self.client, ImpalaCluster.get_e2e_test_cluster(), self.filesystem_client) + TestEventProcessingBase._run_event_based_replication_tests_impl( + self.filesystem_client) def _run_test_empty_partition_events(self, unique_database, is_transactional): test_tbl = unique_database + ".test_events" @@ -84,29 +85,28 @@ class TestEventProcessing(ImpalaTestSuite): is_transactional) self.run_stmt_in_hive("create table {0} (key string, value string) \ partitioned by (year int) stored as parquet {1}".format(test_tbl, TBLPROPERTIES)) - EventProcessorUtils.wait_for_event_processing(self) + self.client.set_configuration({ + "sync_hms_events_wait_time_s": PROCESSING_TIMEOUT_S, + "sync_hms_events_strict_mode": True + }) self.client.execute("describe {0}".format(test_tbl)) self.run_stmt_in_hive( "alter table {0} add partition (year=2019)".format(test_tbl)) - EventProcessorUtils.wait_for_event_processing(self) assert [('2019',)] == self.get_impala_partition_info(test_tbl, 'year') self.run_stmt_in_hive( "alter table {0} add if not exists partition (year=2019)".format(test_tbl)) - EventProcessorUtils.wait_for_event_processing(self) assert [('2019',)] == self.get_impala_partition_info(test_tbl, 'year') assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" self.run_stmt_in_hive( "alter table {0} drop partition (year=2019)".format(test_tbl)) - EventProcessorUtils.wait_for_event_processing(self) assert ('2019') not in self.get_impala_partition_info(test_tbl, 'year') assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" self.run_stmt_in_hive( "alter table {0} drop if exists partition (year=2019)".format(test_tbl)) - EventProcessorUtils.wait_for_event_processing(self) assert ('2019') not in self.get_impala_partition_info(test_tbl, 'year') assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" @@ -287,3 +287,125 @@ class TestEventProcessing(ImpalaTestSuite): assert warning in cmd_output # The cleanup method will drop 'unique_database' and tables in it, which generates # more than 2 self-events. It's OK for EP to skip them. + + [email protected] [email protected]_event_polling_disabled() +class TestEventSyncWaiting(ImpalaTestSuite): + + @classmethod + def get_workload(cls): + return 'functional-planner' + + @classmethod + def add_test_dimensions(cls): + super(TestEventSyncWaiting, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension()) + add_mandatory_exec_option(cls, 'sync_hms_events_wait_time_s', PROCESSING_TIMEOUT_S) + add_mandatory_exec_option(cls, 'sync_hms_events_strict_mode', True) + + def test_hms_event_sync(self, vector, unique_database): + """Verify query option sync_hms_events_wait_time_s should protect the query by + waiting until Impala sync the HMS changes.""" + client = self.default_impala_client(vector.get_value('protocol')) + client.set_configuration(vector.get_exec_option_dict()) + tbl_name = unique_database + ".tbl" + label = "Synced events from Metastore" + # Test DESCRIBE on new table created in Hive + self.run_stmt_in_hive( + "create table {0} (i int) partitioned by (p int)".format(tbl_name)) + res = self.execute_query_expect_success(client, "describe " + tbl_name) + assert res.data == ["i\tint\t", 'p\tint\t'] + assert res.log == '' + self.verify_timeline_item("Query Compilation", label, res.runtime_profile) + + # Test SHOW TABLES gets new tables created in Hive + self.run_stmt_in_hive("create table {0}_2 (i int)".format(tbl_name)) + res = self.execute_query_expect_success(client, "show tables in " + unique_database) + assert res.data == ["tbl", "tbl_2"] + assert res.log == '' + self.verify_timeline_item("Query Compilation", label, res.runtime_profile) + + # Test SHOW VIEWS gets new views created in Hive + self.run_stmt_in_hive( + "create view {0}.v as select * from {1}".format(unique_database, tbl_name)) + res = self.execute_query_expect_success(client, "show views in " + unique_database) + assert res.data == ["v"] + assert res.log == '' + self.verify_timeline_item("Query Compilation", label, res.runtime_profile) + + # Test DROP TABLE + try: + self.run_stmt_in_hive("""create database {0}_2; + create table {0}_2.tbl(i int); + create table {0}_2.tbl_2(i int);""".format(unique_database)) + self.execute_query_expect_success( + client, "drop table {0}_2.tbl".format(unique_database)) + finally: + self.run_stmt_in_hive( + "drop database if exists {0}_2 cascade".format(unique_database)) + + # Test SHOW DATABASES + res = self.execute_query_expect_success(client, "show databases") + assert unique_database + "\t" in res.data + assert unique_database + "_2\t" not in res.data + self.verify_timeline_item("Query Compilation", label, res.runtime_profile) + + # Test DROP DATABASE + try: + self.run_stmt_in_hive("create database {0}_3".format(unique_database)) + self.execute_query_expect_success( + client, "drop database {0}_3".format(unique_database)) + finally: + self.run_stmt_in_hive( + "drop database if exists {0}_3 cascade".format(unique_database)) + + # Test DESCRIBE DATABASE + try: + self.run_stmt_in_hive("create database {0}_4".format(unique_database)) + self.execute_query_expect_success( + client, "describe database {0}_4".format(unique_database)) + finally: + self.run_stmt_in_hive("drop database if exists {0}_4".format(unique_database)) + + # Test SHOW FUNCTIONS + try: + self.run_stmt_in_hive("create database {0}_5".format(unique_database)) + self.execute_query_expect_success( + client, "show functions in {0}_5".format(unique_database)) + finally: + self.run_stmt_in_hive("drop database if exists {0}_5".format(unique_database)) + + # Test SELECT gets new values inserted by Hive + self.run_stmt_in_hive( + "insert into table {0} partition (p=0) select 0".format(tbl_name)) + res = self.execute_query_expect_success(client, "select * from " + tbl_name) + assert res.data == ["0\t0"] + assert res.log == '' + self.verify_timeline_item("Query Compilation", label, res.runtime_profile) + # Same case but using INSERT OVERWRITE in Hive + self.run_stmt_in_hive( + "insert overwrite table {0} partition (p=0) select 1".format(tbl_name)) + res = self.execute_query_expect_success(client, "select * from " + tbl_name) + assert res.data == ["1\t0"] + assert res.log == '' + self.verify_timeline_item("Query Compilation", label, res.runtime_profile) + + # Test SHOW PARTITIONS gets new partitions created by Hive + self.run_stmt_in_hive( + "insert into table {0} partition (p=2) select 2".format(tbl_name)) + res = self.execute_query_expect_success(client, "show partitions " + tbl_name) + assert self.has_value('p=0', res.data) + assert self.has_value('p=2', res.data) + # 3 result lines: 2 for partitions, 1 for total info + assert len(res.data) == 3 + assert res.log == '' + self.verify_timeline_item("Query Compilation", label, res.runtime_profile) + + # Test CREATE TABLE on table dropped by Hive + self.run_stmt_in_hive("drop table " + tbl_name) + self.execute_query_expect_success(client, "create table {0} (j int)".format(tbl_name)) + res = self.execute_query_expect_success(client, "describe " + tbl_name) + assert res.data == ["j\tint\t"] + assert res.log == '' + self.verify_timeline_item("Query Compilation", label, res.runtime_profile) diff --git a/tests/metadata/test_event_processing_base.py b/tests/metadata/test_event_processing_base.py index 054fd2cfc..97725a2c0 100644 --- a/tests/metadata/test_event_processing_base.py +++ b/tests/metadata/test_event_processing_base.py @@ -17,137 +17,131 @@ from __future__ import absolute_import, division, print_function from tests.common.impala_test_suite import ImpalaTestSuite -from tests.common.skip import SkipIfFS, SkipIfCatalogV2 -from tests.util.event_processor_utils import EventProcessorUtils + +EVENT_SYNC_QUERY_OPTIONS = { + "sync_hms_events_wait_time_s": 10, + "sync_hms_events_strict_mode": True +} [email protected] [email protected]_event_polling_disabled() class TestEventProcessingBase(ImpalaTestSuite): @classmethod - def _run_test_insert_events_impl(cls, hive_client, impala_client, impala_cluster, - unique_database, is_transactional=False): + def _run_test_insert_events_impl(cls, unique_database, is_transactional=False): """Test for insert event processing. Events are created in Hive and processed in Impala. The following cases are tested : Insert into table --> for partitioned and non-partitioned table Insert overwrite table --> for partitioned and non-partitioned table Insert into partition --> for partitioned table """ - # Test table with no partitions. - tbl_insert_nopart = 'tbl_insert_nopart' - cls.run_stmt_in_hive( - "drop table if exists %s.%s" % (unique_database, tbl_insert_nopart)) - tblproperties = "" - if is_transactional: - tblproperties = "tblproperties ('transactional'='true'," \ - "'transactional_properties'='insert_only')" - cls.run_stmt_in_hive("create table %s.%s (id int, val int) %s" - % (unique_database, tbl_insert_nopart, tblproperties)) - EventProcessorUtils.wait_for_event_processing_impl(hive_client, impala_cluster) - # Test CTAS and insert by Impala with empty results (IMPALA-10765). - cls.execute_query_expect_success(impala_client, - "create table {db}.ctas_tbl {prop} as select * from {db}.{tbl}" - .format(db=unique_database, tbl=tbl_insert_nopart, prop=tblproperties)) - cls.execute_query_expect_success(impala_client, - "insert into {db}.ctas_tbl select * from {db}.{tbl}" - .format(db=unique_database, tbl=tbl_insert_nopart)) - # Test insert into table, this will fire an insert event. - cls.run_stmt_in_hive("insert into %s.%s values(101, 200)" - % (unique_database, tbl_insert_nopart)) - # With MetastoreEventProcessor running, the insert event will be processed. Query the - # table from Impala. - EventProcessorUtils.wait_for_event_processing_impl(hive_client, impala_cluster) - # Verify that the data is present in Impala. - data = cls.execute_scalar_expect_success(impala_client, "select * from %s.%s" % - (unique_database, tbl_insert_nopart)) - assert data.split('\t') == ['101', '200'] + with cls.create_impala_client() as impala_client: + # Test table with no partitions. + tbl_insert_nopart = 'tbl_insert_nopart' + cls.run_stmt_in_hive( + "drop table if exists %s.%s" % (unique_database, tbl_insert_nopart)) + tblproperties = "" + if is_transactional: + tblproperties = "tblproperties ('transactional'='true'," \ + "'transactional_properties'='insert_only')" + cls.run_stmt_in_hive("create table %s.%s (id int, val int) %s" + % (unique_database, tbl_insert_nopart, tblproperties)) + impala_client.set_configuration(EVENT_SYNC_QUERY_OPTIONS) + # Test CTAS and insert by Impala with empty results (IMPALA-10765). + cls.execute_query_expect_success(impala_client, + "create table {db}.ctas_tbl {prop} as select * from {db}.{tbl}" + .format(db=unique_database, tbl=tbl_insert_nopart, prop=tblproperties)) + cls.execute_query_expect_success(impala_client, + "insert into {db}.ctas_tbl select * from {db}.{tbl}" + .format(db=unique_database, tbl=tbl_insert_nopart)) + # Test insert into table, this will fire an insert event. + cls.run_stmt_in_hive("insert into %s.%s values(101, 200)" + % (unique_database, tbl_insert_nopart)) + # With MetastoreEventProcessor running, the insert event will be processed. Query + # the table from Impala. Verify that the data is present in Impala. + data = cls.execute_scalar_expect_success(impala_client, "select * from %s.%s" % + (unique_database, tbl_insert_nopart)) + assert data.split('\t') == ['101', '200'] - # Test insert overwrite. Overwrite the existing value. - cls.run_stmt_in_hive("insert overwrite table %s.%s values(101, 201)" - % (unique_database, tbl_insert_nopart)) - # Make sure the event has been processed. - EventProcessorUtils.wait_for_event_processing_impl(hive_client, impala_cluster) - # Verify that the data is present in Impala. - data = cls.execute_scalar_expect_success(impala_client, "select * from %s.%s" % - (unique_database, tbl_insert_nopart)) - assert data.split('\t') == ['101', '201'] - # Test insert overwrite by Impala with empty results (IMPALA-10765). - cls.execute_query_expect_success(impala_client, - "insert overwrite {db}.{tbl} select * from {db}.ctas_tbl" - .format(db=unique_database, tbl=tbl_insert_nopart)) - result = cls.execute_query_expect_success(impala_client, - "select * from {db}.{tbl}".format(db=unique_database, tbl=tbl_insert_nopart)) - assert len(result.data) == 0 + # Test insert overwrite. Overwrite the existing value. + cls.run_stmt_in_hive("insert overwrite table %s.%s values(101, 201)" + % (unique_database, tbl_insert_nopart)) + # Make sure the event has been processed using sync_hms_events_wait_time_s. + # Verify that the data is present in Impala. + data = cls.execute_scalar_expect_success(impala_client, "select * from %s.%s" % + (unique_database, tbl_insert_nopart)) + assert data.split('\t') == ['101', '201'] + # Test insert overwrite by Impala with empty results (IMPALA-10765). + cls.execute_query_expect_success(impala_client, + "insert overwrite {db}.{tbl} select * from {db}.ctas_tbl" + .format(db=unique_database, tbl=tbl_insert_nopart)) + result = cls.execute_query_expect_success(impala_client, + "select * from {db}.{tbl}".format(db=unique_database, tbl=tbl_insert_nopart)) + assert len(result.data) == 0 - # Test partitioned table. - tbl_insert_part = 'tbl_insert_part' - cls.run_stmt_in_hive("drop table if exists %s.%s" - % (unique_database, tbl_insert_part)) - cls.run_stmt_in_hive("create table %s.%s (id int, name string) " - "partitioned by(day int, month int, year int) %s" - % (unique_database, tbl_insert_part, tblproperties)) - EventProcessorUtils.wait_for_event_processing_impl(hive_client, impala_cluster) - # Test insert overwrite by Impala with empty results (IMPALA-10765). - cls.execute_query_expect_success(impala_client, - "create table {db}.ctas_part partitioned by (day, month, year) {prop} as " - "select * from {db}.{tbl}".format(db=unique_database, tbl=tbl_insert_part, - prop=tblproperties)) - cls.execute_query_expect_success(impala_client, - "insert into {db}.ctas_part partition(day=0, month=0, year=0) select id, " - "name from {db}.{tbl}".format(db=unique_database, tbl=tbl_insert_part)) - # Insert data into partitions. - cls.run_stmt_in_hive( - "insert into %s.%s partition(day=28, month=03, year=2019)" - "values(101, 'x')" % (unique_database, tbl_insert_part)) - # Make sure the event has been processed. - EventProcessorUtils.wait_for_event_processing_impl(hive_client, impala_cluster) - # Verify that the data is present in Impala. - data = cls.execute_scalar_expect_success(impala_client, - "select * from %s.%s" % (unique_database, tbl_insert_part)) - assert data.split('\t') == ['101', 'x', '28', '3', '2019'] + # Test partitioned table. + tbl_insert_part = 'tbl_insert_part' + cls.run_stmt_in_hive("drop table if exists %s.%s" + % (unique_database, tbl_insert_part)) + cls.run_stmt_in_hive("create table %s.%s (id int, name string) " + "partitioned by(day int, month int, year int) %s" + % (unique_database, tbl_insert_part, tblproperties)) + # Test insert overwrite by Impala with empty results (IMPALA-10765). + cls.execute_query_expect_success(impala_client, + "create table {db}.ctas_part partitioned by (day, month, year) {prop} as " + "select * from {db}.{tbl}".format(db=unique_database, tbl=tbl_insert_part, + prop=tblproperties)) + cls.execute_query_expect_success(impala_client, + "insert into {db}.ctas_part partition(day=0, month=0, year=0) select id, " + "name from {db}.{tbl}".format(db=unique_database, tbl=tbl_insert_part)) + # Insert data into partitions. + cls.run_stmt_in_hive( + "insert into %s.%s partition(day=28, month=03, year=2019)" + "values(101, 'x')" % (unique_database, tbl_insert_part)) + # Make sure the event has been processed using sync_hms_events_wait_time_s. + # Verify that the data is present in Impala. + data = cls.execute_scalar_expect_success(impala_client, + "select * from %s.%s" % (unique_database, tbl_insert_part)) + assert data.split('\t') == ['101', 'x', '28', '3', '2019'] - # Test inserting into existing partitions. - cls.run_stmt_in_hive( - "insert into %s.%s partition(day=28, month=03, year=2019)" - "values(102, 'y')" % (unique_database, tbl_insert_part)) - EventProcessorUtils.wait_for_event_processing_impl(hive_client, impala_cluster) - # Verify that the data is present in Impala. - data = cls.execute_scalar_expect_success(impala_client, - "select count(*) from %s.%s where day=28 and month=3 " - "and year=2019" % (unique_database, tbl_insert_part)) - assert data.split('\t') == ['2'] - # Test inserting into existing partitions by Impala with empty results - # (IMPALA-10765). - cls.execute_query_expect_success(impala_client, - "insert into {db}.{tbl} partition(day=28, month=03, year=2019) " - "select id, name from {db}.ctas_part" - .format(db=unique_database, tbl=tbl_insert_part)) + # Test inserting into existing partitions. + cls.run_stmt_in_hive( + "insert into %s.%s partition(day=28, month=03, year=2019)" + "values(102, 'y')" % (unique_database, tbl_insert_part)) + # Verify that the data is present in Impala. + data = cls.execute_scalar_expect_success(impala_client, + "select count(*) from %s.%s where day=28 and month=3 " + "and year=2019" % (unique_database, tbl_insert_part)) + assert data.split('\t') == ['2'] + # Test inserting into existing partitions by Impala with empty results + # (IMPALA-10765). + cls.execute_query_expect_success(impala_client, + "insert into {db}.{tbl} partition(day=28, month=03, year=2019) " + "select id, name from {db}.ctas_part" + .format(db=unique_database, tbl=tbl_insert_part)) - # Test insert overwrite into existing partitions - cls.run_stmt_in_hive( - "insert overwrite table %s.%s partition(day=28, month=03, " - "year=2019)" "values(101, 'z')" % (unique_database, tbl_insert_part)) - EventProcessorUtils.wait_for_event_processing_impl(hive_client, impala_cluster) - # Verify that the data is present in Impala. - data = cls.execute_scalar_expect_success(impala_client, - "select * from %s.%s where day=28 and month=3 and" - " year=2019 and id=101" % (unique_database, tbl_insert_part)) - assert data.split('\t') == ['101', 'z', '28', '3', '2019'] - # Test insert overwrite into existing partitions by Impala with empty results - # (IMPALA-10765). - cls.execute_query_expect_success(impala_client, "insert overwrite {db}.{tbl} " - "partition(day=28, month=03, year=2019) " - "select id, name from {db}.ctas_part" - .format(db=unique_database, tbl=tbl_insert_part)) - result = cls.execute_query_expect_success(impala_client, "select * from {db}.{tbl} " - "where day=28 and month=3 and year=2019" - .format(db=unique_database, tbl=tbl_insert_part)) - assert len(result.data) == 0 + # Test insert overwrite into existing partitions + cls.run_stmt_in_hive( + "insert overwrite table %s.%s partition(day=28, month=03, " + "year=2019)" "values(101, 'z')" % (unique_database, tbl_insert_part)) + # Verify that the data is present in Impala. + data = cls.execute_scalar_expect_success(impala_client, + "select * from %s.%s where day=28 and month=3 and" + " year=2019 and id=101" % (unique_database, tbl_insert_part)) + assert data.split('\t') == ['101', 'z', '28', '3', '2019'] + impala_client.clear_configuration() + # Test insert overwrite into existing partitions by Impala with empty results + # (IMPALA-10765). + cls.execute_query_expect_success(impala_client, "insert overwrite {db}.{tbl} " + "partition(day=28, month=03, year=2019) " + "select id, name from {db}.ctas_part" + .format(db=unique_database, tbl=tbl_insert_part)) + result = cls.execute_query_expect_success(impala_client, "select * from {db}.{tbl} " + "where day=28 and month=3 and year=2019" + .format(db=unique_database, tbl=tbl_insert_part)) + assert len(result.data) == 0 @classmethod - def _run_event_based_replication_tests_impl(cls, hive_client, impala_client, - impala_cluster, filesystem_client, transactional=True): + def _run_event_based_replication_tests_impl(cls, filesystem_client, transactional=True): """Hive Replication relies on the insert events generated on the tables. This test issues some basic replication commands from Hive and makes sure that the replicated table has correct data.""" @@ -156,12 +150,13 @@ class TestEventProcessingBase(ImpalaTestSuite): target_db = ImpalaTestSuite.get_random_name("repl_target_") unpartitioned_tbl = "unpart_tbl" partitioned_tbl = "part_tbl" + impala_client = cls.create_impala_client() try: cls.run_stmt_in_hive("create database {0}".format(source_db)) cls.run_stmt_in_hive( "alter database {0} set dbproperties ('repl.source.for'='xyz')" .format(source_db)) - EventProcessorUtils.wait_for_event_processing_impl(hive_client, impala_cluster) + impala_client.set_configuration(EVENT_SYNC_QUERY_OPTIONS) # explicit create table command since create table like doesn't allow tblproperties impala_client.execute("create table {0}.{1} (a string, b string) stored as parquet" " {2}".format(source_db, unpartitioned_tbl, TBLPROPERTIES)) @@ -195,7 +190,6 @@ class TestEventProcessingBase(ImpalaTestSuite): # replicate the table from source to target cls.run_stmt_in_hive("repl load {0} into {1}".format(source_db, target_db)) - EventProcessorUtils.wait_for_event_processing_impl(hive_client, impala_cluster) assert unpartitioned_tbl in impala_client.execute( "show tables in {0}".format(target_db)).get_data() assert partitioned_tbl in impala_client.execute( @@ -221,9 +215,6 @@ class TestEventProcessingBase(ImpalaTestSuite): # replicate the table from source to target cls.run_stmt_in_hive("repl load {0} into {1}".format(source_db, target_db)) - # we wait until the events catch up in case repl command above did some HMS - # operations. - EventProcessorUtils.wait_for_event_processing_impl(hive_client, impala_cluster) # confirm the number of rows in target match with the source table. rows_in_unpart_tbl_target = int(cls.execute_scalar_expect_success(impala_client, "select count(*) from {0}.{1}".format(target_db, unpartitioned_tbl)) @@ -246,9 +237,6 @@ class TestEventProcessingBase(ImpalaTestSuite): # replicate the table from source to target cls.run_stmt_in_hive("repl load {0} into {1}".format(source_db, target_db)) - # we wait until the events catch up in case repl command above did some HMS - # operations. - EventProcessorUtils.wait_for_event_processing_impl(hive_client, impala_cluster) # confirm the number of rows in target match with the source table. rows_in_unpart_tbl_target = int(cls.execute_scalar_expect_success(impala_client, "select count(*) from {0}.{1}".format(target_db, unpartitioned_tbl)) @@ -270,9 +258,6 @@ class TestEventProcessingBase(ImpalaTestSuite): # replicate the table from source to target cls.run_stmt_in_hive("repl load {0} into {1}".format(source_db, target_db)) - # we wait until the events catch up in case repl command above did some HMS - # operations. - EventProcessorUtils.wait_for_event_processing_impl(hive_client, impala_cluster) # confirm the number of rows in target match with the source table. rows_in_unpart_tbl_source = int(cls.execute_scalar_expect_success(impala_client, "select count(*) from " @@ -298,9 +283,6 @@ class TestEventProcessingBase(ImpalaTestSuite): # replicate the table from source to target cls.run_stmt_in_hive("repl load {0} into {1}".format(source_db, target_db)) - # we wait until the events catch up in case repl command above did some HMS - # operations. - EventProcessorUtils.wait_for_event_processing_impl(hive_client, impala_cluster) # confirm the number of rows in target match with the source table. rows_in_unpart_tbl_target = int(cls.execute_scalar_expect_success(impala_client, "select count(*) from {0}.{1}".format(target_db, unpartitioned_tbl)) @@ -328,6 +310,7 @@ class TestEventProcessingBase(ImpalaTestSuite): if target_db_obj is not None and target_db_obj.managedLocationUri is not None: filesystem_client.delete_file_dir( target_db_obj.managedLocationUri, True) + impala_client.close() @classmethod def __get_db_nothrow(self, name): diff --git a/tests/metadata/test_metadata_query_statements.py b/tests/metadata/test_metadata_query_statements.py index 89eea1a4e..d31d169d4 100644 --- a/tests/metadata/test_metadata_query_statements.py +++ b/tests/metadata/test_metadata_query_statements.py @@ -186,8 +186,14 @@ class TestMetadataQueryStatements(ImpalaTestSuite): if cluster_properties.is_event_polling_enabled(): # Using HMS event processor - wait until the database shows up. assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" - EventProcessorUtils.wait_for_event_processing(self) + self.client.set_configuration({ + "sync_hms_events_wait_time_s": 10, + "sync_hms_events_strict_mode": True + }) + # Waits for the externally created dbs to appear. self.confirm_db_exists("hive_test_desc_db") + self.confirm_db_exists("hive_test_desc_db2") + self.client.clear_configuration() else: # Invalidate metadata to pick up hive-created db. self.client.execute("invalidate metadata") diff --git a/tests/query_test/test_hive_codec_interop.py b/tests/query_test/test_hive_codec_interop.py index 957c8ca1d..9d245aeb6 100644 --- a/tests/query_test/test_hive_codec_interop.py +++ b/tests/query_test/test_hive_codec_interop.py @@ -81,7 +81,7 @@ class TestFileCodecInterop(ImpalaTestSuite): # Make sure Impala's metadata is in sync. if cluster_properties.is_catalog_v2_cluster(): - self.wait_for_table_to_appear(unique_database, hive_table, timeout_s=10) + self.wait_for_table_to_appear(unique_database, "t1_hive", timeout_s=10) else: self.client.execute("invalidate metadata {0}".format(hive_table))
