On 01/13/2012 07:32 PM, Anthony Liguori wrote:
This also includes a qtest wrapper script to make it easier to launch qtest
tests directly.

Signed-off-by: Anthony Liguori<aligu...@us.ibm.com>

Here is a Python test harness for qtest. I haven't tried merging them with the makefiles.

Feel free to add my s-o-b and include the files in your patches.

Paolo
#! /usr/bin/env python
#
# GTester-compatible main program for pyunit
#
#    Copyright (C) 2011 Red Hat, Inc.
#    Author: Paolo Bonzini <pbonz...@redhat.com>
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.


# This file defines replacements for unittest.main and unittest.TextTestRunner.
# They are command-line compatible with glib test suites, including gtester
# support, and even in standalone mode the output is similar to gtest-based
# C unit tests.  The only unsupported features are -m and --GTestSkipCount.
#
# Use this instead of unittest.main:
#
#   if __name__ == "__main__":
#       gtest_main.main() # run all tests

import sys
import os
import types

import fnmatch
import random
import re
import struct
import time
import traceback

import unittest
from unittest.signals import registerResult, installHandler

# Ugly implementation dependency... used by TestResult to skip internal
# frames in the traceback
__unittest = True


def test_id(test, module='__main__'):
    """Retrieve a GTest-like path from a TestCase."""
    id = test.id().replace('.', '/')
    if module is not None:
        id = id[len(module.__name__):]
    return id


class SelectTestsSuite(unittest.TestSuite):
    """A wrapper for other TestSuites that processes -p and -s options.
       Perhaps it could also use decorators to implement -m?"""

    re = None
    module = None

    def __init__(self, tests=(), module='__main__', path='*', skipPath=None):
        super(unittest.TestSuite, self).__init__(tests=tests)
        self.module = module

        regex = ''
        if path != '*':
            regex = fnmatch.translate(path)
        if skipPath is not None:
            regex = '(?!' + fnmatch.translate(skipPath) + ')' + regex
        if regex != '':
            self.re = re.compile(regex)

    def __iter__(self):
        iter = super(unittest.TestSuite, self).__iter__()
        if self.re is not None:
            iter = (x for x in iter if self.accept(x))
        return iter

    def addTest(self, test):
        if isinstance(test, unittest.TestSuite):
            self.addTests(test)
        else:
            unittest.TestSuite.addTest(self, test)

    def accept(self, test):
        id = test_id(test, self.module)
        return self.re is None or self.re.match(id)


