On Fri, Dec 3, 2021 at 7:22 AM Vladimir Sementsov-Ogievskiy < vsement...@virtuozzo.com> wrote:
> Add -j <JOBS> parameter, to run tests in several jobs simultaneously. > For realization - simply utilize multiprocessing.Pool class. > > Notes: > > 1. Of course, tests can't run simultaneously in same TEST_DIR. So, > use subdirectories TEST_DIR/testname/ and SOCK_DIR/testname/ > instead of simply TEST_DIR and SOCK_DIR > > SOCK_DIR was introduced because TEST_DIR was getting too long, and the length of the filepath was causing problems on some platforms. I hope that we aren't pushing our luck by making the directory longer here. Using the test name is nice because a human operator can quickly correlate directories to the tests that populated them, but if test names get kind of long, I wonder if we'll cause problems again. Just a stray thought. > 2. multiprocessing.Pool.starmap function doesn't support passing > context managers, so we can't simply pass "self". Happily, we need > self only for read-only access, and it just works if it is defined > in global space. So, add a temporary link TestRunner.shared_self > during run_tests(). > I'm a little confused on this point, see below > Signed-off-by: Vladimir Sementsov-Ogievskiy <vsement...@virtuozzo.com> > --- > tests/qemu-iotests/check | 4 +- > tests/qemu-iotests/testrunner.py | 69 ++++++++++++++++++++++++++++---- > 2 files changed, 64 insertions(+), 9 deletions(-) > > diff --git a/tests/qemu-iotests/check b/tests/qemu-iotests/check > index 43a4b694cc..0c27721a41 100755 > --- a/tests/qemu-iotests/check > +++ b/tests/qemu-iotests/check > @@ -34,6 +34,8 @@ def make_argparser() -> argparse.ArgumentParser: > help='show me, do not run tests') > p.add_argument('-makecheck', action='store_true', > help='pretty print output for make check') > + p.add_argument('-j', dest='jobs', type=int, default=1, > + help='run tests in multiple parallel jobs') > > p.add_argument('-d', dest='debug', action='store_true', help='debug') > p.add_argument('-p', dest='print', action='store_true', > @@ -165,6 +167,6 @@ if __name__ == '__main__': > with TestRunner(env, makecheck=args.makecheck, > color=args.color) as tr: > paths = [os.path.join(env.source_iotests, t) for t in tests] > - ok = tr.run_tests(paths) > + ok = tr.run_tests(paths, args.jobs) > if not ok: > sys.exit(1) > (OK) > diff --git a/tests/qemu-iotests/testrunner.py > b/tests/qemu-iotests/testrunner.py > index a9f2feb58c..0feaa396d0 100644 > --- a/tests/qemu-iotests/testrunner.py > +++ b/tests/qemu-iotests/testrunner.py > @@ -26,6 +26,7 @@ > import json > import termios > import sys > +from multiprocessing import Pool > from contextlib import contextmanager > from typing import List, Optional, Iterator, Any, Sequence, Dict, \ > ContextManager > @@ -126,6 +127,31 @@ def __init__(self, status: str, description: str = '', > > > class TestRunner(ContextManager['TestRunner']): > + shared_self = None > + > + @staticmethod > + def proc_run_test(test: str, test_field_width: int) -> TestResult: > + # We are in a subprocess, we can't change the runner object! > *can't*, or shouldn't? I thought changing anything would just result in the child process diverging, but nothing harmful overall. Am I mistaken? > + runner = TestRunner.shared_self > + assert runner is not None > + return runner.run_test(test, test_field_width, mp=True) > + > + def run_tests_pool(self, tests: List[str], > + test_field_width: int, jobs: int) -> > List[TestResult]: > + > + # passing self directly to Pool.starmap() just doesn't work, > because > + # it's a context manager. > Why, what happens? (Or what doesn't happen?) (I believe you that it doesn't work, but it's not immediately obvious to me why.) > + assert TestRunner.shared_self is None > + TestRunner.shared_self = self > + > + with Pool(jobs) as p: > + results = p.starmap(self.proc_run_test, > + zip(tests, [test_field_width] * > len(tests))) > + > + TestRunner.shared_self = None > + > + return results > + > def __init__(self, env: TestEnv, makecheck: bool = False, > color: str = 'auto') -> None: > self.env = env > @@ -219,11 +245,16 @@ def find_reference(self, test: str) -> str: > > return f'{test}.out' > > - def do_run_test(self, test: str) -> TestResult: > + def do_run_test(self, test: str, mp: bool) -> TestResult: > """ > Run one test > > :param test: test file path > + :param mp: if true, we are in a multiprocessing environment, use > + personal subdirectories for test run > + > + Note: this method may be called from subprocess, so it does not > + change ``self`` object in any way! > """ > Maybe worth mentioning that it *does* change environment variables, but because this is "mp", it won't affect the parent execution environment. > > f_test = Path(test) > @@ -249,6 +280,12 @@ def do_run_test(self, test: str) -> TestResult: > > args = [str(f_test.resolve())] > env = self.env.prepare_subprocess(args) > + if mp: > + # Split test directories, so that tests running in parallel > don't > + # break each other. > + for d in ['TEST_DIR', 'SOCK_DIR']: > + env[d] = os.path.join(env[d], f_test.name) > + Path(env[d]).mkdir(parents=True, exist_ok=True) > > t0 = time.time() > with f_bad.open('w', encoding="utf-8") as f: > @@ -291,23 +328,32 @@ def do_run_test(self, test: str) -> TestResult: > casenotrun=casenotrun) > > def run_test(self, test: str, > - test_field_width: Optional[int] = None) -> TestResult: > + test_field_width: Optional[int] = None, > + mp: bool = False) -> TestResult: > """ > Run one test and print short status > > :param test: test file path > :param test_field_width: width for first field of status format > + :param mp: if true, we are in a multiprocessing environment, > don't try > + to rewrite things in stdout > + > + Note: this method may be called from subprocess, so it does not > + change ``self`` object in any way! > """ > > last_el = self.last_elapsed.get(test) > start = datetime.datetime.now().strftime('%H:%M:%S') > > if not self.makecheck: > - self.test_print_one_line(test=test, starttime=start, > - lasttime=last_el, end='\r', > + self.test_print_one_line(test=test, > + status = 'started' if mp else '...', > + starttime=start, > + lasttime=last_el, > + end = '\n' if mp else '\r', > test_field_width=test_field_width) > > - res = self.do_run_test(test) > + res = self.do_run_test(test, mp) > > end = datetime.datetime.now().strftime('%H:%M:%S') > self.test_print_one_line(test=test, status=res.status, > @@ -321,7 +367,7 @@ def run_test(self, test: str, > > return res > > - def run_tests(self, tests: List[str]) -> bool: > + def run_tests(self, tests: List[str], jobs: int = 1) -> bool: > n_run = 0 > failed = [] > notrun = [] > @@ -332,9 +378,16 @@ def run_tests(self, tests: List[str]) -> bool: > > test_field_width = max(len(os.path.basename(t)) for t in tests) + > 2 > > - for t in tests: > + if jobs > 1: > + results = self.run_tests_pool(tests, test_field_width, jobs) > + > + for i, t in enumerate(tests): > name = os.path.basename(t) > - res = self.run_test(t, test_field_width=test_field_width) > + > + if jobs > 1: > + res = results[i] > + else: > + res = self.run_test(t, test_field_width) > > assert res.status in ('pass', 'fail', 'not run') > > Looks good and surprisingly minimal, I just have a curiosity about the nature of the workaround here. Either way, I believe this will probably work as written, so I can give it an ACK at a minimum while I wait for answers. Acked-by: John Snow <js...@redhat.com>