This is an automated email from the ASF dual-hosted git repository. wzhou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 72aaa6dc27bf32c973055a782aeaa2270c66c038 Author: Csaba Ringhofer <[email protected]> AuthorDate: Sat Aug 10 00:21:19 2024 +0200 IMPALA-11729: Speed up start-impala-cluster.py The change reduces cluster startup time by 1-2 seconds. This also makes custom cluster tests a bit quicker. Most of the improvement is caused by removing unneeded sleep from wait_for_catalog() - it also slept after successful connections, while when the first coordinator is up, it is likely that all others are also up, meaning 3*0.5s extra sleep in the dev cluster. Other changes: - wait_for_catalog is cleaned up and renamed to wait_for_coordinator_services - also wait for hs2_http port to be open - decreased some sleep intervals - removed some non-informative logging - wait for hs2/beeswax/webui ports to be open before trying to actually connect to them to avoid extra logging from failed Thrift/http connections - reordered startup to first wait for coordinators to be up then wait for num_known_live_backends in each impalad - this reflects better what the cluster actually waits for (1st catalog update before starting coordinator services) Change-Id: Ic4dd8c2bc7056443373ceb256a03ce562fea38a0 Reviewed-on: http://gerrit.cloudera.org:8080/21656 Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Michael Smith <[email protected]> Reviewed-by: Laszlo Gaal <[email protected]> --- bin/start-impala-cluster.py | 2 +- tests/common/impala_cluster.py | 97 +++++++++++++++++++++++++++--------------- tests/common/impala_service.py | 30 +++++++++++-- 3 files changed, 89 insertions(+), 40 deletions(-) diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py index ad0e41d66..917c17971 100755 --- a/bin/start-impala-cluster.py +++ b/bin/start-impala-cluster.py @@ -1159,7 +1159,7 @@ if __name__ == "__main__": cluster_ops.start_impalads(options.cluster_size, options.num_coordinators, options.use_exclusive_coordinators) # Sleep briefly to reduce log spam: the cluster takes some time to start up. - sleep(3) + sleep(2) impala_cluster = cluster_ops.get_cluster() expected_catalog_delays = 0 diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py index 1465ab31b..707765182 100644 --- a/tests/common/impala_cluster.py +++ b/tests/common/impala_cluster.py @@ -104,7 +104,7 @@ class ImpalaCluster(object): the environment.""" return ImpalaCluster(docker_network=tests.common.environ.docker_network) - def refresh(self): + def refresh(self, silent=False): """ Re-loads the impalad/statestored/catalogd processes if they exist. Helpful to confirm that processes have been killed. @@ -119,9 +119,10 @@ class ImpalaCluster(object): if self.use_admission_service: admissiond_str = "/%d admissiond" % (1 if self.__admissiond else 0) - LOG.debug("Found %d impalad/%d statestored/%d catalogd%s process(es)" % - (len(self.__impalads), len(self.__statestoreds), len(self.__catalogds), - admissiond_str)) + if not silent: + LOG.debug("Found %d impalad/%d statestored/%d catalogd%s process(es)" % + (len(self.__impalads), len(self.__statestoreds), len(self.__catalogds), + admissiond_str)) @property def statestored(self): @@ -201,8 +202,10 @@ class ImpalaCluster(object): one impalad is up). - expected_num_ready_impalads backends are registered with the statestore. expected_num_ready_impalads defaults to expected_num_impalads. + - Each impalad's debug webserver is ready. + - Each coordinator impalad's hs2/beeswax port is open (this happens after catalog + cache is ready). - All impalads knows about all other ready impalads. - - Each coordinator impalad's catalog cache is ready. This information is retrieved by querying the statestore debug webpage and each individual impalad's metrics webpage. """ @@ -215,20 +218,35 @@ class ImpalaCluster(object): def check_processes_still_running(): """Check that the processes we waited for above (i.e. impalads, statestored, catalogd) are still running. Throw an exception otherwise.""" - self.refresh() + self.refresh(silent=True) # The number of impalad processes may temporarily increase if breakpad forked a # process to write a minidump. assert len(self.impalads) >= expected_num_impalads assert self.statestored is not None assert self.catalogd is not None + sleep_interval = 0.5 + # Wait for each webserver to be ready. + for impalad in self.impalads: + impalad.wait_for_webserver(sleep_interval, check_processes_still_running) + + # Wait for coordinators to start. + for impalad in self.impalads: + if impalad._get_arg_value("is_coordinator", default="true") != "true": continue + if impalad._get_arg_value("stress_catalog_init_delay_ms", default=0) != 0: continue + impalad.wait_for_coordinator_services(sleep_interval, check_processes_still_running) + # Decrease sleep_interval after first coordinator ready as the others are also + # likely to be (nearly) ready. + sleep_interval = 0.2 + + # Restore sleep interval to avoid potential log spew. At this point it is unlikely + # that any more sleeps are actually needed. + sleep_interval = 0.5 + # Wait till all impalads consider all backends ready. for impalad in self.impalads: impalad.service.wait_for_num_known_live_backends(expected_num_ready_impalads, - timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=2, + timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=sleep_interval, early_abort_fn=check_processes_still_running) - if (impalad._get_arg_value("is_coordinator", default="true") == "true" - and impalad._get_arg_value("stress_catalog_init_delay_ms", default=0) == 0): - impalad.wait_for_catalog() def wait_for_num_impalads(self, num_impalads, retries=10): """Checks that at least 'num_impalads' impalad processes are running, along with @@ -589,32 +607,41 @@ class ImpaladProcess(BaseImpalaProcess): self.service.wait_for_metric_value('impala-server.ready', expected_value=1, timeout=timeout) - def wait_for_catalog(self): - """Waits for a catalog copy to be received by the impalad. When its received, - additionally waits for client ports to be opened.""" + def wait_for_webserver(self, sleep_interval, early_abort_fn): start_time = time.time() - beeswax_port_is_open = False - hs2_port_is_open = False - num_dbs = 0 - num_tbls = 0 - while ((time.time() - start_time < CLUSTER_WAIT_TIMEOUT_IN_SECONDS) - and not (beeswax_port_is_open and hs2_port_is_open)): - try: - num_dbs, num_tbls = self.service.get_metric_values( - ["catalog.num-databases", "catalog.num-tables"]) - beeswax_port_is_open = self.service.beeswax_port_is_open() - hs2_port_is_open = self.service.hs2_port_is_open() - except Exception: - LOG.exception(("Client services not ready. Waiting for catalog cache: " - "({num_dbs} DBs / {num_tbls} tables). Trying again ...").format( - num_dbs=num_dbs, - num_tbls=num_tbls)) - sleep(0.5) - - if not hs2_port_is_open or not beeswax_port_is_open: - raise RuntimeError( - "Unable to open client ports within {num_seconds} seconds.".format( - num_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS)) + while time.time() - start_time < CLUSTER_WAIT_TIMEOUT_IN_SECONDS: + LOG.info("Waiting for Impalad webserver port %s", self.service.webserver_port) + if self.service.webserver_port_is_open(): return + early_abort_fn() + sleep(sleep_interval) + + def wait_for_coordinator_services(self, sleep_interval, early_abort_fn): + """Waits for client ports to be opened. Assumes that the webservice ports are open.""" + start_time = time.time() + LOG.info( + "Waiting for coordinator client services " + + "- hs2 port: %d hs2-http port: %d beeswax port: %d", + self.service.hs2_port, self.service.hs2_http_port, self.service.beeswax_port) + while time.time() - start_time < CLUSTER_WAIT_TIMEOUT_IN_SECONDS: + beeswax_port_is_open = self.service.beeswax_port_is_open() + hs2_port_is_open = self.service.hs2_port_is_open() + hs2_http_port_is_open = self.service.hs2_http_port_is_open() + if beeswax_port_is_open and hs2_port_is_open and hs2_http_port_is_open: + return + early_abort_fn() + # The coordinator is likely to wait for the catalog update. Fetch the number + # of catalog objects. + num_dbs, num_tbls = self.service.get_metric_values( + ["catalog.num-databases", "catalog.num-tables"]) + LOG.info(("Client services not ready. Waiting for catalog cache: " + "({num_dbs} DBs / {num_tbls} tables). Trying again ...").format( + num_dbs=num_dbs, + num_tbls=num_tbls)) + sleep(sleep_interval) + + raise RuntimeError( + "Unable to open client ports within {num_seconds} seconds.".format( + num_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS)) def set_jvm_log_level(self, class_name, level): """Helper method to set JVM log level for certain class name.""" diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py index d46f67256..fdee26536 100644 --- a/tests/common/impala_service.py +++ b/tests/common/impala_service.py @@ -26,6 +26,7 @@ import logging import os import re import requests +import socket import subprocess from datetime import datetime from time import sleep, time @@ -370,7 +371,7 @@ class ImpaladService(BaseImpalaService): else: LOG.info("Waiting for num_known_live_backends=%s. Current value: %s" % (expected_value, value)) - sleep(1) + sleep(interval) assert 0, 'num_known_live_backends did not reach expected value in time' def read_query_profile_page(self, query_id, timeout=10, interval=1): @@ -425,6 +426,17 @@ class ImpaladService(BaseImpalaService): sleep(interval) return False + def is_port_open(self, port): + try: + sock = socket.create_connection((self.hostname, port), timeout=1) + sock.close() + return True + except Exception: + return False + + def webserver_port_is_open(self): + return self.is_port_open(self.webserver_port) + def create_beeswax_client(self, use_kerberos=False): """Creates a new beeswax client connection to the impalad""" client = create_connection('%s:%d' % (self.hostname, self.beeswax_port), @@ -434,12 +446,16 @@ class ImpaladService(BaseImpalaService): def beeswax_port_is_open(self): """Test if the beeswax port is open. Does not need to authenticate.""" + # Check if the port is open first to avoid chatty logging of Thrift connection. + if not self.is_port_open(self.beeswax_port): return False + try: # The beeswax client will connect successfully even if not authenticated. client = self.create_beeswax_client() client.close() return True - except Exception: + except Exception as e: + LOG.info(e) return False def create_ldap_beeswax_client(self, user, password, use_ssl=False): @@ -456,11 +472,14 @@ class ImpaladService(BaseImpalaService): def hs2_port_is_open(self): """Test if the HS2 port is open. Does not need to authenticate.""" + # Check if the port is open first to avoid chatty logging of Thrift connection. + if not self.is_port_open(self.hs2_port): return False + # Impyla will try to authenticate as part of connecting, so preserve previous logic # that uses the HS2 thrift code directly. try: - socket = TSocket(self.hostname, self.hs2_port) - transport = TBufferedTransport(socket) + sock = TSocket(self.hostname, self.hs2_port) + transport = TBufferedTransport(sock) transport.open() transport.close() return True @@ -468,6 +487,9 @@ class ImpaladService(BaseImpalaService): LOG.info(e) return False + def hs2_http_port_is_open(self): + # Only check if the port is open, do not create Thrift transport. + return self.is_port_open(self.hs2_http_port) # Allows for interacting with the StateStore service to perform operations such as # accessing the debug webpage.