class GTestResult(unittest.TestResult):
    """A test result class that can print formatted text results to a stream
       and can talk to gtester using the glib test protocol.  Roughly based on
       TextTestResult, used internally by instances of GAbstractTestRunner."""

    separator1 = '=' * 70 + "\n"
    separator2 = '-' * 70 + "\n"

    def defaultSeed(self):
        """Return a default random number seed.  GLib expects this to be
           the string 'R02S' followed by four zero-padded 32-bit integers.
           We need to return a properly formatted seed so that the value
           can be passed to gtester even when Python and C tests are mixed
           on the command line."""
        try:
            s = os.urandom(16)
            a, b, c, d = struct.unpack(">IIII", s)
        except NotImplementedError:
            t = time.time()
            a = int(t)
            b = int((t - a) * 1000000)
            c = os.getpid()
            d = os.name = 'posix' and os.getppid() or 0
        return "R02S{0:>08x}{1:>08x}{2:>08x}{3:>08x}".format(a,b,c,d)

    def __init__(self, log, stream, progName=None, module='__main__', verbosity=1,
                 seed=None):
        super(GTestResult, self).__init__()
        self.stream = stream
        self.log = log
        self.showAll = verbosity > 1
        self.quiet = verbosity == 0
        self.module = module
        self.seed = seed or self.defaultSeed()
        self.progName = progName or os.path.basename(sys.argv[0])

    # These methods implement the glib test protocol.
    G_TEST_LOG_NONE = 0
    G_TEST_LOG_ERROR = 1           # s:msg
    G_TEST_LOG_START_BINARY = 2    # s:binaryname s:seed
    G_TEST_LOG_LIST_CASE = 3       # s:testpath
    G_TEST_LOG_SKIP_CASE = 4       # s:testpath, TODO
    G_TEST_LOG_START_CASE = 5      # s:testpath
    G_TEST_LOG_STOP_CASE = 6       # d:status d:nforks d:elapsed
    G_TEST_LOG_MIN_RESULT = 7      # s:blurb d:result
    G_TEST_LOG_MAX_RESULT = 8      # s:blurb d:result
    G_TEST_LOG_MESSAGE = 9         # s:blurb

    def pack_log(self, typ, strings=(), nums=()):
        out = struct.pack(">iiii", typ, len(strings), len(nums), 0)
        for s in strings:
            out = out + struct.pack(">i", len(s)) + s
        for n in nums:
            out = out + struct.pack(">d", float(n))
        out = struct.pack(">i", len(out) + 4) + out
        self.log.write(out)

    def logStartBinary(self):
        self.pack_log(self.G_TEST_LOG_START_BINARY, (self.progName, self.seed))

    def logStartCase(self, test):
        id = test_id(test, self.module)
        self.pack_log(self.G_TEST_LOG_START_CASE, (id,))

    def logListCase(self, test):
        id = test_id(test, self.module)
        self.pack_log(self.G_TEST_LOG_LIST_CASE, (id,))

    def logError(self, msg):
        self.pack_log(self.G_TEST_LOG_ERROR, (msg,))

    def logMessage(self, msg):
        self.pack_log(self.G_TEST_LOG_MESSAGE, (msg,))

    def logStopCase(self, status):
        try:
            elapsed = time.clock() - self.startCaseTime
            self.pack_log(self.G_TEST_LOG_STOP_CASE, nums=(status,0,elapsed))
        except:
            # This happens when class setup fails.  startCaseTime has not
            # been set, do nothing
            assert status != 0
            pass

    def logException(self, test, kind, err, skip=1):
        """Return a representation for the exception passed to addError
           or addFailure."""
        type, inst, trace=err
        try:
            file, line, func, text = traceback.extract_tb(trace, skip+1)[skip]
            msg = "%s:%s:%s:%s: %s\n" % (kind, file, line, type.__name__, inst)
        except:
            msg = "%s:%s: %s\n" % (kind, type.__name__, inst)
        self.write("**\n" + msg)
        self.logError(msg + self.separator1 + self._exc_info_to_string(err, test))

    # These methods implement text output.
    def write(self, str):
        self.stream.write(str)

    def flush(self):
        self.stream.flush()

    # These methods implement the standard TestResult protocol.
    def startTestRun(self):
        self.logStartBinary()
        self.startTime = time.clock()

    def stopTestRun(self):
        self.stopTime = time.clock()

    def startTest(self, test):
        super(GTestResult, self).startTest(test)
        random.seed(self.seed)
        self.startCaseTime = time.clock()
        self.logStartCase(test)
        if not self.quiet:
            self.write(test_id(test, self.module))
            self.write(": ")
            self.flush()

    def addSuccess(self, test):
        super(GTestResult, self).addSuccess(test)
        self.logStopCase(0)
        if not self.quiet:
            self.write("OK\n")

    def addError(self, test, err):
        self.logException(test, "ERROR", err)
        self.logStopCase(1)
        super(GTestResult, self).addError(test, err)

    def addFailure(self, test, err):
        self.logException(test, "FAIL", err)
        self.logStopCase(1)
        super(GTestResult, self).addFailure(test, err)

    def addSkip(self, test, reason):
        self.logStopCase(0)
        super(GTestResult, self).addSkip(test, reason)
        if not self.quiet:
            self.write("SKIP {0!r}\n".format(reason))

    def addExpectedFailure(self, test, err):
        self.logException("XFAIL", err)
        self.logStopCase(0)
        super(GTestResult, self).addExpectedFailure(test, err)
        if not self.quiet:
            self.write("XFAIL\n")

    def addUnexpectedSuccess(self, test):
        self.logError("unexpected success")
        self.logStopCase(1)
        super(GTestResult, self).addUnexpectedSuccess(test)
        if not self.quiet:
            self.write("XPASS\n")

    # Additional methods used by GTestLister and GTestRunner.
    def listTest(self, test):
        super(GTestResult, self).startTest(test)
        self.logListCase(test)
        if not self.quiet:
            self.write(test_id(test, self.module))
            self.write("\n")
            self.flush()
        super(GTestResult, self).addSuccess(test)

    def printErrors(self):
        self.printErrorList('ERROR', self.errors)
        self.printErrorList('FAIL', self.failures)
        self.write(self.separator2)

    def printErrorList(self, kind, errors):
        for test, err in errors:
            self.write(self.separator1)
            self.write("%s: %s\n" % (kind,test_id(test, self.module)))
            self.write(self.separator2)
            self.write("%s\n" % err)

    def printSummary(self):
        run = self.testsRun
        timeTaken = self.stopTime - self.startTime
        if not self.quiet:
            self.write("Ran %d test%s in %.3fs\n\n" %
                       (run, run != 1 and "s" or "", timeTaken))

        infos = []
        failed = len(self.failures)
        if failed:
            infos.append("failures=%d" % failed)

        errored = len(self.errors)
        if errored:
            infos.append("errors=%d" % errored)

        skipped = len(self.skipped)
        if skipped:
            infos.append("skipped=%d" % skipped)

        expectedFails = len(self.expectedFailures)
        if expectedFails:
            infos.append("expected failures=%d" % expectedFails)

        unexpectedSuccesses = len(self.unexpectedSuccesses)
        if unexpectedSuccesses:
            infos.append("unexpected successes=%d" % unexpectedSuccesses)

        if not self.wasSuccessful():
            self.write("FAILED (%s)\n" % (", ".join(infos),))
        elif infos:
            self.write("OK (%s)\n" % (", ".join(infos),))
        elif not self.quiet:
            self.write("OK\n")

    def printResults(self):
        if self.quiet or self.showAll:
            if self.showAll:
                self.write("\n")
                self.printErrors()
            self.printSummary()

