This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit d3ae4a416e1bd6c0bf98fdcf635063d2ab7233a2
Author: Riza Suminto <[email protected]>
AuthorDate: Tue Nov 26 14:33:18 2024 -0800

    IMPALA-13585: Make pip_download.py interruptible
    
    infra/python/deps/pip_download.py use multiprocessing.pool.ThreadPool
    where each thread calls wget. It is also wrap the download_package
    function with retry wrapper. When there is a network issue happen,
    pressing Ctrl+C does not immediately terminate pip_download.py and all
    its children. Thus, the script appears to hang.
    
    This patch make pip_download.py to capture SIGINT and pass it as
    cancellation event to all threads. It is changed to run with python3.
    All flake8 issues are also fixed.
    
    Testing:
    
    - Manually run `buildall.sh -cmake_only` and interrupt it in the middle
      of pip_download.py execution. Verify that script terminate
      immediately.
    
    Change-Id: I6f293dd8f3fcf3cffa17a4a44627a41d67b7dc91
    Reviewed-on: http://gerrit.cloudera.org:8080/22128
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 infra/python/deps/pip_download.py | 49 ++++++++++++++++++++++++++++-----------
 1 file changed, 35 insertions(+), 14 deletions(-)

diff --git a/infra/python/deps/pip_download.py 
b/infra/python/deps/pip_download.py
index bbbc4431d..f9d442f23 100755
--- a/infra/python/deps/pip_download.py
+++ b/infra/python/deps/pip_download.py
@@ -1,4 +1,4 @@
-#!/usr/bin/python
+#!/usr/bin/python3
 #
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
@@ -19,9 +19,8 @@
 
 # Implement the basic 'pip download' functionality in a way that gives us more 
control
 # over which archive type is downloaded and what post-download steps are 
executed.
-# This script requires Python 2.7+.
+# This script requires Python 3.
 
-from __future__ import absolute_import, division, print_function
 import hashlib
 import multiprocessing.pool
 import os
@@ -29,7 +28,9 @@ import os.path
 import re
 import sys
 from random import randint
+from threading import Event
 from time import sleep
+import signal
 import subprocess
 
 NUM_DOWNLOAD_ATTEMPTS = 8
@@ -63,7 +64,10 @@ def retry(func):
 
   def wrapper(*args, **kwargs):
     for try_num in range(NUM_DOWNLOAD_ATTEMPTS):
-      if try_num > 0:
+      if kwargs['is_canceled'] and kwargs['is_canceled'].is_set():
+        print('Cancelling {} download after {} tries.'.format(args, try_num))
+        return False
+      elif try_num > 0:
         sleep_len = randint(5, 10 * 2 ** try_num)
         print('Sleeping for {0} seconds before retrying'.format(sleep_len))
         sleep(sleep_len)
@@ -78,8 +82,12 @@ def retry(func):
 
   return wrapper
 
-def get_package_info(pkg_name, pkg_version):
+
+def get_package_info(pkg_name, pkg_version, is_canceled=None):
   '''Returns the file name, path, hash algorithm and digest of the package.'''
+  fail_return_value = (None, None, None, None)
+  if is_canceled and is_canceled.is_set():
+    return fail_return_value
   # We store the matching result in the candidates list instead of returning 
right away
   # to sort them and return the first value in alphabetical order. This 
ensures that the
   # same result is always returned even if the ordering changed on the server.
@@ -100,17 +108,18 @@ def get_package_info(pkg_name, pkg_version):
     digest = match.group(3)
     file_name = match.group(4)
     # Make sure that we consider only non Wheel archives, because those are 
not supported.
-    if (file_name.endswith('-{0}.tar.gz'.format(pkg_version)) or
-        file_name.endswith('-{0}.tar.bz2'.format(pkg_version)) or
-        file_name.endswith('-{0}.zip'.format(pkg_version))):
+    if (file_name.endswith('-{0}.tar.gz'.format(pkg_version))
+        or file_name.endswith('-{0}.tar.bz2'.format(pkg_version))
+        or file_name.endswith('-{0}.zip'.format(pkg_version))):
       candidates.append((file_name, path, hash_algorithm, digest))
   if not candidates:
     print('Could not find archive to download for {0} {1}'.format(pkg_name, 
pkg_version))
-    return (None, None, None, None)
+    return fail_return_value
   return sorted(candidates)[0]
 
+
 @retry
-def download_package(pkg_name, pkg_version):
+def download_package(pkg_name, pkg_version, is_canceled=None):
   file_name, path, hash_algorithm, expected_digest = get_package_info(pkg_name,
       pkg_version)
   if not file_name:
@@ -119,6 +128,8 @@ def download_package(pkg_name, pkg_version):
       expected_digest):
     print('File with matching digest already exists, skipping 
{0}'.format(file_name))
     return True
+  if is_canceled and is_canceled.is_set():
+    return False
   pkg_url = '{0}/packages/{1}'.format(PYPI_MIRROR, path)
   print('Downloading {0} from {1}'.format(file_name, pkg_url))
   if 0 != subprocess.check_call(["wget", pkg_url, "-q", "-O", file_name]):
@@ -129,6 +140,7 @@ def download_package(pkg_name, pkg_version):
     print('Hash digest check failed in file {0}.'.format(file_name))
     return False
 
+
 def main():
   if len(sys.argv) > 1:
     _, pkg_name, pkg_version = sys.argv
@@ -137,6 +149,13 @@ def main():
 
   pool = 
multiprocessing.pool.ThreadPool(processes=min(multiprocessing.cpu_count(), 4))
   results = []
+  is_canceled = Event()
+
+  def handler(signum, frame):
+    print('Ctrl+C was pressed.')
+    is_canceled.set()
+
+  signal.signal(signal.SIGINT, handler)
 
   for requirements_file in REQUIREMENTS_FILES:
     # If the package name and version are not specified in the command line 
arguments,
@@ -149,15 +168,17 @@ def main():
       # should be installed (for example a specific OS). We can ignore this 
and download
       # the package anyways because the installation 
script(bootstrap_virtualenv.py) can
       # take it into account.
-      l = line.split(";")[0].strip()
-      if not l:
+      first_token = line.split(";")[0].strip()
+      if not first_token:
         continue
-      pkg_name, pkg_version = l.split('==')
+      pkg_name, pkg_version = first_token.split('==')
       results.append(pool.apply_async(
-        download_package, args=[pkg_name.strip(), pkg_version.strip()]))
+        download_package, args=[pkg_name.strip(), pkg_version.strip()],
+        kwds={'is_canceled': is_canceled}))
 
     for x in results:
       x.get()
 
+
 if __name__ == '__main__':
   main()

Reply via email to