This is an automated email from the ASF dual-hosted git repository. csringhofer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit d3ae4a416e1bd6c0bf98fdcf635063d2ab7233a2 Author: Riza Suminto <[email protected]> AuthorDate: Tue Nov 26 14:33:18 2024 -0800 IMPALA-13585: Make pip_download.py interruptible infra/python/deps/pip_download.py use multiprocessing.pool.ThreadPool where each thread calls wget. It is also wrap the download_package function with retry wrapper. When there is a network issue happen, pressing Ctrl+C does not immediately terminate pip_download.py and all its children. Thus, the script appears to hang. This patch make pip_download.py to capture SIGINT and pass it as cancellation event to all threads. It is changed to run with python3. All flake8 issues are also fixed. Testing: - Manually run `buildall.sh -cmake_only` and interrupt it in the middle of pip_download.py execution. Verify that script terminate immediately. Change-Id: I6f293dd8f3fcf3cffa17a4a44627a41d67b7dc91 Reviewed-on: http://gerrit.cloudera.org:8080/22128 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- infra/python/deps/pip_download.py | 49 ++++++++++++++++++++++++++++----------- 1 file changed, 35 insertions(+), 14 deletions(-) diff --git a/infra/python/deps/pip_download.py b/infra/python/deps/pip_download.py index bbbc4431d..f9d442f23 100755 --- a/infra/python/deps/pip_download.py +++ b/infra/python/deps/pip_download.py @@ -1,4 +1,4 @@ -#!/usr/bin/python +#!/usr/bin/python3 # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -19,9 +19,8 @@ # Implement the basic 'pip download' functionality in a way that gives us more control # over which archive type is downloaded and what post-download steps are executed. -# This script requires Python 2.7+. +# This script requires Python 3. -from __future__ import absolute_import, division, print_function import hashlib import multiprocessing.pool import os @@ -29,7 +28,9 @@ import os.path import re import sys from random import randint +from threading import Event from time import sleep +import signal import subprocess NUM_DOWNLOAD_ATTEMPTS = 8 @@ -63,7 +64,10 @@ def retry(func): def wrapper(*args, **kwargs): for try_num in range(NUM_DOWNLOAD_ATTEMPTS): - if try_num > 0: + if kwargs['is_canceled'] and kwargs['is_canceled'].is_set(): + print('Cancelling {} download after {} tries.'.format(args, try_num)) + return False + elif try_num > 0: sleep_len = randint(5, 10 * 2 ** try_num) print('Sleeping for {0} seconds before retrying'.format(sleep_len)) sleep(sleep_len) @@ -78,8 +82,12 @@ def retry(func): return wrapper -def get_package_info(pkg_name, pkg_version): + +def get_package_info(pkg_name, pkg_version, is_canceled=None): '''Returns the file name, path, hash algorithm and digest of the package.''' + fail_return_value = (None, None, None, None) + if is_canceled and is_canceled.is_set(): + return fail_return_value # We store the matching result in the candidates list instead of returning right away # to sort them and return the first value in alphabetical order. This ensures that the # same result is always returned even if the ordering changed on the server. @@ -100,17 +108,18 @@ def get_package_info(pkg_name, pkg_version): digest = match.group(3) file_name = match.group(4) # Make sure that we consider only non Wheel archives, because those are not supported. - if (file_name.endswith('-{0}.tar.gz'.format(pkg_version)) or - file_name.endswith('-{0}.tar.bz2'.format(pkg_version)) or - file_name.endswith('-{0}.zip'.format(pkg_version))): + if (file_name.endswith('-{0}.tar.gz'.format(pkg_version)) + or file_name.endswith('-{0}.tar.bz2'.format(pkg_version)) + or file_name.endswith('-{0}.zip'.format(pkg_version))): candidates.append((file_name, path, hash_algorithm, digest)) if not candidates: print('Could not find archive to download for {0} {1}'.format(pkg_name, pkg_version)) - return (None, None, None, None) + return fail_return_value return sorted(candidates)[0] + @retry -def download_package(pkg_name, pkg_version): +def download_package(pkg_name, pkg_version, is_canceled=None): file_name, path, hash_algorithm, expected_digest = get_package_info(pkg_name, pkg_version) if not file_name: @@ -119,6 +128,8 @@ def download_package(pkg_name, pkg_version): expected_digest): print('File with matching digest already exists, skipping {0}'.format(file_name)) return True + if is_canceled and is_canceled.is_set(): + return False pkg_url = '{0}/packages/{1}'.format(PYPI_MIRROR, path) print('Downloading {0} from {1}'.format(file_name, pkg_url)) if 0 != subprocess.check_call(["wget", pkg_url, "-q", "-O", file_name]): @@ -129,6 +140,7 @@ def download_package(pkg_name, pkg_version): print('Hash digest check failed in file {0}.'.format(file_name)) return False + def main(): if len(sys.argv) > 1: _, pkg_name, pkg_version = sys.argv @@ -137,6 +149,13 @@ def main(): pool = multiprocessing.pool.ThreadPool(processes=min(multiprocessing.cpu_count(), 4)) results = [] + is_canceled = Event() + + def handler(signum, frame): + print('Ctrl+C was pressed.') + is_canceled.set() + + signal.signal(signal.SIGINT, handler) for requirements_file in REQUIREMENTS_FILES: # If the package name and version are not specified in the command line arguments, @@ -149,15 +168,17 @@ def main(): # should be installed (for example a specific OS). We can ignore this and download # the package anyways because the installation script(bootstrap_virtualenv.py) can # take it into account. - l = line.split(";")[0].strip() - if not l: + first_token = line.split(";")[0].strip() + if not first_token: continue - pkg_name, pkg_version = l.split('==') + pkg_name, pkg_version = first_token.split('==') results.append(pool.apply_async( - download_package, args=[pkg_name.strip(), pkg_version.strip()])) + download_package, args=[pkg_name.strip(), pkg_version.strip()], + kwds={'is_canceled': is_canceled})) for x in results: x.get() + if __name__ == '__main__': main()
