This patch adds the bpf-vmtest-tool subdirectory under contrib which tests
BPF programs under a live kernel using a QEMU VM. It can build the
specified kernel version with eBPF support enabled
and stores it under $VMTEST_DIR
It can also compile BPF C source files or BPF bytecode objects and
test them against the kernel verifier for errors. When a BPF program
is rejected by the kernel verifier, the verifier logs are displayed.
$ python3 main.py -k 6.15 --bpf-src assets/ebpf-programs/fail.c
BPF program failed to load
Verifier logs:
btf_vmlinux is malformed
0: R1=ctx() R10=fp0
0: (81) r0 = *(s32 *)(r10 +4)
invalid read from stack R10 off=4 size=4
processed 1 insns (limit 1000000) max_states_per_insn 0 total_states 0
peak_states 0 mark_read 0
See the README for more examples.
The script uses vmtest (https://github.com/danobi/vmtest) to boot
the VM and run the program. By default, it uses the host's root
("/") as the VM rootfs via the 9p filesystem, so only the kernel is
replaced during testing.
Tested with Python 3.9 and above.
contrib/ChangeLog:
* bpf-vmtest-tool/README: New file.
* bpf-vmtest-tool/bpf.py: New file.
* bpf-vmtest-tool/config.py: New file.
* bpf-vmtest-tool/kernel.py: New file.
* bpf-vmtest-tool/main.py: New file.
* bpf-vmtest-tool/pyproject.toml: New file.
* bpf-vmtest-tool/tests/test_cli.py: New file.
* bpf-vmtest-tool/utils.py: New file.
* bpf-vmtest-tool/vm.py: New file.
Signed-off-by: Piyush Raj <[email protected]>
---
contrib/bpf-vmtest-tool/README | 157 ++++++++++++
contrib/bpf-vmtest-tool/bpf.py | 221 +++++++++++++++++
contrib/bpf-vmtest-tool/config.py | 50 ++++
contrib/bpf-vmtest-tool/kernel.py | 290 ++++++++++++++++++++++
contrib/bpf-vmtest-tool/main.py | 285 +++++++++++++++++++++
contrib/bpf-vmtest-tool/pyproject.toml | 36 +++
contrib/bpf-vmtest-tool/tests/test_cli.py | 219 ++++++++++++++++
contrib/bpf-vmtest-tool/utils.py | 31 +++
contrib/bpf-vmtest-tool/vm.py | 169 +++++++++++++
9 files changed, 1458 insertions(+)
create mode 100644 contrib/bpf-vmtest-tool/README
create mode 100644 contrib/bpf-vmtest-tool/bpf.py
create mode 100644 contrib/bpf-vmtest-tool/config.py
create mode 100644 contrib/bpf-vmtest-tool/kernel.py
create mode 100644 contrib/bpf-vmtest-tool/main.py
create mode 100644 contrib/bpf-vmtest-tool/pyproject.toml
create mode 100644 contrib/bpf-vmtest-tool/tests/test_cli.py
create mode 100644 contrib/bpf-vmtest-tool/utils.py
create mode 100644 contrib/bpf-vmtest-tool/vm.py
diff --git a/contrib/bpf-vmtest-tool/README b/contrib/bpf-vmtest-tool/README
new file mode 100644
index 00000000000..552b2a3e1c8
--- /dev/null
+++ b/contrib/bpf-vmtest-tool/README
@@ -0,0 +1,157 @@
+BPF vmtest Tool
+===============
+https://gcc.gnu.org/wiki/BPFRunTimeTests
+
+This directory contains a Python script to run BPF programs or shell commands
+under a live Linux kernel using QEMU virtual machines.
+
+USAGE
+=====
+
+Initial Setup
+-------------
+
+Before using the tool, you must set the directory where vmtest will look for
+kernels and store kernel artifacts. You can do this in two ways:
+
+1. Set the VMTEST_DIR environment variable
+2. Use the --vmtest-dir flag with each command
+
+Note: This is required to use the tool.
+
+Available Options
+-----------------
+
+View all supported commands using the --help option:
+
+ usage: main.py [-h] [-v DEBUG|INFO|WARNING|ERROR] [--vmtest-dir DIR]
{bpf,vmtest,kernel} ...
+
+ BPF vmtest tool
+
+ positional arguments:
+ {bpf,vmtest,kernel} Available commands
+ bpf BPF program management
+ vmtest Run VM tests
+ kernel Kernel management
+
+ options:
+ -h, --help show this help message and exit
+ -v DEBUG|INFO|WARNING|ERROR, --log-level DEBUG|INFO|WARNING|ERROR
+ Log level
+ --vmtest-dir DIR Directory for vmtest artifacts (or set VMTEST_DIR
env variable)
+
+ Examples:
+ # Compile BPF source to bytecode
+ main.py bpf compile my_prog.bpf.c -o my_prog.bpf.o
+
+ # Run BPF program in VM
+ main.py vmtest --kernel 6.15-x86_64 --bpf-src my_prog.bpf.c
+
+ # List available kernels
+ main.py kernel list
+
+
+COMMANDS
+========
+
+kernel subcommand
+-----------------
+
+You must build a kernel before using it.
+
+Build a kernel:
+
+ python main.py --vmtest-dir="/home/user/.bpf-vmtest-tool" kernel build 6.16
+
+The tool will download and build the specified kernel version from
+https://www.kernel.org/pub/linux/kernel and store the build artifacts in
+$VMTEST_DIR/kernels/linux-6.15-x86_64. Specifically, it stores bpftool,
+bzImage-6.15-x86_64, and vmlinux.h, which are used when compiling BPF programs
+instead of relying on the host system.
+
+List available kernels:
+
+ python main.py --vmtest-dir="/home/user/.bpf-vmtest-tool" kernel list
+
+Remove kernels:
+
+ python main.py --vmtest-dir="/home/user/.bpf-vmtest-tool" kernel remove
6.15-x86_64
+
+Note: If the architecture is omitted, the host architecture is assumed. For
+example, "6.15" will be treated as "6.15-x86_64" on an x86_64 system.
+
+vmtest subcommand
+-----------------
+
+Run a shell command inside a live kernel VM:
+
+ python main.py vmtest -k 6.15 -r / -c "uname -a"
+
+Run a BPF source file in the VM:
+
+ python main.py vmtest -k 6.15 --bpf-src fail.c
+
+Run a precompiled BPF object file:
+
+ python main.py vmtest -k 6.15 --bpf-obj fail.bpf.o
+
+bpf subcommand
+--------------
+
+You can compile BPF source to bytecode using the kernel-specific bpftool and
+vmlinux.h stored in $VMTEST_DIR:
+
+ python main.py --vmtest-dir="/home/user/.bpf-vmtest-tool" bpf compile
invalid-memory-access.c -k 6.15 -o /tmp/invalid-memory-access.bpf.o
+
+
+LIMITATIONS
+===========
+
+- Only x86_64 architecture is currently supported
+- Only "/" (the root filesystem) is currently supported as the VM rootfs when
+ running or testing BPF programs using --bpf-src or --bpf-obj
+
+
+DEPENDENCIES
+============
+
+- Python >= 3.9
+- vmtest >= v0.18.0 (https://github.com/danobi/vmtest)
+ - QEMU
+ - qemu-guest-agent
+
+For compiling kernels:
+- pahole
+- https://docs.kernel.org/process/changes.html#current-minimal-requirements
+
+For compiling and loading BPF programs:
+- libbpf
+- gcc-bpf-unknown-none
(https://gcc.gnu.org/wiki/BPFBackEnd#Where_to_find_GCC_BPF)
+
+
+BUILD FLAGS
+===========
+
+You can customize compiler settings using environment variables:
+
+- BPF_CC: Compiler for the BPF program (default: bpf-unknown-none-gcc)
+- BPF_CFLAGS: Extra flags for BPF program compilation (default: "-O2")
+- BPF_INCLUDES: Include paths for BPF (default: "-I/usr/local/include
-I/usr/include")
+- VMTEST_CC: Compiler for the user-space loader (default: gcc)
+- VMTEST_CFLAGS: Flags for compiling the loader (default: "-g -Wall")
+- VMTEST_LDFLAGS: Linker flags for the loader (default: "-lelf -lz -lbpf")
+
+Example usage:
+
+ BPF_CFLAGS="-O3 -g" BPF_CC="/bpf-gcc-build/gcc/xgcc" python main.py vmtest
-k 6.15 --bpf-src fail.c
+
+
+DEVELOPMENT
+===========
+
+Development dependencies are specified in pyproject.toml, which can be used
+with any suitable Python virtual environment manager.
+
+To run the test suite:
+
+ python3 -m pytest
\ No newline at end of file
diff --git a/contrib/bpf-vmtest-tool/bpf.py b/contrib/bpf-vmtest-tool/bpf.py
new file mode 100644
index 00000000000..714cf49abb8
--- /dev/null
+++ b/contrib/bpf-vmtest-tool/bpf.py
@@ -0,0 +1,221 @@
+import re
+import subprocess
+import logging
+from pathlib import Path
+import sys
+import tempfile
+from typing import Optional
+import utils
+import config
+import os
+
+# Based on the compilation process described in:
+# https://git.sr.ht/~brianwitte/gcc-bpf-example/tree/master/item/Makefile
+
+logger = logging.getLogger(__name__)
+
+
+def generate_sanitized_name(path: Path):
+ """generate sanitized c variable name"""
+ name = re.sub(r"\W", "_", path.stem)
+ if name and name[0].isdigit():
+ name = "_" + name
+ return name
+
+
+class BPFProgram:
+ tmp_base_dir = tempfile.TemporaryDirectory(prefix="vmtest")
+ tmp_base_dir_path = Path(tmp_base_dir.name)
+
+ def __init__(
+ self,
+ source_path: Optional[Path] = None,
+ bpf_bytecode_path: Optional[Path] = None,
+ use_temp_dir: bool = False,
+ ):
+ path = source_path or bpf_bytecode_path
+ self.name = generate_sanitized_name(path)
+ self.build_dir = self.__class__.tmp_base_dir_path / "ebpf_programs" /
self.name
+
+ if source_path:
+ self.bpf_src = source_path
+ self.bpf_obj = self.build_dir / f"{self.name}.bpf.o"
+ else:
+ self.bpf_obj = bpf_bytecode_path
+ self.build_dir.mkdir(parents=True, exist_ok=True)
+ self.bpf_skel = self.build_dir / f"{self.name}.skel.h"
+ self.loader_src = self.build_dir / f"{self.name}-loader.c"
+ self.output = self.build_dir / f"{self.name}.o"
+
+ @classmethod
+ def from_source(cls, source_path: Path, kernel_spec):
+ self = cls(source_path=source_path)
+ self._compile_bpf(kernel_spec)
+ self._compile_from_bpf_bytecode(kernel_spec)
+ return self.output
+
+ @classmethod
+ def from_bpf_obj(cls, obj_path: Path, kernel_spec):
+ self = cls(bpf_bytecode_path=obj_path)
+ self._compile_from_bpf_bytecode(kernel_spec)
+ return self.output
+
+ def compile_bpf(self, kernel_spec) -> Path:
+ if self.bpf_src is None:
+ raise ValueError(
+ "Cannot compile BPF source: instance was created with "
+ "bpf_bytecode_path instead of source_path"
+ )
+ self._compile_bpf(kernel_spec)
+ return self.bpf_obj
+
+ def _compile_from_bpf_bytecode(self, kernel_spec):
+ self._generate_skeleton(kernel_spec)
+ self._compile_loader()
+
+ def _compile_bpf(self, kernel_spec):
+ """Compile the eBPF program using gcc"""
+ logger.info(f"Compiling eBPF source: {self.bpf_src}")
+ cmd = [
+ config.config.bpf_cc,
+ f"-D__TARGET_ARCH_{config.config.arch}",
+ "-gbtf",
+ "-std=gnu17",
+ ]
+ cmd.append(f"-I{kernel_spec.vmlinux_path.parent}")
+ cmd.extend(config.config.bpf_cflags.split(" "))
+ cmd.extend(config.config.bpf_includes.split(" "))
+ cmd.extend(
+ [
+ "-c",
+ str(self.bpf_src),
+ "-o",
+ str(self.bpf_obj),
+ ]
+ )
+ # remove variables that conflict with host compiler
+ clean_env = os.environ.copy()
+ clean_env.pop("GCC_EXEC_PREFIX", None)
+ logger.debug("".join(cmd))
+ try:
+ utils.run_command(cmd, env=clean_env)
+ except subprocess.CalledProcessError as e:
+ logger.error(f"bpf compilation failed: {e}")
+ sys.exit(1)
+ logger.info(f"eBPF compiled: {self.bpf_obj}")
+
+ def _generate_skeleton(self, kernel_spec):
+ """Generate the BPF skeleton header using bpftool"""
+ logger.info(f"Generating skeleton: {self.bpf_skel}")
+ cmd = [
+ kernel_spec.bpftool_path,
+ "gen",
+ "skeleton",
+ str(self.bpf_obj),
+ "name",
+ self.name,
+ ]
+ try:
+ result = utils.run_command(cmd)
+ with open(self.bpf_skel, "w") as f:
+ f.write(result.stdout)
+ logger.info("Skeleton generated.")
+ except subprocess.CalledProcessError:
+ logger.error("Failed to generate skeleton.")
+ sys.exit(1)
+
+ def _compile_loader(self):
+ """Compile the C loader program"""
+ self.generate_loader()
+ logger.info(f"Compiling loader: {self.loader_src}")
+ cmd = [
+ config.config.vmtest_cc,
+ *config.config.vmtest_cflags.split(" "),
+ "-I",
+ str(self.build_dir),
+ str(self.loader_src),
+ *config.config.vmtest_ldflags.split(" "),
+ "-o",
+ str(self.output),
+ ]
+ # remove variables that conflict with host compiler
+ clean_env = os.environ.copy()
+ clean_env.pop("GCC_EXEC_PREFIX", None)
+ try:
+ utils.run_command(cmd, env=clean_env)
+ except subprocess.CalledProcessError as e:
+ logger.error(f"bpf loader compilation failed: {e}")
+ sys.exit(1)
+
+ logger.info("Compilation complete")
+
+ def generate_loader(self):
+ """
+ Generate a loader C file for the given BPF skeleton.
+
+ Args:
+ bpf_name (str): Name of the BPF program (e.g. "prog").
+ output_path (str): Path to write loader.c.
+ """
+ skeleton_header = f"{self.name}.skel.h"
+ loader_code = f"""\
+ #include <stdio.h>
+ #include <stdlib.h>
+ #include <signal.h>
+ #include <unistd.h>
+ #include <bpf/libbpf.h>
+ #include "{skeleton_header}"
+
+ #define LOG_BUF_SIZE 1024 * 1024
+
+ static volatile sig_atomic_t stop;
+ static char log_buf[LOG_BUF_SIZE];
+
+ void handle_sigint(int sig) {{
+ stop = 1;
+ }}
+
+ int main() {{
+ struct {self.name} *skel;
+ struct bpf_program *prog;
+ int err;
+
+ signal(SIGINT, handle_sigint);
+
+ skel = {self.name}__open(); // STEP 1: open only
+ if (!skel) {{
+ fprintf(stderr, "Failed to open BPF skeleton\\n");
+ return 1;
+ }}
+
+ // STEP 2: Get the bpf_program object for the main program
+ bpf_object__for_each_program(prog, skel->obj) {{
+ bpf_program__set_log_buf(prog, log_buf, sizeof(log_buf));
+ bpf_program__set_log_level(prog, 1); // optional: verbose logs
+ }}
+
+ // STEP 3: Load the program (this will trigger verifier log output)
+ err = {self.name}__load(skel);
+ fprintf(
+ stderr,
+ "--- Verifier log start ---\\n"
+ "%s\\n"
+ "--- Verifier log end ---\\n",
+ log_buf
+ );
+ if (err) {{
+ fprintf(stderr, "Failed to load BPF skeleton: %d\\n", err);
+ {self.name}__destroy(skel);
+ return 1;
+ }}
+
+ printf("BPF program loaded successfully.\\n");
+
+ {self.name}__destroy(skel);
+ return 0;
+ }}
+
+ """
+ with open(self.loader_src, "w") as f:
+ f.write(loader_code)
+ logger.info(f"Generated loader at {self.loader_src}")
diff --git a/contrib/bpf-vmtest-tool/config.py
b/contrib/bpf-vmtest-tool/config.py
new file mode 100644
index 00000000000..5f94f8f69bf
--- /dev/null
+++ b/contrib/bpf-vmtest-tool/config.py
@@ -0,0 +1,50 @@
+import platform
+from pathlib import Path
+import os
+from dataclasses import dataclass
+
+
+@dataclass
+class VMTestConfig:
+ """Configuration for BPF vmtest tool"""
+
+ vmtest_dir: Path
+ kernel_tarball_url: str = "https://cdn.kernel.org/pub/linux/kernel/"
+ arch: str = platform.machine()
+ vmtest_cc: str = os.getenv("VMTEST_CC", "gcc")
+ vmtest_cflags: str = os.getenv("VMTEST_CFLAGS", "-g -Wall -Werror ")
+ vmtest_ldflags: str = os.getenv("VMTEST_LDFLAGS", "-lelf -lz -lbpf")
+ bpf_cc: str = os.getenv("BPF_CC", "bpf-unknown-none-gcc")
+ bpf_cflags: str = os.getenv("BPF_CFLAGS", "-O2 -Wall -Werror")
+ bpf_includes: str = os.getenv("BPF_INCLUDES", "-I/usr/local/include
-I/usr/include")
+
+ @property
+ def kconfig_rel_paths(self) -> list:
+ """Kernel config paths relative to kernel directory"""
+ return [
+ "tools/testing/selftests/bpf/config",
+ "tools/testing/selftests/bpf/config.vm",
+ f"tools/testing/selftests/bpf/config.{self.arch}",
+ ]
+
+ @property
+ def kernels_dir(self) -> Path:
+ """Get kernels directory"""
+ return self.vmtest_dir / "kernels"
+
+ def __post_init__(self):
+ """Validate vmtest_dir exists"""
+ if not self.vmtest_dir.exists():
+ raise ValueError(f"VMTEST_DIR does not exist: {self.vmtest_dir}")
+ if not self.vmtest_dir.is_dir():
+ raise ValueError(f"VMTEST_DIR is not a directory:
{self.vmtest_dir}")
+
+
+# Global config instance
+config = None
+
+
+def init_config(vmtest_dir: str):
+ """Initialize global config"""
+ global config
+ config = VMTestConfig(vmtest_dir=Path(vmtest_dir))
diff --git a/contrib/bpf-vmtest-tool/kernel.py
b/contrib/bpf-vmtest-tool/kernel.py
new file mode 100644
index 00000000000..f60bc859206
--- /dev/null
+++ b/contrib/bpf-vmtest-tool/kernel.py
@@ -0,0 +1,290 @@
+import logging
+import os
+import shutil
+import subprocess
+from pathlib import Path
+import re
+import sys
+from urllib.parse import urljoin
+from urllib.request import urlretrieve
+from typing import Optional, List
+from dataclasses import dataclass
+
+import config
+import utils
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class KernelSpec:
+ """Immutable kernel specification"""
+
+ version: str
+ arch: str | None = None
+
+ def __post_init__(self):
+ if self.arch is None or self.arch == "":
+ self.arch = config.config.arch
+ self.major = self.version.split(".")[0]
+
+ def __str__(self):
+ return f"{self.version}-{self.arch}"
+
+ @property
+ def kernel_build_dir(self) -> Path:
+ return config.config.kernels_dir / f"linux-{self}-build"
+
+ @property
+ def kernel_dir(self) -> Path:
+ return config.config.kernels_dir / f"linux-{self}"
+
+ @property
+ def bzimage_path(self) -> Path:
+ return self.kernel_dir / f"bzImage-{self}"
+
+ @property
+ def bpftool_path(self) -> Path:
+ return self.kernel_dir / "bpftool"
+
+ @property
+ def vmlinux_path(self) -> Path:
+ return self.kernel_dir / "vmlinux.h"
+
+ @property
+ def tarball_path(self) -> Path:
+ return config.config.kernels_dir / f"linux-{self}.tar.xz"
+
+
+class KernelImage:
+ """Represents a compiled kernel image"""
+
+ def __init__(self, path: Path):
+ if not isinstance(path, Path):
+ path = Path(path)
+
+ if not path.exists():
+ raise FileNotFoundError(f"Kernel image not found: {path}")
+
+ self.path = path
+
+ def __str__(self):
+ return str(self.path)
+
+
+class KernelCompiler:
+ """Handles complete kernel compilation process including download and
build"""
+
+ @staticmethod
+ def _progress_hook(block_num: int, block_size: int, total_size: int) ->
None:
+ """Progress hook for urlretrieve to display download progress"""
+ if total_size <= 0:
+ return
+
+ downloaded = block_num * block_size
+ percent = min(downloaded * 100 // total_size, 100)
+ bar_length = 10
+ filled = int(bar_length * downloaded // total_size)
+ bar = "#" * filled + "-" * (bar_length - filled)
+
+ if logger.getEffectiveLevel() <= logging.INFO:
+ downloaded_mb = downloaded / (1024 * 1024)
+ total_mb = total_size / (1024 * 1024)
+
+ if sys.stdout.isatty():
+ sys.stdout.write(
+ f"\rDownloading: |{bar}| {percent}% ({downloaded_mb:.2f}MB
/ {total_mb:.2f}MB)"
+ )
+ sys.stdout.flush()
+ if downloaded >= total_size:
+ sys.stdout.write("\n")
+ sys.stdout.flush()
+ else:
+ if downloaded >= total_size or downloaded % (block_size * 100)
== 0:
+ logger.info(
+ f"Downloading: {percent}% ({downloaded_mb:.2f}MB /
{total_mb:.2f}MB)"
+ )
+
+ def compile_from_version(self, spec: KernelSpec) -> KernelImage:
+ """Complete compilation process from kernel version"""
+ if spec.bzimage_path.exists():
+ logger.info(f"Kernel {spec} already exists, skipping compilation")
+ return KernelImage(spec.bzimage_path)
+
+ try:
+ self._download_source(spec)
+ self._extract_source(spec)
+ self._configure_kernel(spec)
+ self._compile_kernel(spec)
+ self._copy_bzimage(spec)
+ # generate vmlinux.h from the kernel by starign the vm from the
build kernel
+
+ logger.info(f"Successfully compiled kernel {spec}")
+ return KernelImage(spec.bzimage_path)
+
+ except Exception as e:
+ logger.error(f"Failed to compile kernel {spec}: {e}")
+ sys.exit(1)
+ finally:
+ # Always cleanup temporary files
+ self._cleanup(spec)
+
+ def _download_source(self, spec: KernelSpec) -> None:
+ """Download kernel source tarball"""
+ if spec.tarball_path.exists():
+ logger.info(f"Tarball already exists: {spec.tarball_path}")
+ return
+
+ url_suffix = f"v{spec.major}.x/linux-{spec.version}.tar.xz"
+ url = urljoin(config.config.kernel_tarball_url, url_suffix)
+
+ logger.info(f"Downloading kernel from {url}")
+ spec.tarball_path.parent.mkdir(parents=True, exist_ok=True)
+ urlretrieve(url, spec.tarball_path, reporthook=self._progress_hook)
+ logger.info("Kernel source downloaded")
+
+ def _extract_source(self, spec: KernelSpec) -> None:
+ """Extract kernel source tarball"""
+ logger.info(f"Extracting kernel source to {spec.kernel_build_dir}")
+ spec.kernel_build_dir.mkdir(parents=True, exist_ok=True)
+
+ utils.run_command(
+ [
+ "tar",
+ "-xf",
+ str(spec.tarball_path),
+ "-C",
+ str(spec.kernel_build_dir),
+ "--strip-components=1",
+ ]
+ )
+
+ def _configure_kernel(self, spec: KernelSpec) -> None:
+ """Configure kernel with provided config files"""
+ config_path = spec.kernel_build_dir / ".config"
+
+ with open(config_path, "wb") as kconfig:
+ for config_rel_path in config.config.kconfig_rel_paths:
+ config_abs_path = spec.kernel_build_dir / config_rel_path
+ if config_abs_path.exists():
+ with open(config_abs_path, "rb") as conf:
+ kconfig.write(conf.read())
+
+ logger.info("Updated kernel configuration")
+
+ def _compile_kernel(self, spec: KernelSpec) -> None:
+ """Compile the kernel"""
+ logger.info(f"Compiling kernel in {spec.kernel_build_dir}")
+ old_cwd = os.getcwd()
+
+ try:
+ os.chdir(spec.kernel_build_dir)
+ # pahole is required for the DEBUG_INFO_BTF kernel configuration
option.
+ pahole_path = shutil.which("pahole")
+ if pahole_path is None:
+ logger.error(
+ "pahole not found in PATH. BTF generation requires pahole
v1.16 or later."
+ )
+ sys.exit(1)
+ friendly_cores = os.cpu_count() - 2
+ utils.run_command(["make", "olddefconfig"], stream_output=True)
+ logger.info("Starting kernel compilation")
+ utils.run_command(
+ ["make", f"-j{friendly_cores}", "bzImage"], stream_output=True
+ )
+ logger.info("Compiling bpftool")
+ utils.run_command(
+ ["make", "-C", "tools/bpf", f"-j{friendly_cores}", "bpftool"],
+ stream_output=True,
+ )
+ except subprocess.CalledProcessError as e:
+ logger.error(f"Kernel compilation failed: {e}")
+ sys.exit(1)
+ finally:
+ os.chdir(old_cwd)
+
+ def _copy_bzimage(self, spec: KernelSpec) -> None:
+ """Copy compiled bzImage to final location"""
+ # compile the bpftool as well
+ src = spec.kernel_build_dir / "arch/x86/boot/bzImage"
+ dest = spec.bzimage_path
+ dest.parent.mkdir(parents=True, exist_ok=True)
+
+ shutil.copy2(src, dest)
+ logger.info(f"Stored bzImage at {dest}")
+
+ shutil.copy2(
+ spec.kernel_build_dir / "tools/bpf/bpftool/vmlinux.h",
spec.vmlinux_path
+ )
+ logger.info(f"Stored vmlinux at {spec.vmlinux_path}")
+
+ shutil.copy2(
+ spec.kernel_build_dir / "tools/bpf/bpftool/bpftool",
spec.bpftool_path
+ )
+ logger.info(f"Stored bpftool at {spec.bpftool_path}")
+
+ def _cleanup(self, spec: KernelSpec) -> None:
+ """Clean up temporary files"""
+ if spec.tarball_path.exists():
+ spec.tarball_path.unlink()
+ logger.info("Removed tarball")
+
+ if spec.kernel_build_dir.exists():
+ shutil.rmtree(spec.kernel_build_dir)
+ logger.info("Removed kernel source directory")
+
+
+class KernelManager:
+ """Main interface for kernel management"""
+
+ def __init__(self):
+ self.compiler = KernelCompiler()
+
+ def remove_kernel(self, name: str) -> None:
+ """Remove compiled kernel by version"""
+ version, _, arch = name.partition("-")
+ spec = KernelSpec(version=version, arch=arch)
+ if spec.kernel_dir.exists():
+ shutil.rmtree(spec.kernel_dir)
+ logger.info(f"Removed kernel {spec}")
+ else:
+ logger.error(
+ f"Kernel {spec} does not exist, path {spec.kernel_dir} not
found"
+ )
+ raise SystemExit(1)
+
+ def build_kernel(self, version: str, arch=None) -> None:
+ """Build kernel from version"""
+
+ spec = KernelSpec(version=version, arch=arch)
+ self.compiler.compile_from_version(spec)
+
+ @staticmethod
+ def get_kernel_from_version(
+ version: str,
+ ):
+ """Get kernel image from version"""
+ version, _, arch = version.partition("-")
+ spec = KernelSpec(version=version, arch=arch)
+ if spec.bzimage_path.exists():
+ return spec, KernelImage(spec.bzimage_path)
+ else:
+ raise FileNotFoundError(
+ f"Kernel {spec} not found. Use 'main.py kernel build' to
create it."
+ )
+
+ def list_kernels(self) -> List[str]:
+ """List all available compiled kernels"""
+ if not config.config.kernels_dir.exists():
+ raise FileNotFoundError(
+ f"Kernels directory not found: {config.config.kernels_dir}"
+ )
+
+ kernels = []
+ for file in config.config.kernels_dir.glob("linux-*"):
+ if file.is_dir():
+ match = re.match(r"linux-(.*)", file.name)
+ if match:
+ kernels.append(match.group(1))
+
+ return sorted(kernels)
diff --git a/contrib/bpf-vmtest-tool/main.py b/contrib/bpf-vmtest-tool/main.py
new file mode 100644
index 00000000000..43b6036c615
--- /dev/null
+++ b/contrib/bpf-vmtest-tool/main.py
@@ -0,0 +1,285 @@
+import argparse
+import logging
+from pathlib import Path
+import sys
+import textwrap
+import os
+
+import bpf
+import kernel
+import vm
+import config
+
+logger = logging.getLogger(__name__)
+
+
+def cmd_kernel_list(args):
+ """List all available kernels"""
+ kmanager = kernel.KernelManager()
+ kernels = kmanager.list_kernels()
+ if kernels:
+ for k in kernels:
+ print(k)
+ else:
+ logger.info("No kernels available")
+
+
+def cmd_kernel_remove(args):
+ """Remove a kernel"""
+ kmanager = kernel.KernelManager()
+ if not args.kernel:
+ logger.error("kernel version required for remove action")
+ sys.exit(1)
+ kmanager.remove_kernel(args.kernel)
+ logger.info(f"Kernel {args.kernel} removed")
+ print(f"Kernel {args.kernel} removed")
+
+
+def cmd_kernel_build(args):
+ """Build a kernel"""
+ kmanager = kernel.KernelManager()
+ if not args.kernel:
+ logger.error("kernel version required for build action")
+ sys.exit(1)
+ kmanager.build_kernel(version=args.kernel)
+
+
+def cmd_bpf_compile(args):
+ """Compile BPF source to bytecode only"""
+ kmanager = kernel.KernelManager()
+
+ try:
+ kernel_spec, _ = kmanager.get_kernel_from_version(version=args.kernel)
+ except Exception as e:
+ logger.error(f"Failed to get kernel: {e}")
+ sys.exit(1)
+
+ try:
+ bpf_program = bpf.BPFProgram(source_path=Path(args.bpf_src))
+ output_path = bpf_program.compile_bpf(kernel_spec)
+
+ if args.output:
+ import shutil
+
+ output_dest = Path(args.output)
+ shutil.copy2(output_path, output_dest)
+ logger.info(f"Copied to: {output_dest}")
+
+ except Exception as e:
+ logger.error(f"Failed to compile BPF source: {e}")
+ sys.exit(1)
+
+
+def cmd_vmtest(args):
+ """Handle vmtest subcommand"""
+ kmanager = kernel.KernelManager()
+
+ try:
+ kernel_spec, kernel_image = kmanager.get_kernel_from_version(
+ version=args.kernel
+ )
+ except Exception as e:
+ logger.error(f"Failed to get kernel: {e}")
+ sys.exit(1)
+
+ try:
+ if args.bpf_src:
+ command = bpf.BPFProgram.from_source(Path(args.bpf_src),
kernel_spec)
+ elif args.bpf_obj:
+ command = bpf.BPFProgram.from_bpf_obj(Path(args.bpf_obj),
kernel_spec)
+ elif args.command:
+ command = args.command
+ except Exception as e:
+ logger.error(f"Failed to prepare command for vmtest: {e}")
+ sys.exit(1)
+
+ virtual_machine = vm.VirtualMachine(kernel_image, args.rootfs,
str(command))
+ try:
+ result = virtual_machine.execute()
+ except vm.BootFailedError as e:
+ logger.error(f"VM boot failure: {e}")
+ sys.exit(e.returncode)
+
+ if args.bpf_src or args.bpf_obj:
+ if result.returncode == 0:
+ print("BPF programs successfully loaded")
+ else:
+ if "Failed to load BPF skeleton" in result.stdout:
+ print("BPF program failed to load")
+ print("Verifier logs:")
+ print(textwrap.indent(vm.bpf_verifier_logs(result.stdout),
"\t"))
+ elif args.command:
+ print(result.stdout)
+
+ sys.exit(result.returncode)
+
+
+def main():
+ parser = argparse.ArgumentParser(
+ description="BPF vmtest tool",
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ epilog=textwrap.dedent("""
+ Examples:
+ # Compile BPF source to bytecode
+ %(prog)s bpf compile my_prog.bpf.c -o my_prog.bpf.o
+
+ # Run BPF program in VM
+ %(prog)s vmtest --kernel 6.15-x86_64 --bpf-src my_prog.bpf.c
+
+ # List available kernels
+ %(prog)s kernel list
+ """),
+ )
+
+ parser.add_argument(
+ "-v",
+ "--log-level",
+ help="Log level",
+ metavar="DEBUG|INFO|WARNING|ERROR",
+ choices=["DEBUG", "INFO", "WARNING", "ERROR"],
+ default="INFO",
+ )
+
+ parser.add_argument(
+ "--vmtest-dir",
+ help="Directory for vmtest artifacts (or set VMTEST_DIR env variable)",
+ metavar="DIR",
+ type=str,
+ default=os.getenv("VMTEST_DIR"),
+ )
+
+ subparsers = parser.add_subparsers(dest="subcommand", help="Available
commands")
+
+ # BPF subcommand
+ bpf_subparser = subparsers.add_parser("bpf", help="BPF program management")
+ bpf_subparsers = bpf_subparser.add_subparsers(dest="bpf_action", help="BPF
actions")
+
+ # bpf compile subcommand
+ compile_parser = bpf_subparsers.add_parser(
+ "compile", help="Compile BPF source to bytecode (.bpf.o)"
+ )
+ compile_parser.add_argument(
+ "bpf_src",
+ help="Path to BPF C source file",
+ type=str,
+ )
+ compile_parser.add_argument(
+ "-o",
+ "--output",
+ help="Output path for compiled bytecode (optional, defaults to temp
dir)",
+ metavar="PATH",
+ type=str,
+ required=True,
+ )
+ compile_parser.add_argument(
+ "-k",
+ "--kernel",
+ help="Kernel version to use for compilation",
+ metavar="VERSION",
+ type=str,
+ required=True,
+ )
+
+ compile_parser.set_defaults(func=cmd_bpf_compile)
+
+ # VMtest subcommand
+ vmtest_parser = subparsers.add_parser("vmtest", help="Run VM tests")
+ vmtest_parser.set_defaults(func=cmd_vmtest)
+
+ vmtest_parser.add_argument(
+ "-k",
+ "--kernel",
+ help="Kernel version to boot in the vm",
+ metavar="VERSION",
+ type=str,
+ required=True,
+ )
+ vmtest_parser.add_argument(
+ "-r", "--rootfs", help="rootfs to mount in the vm", default="/",
metavar="PATH"
+ )
+ command_group = vmtest_parser.add_mutually_exclusive_group(required=True)
+ command_group.add_argument(
+ "--bpf-src",
+ help="Path to BPF C source file",
+ metavar="PATH",
+ type=str,
+ )
+ command_group.add_argument(
+ "--bpf-obj",
+ help="Path to bpf bytecode object",
+ metavar="PATH",
+ type=str,
+ )
+ command_group.add_argument(
+ "-c", "--command", help="command to run in the vm", metavar="COMMAND"
+ )
+
+ # Kernel subcommand with nested subcommands
+ kernel_subparser = subparsers.add_parser("kernel", help="Kernel
management")
+ kernel_subparsers = kernel_subparser.add_subparsers(
+ dest="kernel_action", help="Kernel actions"
+ )
+
+ # kernel list subcommand
+ list_parser = kernel_subparsers.add_parser(
+ "list", help="List all available kernels"
+ )
+ list_parser.set_defaults(func=cmd_kernel_list)
+
+ # kernel remove subcommand
+ remove_parser = kernel_subparsers.add_parser("remove", help="Remove a
kernel")
+ remove_parser.add_argument(
+ "kernel", help="Kernel version to remove (e.g., 6.15-x86_64)"
+ )
+ remove_parser.set_defaults(func=cmd_kernel_remove)
+
+ # kernel build subcommand
+ build_parser = kernel_subparsers.add_parser("build", help="Build a kernel")
+ build_parser.add_argument(
+ "kernel", help="Kernel version to build (e.g. 6.15-x86_64)"
+ )
+ build_parser.set_defaults(func=cmd_kernel_build)
+
+ args = parser.parse_args()
+ logging.basicConfig(level=args.log_level, format="%(levelname)s:
%(message)s")
+
+ if not args.vmtest_dir:
+ logger.error(
+ "VMTEST_DIR not specified. Use --vmtest-dir=DIR or set VMTEST_DIR
environment variable"
+ )
+ sys.exit(1)
+
+ vmtest_path = Path(args.vmtest_dir)
+ if not vmtest_path.exists():
+ logger.error(f"VMTEST_DIR does not exist: {vmtest_path}")
+ sys.exit(1)
+
+ if not vmtest_path.is_dir():
+ logger.error(f"VMTEST_DIR is not a directory: {vmtest_path}")
+ sys.exit(1)
+
+ try:
+ config.init_config(vmtest_dir=args.vmtest_dir)
+ except ValueError as e:
+ logger.error(str(e))
+ sys.exit(1)
+
+ logger.debug(f"VMTEST_DIR set to: {args.vmtest_dir}")
+
+ if hasattr(args, "func"):
+ args.func(args)
+ sys.exit(0)
+ else:
+ parser.print_help()
+ sys.exit(1)
+
+
+if __name__ == "__main__":
+ try:
+ main()
+ except KeyboardInterrupt:
+ logger.error("Operation cancelled by user")
+ sys.exit(1)
+ except Exception as e:
+ logger.error(f"Unknown error: {e}")
+ sys.exit(1)
diff --git a/contrib/bpf-vmtest-tool/pyproject.toml
b/contrib/bpf-vmtest-tool/pyproject.toml
new file mode 100644
index 00000000000..1977612cfd6
--- /dev/null
+++ b/contrib/bpf-vmtest-tool/pyproject.toml
@@ -0,0 +1,36 @@
+[project]
+name = "bpf-vmtest-tool"
+version = "0.1.0"
+description = "Test BPF code against live kernels"
+readme = "README.md"
+requires-python = ">=3.9"
+dependencies = []
+
+[dependency-groups]
+dev = [
+ "pre-commit>=4.2.0",
+ "pytest>=8.4.0",
+ "pytest-sugar>=1.0.0",
+ "ruff>=0.11.13",
+ "tox>=4.26.0",
+]
+
+[tool.pytest.ini_options]
+addopts = [
+ "--import-mode=importlib",
+]
+testpaths = ["tests"]
+pythonpath = ["."]
+
+[tool.ruff.lint]
+select = [
+ # pycodestyle
+ "E",
+ # Pyflakes
+ "F",
+]
+# Allow fix for all enabled rules (when `--fix`) is provided.
+fixable = ["ALL"]
+
+# Allow unused variables when underscore-prefixed.
+dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
\ No newline at end of file
diff --git a/contrib/bpf-vmtest-tool/tests/test_cli.py
b/contrib/bpf-vmtest-tool/tests/test_cli.py
new file mode 100644
index 00000000000..d9a01328bf2
--- /dev/null
+++ b/contrib/bpf-vmtest-tool/tests/test_cli.py
@@ -0,0 +1,219 @@
+import sys
+from unittest import mock
+import pytest
+from bpf import BPFProgram
+import kernel
+import main
+import logging
+import config
+import os
+
+logger = logging.getLogger(__name__)
+
+
[email protected]
+def tmp_config():
+ VMTEST_DIR = "/home/d3bug/.bpf-vmtest-tool"
+ assert VMTEST_DIR is not None, "Specify VMTEST_DIR environment varible"
+ config.init_config(vmtest_dir=VMTEST_DIR)
+
+
+# reset config for every test
[email protected](autouse=True)
+def reset_config():
+ config.config = None
+
+
[email protected]
+def openat_bpf_source(tmp_path):
+ openat_bpf = tmp_path / "openat_bpf.c"
+ openat_bpf.write_text(r"""
+ #include "vmlinux.h"
+ #include <bpf/bpf_helpers.h>
+ #include <bpf/bpf_tracing.h>
+ #include <bpf/bpf_core_read.h>
+
+ char LICENSE[] SEC("license") = "GPL";
+
+ int example_pid = 0;
+
+ SEC("tracepoint/syscalls/sys_enter_openat")
+ int handle_openat(struct trace_event_raw_sys_enter *ctx)
+ {
+ int pid = bpf_get_current_pid_tgid() >> 32;
+ char filename[256]; // filename buffer
+ bpf_probe_read_user(&filename, sizeof(filename), (void *)ctx->args[1]);
+ bpf_printk("sys_enter_openat() called from PID %d for file: %s\n", pid,
+ filename);
+
+ return 0;
+ }
+
+ """)
+ return openat_bpf
+
+
[email protected]
+def openat_bpf_obj(openat_bpf_source, tmp_config):
+ def _create_openat_bpf_obj(kernel_spec):
+ bpf_program = BPFProgram(source_path=openat_bpf_source)
+ bpf_program._compile_bpf(kernel_spec)
+ return bpf_program.bpf_obj
+
+ return _create_openat_bpf_obj
+
+
[email protected]
+def invalid_memory_access_bpf_source(tmp_path):
+ invalid_memory_access_bpf = tmp_path / "invalid_memory_access_bpf.c"
+ invalid_memory_access_bpf.write_text(r"""
+ #include "vmlinux.h"
+ #include <bpf/bpf_helpers.h>
+ #include <bpf/bpf_tracing.h>
+
+ char LICENSE[] SEC("license") = "GPL";
+
+ SEC("tracepoint/syscalls/sys_enter_openat")
+ int bpf_prog(struct trace_event_raw_sys_enter *ctx) {
+ int arr[4] = {1, 2, 3, 4};
+
+ // Invalid memory access: out-of-bounds
+ int val = arr[5]; // This causes the verifier to fail
+
+ return val;
+ }
+ """)
+ return invalid_memory_access_bpf
+
+
[email protected]
+def invalid_memory_access_bpf_obj(invalid_memory_access_bpf_source,
tmp_config):
+ def _create_invalid_memory_access_bpf_obj(kernel_spec):
+ bpf_program = BPFProgram(source_path=invalid_memory_access_bpf_source)
+ bpf_program._compile_bpf(kernel_spec)
+ return bpf_program.bpf_obj
+
+ return _create_invalid_memory_access_bpf_obj
+
+
+def run_main_with_args_and_capture_output(args, capsys):
+ with mock.patch.object(sys, "argv", args):
+ try:
+ main.main()
+ except SystemExit as e:
+ result = capsys.readouterr()
+ output = result.out.rstrip()
+ error = result.err.rstrip()
+ logger.debug("STDOUT:\n%s", output)
+ logger.debug("STDERR:\n%s", error)
+ return (e.code, output, error)
+ except Exception as e:
+ pytest.fail(f"Unknown error happend: {e}")
+ else:
+ pytest.fail("Expected main to raise SystemExit")
+
+
+KERNEL_VERSION = "6.16"
+kernel_cli_flags = [["--kernel", KERNEL_VERSION]]
+
+
[email protected]("kernel_args", kernel_cli_flags)
+class TestCLI:
+ def test_main_with_valid_bpf(self, kernel_args, openat_bpf_source, capsys):
+ args = [
+ "main.py",
+ "vmtest",
+ *kernel_args,
+ "--rootfs",
+ "/",
+ "--bpf-src",
+ str(openat_bpf_source),
+ ]
+ code, output, _ = run_main_with_args_and_capture_output(args, capsys)
+ assert code == 0
+ assert "BPF programs successfully loaded" == output
+
+ def test_main_with_valid_bpf_obj(self, kernel_args, openat_bpf_obj,
capsys):
+ args = [
+ "main.py",
+ "vmtest",
+ *kernel_args,
+ "--rootfs",
+ "/",
+ "--bpf-obj",
+ str(openat_bpf_obj(kernel.KernelSpec(kernel_args[1]))),
+ ]
+ code, output, _ = run_main_with_args_and_capture_output(args, capsys)
+ assert code == 0
+ assert "BPF programs successfully loaded" == output
+
+ def test_main_with_invalid_bpf(
+ self, kernel_args, invalid_memory_access_bpf_source, capsys
+ ):
+ args = [
+ "main.py",
+ "vmtest",
+ *kernel_args,
+ "--rootfs",
+ "/",
+ "--bpf-src",
+ str(invalid_memory_access_bpf_source),
+ ]
+ code, output, _ = run_main_with_args_and_capture_output(args, capsys)
+ output_lines = output.splitlines()
+ assert code == 1
+ assert "BPF program failed to load" == output_lines[0]
+ assert "Verifier logs:" == output_lines[1]
+
+ def test_main_with_invalid_bpf_obj(
+ self, kernel_args, invalid_memory_access_bpf_obj, capsys
+ ):
+ args = [
+ "main.py",
+ "vmtest",
+ *kernel_args,
+ "--rootfs",
+ "/",
+ "--bpf-obj",
+
str(invalid_memory_access_bpf_obj(kernel.KernelSpec(kernel_args[1]))),
+ ]
+ code, output, _ = run_main_with_args_and_capture_output(args, capsys)
+ output_lines = output.splitlines()
+ assert code == 1
+ assert "BPF program failed to load" == output_lines[0]
+ assert "Verifier logs:" == output_lines[1]
+
+ def test_main_with_valid_command(self, kernel_args, capsys):
+ args = ["main.py", "vmtest", *kernel_args, "--rootfs", "/", "-c",
"uname -r"]
+ code, output, _ = run_main_with_args_and_capture_output(args, capsys)
+ assert code == 0
+ assert f"{kernel_args[1]}.0" in output
+
+ def test_main_with_invalid_command(self, kernel_args, capsys):
+ args = [
+ "main.py",
+ "vmtest",
+ *kernel_args,
+ "--rootfs",
+ "/",
+ "-c",
+ "NotImplemented",
+ ]
+ code, output, error = run_main_with_args_and_capture_output(args,
capsys)
+ assert code != 0
+ assert f"Command failed with exit code: {code}" in output
+
+ def test_bpf_compile_subcommand(
+ self, kernel_args, openat_bpf_source, tmp_path, capsys
+ ):
+ args = [
+ "main.py",
+ "bpf",
+ "compile",
+ *kernel_args,
+ "-o",
+ "",
+ str(openat_bpf_source),
+ ]
+ code, _, _ = run_main_with_args_and_capture_output(args, capsys)
+ assert code == 0
diff --git a/contrib/bpf-vmtest-tool/utils.py b/contrib/bpf-vmtest-tool/utils.py
new file mode 100644
index 00000000000..682840556f1
--- /dev/null
+++ b/contrib/bpf-vmtest-tool/utils.py
@@ -0,0 +1,31 @@
+import subprocess
+import logging
+from typing import Any
+
+logger = logging.getLogger(__name__)
+
+
+def run_command(cmd: list[str], stream_output: bool = False, **kwargs: Any):
+ cleaned_cmd = [str(item) for item in cmd if str(item).strip()]
+ capture_cmd_output = not stream_output
+ try:
+ logger.debug(f"running command: {cleaned_cmd}")
+ result = subprocess.run(
+ cleaned_cmd,
+ text=True,
+ check=True,
+ capture_output=capture_cmd_output,
+ shell=False,
+ **kwargs,
+ )
+ if capture_cmd_output:
+ logger.debug("Command stdout:\n" + result.stdout.strip())
+ if result.stderr:
+ logger.debug("Command stderr:\n" + result.stderr.strip())
+ return result
+ except subprocess.CalledProcessError as e:
+ logger.error(e)
+ if capture_cmd_output:
+ logger.error("Command failed with stdout:\n" + e.stdout.strip())
+ logger.error("Command failed with stderr:\n" + e.stderr.strip())
+ raise
diff --git a/contrib/bpf-vmtest-tool/vm.py b/contrib/bpf-vmtest-tool/vm.py
new file mode 100644
index 00000000000..ba3bdecf94b
--- /dev/null
+++ b/contrib/bpf-vmtest-tool/vm.py
@@ -0,0 +1,169 @@
+import logging
+import subprocess
+from typing import List
+
+from kernel import KernelImage
+
+logger = logging.getLogger(__name__)
+
+
+class VMConfig:
+ """Configuration container for VM settings"""
+
+ def __init__(
+ self, kernel_image: KernelImage, rootfs_path: str, command: str,
**kwargs
+ ):
+ self.kernel = kernel_image
+ self.kernel_path = str(kernel_image.path)
+ self.rootfs_path = rootfs_path
+ self.command = command
+ self.memory_mb = kwargs.get("memory_mb", 512)
+ self.cpu_count = kwargs.get("cpu_count", 1)
+ self.extra_args = kwargs.get("extra_args", {})
+
+
+def bpf_verifier_logs(output: str) -> str:
+ start_tag = "--- Verifier log start ---"
+ end_tag = "--- Verifier log end ---"
+
+ start_idx = output.find(start_tag)
+ end_idx = output.find(end_tag)
+
+ if start_idx != -1 and end_idx != -1:
+ # Extract between the tags (excluding the markers themselves)
+ log_body = output[start_idx + len(start_tag) : end_idx].strip()
+ return log_body
+ else:
+ return "No verifier log found in the output."
+
+
+class Vmtest:
+ """vmtest backend implementation"""
+
+ def __init__(self):
+ pass
+
+ def _boot_command(self, vm_config: VMConfig):
+ vmtest_command = ["vmtest"]
+ vmtest_command.extend(["-k", vm_config.kernel_path])
+ vmtest_command.extend(["-r", vm_config.rootfs_path])
+ vmtest_command.append(vm_config.command)
+ return vmtest_command
+
+ def _remove_boot_log(self, full_output: str) -> str:
+ """
+ Filters QEMU and kernel boot logs, returning only the output after the
+ `===> Running command` marker.
+ """
+ marker = "===> Running command"
+ lines = full_output.splitlines()
+
+ try:
+ start_index = next(i for i, line in enumerate(lines) if marker in
line)
+ # Return everything after that marker (excluding the marker itself)
+ return "\n".join(lines[start_index + 1 :]).strip()
+ except StopIteration:
+ return full_output.strip()
+
+ def run_command(self, vm_config):
+ vm = None
+ try:
+ logger.info(f"Booting VM with kernel: {vm_config.kernel_path}")
+ logger.info(f"Using rootfs: {vm_config.rootfs_path}")
+ vm = subprocess.run(
+ self._boot_command(vm_config),
+ check=True,
+ text=True,
+ capture_output=True,
+ shell=False,
+ )
+ vm_stdout = vm.stdout
+ logger.debug(vm_stdout)
+ return VMCommandResult(
+ vm.returncode, self._remove_boot_log(vm_stdout), None
+ )
+ except FileNotFoundError:
+ raise BootFailedError(
+ "vmtest command not found in PATH. Please ensure vmtest is
installed and available in your system PATH."
+ )
+ except subprocess.CalledProcessError as e:
+ out = e.stdout
+ err = e.stderr
+ # when the command in the vm fails we consider it as a successful
boot
+ if "===> Running command" not in out:
+ raise BootFailedError("Boot failed", out, err, e.returncode)
+ logger.debug("STDOUT: \n%s", out)
+ logger.debug("STDERR: \n%s", err)
+ return VMCommandResult(e.returncode, self._remove_boot_log(out),
err)
+
+
+class VMCommandResult:
+ def __init__(self, returncode, stdout, stderr) -> None:
+ self.returncode = returncode
+ self.stdout = stdout
+ self.stderr = stderr
+
+
+class VirtualMachine:
+ """Main VM class - simple interface for end users"""
+
+ # Registry of available hypervisors
+ _hypervisors = {
+ "vmtest": Vmtest,
+ }
+
+ def __init__(
+ self,
+ kernel_image: KernelImage,
+ rootfs_path: str,
+ command: str,
+ hypervisor_type: str = "vmtest",
+ **kwargs,
+ ):
+ self.config = VMConfig(kernel_image, rootfs_path, command, **kwargs)
+
+ if hypervisor_type not in self._hypervisors:
+ raise ValueError(f"Unsupported hypervisor: {hypervisor_type}")
+
+ self.hypervisor = self._hypervisors[hypervisor_type]()
+
+ @classmethod
+ def list_hypervisors(cls) -> List[str]:
+ """List available hypervisors"""
+ return list(cls._hypervisors.keys())
+
+ def execute(self):
+ """Execute command in VM"""
+ return self.hypervisor.run_command(self.config)
+
+
+class BootFailedError(Exception):
+ """Raised when VM fails to boot properly (before command execution)."""
+
+ def __init__(
+ self, message: str, stdout: str = "", stderr: str = "", returncode:
int = -1
+ ):
+ super().__init__(message)
+ self.stdout = stdout
+ self.stderr = stderr
+ self.returncode = returncode
+
+ def __str__(self):
+ base = super().__str__()
+
+ output_parts = [
+ base,
+ f"Return code: {self.returncode}",
+ ]
+
+ optional_sections = [
+ ("STDOUT", self.stdout),
+ ("STDERR", self.stderr),
+ ]
+
+ for header, content in optional_sections:
+ if content:
+ output_parts.append(f"--- {header} ---")
+ output_parts.append(content)
+
+ return "\n".join(output_parts)
--
2.52.0