This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch branch-3.4.2
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 3cf30008cfe3321917ba9146b96aee8250ce3dd2
Author: Bikramjeet Vig <[email protected]>
AuthorDate: Wed Aug 5 16:31:08 2020 -0700

    IMPALA-10052: Expose daemon health endpoint for statestore and catalog
    
    This change exposes the daemon health of statestored and catalogd via
    an HTTP endpoint '/healthz'. If the server is healthy, this endpoint
    will return HTTP code 200 (OK). If it is unhealthy, it will return
    503 (Service Unavailable). This is consistent with the endpoint added
    for impalads in IMPALA-8895.
    
    Testing:
    - Extended test in test_web_pages.py
    
    Change-Id: I7714734df8e50dabbbebcb77a86a5a00bd13bf7c
    Reviewed-on: http://gerrit.cloudera.org:8080/16295
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-on: http://gerrit.cloudera.org:8080/21261
    Reviewed-by: Zihao Ye <[email protected]>
    Tested-by: Quanlong Huang <[email protected]>
---
 be/src/catalog/catalog-server.cc  | 19 +++++++++++++++++++
 be/src/catalog/catalog-server.h   | 14 ++++++++++++++
 be/src/catalog/catalogd-main.cc   |  1 +
 be/src/statestore/statestore.cc   | 20 ++++++++++++++++++--
 be/src/statestore/statestore.h    | 10 ++++++++--
 tests/webserver/test_web_pages.py |  6 ++++--
 6 files changed, 64 insertions(+), 6 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index ad140b86f..01173ca47 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -97,6 +97,7 @@ const string TABLE_METRICS_WEB_PAGE = "/table_metrics";
 const string TABLE_METRICS_TEMPLATE = "table_metrics.tmpl";
 const string EVENT_WEB_PAGE = "/events";
 const string EVENT_METRICS_TEMPLATE = "events.tmpl";
+const string CATALOG_SERVICE_HEALTH_WEB_PAGE = "/healthz";
 
 const int REFRESH_METRICS_INTERVAL_MS = 1000;
 
