This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin4_on_cloud
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin4_on_cloud by this push:
new 3f82213 # minor fix, add progress bar for logging detail
3f82213 is described below
commit 3f822137b08c5d875bb6c50073b117e945ab7748
Author: Mukvin <[email protected]>
AuthorDate: Sat Apr 2 14:53:53 2022 +0800
# minor fix, add progress bar for logging detail
---
clouds/aws.py | 2 +-
engine.py | 6 +----
instances/aws_instance.py | 25 ++++++++++++++++---
utils.py => utils/common_utils.py | 13 ++++++++++
engine_utils.py => utils/engine_utils.py | 2 +-
utils/progress_bar.py | 43 ++++++++++++++++++++++++++++++++
6 files changed, 81 insertions(+), 10 deletions(-)
diff --git a/clouds/aws.py b/clouds/aws.py
index 8b944ea..b082c1a 100644
--- a/clouds/aws.py
+++ b/clouds/aws.py
@@ -28,7 +28,7 @@ from constant.kylin_properties_params import KylinProperties
from constant.path import KYLIN_PROPERTIES_TEMPLATE_DIR
from constant.yaml_params import Params
from instances.aws_instance import AWSInstance
-from utils import Utils
+from utils.common_utils import Utils
logger = logging.getLogger(__name__)
diff --git a/engine.py b/engine.py
index f43d438..1b717ad 100644
--- a/engine.py
+++ b/engine.py
@@ -16,16 +16,12 @@
#
import logging
-import os
from typing import List
-import yaml
-
from constant.client import Client
from constant.config import Config
from constant.deployment import ScaleType, NodeType, Cluster
-from constant.yaml_files import File
-from engine_utils import EngineUtils
+from utils.engine_utils import EngineUtils
logger = logging.getLogger(__name__)
diff --git a/instances/aws_instance.py b/instances/aws_instance.py
index 002cc8e..f370d95 100644
--- a/instances/aws_instance.py
+++ b/instances/aws_instance.py
@@ -34,7 +34,8 @@ from constant.path import JARS_PATH, TARS_PATH, SCRIPTS_PATH,
KYLIN_PROPERTIES_T
from constant.server_mode import ServerMode
from constant.yaml_files import File
from constant.yaml_params import Params
-from utils import Utils
+from utils.common_utils import Utils
+from utils.progress_bar import ProgressBar
logger = logging.getLogger(__name__)
@@ -563,6 +564,7 @@ class AWSInstance:
# FIXME: it's hard code to make sure that zks were already initialized.
time.sleep(10)
+ logger.info(f"Current execute commands in `Zookeeper stack` which
named {zk_stack}.")
self.refresh_zks_cfg(zk_ips=zk_ips, zk_ids=zk_ids)
self.start_zks(zk_ids=zk_ids, zk_ips=zk_ips)
@@ -927,6 +929,7 @@ class AWSInstance:
raise Exception(msg)
start_command = Commands.START_SPARK_MASTER_COMMAND.value
+ logger.info(f"Current execute commands in `Spark master stack` which
named {self.spark_master_stack_name}.")
self.exec_script_instance_and_return(name_or_id=spark_master_id,
script=start_command)
def get_spark_master_instance_id(self) -> str:
@@ -1183,6 +1186,7 @@ class AWSInstance:
instance_id = self.get_instance_id(stack_name)
# spark decommission feature start to be supported in spark 3.1.x.
# refer: https://issues.apache.org/jira/browse/SPARK-20624.
+ logger.info(f"Current execute commands in `Spark Slave stack` which
named {stack_name}.")
try:
self.exec_script_instance_and_return(
name_or_id=instance_id,
script=Commands.SPARK_DECOMMISION_WORKER_COMMAND.value)
@@ -1206,11 +1210,13 @@ class AWSInstance:
def start_prometheus_server(self) -> None:
start_command = Commands.START_PROMETHEUS_COMMAND.value
instance_id = self.instance_id_of_static_services
+ logger.info(f"Current execute commands in `static service stack` which
named {self.static_service_stack_name}.")
self.exec_script_instance_and_return(name_or_id=instance_id,
script=start_command)
def stop_prometheus_server(self) -> None:
stop_command = Commands.STOP_PROMETHEUS_COMMAND.value
instance_id = self.instance_id_of_static_services
+ logger.info(f"Current execute commands in `static service stack` which
named {self.static_service_stack_name}.")
self.exec_script_instance_and_return(name_or_id=instance_id,
script=stop_command)
def restart_prometheus_server(self) -> None:
@@ -1225,11 +1231,13 @@ class AWSInstance:
raise Exception(f'Current static services stack was not create
complete, please check.')
refresh_config_commands = self.refresh_prometheus_commands()
+ logger.info(f"Current execute commands in `static service stack` which
named {self.static_service_stack_name}.")
for command in refresh_config_commands:
self.exec_script_instance_and_return(name_or_id=static_services_id,
script=command)
# Special support spark metrics into prometheus
spark_config_commands = self.refresh_spark_metrics_commands()
+ logger.info(f"Current execute commands in `static service stack` which
named {self.static_service_stack_name}.")
for command in spark_config_commands:
self.exec_script_instance_and_return(name_or_id=static_services_id,
script=command)
@@ -1294,12 +1302,14 @@ class AWSInstance:
def is_prometheus_configured(self, host: str) -> bool:
static_services_instance_id = self.instance_id_of_static_services
check_command =
Commands.PROMETHEUS_CFG_CHECK_COMMAND.value.format(node=host)
+ logger.info(f"Current execute commands in `static service stack` which
named {self.static_service_stack_name}.")
output =
self.exec_script_instance_and_return(name_or_id=static_services_instance_id,
script=check_command)
return output['StandardOutputContent'] == '0\n'
def get_prometheus_configured_hosts(self, hosts: List) -> List:
static_services_instance_id = self.instance_id_of_static_services
configured_hosts = []
+ logger.info(f"Current execute commands in `static service stack` which
named {self.static_service_stack_name}.")
for host in hosts:
check_command =
Commands.PROMETHEUS_CFG_CHECK_COMMAND.value.format(node=host)
output =
self.exec_script_instance_and_return(name_or_id=static_services_instance_id,
script=check_command)
@@ -1359,6 +1369,7 @@ class AWSInstance:
def _check_prometheus_exists_nodes(self, params: Dict) -> Dict:
exists_nodes: Dict = {}
+ logger.info(f"Current execute commands in `static service stack` which
named {self.static_service_stack_name}.")
for k, v in params.items():
command =
Commands.PROMETHEUS_CFG_CHECK_COMMAND.value.format(node=k)
output = self.exec_script_instance_and_return(
@@ -1370,6 +1381,7 @@ class AWSInstance:
def _check_prometheus_not_exists_nodes(self, params: Dict) -> Dict:
not_exists_nodes: Dict = {}
+ logger.info(f"Current execute commands in `static service stack` which
named {self.static_service_stack_name}.")
for k, v in params.items():
command =
Commands.PROMETHEUS_CFG_CHECK_COMMAND.value.format(node=k)
output = self.exec_script_instance_and_return(
@@ -1392,16 +1404,19 @@ class AWSInstance:
def refresh_prometheus_config_after_scale_up(self, expected_nodes: Dict)
-> None:
commands = self.refresh_prometheus_commands_after_scale(expected_nodes)
+ logger.info(f"Current execute commands in `static service stack` which
named {self.static_service_stack_name}.")
for command in commands:
self.exec_script_instance_and_return(name_or_id=self.instance_id_of_static_services,
script=command)
def refresh_prometheus_spark_driver_of_kylin_after_scale_up(self,
expected_nodes: Dict) -> None:
commands =
self.refresh_prometheus_spark_driver_of_kylin_after_scale(expected_nodes)
+ logger.info(f"Current execute commands in `static service stack` which
named {self.static_service_stack_name}.")
for command in commands:
self.exec_script_instance_and_return(name_or_id=self.instance_id_of_static_services,
script=command)
def
refresh_prometheus_spark_metrics_of_kylin_in_cluster_after_scale_up(self,
expected_nodes: Dict) -> None:
commands =
self.refresh_prometheus_spark_driver_of_kylin_in_cluster_after_scale(expected_nodes)
+ logger.info(f"Current execute commands in `static service stack` which
named {self.static_service_stack_name}.")
for command in commands:
self.exec_script_instance_and_return(name_or_id=self.instance_id_of_static_services,
script=command)
@@ -1410,12 +1425,14 @@ class AWSInstance:
def refresh_prometheus_config_after_scale_down(self, exists_nodes: Dict)
-> None:
commands =
[Commands.PROMETHEUS_DELETE_CFG_COMMAND.value.format(node=worker) for worker in
exists_nodes.keys()]
+ logger.info(f"Current execute commands in `static service stack` which
named {self.static_service_stack_name}.")
for command in commands:
self.exec_script_instance_and_return(name_or_id=self.instance_id_of_static_services,
script=command)
def refresh_prometheus_spark_driver_of_kylin_after_scale_down(self,
exists_nodes: Dict) -> None:
commands =
[Commands.SPARK_DRIVER_METRIC_OF_KYLIN_DELETE_CFG_COMMAND.value.format(node=worker)
for worker in exists_nodes.keys()]
+ logger.info(f"Current execute commands in `static service stack` which
named {self.static_service_stack_name}.")
for command in commands:
self.exec_script_instance_and_return(name_or_id=self.instance_id_of_static_services,
script=command)
@@ -1935,6 +1952,7 @@ class AWSInstance:
vm_name = None
if isinstance(name_or_id, str):
vm_name = [name_or_id]
+ logger.info(f"Current instance id: {name_or_id} is executing commands:
{script}.")
response = self.send_command(vm_name=vm_name, script=script)
command_id = response['Command']['CommandId']
time.sleep(5)
@@ -2134,8 +2152,9 @@ class AWSInstance:
def upload_file_to_s3(self, local_file_dir: str, filename: str, bucket:
str, bucket_dir: str) -> None:
logger.info(f'Uploading {filename} from {local_file_dir} to S3 bucket:
{bucket}/{bucket_dir}.')
- self.s3_client.upload_file(os.path.join(local_file_dir, filename),
bucket, bucket_dir + filename)
- logger.info(f'Uploaded {filename} successfully.')
+ full_filename = os.path.join(local_file_dir, filename)
+ self.s3_client.upload_file(full_filename, bucket, bucket_dir +
filename, Callback=ProgressBar(full_filename))
+ logger.info(f'\nUploaded {filename} successfully.')
def is_stack_deleted_complete(self, stack_name: str) -> bool:
if self.is_stack_create_complete(stack_name):
diff --git a/utils.py b/utils/common_utils.py
similarity index 95%
rename from utils.py
rename to utils/common_utils.py
index 833d4de..14e31f4 100644
--- a/utils.py
+++ b/utils/common_utils.py
@@ -18,6 +18,8 @@
import logging
import os
import shutil
+import sys
+from datetime import datetime
from typing import List, Tuple, Generator, Dict
import requests
@@ -117,10 +119,21 @@ class Utils:
return
logger.info(f"Downloading {os.path.abspath(file_path)}.")
with open(file_path, 'wb') as f:
+ # set downloading bar
+ total_length = int(r.headers.get('content-length'))
+ temp_done = 0
+ start = datetime.now()
for chunk in r.iter_content(chunk_size=1024 * 8):
if not chunk:
break
+
+ temp_done += len(chunk)
f.write(chunk)
+ end = datetime.now()
+ done = int(50 * temp_done / total_length)
+ sys.stdout.write(f"\r[{'=' * done}{' '* (50 - done)}] %
{temp_done} / {total_length} "
+ f"- Duration: {end - start}\r")
+ sys.stdout.flush()
f.flush()
os.fsync(f.fileno())
diff --git a/engine_utils.py b/utils/engine_utils.py
similarity index 99%
rename from engine_utils.py
rename to utils/engine_utils.py
index 5f432e7..4f6ff52 100644
--- a/engine_utils.py
+++ b/utils/engine_utils.py
@@ -23,7 +23,7 @@ from constant.config import Config
from constant.deployment import NodeType, ScaleType
from constant.yaml_files import Tar
from instances.kylin_utils import KylinUtils
-from utils import Utils
+from utils.common_utils import Utils
logger = logging.getLogger(__name__)
diff --git a/utils/progress_bar.py b/utils/progress_bar.py
new file mode 100644
index 0000000..e2051b1
--- /dev/null
+++ b/utils/progress_bar.py
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import threading
+from datetime import datetime
+
+
+class ProgressBar(object):
+
+ def __init__(self, filename):
+ self._filename = filename
+ self._size = int(os.path.getsize(filename))
+ self._seen_so_far = 0
+ self._lock = threading.Lock()
+ self._done = 0
+ self._start = datetime.now()
+ self._end = None
+
+ def __call__(self, bytes_amount):
+ # To simplify, assume this is hooked up to a single filename
+ with self._lock:
+ self._end = datetime.now()
+ self._seen_so_far += bytes_amount
+ self._done = int(50 * self._seen_so_far / self._size)
+ sys.stdout.write(f"\r[{'=' * self._done}{' ' * (50 - self._done)}]
% {self._seen_so_far} / {self._size} "
+ f"- Duration: {self._end - self._start}\r")
+ sys.stdout.flush()