class GAbstractTestRunner(object):
    """A test runner class that interfaces to a GTestResult.  Compared
       to e.g. TextTestRunner, it can pass down some data that is required
       by the glib test protocol (program name, random number seed)."""

    def __init__(self, log=None, stream=sys.stdout, verbosity=1, progName=None,
                 module='__main__', failfast=True, seed=None, buffer=False):
        self.module = module
        self.verbosity = verbosity
        self.failfast = failfast
        self.buffer = buffer
        self.progName = progName
        self.seed = seed

        class _DummyStream(object):
            def write(self, s):
                pass

        self.log = log or _DummyStream()
        self.stream = log and _DummyStream() or stream

    def _makeResult(self):
        return GTestResult(stream=self.stream, log=self.log, module=self.module,
                           seed=self.seed, progName=self.progName,
                           verbosity=self.verbosity)

    def run(self, test):
        "Run the given test case or test suite."
        result = self._makeResult()
        registerResult(result)
        result.failfast = self.failfast
        result.buffer = self.buffer
        startTestRun = getattr(result, 'startTestRun', None)
        if startTestRun is not None:
            startTestRun()
        try:
            self.doRun(test, result)
        finally:
            stopTestRun = getattr(result, 'stopTestRun', None)
            if stopTestRun is not None:
                stopTestRun()
        return result

    def doRun(self, test, result):
        "Run the given test case or test suite."
        test(result)

class GTestLister(GAbstractTestRunner):
    """A test runner class that only prints the names of test cases.
       in the suite.

       When run in standalone mode it prints out the names of tests
       that have been selected.  When run in gtester mode, it logs
       the same information using the glib test protocol."""

    def doRun(self, test, result):
        """Run the given test case or test suite (actually just ask the
           GTestResult to list the test case)."""
        for t in test:
            result.listTest(t)

class GTestRunner(GAbstractTestRunner):
    """A test runner class that emits results in textual and GLib form.

       When run in standalone mode it prints out the names of tests as
       they are run, errors as they occur, and a summary of the results
       at the end of the test run, depending on the verbosity level.
       When run in gtester mode, it logs the entire run using the glib
       test protocol."""

    def run(self, test):
        "Run the given test case or test suite."
        result = super(GTestRunner, self).run(test)
        result.printResults()
        return result