@@ -287,6 +288,11 @@ Status CatalogServer::Start() {
 }
 
 void CatalogServer::RegisterWebpages(Webserver* webserver) {
+  Webserver::RawUrlCallback healthz_callback =
+      [this](const auto& req, auto* data, auto* response) {
+        return this->HealthzHandler(req, data, response);
+      };
+  webserver->RegisterUrlCallback(CATALOG_SERVICE_HEALTH_WEB_PAGE, 
healthz_callback);
   webserver->RegisterUrlCallback(CATALOG_WEB_PAGE, CATALOG_TEMPLATE,
       [this](const auto& args, auto* doc) { this->CatalogUrlCallback(args, 
doc); }, true);
   webserver->RegisterUrlCallback(CATALOG_OBJECT_WEB_PAGE, 
CATALOG_OBJECT_TEMPLATE,
@@ -740,3 +746,16 @@ bool CatalogServer::AddPendingTopicItem(std::string key, 
int64_t version,
               Substitute(", compressed size=$0", item.value.size()) : 
string());
   return true;
 }
+
+void CatalogServer::MarkServiceAsStarted() { service_started_ = true; }
+
+void CatalogServer::HealthzHandler(
+    const Webserver::WebRequest& req, std::stringstream* data, HttpStatusCode* 
response) {
+  if (service_started_) {
+    (*data) << "OK";
+    *response = HttpStatusCode::Ok;
+    return;
+  }
+  *(data) << "Not Available";
+  *response = HttpStatusCode::ServiceUnavailable;
+}
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index dec22eaf3..5cb3a2f29 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -27,11 +27,14 @@
 #include "gen-cpp/Frontend_types.h"
 #include "gen-cpp/Types_types.h"
 #include "catalog/catalog.h"
+#include "kudu/util/web_callback_registry.h"
 #include "statestore/statestore-subscriber.h"
 #include "util/condition-variable.h"
 #include "util/metrics-fwd.h"
 #include "rapidjson/rapidjson.h"
 
+using kudu::HttpStatusCode;
+
 namespace impala {
 
 class StatestoreSubscriber;
@@ -78,7 +81,14 @@ class CatalogServer {
   bool AddPendingTopicItem(std::string key, int64_t version, const uint8_t* 
item_data,
       uint32_t size, bool deleted);
 
+  /// Mark service as started. Should be called only after the thrift server 
hosting this
+  /// service has started.
+  void MarkServiceAsStarted();
+
  private:
+  /// Indicates whether the catalog service is ready.
+  std::atomic_bool service_started_{false};
+
   /// Thrift API implementation which proxies requests onto this 
CatalogService.
   boost::shared_ptr<CatalogServiceIf> thrift_iface_;
   ThriftSerializer thrift_serializer_;
@@ -236,6 +246,10 @@ class CatalogServer {
   // metastore event processor metrics and adds it to the document
   void EventMetricsUrlCallback(
       const Webserver::WebRequest& req, rapidjson::Document* document);
+
+  /// Raw callback to indicate whether the service is ready.
+  void HealthzHandler(const Webserver::WebRequest& req, std::stringstream* 
data,
+      HttpStatusCode* response);
 };
 
 }
diff --git a/be/src/catalog/catalogd-main.cc b/be/src/catalog/catalogd-main.cc
index 847932642..5c2e8eb8c 100644
--- a/be/src/catalog/catalogd-main.cc
+++ b/be/src/catalog/catalogd-main.cc
@@ -107,6 +107,7 @@ int CatalogdMain(int argc, char** argv) {
   }
   ABORT_IF_ERROR(builder.metrics(metrics.get()).Build(&server));
   ABORT_IF_ERROR(server->Start());
+  catalog_server.MarkServiceAsStarted();
   LOG(INFO) << "CatalogService started on port: " << 
FLAGS_catalog_service_port;
   server->Join();
 
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 7df60b212..2c9e209f6 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -453,7 +453,7 @@ Statestore::Statestore(MetricGroup* metrics)
 }
 
 Statestore::~Statestore() {
-  CHECK(initialized_) << "Cannot shutdown Statestore once initialized.";
+  CHECK(service_started_) << "Cannot shutdown Statestore once initialized and 
started.";
 }
 
 Status Statestore::Init(int32_t state_store_port) {
@@ -482,12 +482,17 @@ Status Statestore::Init(int32_t state_store_port) {
   RETURN_IF_ERROR(subscriber_heartbeat_threadpool_.Init());
   RETURN_IF_ERROR(Thread::Create("statestore-heartbeat", 
"heartbeat-monitoring-thread",
       &Statestore::MonitorSubscriberHeartbeat, this, 
&heartbeat_monitoring_thread_));
-  initialized_ = true;
+  service_started_ = true;
 
   return Status::OK();
 }
 
 void Statestore::RegisterWebpages(Webserver* webserver) {
+  Webserver::RawUrlCallback healthz_callback =
+      [this](const auto& req, auto* data, auto* response) {
+        return this->HealthzHandler(req, data, response);
+      };
+  webserver->RegisterUrlCallback("/healthz", healthz_callback);
   Webserver::UrlCallback topics_callback =
       bind<void>(mem_fn(&Statestore::TopicsHandler), this, _1, _2);
   webserver->RegisterUrlCallback("/topics", "statestore_topics.tmpl",
@@ -1075,3 +1080,14 @@ void Statestore::ShutdownForTesting() {
 int64_t Statestore::FailedExecutorDetectionTimeMs() {
   return FLAGS_statestore_max_missed_heartbeats * 
FLAGS_statestore_heartbeat_frequency_ms;
 }
+
+void Statestore::HealthzHandler(
+    const Webserver::WebRequest& req, std::stringstream* data, HttpStatusCode* 
response) {
+  if (service_started_) {
+    (*data) << "OK";
+    *response = HttpStatusCode::Ok;
+    return;
+  }
+  *(data) << "Not Available";
+  *response = HttpStatusCode::ServiceUnavailable;
+}
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index a08d3388e..f6644eff4 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -44,6 +44,8 @@
 #include "util/thread-pool.h"
 #include "util/webserver.h"
 
+using kudu::HttpStatusCode;
+
 namespace impala {
 
 class Status;
@@ -565,8 +567,8 @@ class Statestore : public CacheLineAligned {
   /// Thread that monitors the heartbeats of all subscribers.
   std::unique_ptr<Thread> heartbeat_monitoring_thread_;
 
-  /// Flag to indicate that the statestore has been initialized.
-  bool initialized_ = false;
+  /// Indicates whether the statestore has been initialized and the service is 
ready.
+  std::atomic_bool service_started_{false};
 
   /// Cache of subscriber clients used for UpdateState() RPCs. Only one client 
per
   /// subscriber should be used, but the cache helps with the client lifecycle 
on failure.
@@ -722,6 +724,10 @@ class Statestore : public CacheLineAligned {
   /// last_heartbeat_ts_ has not been updated in that interval, it logs the 
subscriber's
   /// id.
   [[noreturn]] void MonitorSubscriberHeartbeat();
+
+  /// Raw callback to indicate whether the service is ready.
+  void HealthzHandler(const Webserver::WebRequest& req, std::stringstream* 
data,
+      HttpStatusCode* response);
 };
 
 } // namespace impala
diff --git a/tests/webserver/test_web_pages.py 
b/tests/webserver/test_web_pages.py
index 07eaf9bf2..07388e468 100644
--- a/tests/webserver/test_web_pages.py
+++ b/tests/webserver/test_web_pages.py
@@ -50,6 +50,7 @@ class TestWebPage(ImpalaTestSuite):
   RESET_RESOURCE_POOL_STATS_URL = "http://localhost:{0}/resource_pool_reset";
   BACKENDS_URL = "http://localhost:{0}/backends";
   PROMETHEUS_METRICS_URL = "http://localhost:{0}/metrics_prometheus";
+  HEALTHZ_URL = "http://localhost:{0}/healthz";
 
   # log4j changes do not apply to the statestore since it doesn't
   # have an embedded JVM. So we make two sets of ports to test the
@@ -729,8 +730,9 @@ class TestWebPage(ImpalaTestSuite):
 
   def test_healthz_endpoint(self):
     """Test to check that the /healthz endpoint returns 200 OK."""
-    page = requests.get("http://localhost:25000/healthz";)
-    assert page.status_code == requests.codes.ok
+    for port in self.TEST_PORTS_WITH_SS:
+      page = requests.get(self.HEALTHZ_URL.format(port))
+      assert page.status_code == requests.codes.ok
 
   def test_knox_compatibility(self):
     """Checks that the template files conform to the requirements for 
compatibility with

Reply via email to