On Fri, Dec 3, 2021 at 7:22 AM Vladimir Sementsov-Ogievskiy <
vsement...@virtuozzo.com> wrote:

> Add -j <JOBS> parameter, to run tests in several jobs simultaneously.
> For realization - simply utilize multiprocessing.Pool class.
>
> Notes:
>
> 1. Of course, tests can't run simultaneously in same TEST_DIR. So,
>    use subdirectories TEST_DIR/testname/ and SOCK_DIR/testname/
>    instead of simply TEST_DIR and SOCK_DIR
>
>
SOCK_DIR was introduced because TEST_DIR was getting too long, and the
length of the filepath was causing problems on some platforms. I hope that
we aren't pushing our luck by making the directory longer here. Using the
test name is nice because a human operator can quickly correlate
directories to the tests that populated them, but if test names get kind of
long, I wonder if we'll cause problems again.

Just a stray thought.


> 2. multiprocessing.Pool.starmap function doesn't support passing
>    context managers, so we can't simply pass "self". Happily, we need
>    self only for read-only access, and it just works if it is defined
>    in global space. So, add a temporary link TestRunner.shared_self
>    during run_tests().
>

I'm a little confused on this point, see below


> Signed-off-by: Vladimir Sementsov-Ogievskiy <vsement...@virtuozzo.com>
> ---
>  tests/qemu-iotests/check         |  4 +-
>  tests/qemu-iotests/testrunner.py | 69 ++++++++++++++++++++++++++++----
>  2 files changed, 64 insertions(+), 9 deletions(-)
>
> diff --git a/tests/qemu-iotests/check b/tests/qemu-iotests/check
> index 43a4b694cc..0c27721a41 100755
> --- a/tests/qemu-iotests/check
> +++ b/tests/qemu-iotests/check
> @@ -34,6 +34,8 @@ def make_argparser() -> argparse.ArgumentParser:
>                     help='show me, do not run tests')
>      p.add_argument('-makecheck', action='store_true',
>                     help='pretty print output for make check')
> +    p.add_argument('-j', dest='jobs', type=int, default=1,
> +                   help='run tests in multiple parallel jobs')
>
>      p.add_argument('-d', dest='debug', action='store_true', help='debug')
>      p.add_argument('-p', dest='print', action='store_true',
> @@ -165,6 +167,6 @@ if __name__ == '__main__':
>          with TestRunner(env, makecheck=args.makecheck,
>                          color=args.color) as tr:
>              paths = [os.path.join(env.source_iotests, t) for t in tests]
> -            ok = tr.run_tests(paths)
> +            ok = tr.run_tests(paths, args.jobs)
>              if not ok:
>                  sys.exit(1)
>

(OK)


> diff --git a/tests/qemu-iotests/testrunner.py
> b/tests/qemu-iotests/testrunner.py
> index a9f2feb58c..0feaa396d0 100644
> --- a/tests/qemu-iotests/testrunner.py
> +++ b/tests/qemu-iotests/testrunner.py
> @@ -26,6 +26,7 @@
>  import json
>  import termios
>  import sys
> +from multiprocessing import Pool
>  from contextlib import contextmanager
>  from typing import List, Optional, Iterator, Any, Sequence, Dict, \
>          ContextManager
> @@ -126,6 +127,31 @@ def __init__(self, status: str, description: str = '',
>
>
>  class TestRunner(ContextManager['TestRunner']):
> +    shared_self = None
>
+
> +    @staticmethod
> +    def proc_run_test(test: str, test_field_width: int) -> TestResult:
> +        # We are in a subprocess, we can't change the runner object!
>

*can't*, or shouldn't? I thought changing anything would just result in the
child process diverging, but nothing harmful overall. Am I mistaken?


> +        runner = TestRunner.shared_self
> +        assert runner is not None
> +        return runner.run_test(test, test_field_width, mp=True)
> +
> +    def run_tests_pool(self, tests: List[str],
> +                       test_field_width: int, jobs: int) ->
> List[TestResult]:
> +
> +        # passing self directly to Pool.starmap() just doesn't work,
> because
> +        # it's a context manager.
>

Why, what happens? (Or what doesn't happen?)

(I believe you that it doesn't work, but it's not immediately obvious to me
why.)


> +        assert TestRunner.shared_self is None
> +        TestRunner.shared_self = self
> +
> +        with Pool(jobs) as p:
> +            results = p.starmap(self.proc_run_test,
> +                                zip(tests, [test_field_width] *
> len(tests)))
> +
> +        TestRunner.shared_self = None
> +
> +        return results
> +
>      def __init__(self, env: TestEnv, makecheck: bool = False,
>                   color: str = 'auto') -> None:
>          self.env = env
> @@ -219,11 +245,16 @@ def find_reference(self, test: str) -> str:
>
>          return f'{test}.out'
>
> -    def do_run_test(self, test: str) -> TestResult:
> +    def do_run_test(self, test: str, mp: bool) -> TestResult:
>          """
>          Run one test
>
>          :param test: test file path
> +        :param mp: if true, we are in a multiprocessing environment, use
> +                   personal subdirectories for test run
> +
> +        Note: this method may be called from subprocess, so it does not
> +        change ``self`` object in any way!
>          """
>

Maybe worth mentioning that it *does* change environment variables, but
because this is "mp", it won't affect the parent execution environment.


>
>          f_test = Path(test)
> @@ -249,6 +280,12 @@ def do_run_test(self, test: str) -> TestResult:
>
>          args = [str(f_test.resolve())]
>          env = self.env.prepare_subprocess(args)
> +        if mp:
> +            # Split test directories, so that tests running in parallel
> don't
> +            # break each other.
> +            for d in ['TEST_DIR', 'SOCK_DIR']:
> +                env[d] = os.path.join(env[d], f_test.name)
> +                Path(env[d]).mkdir(parents=True, exist_ok=True)
>
>          t0 = time.time()
>          with f_bad.open('w', encoding="utf-8") as f:
> @@ -291,23 +328,32 @@ def do_run_test(self, test: str) -> TestResult:
>                                casenotrun=casenotrun)
>
>      def run_test(self, test: str,
> -                 test_field_width: Optional[int] = None) -> TestResult:
> +                 test_field_width: Optional[int] = None,
> +                 mp: bool = False) -> TestResult:
>          """
>          Run one test and print short status
>
>          :param test: test file path
>          :param test_field_width: width for first field of status format
> +        :param mp: if true, we are in a multiprocessing environment,
> don't try
> +                   to rewrite things in stdout
> +
> +        Note: this method may be called from subprocess, so it does not
> +        change ``self`` object in any way!
>          """
>
>          last_el = self.last_elapsed.get(test)
>          start = datetime.datetime.now().strftime('%H:%M:%S')
>
>          if not self.makecheck:
> -            self.test_print_one_line(test=test, starttime=start,
> -                                     lasttime=last_el, end='\r',
> +            self.test_print_one_line(test=test,
> +                                     status = 'started' if mp else '...',
> +                                     starttime=start,
> +                                     lasttime=last_el,
> +                                     end = '\n' if mp else '\r',
>                                       test_field_width=test_field_width)
>
> -        res = self.do_run_test(test)
> +        res = self.do_run_test(test, mp)
>
>          end = datetime.datetime.now().strftime('%H:%M:%S')
>          self.test_print_one_line(test=test, status=res.status,
>
@@ -321,7 +367,7 @@ def run_test(self, test: str,
>
>          return res
>
> -    def run_tests(self, tests: List[str]) -> bool:
> +    def run_tests(self, tests: List[str], jobs: int = 1) -> bool:
>          n_run = 0
>          failed = []
>          notrun = []
> @@ -332,9 +378,16 @@ def run_tests(self, tests: List[str]) -> bool:
>
>          test_field_width = max(len(os.path.basename(t)) for t in tests) +
> 2
>
> -        for t in tests:
> +        if jobs > 1:
> +            results = self.run_tests_pool(tests, test_field_width, jobs)
> +
> +        for i, t in enumerate(tests):
>              name = os.path.basename(t)
> -            res = self.run_test(t, test_field_width=test_field_width)
> +
> +            if jobs > 1:
> +                res = results[i]
> +            else:
> +                res = self.run_test(t, test_field_width)
>
>              assert res.status in ('pass', 'fail', 'not run')
>
>
Looks good and surprisingly minimal, I just have a curiosity about the
nature of the workaround here.

Either way, I believe this will probably work as written, so I can give it
an ACK at a minimum while I wait for answers.

Acked-by: John Snow <js...@redhat.com>

Reply via email to