USAGE = """\
Usage: %(progName)s [options]

Help Options:
  -?, --help                     Show help options
Test Options:
  -l                             List test cases available in a test executable
  -seed=RANDOMSEED               Provide a random seed to reproduce test
                                 runs using random numbers
  -v, --verbose                  Run tests verbosely
  -q, --quiet                    Run tests quietly
  -p TESTPATH                    execute all tests matching TESTPATH
  -s TESTPATH                    skip all tests matching TESTPATH
  -m {perf|slow|thorough|quick}  (unsupported) Execute tests according to mode
  -k, --keep-going               Keep running after the first failure
  --runner=CLASS                 Use an alternative test runner
                                 (e.g. unittest.TextTestRunner)
  --GTestLogFD=N                 file descriptor for communication with GTester
  --GTestSkipCount=N             (unsupported) gtester-specific argument

-p and -s accept slash-separated paths.  They work even in combination with
--runner.  -l and --GTestLogFD are only supported without --runner.
"""

class GTestProgram(object):
    """A command-line program that runs a set of tests, used to make test
       modules conveniently executable and to interface them with gtester."""

    failfast = True
    progName = None
    seed = None
    exit = True
    logFD = None
    verbosity = 1
    path = '*'
    skipPath = None

    def getModule(self, module):
        if isinstance(module, basestring):
            m = __import__(module)
            for part in module.split('.')[1:]:
                m = getattr(m, part)
            return m
        else:
            return module

    def getClass(self, name):
        if isinstance(name, basestring):
            parts = name.split('.')
            module = ".".join(parts[:-1])
            m = self.getModule(module)
            return getattr(m, parts[-1])
        else:
            return name

    def __init__(self, module='__main__', argv=None,
                    testRunner=None, testLoader=unittest.defaultTestLoader,
                    exit=True, seed=None):
        argv = argv or sys.argv
        self.module = self.getModule(module)
        self.exit = exit
        self.seed = seed
        self.testRunner = testRunner
        self.testLoader = testLoader
        self.progName = os.path.basename(argv[0])
        self.parseArgs(argv)
        self.suite = self.createTests()
        self.runTests()

    def usageExit(self, msg=None):
        if msg:
            print msg
        print USAGE % {'progName': self.progName}
        if self.exit:
            sys.exit(2)

    def parseArgs(self, argv):
        import getopt
        long_opts = ['seed=', 'keep-going', 'verbose', 'quiet', 'GTestLogFD=',
                     'runner=', 'GTestSkipCount=']
        list = False
        try:
            options, args = getopt.getopt(argv[1:], '?hHlvqs:p:m:k', long_opts)
            for opt, value in options:
                # quirk in the gtest option parser...
                if opt[1] != '-' and value != '' and value[0] == '=':
                    value = value[1:]

                if opt in ('-h','-H','-?','--help'):
                    self.usageExit()
                if opt in ('-q','--quiet'):
                    self.verbosity = 0
                if opt in ('-v','--verbose'):
                    self.verbosity = 2
                if opt in ('-k','--keep-going'):
                    self.failfast = False
                if opt in ('-l'):
                    list = True
                if opt in ('-p'):
                    self.path = value
                if opt in ('-s'):
                    self.skipPath = value
                if opt in ('--seed'):
                    self.seed = value
                if opt in ('--runner'):
                    self.testRunner = self.getClass(value)
                if opt in ('--GTestLogFD'):
                    self.logFD = int(value)
        except getopt.error, msg:
            self.usageExit(msg)

        if self.testRunner is None:
            self.testRunner = list and GTestLister or GTestRunner
        else:
            # Set the seed now for user-specified runners.  GTestRunners
            # resets it for each testcase, so that results are reproducible
            # even if other tests change.
            random.seed(self.seed)

    def createTests(self):
        allTests = self.testLoader.loadTestsFromModule(self.module)
        return SelectTestsSuite(tests=allTests, path=self.path, skipPath=self.skipPath,
                                module=self.module)

    def runTests(self):
        installHandler()
        if isinstance(self.testRunner, type) and \
               issubclass(self.testRunner, GAbstractTestRunner):
            # pass GTest-specific options to GAbstractTestRunner subclasses
            testRunner = self.testRunner(verbosity=self.verbosity,
                                         failfast=self.failfast,
                                         module=self.module,
                                         progName=self.progName,
                                         seed=self.seed,
                                         log=self.logFD and os.fdopen(self.logFD, 'w'))
        elif isinstance(self.testRunner, (type, types.ClassType)):
            try:
                testRunner = self.testRunner(verbosity=self.verbosity,
                                             failfast=self.failfast)
            except TypeError:
                testRunner = self.testRunner()
        else:
            # it is assumed to be a TestRunner instance
            testRunner = self.testRunner

        self.result = testRunner.run(self.suite)
        if self.exit:
            sys.exit(not self.result.wasSuccessful())

