[
https://issues.apache.org/jira/browse/IMPALA-13850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17934084#comment-17934084
]
Wenzhe Zhou commented on IMPALA-13850:
--------------------------------------
Function CatalogServer::GatherCatalogUpdatesThread() acquire "catalog_lock_"
when calling catalog_->GetCatalogDelta() to gather catalog update. If IM is
triggered to invalidate all metadata, the function call could take long time
to collect catalog update.
{code:java}
[[noreturn]] void CatalogServer::GatherCatalogUpdatesThread() {
while (true) {
unique_lock<mutex> unique_lock(catalog_lock_);
// Protect against spurious wake-ups by checking the value of
topic_updates_ready_.
// It is only safe to continue on and update the shared
pending_topic_updates_
// when topic_updates_ready_ is false, otherwise we may be in the middle of
// processing a heartbeat.
while (topic_updates_ready_) {
catalog_update_cv_.Wait(unique_lock);
}
MonotonicStopWatch sw;
sw.Start();
// Clear any pending topic updates. They will have been processed by the
heartbeat
// thread by the time we make it here.
pending_topic_updates_.clear();
long current_catalog_version;
Status status = catalog_->GetCatalogVersion(¤t_catalog_version);
if (!status.ok()) {
LOG(ERROR) << status.GetDetail();
} else if (current_catalog_version != last_sent_catalog_version_) {
// If there has been a change since the last time the catalog was queried,
// call into the Catalog to find out what has changed.
TGetCatalogDeltaResponse resp;
status = catalog_->GetCatalogDelta(this, last_sent_catalog_version_,
&resp);
if (!status.ok()) {
LOG(ERROR) << status.GetDetail();
} else {
catalog_objects_max_version_ = resp.max_catalog_version;
}
}
topic_processing_time_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 *
1000.0));
topic_updates_ready_ = true;
}
}
{code}
Function CatalogServer::IsCatalogInitialized() return true if catalogd
initialization is finished. But this function is also blocked by
"catalog_lock_".
We should call CatalogServer::IsCatalogInitialized() to check if catalogd
initialization is finished when IM command is invoked for a impala cluster with
CatalogD HA enabled, and reject the IM request if catalogd initialization is
not finished.
> Catalogd should not start metadata operation until initialization is done if
> HA is enabled
> ------------------------------------------------------------------------------------------
>
> Key: IMPALA-13850
> URL: https://issues.apache.org/jira/browse/IMPALA-13850
> Project: IMPALA
> Issue Type: Bug
> Components: Catalog
> Reporter: Wenzhe Zhou
> Priority: Major
>
> In a case reported by user, the catalogd initialization failed to complete.
> Log messages showed that catalog HA was enabled. catalogd was blocked when
> trying to acquire "CatalogServer.catalog_lock_" when calling
> CatalogServer::UpdateActiveCatalogd() during statestore subscriber
> registration.
> Log message showed that there was IM command issued before catalogd tried to
> register to statestore.
> {code:java}
> I0310 12:21:34.093617 1 CatalogServiceCatalog.java:2188] Invalidated all
> metadata.
> I0310 12:21:34.094341 1 thrift-server.cc:419] ThriftServer
> 'StatestoreSubscriber' started on port: 23020
> I0310 12:21:34.094341 1816 TAcceptQueueServer.cpp:329]
> connection_setup_thread_pool_size is set to 2
> I0310 12:21:34.094586 1 thrift-util.cc:198] TSocket::open() error on
> socket (after THRIFT_POLL) <Host: localhost Port: 23020>: Connection refused
> I0310 12:21:34.094790 1 statestore-subscriber.cc:745] Starting statestore
> subscriber
> {code}
> We should not allow any metadata operation until initialization is done. When
> HA is enabled, catalog-server should not hold "CatalogServer.catalog_lock_"
> for long time before active catalogd is assigned.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]