On Wed, 08/06 17:12, Maria Kustova wrote: > The purpose of the test runner is to prepare the test environment (e.g. create > a work directory, a test image, etc), execute a program under test with > parameters, indicate a test failure if the program was killed during the test > execution and collect core dumps, logs and other test artifacts. > > The test runner doesn't depend on an image format or a program will be tested, > so it can be used with any external image generator and program under test. > > Signed-off-by: Maria Kustova <mari...@catit.be> > --- > tests/image-fuzzer/runner/runner.py | 405 > ++++++++++++++++++++++++++++++++++++ > 1 file changed, 405 insertions(+) > create mode 100755 tests/image-fuzzer/runner/runner.py > > diff --git a/tests/image-fuzzer/runner/runner.py > b/tests/image-fuzzer/runner/runner.py > new file mode 100755 > index 0000000..3fa7fca > --- /dev/null > +++ b/tests/image-fuzzer/runner/runner.py > @@ -0,0 +1,405 @@ > +#!/usr/bin/env python > + > +# Tool for running fuzz tests > +# > +# Copyright (C) 2014 Maria Kustova <mari...@catit.be> > +# > +# This program is free software: you can redistribute it and/or modify > +# it under the terms of the GNU General Public License as published by > +# the Free Software Foundation, either version 2 of the License, or > +# (at your option) any later version. > +# > +# This program is distributed in the hope that it will be useful, > +# but WITHOUT ANY WARRANTY; without even the implied warranty of > +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > +# GNU General Public License for more details. > +# > +# You should have received a copy of the GNU General Public License > +# along with this program. If not, see <http://www.gnu.org/licenses/>. > +# > + > +import sys > +import os > +import signal > +import subprocess > +import random > +import shutil > +from itertools import count > +import getopt > +import StringIO > +import resource > + > +try: > + import json > +except ImportError: > + try: > + import simplejson as json > + except ImportError: > + print >>sys.stderr, \ > + "Warning: Module for JSON processing is not found.\n" \ > + "'--config' and '--command' options are not supported." > + > +# Backing file sizes in MB > +MAX_BACKING_FILE_SIZE = 10 > +MIN_BACKING_FILE_SIZE = 1 > + > + > +def multilog(msg, *output): > + """ Write an object to all of specified file descriptors.""" > + for fd in output: > + fd.write(msg) > + fd.flush() > + > + > +def str_signal(sig): > + """ Convert a numeric value of a system signal to the string one > + defined by the current operational system. > + """ > + for k, v in signal.__dict__.items(): > + if v == sig: > + return k > + > + > +def run_app(fd, q_args): > + """Start an application with specified arguments and return its exit code > + or kill signal depending on the result of execution. > + """ > + devnull = open('/dev/null', 'r+') > + process = subprocess.Popen(q_args, stdin=devnull, > + stdout=subprocess.PIPE, > + stderr=subprocess.PIPE) > + out, err = process.communicate() > + fd.write(out) > + fd.write(err) > + return process.returncode > + > + > +class TestException(Exception): > + """Exception for errors risen by TestEnv objects.""" > + pass > + > + > +class TestEnv(object): > + > + """Test object. > + > + The class sets up test environment, generates backing and test images > + and executes application under tests with specified arguments and a test > + image provided. > + > + All logs are collected. > + > + The summary log will contain short descriptions and statuses of tests in > + a run. > + > + The test log will include application (e.g. 'qemu-img') logs besides info > + sent to the summary log. > + """ > + > + def __init__(self, test_id, seed, work_dir, run_log, > + cleanup=True, log_all=False): > + """Set test environment in a specified work directory. > + > + Path to qemu-img and qemu-io will be retrieved from 'QEMU_IMG' and > + 'QEMU_IO' environment variables. > + """ > + if seed is not None: > + self.seed = seed > + else: > + self.seed = str(random.randint(0, sys.maxint)) > + random.seed(self.seed) > + > + self.init_path = os.getcwd() > + self.work_dir = work_dir > + self.current_dir = os.path.join(work_dir, 'test-' + test_id) > + self.qemu_img = os.environ.get('QEMU_IMG', 'qemu-img')\ > + .strip().split(' ')
Nitpicking. I think split(' ') doesn't make sense, this could instead be: self.qemu_img = [os.environ.get('QEMU_IMG', 'qemu-img').strip()] Otherwise user won't be able to pass in a QEMU_IMG path with space. Corner case, though. Otherwise looks good, Fam > + self.qemu_io = os.environ.get('QEMU_IO', 'qemu-io').strip().split(' > ') > + self.commands = [['qemu-img', 'check', '-f', 'qcow2', '$test_img'], > + ['qemu-img', 'info', '-f', 'qcow2', '$test_img'], > + ['qemu-io', '$test_img', '-c', 'read $off $len'], > + ['qemu-io', '$test_img', '-c', 'write $off $len'], > + ['qemu-io', '$test_img', '-c', > + 'aio_read $off $len'], > + ['qemu-io', '$test_img', '-c', > + 'aio_write $off $len'], > + ['qemu-io', '$test_img', '-c', 'flush'], > + ['qemu-io', '$test_img', '-c', > + 'discard $off $len'], > + ['qemu-io', '$test_img', '-c', > + 'truncate $off']] > + for fmt in ['raw', 'vmdk', 'vdi', 'cow', 'qcow2', 'file', > + 'qed', 'vpc']: > + self.commands.append( > + ['qemu-img', 'convert', '-f', 'qcow2', '-O', fmt, > + '$test_img', 'converted_image.' + fmt]) > + > + try: > + os.makedirs(self.current_dir) > + except OSError, e: > + print >>sys.stderr, \ > + "Error: The working directory '%s' cannot be used. Reason: > %s"\ > + % (self.work_dir, e[1]) > + raise TestException > + self.log = open(os.path.join(self.current_dir, "test.log"), "w") > + self.parent_log = open(run_log, "a") > + self.failed = False > + self.cleanup = cleanup > + self.log_all = log_all > + > + def _create_backing_file(self): > + """Create a backing file in the current directory. > + > + Return a tuple of a backing file name and format. > + > + Format of a backing file is randomly chosen from all formats > supported > + by 'qemu-img create'. > + """ > + # All formats supported by the 'qemu-img create' command. > + backing_file_fmt = random.choice(['raw', 'vmdk', 'vdi', 'cow', > 'qcow2', > + 'file', 'qed', 'vpc']) > + backing_file_name = 'backing_img.' + backing_file_fmt > + backing_file_size = random.randint(MIN_BACKING_FILE_SIZE, > + MAX_BACKING_FILE_SIZE) * (1 << 20) > + cmd = self.qemu_img + ['create', '-f', backing_file_fmt, > + backing_file_name, str(backing_file_size)] > + temp_log = StringIO.StringIO() > + retcode = run_app(temp_log, cmd) > + if retcode == 0: > + temp_log.close() > + return (backing_file_name, backing_file_fmt) > + else: > + multilog("Warning: The %s backing file was not created.\n\n" > + % backing_file_fmt, sys.stderr, self.log, > self.parent_log) > + self.log.write("Log for the failure:\n" + temp_log.getvalue() + > + '\n\n') > + temp_log.close() > + return (None, None) > + > + def execute(self, input_commands=None, fuzz_config=None): > + """ Execute a test. > + > + The method creates backing and test images, runs test app and > analyzes > + its exit status. If the application was killed by a signal, the test > + is marked as failed. > + """ > + if input_commands is None: > + commands = self.commands > + else: > + commands = input_commands > + > + os.chdir(self.current_dir) > + backing_file_name, backing_file_fmt = self._create_backing_file() > + img_size = image_generator.create_image('test.img', > + backing_file_name, > + backing_file_fmt, > + fuzz_config) > + for item in commands: > + shutil.copy('test.img', 'copy.img') > + # 'off' and 'len' are multiple of the sector size > + sector_size = 512 > + start = random.randrange(0, img_size + 1, sector_size) > + end = random.randrange(start, img_size + 1, sector_size) > + > + if item[0] == 'qemu-img': > + current_cmd = list(self.qemu_img) > + elif item[0] == 'qemu-io': > + current_cmd = list(self.qemu_io) > + else: > + multilog("Warning: test command '%s' is not defined.\n" \ > + % item[0], sys.stderr, self.log, self.parent_log) > + continue > + # Replace all placeholders with their real values > + for v in item[1:]: > + c = (v > + .replace('$test_img', 'copy.img') > + .replace('$off', str(start)) > + .replace('$len', str(end - start))) > + current_cmd.append(c) > + > + # Log string with the test header > + test_summary = "Seed: %s\nCommand: %s\nTest directory: %s\n" \ > + "Backing file: %s\n" \ > + % (self.seed, " ".join(current_cmd), > + self.current_dir, backing_file_name) > + > + temp_log = StringIO.StringIO() > + try: > + retcode = run_app(temp_log, current_cmd) > + except OSError, e: > + multilog(test_summary + "Error: Start of '%s' failed. " \ > + "Reason: %s\n\n" % (os.path.basename( > + current_cmd[0]), e[1]), > + sys.stderr, self.log, self.parent_log) > + raise TestException > + > + if retcode < 0: > + self.log.write(temp_log.getvalue()) > + multilog(test_summary + "FAIL: Test terminated by signal " + > + "%s\n\n" % str_signal(-retcode), sys.stderr, > self.log, > + self.parent_log) > + self.failed = True > + else: > + if self.log_all: > + self.log.write(temp_log.getvalue()) > + multilog(test_summary + "PASS: Application exited with" + > + " the code '%d'\n\n" % retcode, sys.stdout, > + self.log, self.parent_log) > + temp_log.close() > + os.remove('copy.img') > + > + def finish(self): > + """Restore the test environment after a test execution.""" > + self.log.close() > + self.parent_log.close() > + os.chdir(self.init_path) > + if self.cleanup and not self.failed: > + shutil.rmtree(self.current_dir) > + > +if __name__ == '__main__': > + > + def usage(): > + print """ > + Usage: runner.py [OPTION...] TEST_DIR IMG_GENERATOR > + > + Set up test environment in TEST_DIR and run a test in it. A module > for > + test image generation should be specified via IMG_GENERATOR. > + Example: > + runner.py -c '[["qemu-img", "info", "$test_img"]]' /tmp/test ../qcow2 > + > + Optional arguments: > + -h, --help display this help and exit > + -c, --command=JSON run tests for all commands specified > in > + the JSON array > + -s, --seed=STRING seed for a test image generation, > + by default will be generated randomly > + --config=JSON take fuzzer configuration from the > JSON > + array > + -k, --keep_passed don't remove folders of passed tests > + -v, --verbose log information about passed tests > + > + JSON: > + > + '--command' accepts a JSON array of commands. Each command presents > + an application under test with all its paramaters as a list of > strings, > + e.g. > + ["qemu-io", "$test_img", "-c", "write $off $len"] > + > + Supported application aliases: 'qemu-img' and 'qemu-io'. > + Supported argument aliases: $test_img for the fuzzed image, $off > + for an offset, $len for length. > + > + Values for $off and $len will be generated based on the virtual disk > + size of the fuzzed image > + Paths to 'qemu-img' and 'qemu-io' are retrevied from 'QEMU_IMG' and > + 'QEMU_IO' environment variables > + > + '--config' accepts a JSON array of fields to be fuzzed, e.g. > + '[["header"], ["header", "version"]]' > + Each of the list elements can consist of a complex image element only > + as ["header"] or ["feature_name_table"] or an exact field as > + ["header", "version"]. In the first case random portion of the > element > + fields will be fuzzed, in the second one the specified field will be > + fuzzed always. > + > + If '--config' argument is specified, fields not listed in > + the configuration array will not be fuzzed. > + """ > + > + def run_test(test_id, seed, work_dir, run_log, cleanup, log_all, > + command, fuzz_config): > + """Setup environment for one test and execute this test.""" > + try: > + test = TestEnv(test_id, seed, work_dir, run_log, cleanup, > + log_all) > + except TestException: > + sys.exit(1) > + > + # Python 2.4 doesn't support 'finally' and 'except' in the same 'try' > + # block > + try: > + try: > + test.execute(command, fuzz_config) > + except TestException: > + sys.exit(1) > + finally: > + test.finish() > + > + try: > + opts, args = getopt.gnu_getopt(sys.argv[1:], 'c:hs:kv', > + ['command=', 'help', 'seed=', > 'config=', > + 'keep_passed', 'verbose']) > + except getopt.error, e: > + print >>sys.stderr, \ > + "Error: %s\n\nTry 'runner.py --help' for more information" % e > + sys.exit(1) > + > + command = None > + cleanup = True > + log_all = False > + seed = None > + config = None > + for opt, arg in opts: > + if opt in ('-h', '--help'): > + usage() > + sys.exit() > + elif opt in ('-c', '--command'): > + try: > + command = json.loads(arg) > + except (TypeError, ValueError, NameError), e: > + print >>sys.stderr, \ > + "Error: JSON array of test commands cannot be loaded.\n" > \ > + "Reason: %s" % e > + sys.exit(1) > + elif opt in ('-k', '--keep_passed'): > + cleanup = False > + elif opt in ('-v', '--verbose'): > + log_all = True > + elif opt in ('-s', '--seed'): > + seed = arg > + elif opt == '--config': > + try: > + config = json.loads(arg) > + except (TypeError, ValueError, NameError), e: > + print >>sys.stderr, \ > + "Error: JSON array with the fuzzer configuration cannot" > \ > + " be loaded\nReason: %s" % e > + sys.exit(1) > + > + if not len(args) == 2: > + print >>sys.stderr, \ > + "Expected two parameters\nTry 'runner.py --help'" \ > + " for more information." > + sys.exit(1) > + > + work_dir = os.path.realpath(args[0]) > + # run_log is created in 'main', because multiple tests are expected to > + # log in it > + run_log = os.path.join(work_dir, 'run.log') > + > + # Add the path to the image generator module to sys.path > + sys.path.append(os.path.realpath(os.path.dirname(args[1]))) > + # Remove a script extension from image generator module if any > + generator_name = os.path.splitext(os.path.basename(args[1]))[0] > + > + try: > + image_generator = __import__(generator_name) > + except ImportError, e: > + print >>sys.stderr, \ > + "Error: The image generator '%s' cannot be imported.\n" \ > + "Reason: %s" % (generator_name, e) > + sys.exit(1) > + > + # Enable core dumps > + resource.setrlimit(resource.RLIMIT_CORE, (-1, -1)) > + # If a seed is specified, only one test will be executed. > + # Otherwise runner will terminate after a keyboard interruption > + for test_id in count(1): > + try: > + run_test(str(test_id), seed, work_dir, run_log, cleanup, > + log_all, command, config) > + except (KeyboardInterrupt, SystemExit): > + sys.exit(1) > + > + if seed is not None: > + break > -- > 1.9.3 >