main = GTestProgram
#! /usr/bin/env python
#
# qtest harness for PyUnit
#
#    Copyright (C) 2011 Red Hat, Inc.
#    Author: Paolo Bonzini <pbonz...@redhat.com>
#
# This file is licensed under the terms of the GNU GPL, version 2 or later.
# See the COPYING file in the top-level directory.

import os
import struct
import socket
import unittest

class ClientDisconnected(Exception):
    pass

class QTestCase(unittest.TestCase):
    @classmethod
    def supportedArches():
        return ['all']

    @classmethod
    def extraArgs():
        return ()

    @classmethod
    def isSupportedArch(self, arch):
        return any(x == 'all' or x == arch for x in self.supportedArches())

    @classmethod
    def _raiseIRQ(self, irq):
        self._irqs.add(irq)

    @classmethod
    def _lowerIRQ(self, irq):
        self._irqs.discard(irq)

    @classmethod
    def setUpClass(self):
        qemu_binary = os.getenv('QTEST_QEMU_BINARY')
        arch = os.path.basename(qemu_binary).replace('qemu-system-','')
        if not self.isSupportedArch(arch):
            self._classSetupFailed = True
            return

        socket_path = '/tmp/qtest-%d.sock' % os.getpid()
        server_socket = socket.socket(socket.AF_UNIX)
        server_socket.bind(socket_path)
        server_socket.listen(1)

        self.pid = os.spawnl(os.P_NOWAIT, qemu_binary,
                             qemu_binary,
                             '-qtest', 'unix:%s' % socket_path,
                             '-qtest-log', '/dev/null',
                             '-machine', 'accel=qtest',
                             *self.extraArgs())

        self._socket, addr = server_socket.accept()
        self._irqs = set()
        self._buffer = ''
        self._reply = None
        server_socket.close()

    @classmethod
    def tearDownClass(self):
        self._socket.close()
        self._socket = None
        os.kill(self.pid, 15)
        os.waitpid(self.pid, 0)

    @classmethod
    def _qtestResponse(self, buf):
        tokens = buf.split()
        if tokens[0] == 'OK':
            self._reply = tokens
            return

        if tokens[0] == 'IRQ':
            if tokens[1] == 'raise':
                self._raiseIRQ(int(tokens[2]))
            else:
                self._lowerIRQ(int(tokens[2]))
            return

    @classmethod
    def waitForEvents(self, blocking=True):
        self._socket.setblocking(int(blocking))
        while True:
            i = self._buffer.find('\n')
            if i != -1:
                response, self._buffer = self._buffer[:i], self._buffer[i+1:]
                self._qtestResponse(response)
                self._socket.setblocking(0)
                continue

            try:
                data = self._socket.recv(4096)
                self._buffer = self._buffer + data
                if (len(data) == 0):
                    raise ClientDisconnected
            except:
                return

    @classmethod
    def _send(self, str):
        assert self._reply is None
        self._socket.sendall(str + '\n')
        while self._reply is None:
            self.waitForEvents()
        reply, self._reply = self._reply, None
        return reply

    # IRQ access
    def getIRQ(self, irq):
        self.waitForEvents(False)
        return irq in self._irqs

    def waitIRQ(self, irq, level=True):
        self.waitForEvents(False)
        while (irq in self._irqs) != level:
            self.waitForEvents(True)

    # PIO reads
    def inb(self, port):
        return int(self._send('inb 0x%x' % port)[1][2:], 16)

    def inw(self, port):
        return int(self._send('inw 0x%x' % port)[1][2:], 16)

    def inl(self, port):
        return int(self._send('inl 0x%x' % port)[1][2:], 16)

    # Memory reads
    def memread_unpack(self, addr, format):
        size = struct.calcsize(format)
        str = 'read 0x%x 0x%x' % (addr, size)
        reply = self._send(str)[1]
        bytes = reply[2:].decode('hex_codec')
        return struct.unpack(format, bytes)

    def memread(self, addr, size):
        return self.memread_unpack('%dB' % size)

    def ldb(self, addr):
        return self.memread_unpack(addr, 'B')[0]

    def ldw_le(self, addr):
        return self.memread_unpack(addr, '<H')[0]

    def ldl_le(self, addr):
        return self.memread_unpack(addr, '<I')[0]

    def ldw_be(self, addr):
        return self.memread_unpack(addr, '>H')[0]

    def ldl_be(self, addr):
        return self.memread_unpack(addr, '>I')[0]

    # PIO writes
    def outb(self, port, val):
        self._send('outb 0x%x 0x%x' % (port, val))

    def outw(self, port, val):
        self._send('outw 0x%x 0x%x' % (port, val))

    def outl(self, port, val):
        self._send('outl 0x%x 0x%x' % (port, val))

    # Memory writes
    def memwrite_pack(self, addr, format, *data):
        bytes = struct.pack(format, *data)
        str = 'write 0x%x 0x%x 0x%s' % (
                addr, len(bytes), bytes.encode('hex_codec'))
        self._send(str)

    def memwrite(self, addr, *data):
        self.memwrite_pack('%dB' % len(data), *data)

    def stb(self, addr, datum):
        self.memwrite_pack(addr, 'B', datum)

    def stw_le(self, addr, datum):
        self.memwrite_pack(addr, '<H', datum)

    def stl_le(self, addr, datum):
        self.memwrite_pack(addr, '<I', datum)

    def stw_be(self, addr, datum):
        self.memwrite_pack(addr, '>H', datum)

    def stl_be(self, addr, datum):
        self.memwrite_pack(addr, '>I', datum)
