Add avocado test for vfio subsystem. The test uses pci-tesdev as a source
PCI device for vfio. So the test supposed to be launched inside QEMU
guest launched with '-device pci-testdev' devices.

The test creates VFIO container&group, launches QEMU and passes
container/group/device to it.

Signed-off-by: Andrey Ryabinin <a...@yandex-team.com>
---
 tests/avocado/vfio.py | 156 ++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 156 insertions(+)
 create mode 100644 tests/avocado/vfio.py

diff --git a/tests/avocado/vfio.py b/tests/avocado/vfio.py
new file mode 100644
index 00000000000..fa0d2b65dc7
--- /dev/null
+++ b/tests/avocado/vfio.py
@@ -0,0 +1,156 @@
+# tests for QEMU's vfio subsystem
+#
+# Copyright (c) 2022 Yandex N.V.
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or later.
+# See the COPYING file in the top-level directory.
+
+from avocado.utils import wait
+from avocado import skipUnless
+from avocado_qemu import QemuSystemTest
+from avocado_qemu import run_cmd
+import os
+import sys
+import subprocess
+from fcntl import ioctl
+from ctypes import *
+import struct
+
+
+VFIO_CMD_PREFIX = ord(';') << (4*2)
+VFIO_GET_API_VERSION = VFIO_CMD_PREFIX | 100
+VFIO_CHECK_EXTENSION = VFIO_CMD_PREFIX | 101
+VFIO_SET_IOMMU = VFIO_CMD_PREFIX | 102
+VFIO_GROUP_GET_STATUS = VFIO_CMD_PREFIX | 103
+VFIO_GROUP_SET_CONTAINER = VFIO_CMD_PREFIX | 104
+VFIO_GROUP_GET_DEVICE_FD = VFIO_CMD_PREFIX | 106
+
+VFIO_TYPE1_IOMMU = 1
+VFIO_SPAPR_TCE_IOMMU = 2
+VFIO_TYPE1v2_IOMMU = 3
+VFIO_SPAPR_TCE_v2_IOMMU = 7
+
+VFIO_API_VERSION = 0
+VFIO_TYPE1_IOMMU = 1
+PCI_VENDOR_ID=0x1b36
+PCI_DEV_ID=0x0005
+
+class vfio_group_status(Structure):
+    _fields_ = [("argsz", c_uint32),
+                ("flags", c_uint32)]
+
+class vfio_container(object):
+    def open(self):
+        self.container_fd = os.open("/dev/vfio/vfio", os.O_RDWR)
+        if ioctl(self.container_fd, VFIO_GET_API_VERSION) != VFIO_API_VERSION:
+            raise Exception("VFIO_GET_API_VERSION: unexpected vfio api 
version")
+        iommu_types = [ VFIO_TYPE1v2_IOMMU, VFIO_TYPE1_IOMMU,
+                          VFIO_SPAPR_TCE_v2_IOMMU, VFIO_SPAPR_TCE_IOMMU ];
+        for iommu_type in iommu_types:
+            if ioctl(self.container_fd, VFIO_CHECK_EXTENSION, iommu_type):
+                self.iommu_type = iommu_type
+        if not self.iommu_type:
+            raise Exception("No available IOMMU models");
+
+    def set_iommu(self):
+            ioctl(self.container_fd, VFIO_SET_IOMMU, self.iommu_type);
+
+class vfio_group(object):
+    def __init__(self, container, group_num):
+        self.ct = container
+        self.group_num = group_num
+    def open(self):
+        self.group_fd = os.open("/dev/vfio/%d" % self.group_num, os.O_RDWR)
+        status = vfio_group_status(0, 0)
+        ioctl(self.group_fd, VFIO_GROUP_GET_STATUS, pointer(status))
+        ioctl(self.group_fd, VFIO_GROUP_SET_CONTAINER,
+              c_int(self.ct.container_fd));
+
+class vfio_device(object):
+    def __init__(self, dev, group):
+        self.dev = dev
+        self.group = group
+
+def pci_testdev_exists():
+    for dev in next(os.walk('/sys/bus/pci/devices'))[1]:
+        with open("/sys/bus/pci/devices/%s/vendor" % dev) as f:
+            if f.read() != '0x%x\n' % PCI_VENDOR_ID:
+                continue
+        with open("/sys/bus/pci/devices/%s/device" % dev) as f:
+            if f.read() != '0x%.4x\n' % PCI_DEV_ID:
+                continue
+        return True
+    return False
+
+class VFIOIOMMUTest(QemuSystemTest):
+    devices = []
+    groups = []
+    timeout = 900
+    ct = vfio_container()
+
+    def add_group(self, dev):
+        tmp = os.readlink("/sys/bus/pci/devices/%s/iommu_group" % dev)
+        group_num = int(tmp.split('/')[-1])
+
+        for g in self.groups:
+            if g.group_num == group_num:
+                return g
+        group = vfio_group(self.ct, group_num)
+        self.groups.append(group)
+        return group
+
+    def setUp(self):
+        super().setUp()
+        run_cmd(('modprobe', 'vfio-pci'))
+        try:
+            f = open("/sys/bus/pci/drivers/vfio-pci/new_id", "a")
+            f.write("%x %x" % (PCI_VENDOR_ID, PCI_DEV_ID))
+        except PermissionError:
+            pass
+        except FileExistsError:
+            pass
+        for dev in next(os.walk('/sys/bus/pci/devices'))[1]:
+            with open("/sys/bus/pci/devices/%s/vendor" % dev) as f:
+                if f.read() != '0x%x\n' % PCI_VENDOR_ID:
+                    continue
+            with open("/sys/bus/pci/devices/%s/device" % dev) as f:
+                if f.read() != '0x%.4x\n' % PCI_DEV_ID:
+                    continue
+
+            self.devices.append(vfio_device(dev, self.add_group(dev)))
+
+    def open_dev_fds(self):
+        self.ct.open()
+        for group in self.groups:
+            group.open()
+        self.ct.set_iommu()
+
+    def close_fds(self):
+        for group in self.groups:
+            os.close(group.group_fd)
+        os.close(self.ct.container_fd)
+
+    def hotplug_devices(self, vm):
+        vm._qmp.send_fd_scm(self.devices[0].group.ct.container_fd)
+        vm.command("getfd", fdname="ct")
+        vm.command("object-add", qom_type="vfio-container", id="ct", fd="ct")
+
+        for group in self.groups:
+            vm._qmp.send_fd_scm(group.group_fd)
+            vm.command("getfd", fdname="group_fd")
+            vm.command("object-add", qom_type="vfio-group",
+                       id="group%d" % group.group_num,
+                       fd="group_fd", container="ct")
+
+        for i in range(len(self.devices)):
+            vm.command("device_add", driver="vfio-pci",
+                       host=self.devices[i].dev, id="dev%d" % i,
+                       group="group%d" % self.devices[i].group.group_num)
+
+    @skipUnless(pci_testdev_exists(), "no pci-testdev found")
+    def test_vfio(self):
+        self.open_dev_fds()
+        self.vm.add_args('-nodefaults')
+        self.vm.launch()
+        self.hotplug_devices(self.vm)
+        self.close_fds()
-- 
2.37.3


Reply via email to