#! /usr/bin/env python
#
# Sample qtest written in Python
#
#    Copyright (C) 2011 Red Hat, Inc.
#    Author: Paolo Bonzini <pbonz...@redhat.com>
#
# This file is licensed under the terms of the GNU GPL, version 2 or later.
# See the COPYING file in the top-level directory.

import time
import qtest
import gtest_main

class QTestCase(qtest.QTestCase):
    @classmethod
    def supportedArches(self):
        return ['i386', 'x86_64']

    @classmethod
    def extraArgs(self):
        return ('-vga', 'none')

    BASE = 0x70
    RTC_SECONDS = 0
    RTC_MINUTES = 2
    RTC_HOURS = 4

    def read(self, reg):
        self.outb(self.BASE, reg)
        return self.inb(self.BASE + 1)

    def write(self, reg, value):
        self.outb(self.BASE, reg)
        self.outb(self.BASE + 1, value)

    def testRead(self):
        self.write(self.RTC_HOURS, 0x01)
        self.write(self.RTC_MINUTES, 0x15)
        self.write(self.RTC_SECONDS, 0x29)
        assert self.read(self.RTC_MINUTES) == 0x15
        assert self.read(self.RTC_HOURS) == 0x01

    def testUpdate(self):
        self.write(self.RTC_HOURS, 0x01)
        self.write(self.RTC_MINUTES, 0x15)
        self.write(self.RTC_SECONDS, 0x29)
        while self.read(self.RTC_SECONDS) == 0x29:
            time.sleep(0.2)
        assert self.read(self.RTC_SECONDS) == 0x30

if __name__ == '__main__':
    gtest_main.main()

Reply via email to