Hi Jose, On Tue, 28 Oct 2025 at 20:59, Jose E. Marchesi <[email protected]> wrote: > > > Hello Piyush. > > Sorry for taking so long to review this. > > I think the tool is great and will help us a lot, not only to be able to > run BPF tests, but also to hack the compiler. Having an easy and stable > way of just running some particular built BPF program in some particular > kernel version is priceless. > > So, FWIW, I would say it is OK to install this tool under contrib > provided the points raised by David and by Hans Peter get addressed. > > It would be good to have it installed before Stage 0 ends, November 17.
Thanks for the feedback :) I’ll try to send the patch by next week so we have enough time for review before November 17 > > contrib/ChangeLog: > > > > * bpf-vmtest-tool/README: New file. > > * bpf-vmtest-tool/bpf.py: New file. > > * bpf-vmtest-tool/config.py: New file. > > * bpf-vmtest-tool/kernel.py: New file. > > * bpf-vmtest-tool/main.py: New file. > > * bpf-vmtest-tool/pyproject.toml: New file. > > * bpf-vmtest-tool/tests/test_cli.py: New file. > > * bpf-vmtest-tool/utils.py: New file. > > * bpf-vmtest-tool/vm.py: New file. > > > > Signed-off-by: Piyush Raj <[email protected]> > > --- > > contrib/bpf-vmtest-tool/README | 78 ++++++++ > > contrib/bpf-vmtest-tool/bpf.py | 199 ++++++++++++++++++++ > > contrib/bpf-vmtest-tool/config.py | 18 ++ > > contrib/bpf-vmtest-tool/kernel.py | 209 ++++++++++++++++++++++ > > contrib/bpf-vmtest-tool/main.py | 103 +++++++++++ > > contrib/bpf-vmtest-tool/pyproject.toml | 36 ++++ > > contrib/bpf-vmtest-tool/tests/test_cli.py | 167 +++++++++++++++++ > > contrib/bpf-vmtest-tool/utils.py | 27 +++ > > contrib/bpf-vmtest-tool/vm.py | 154 ++++++++++++++++ > > 9 files changed, 991 insertions(+) > > create mode 100644 contrib/bpf-vmtest-tool/README > > create mode 100644 contrib/bpf-vmtest-tool/bpf.py > > create mode 100644 contrib/bpf-vmtest-tool/config.py > > create mode 100644 contrib/bpf-vmtest-tool/kernel.py > > create mode 100644 contrib/bpf-vmtest-tool/main.py > > create mode 100644 contrib/bpf-vmtest-tool/pyproject.toml > > create mode 100644 contrib/bpf-vmtest-tool/tests/test_cli.py > > create mode 100644 contrib/bpf-vmtest-tool/utils.py > > create mode 100644 contrib/bpf-vmtest-tool/vm.py > > > > diff --git a/contrib/bpf-vmtest-tool/README b/contrib/bpf-vmtest-tool/README > > new file mode 100644 > > index 00000000000..599e3529aa8 > > --- /dev/null > > +++ b/contrib/bpf-vmtest-tool/README > > @@ -0,0 +1,78 @@ > > +This directory contains a Python script to run BPF programs or shell > > commands > > +under a live Linux kernel using QEMU virtual machines. > > + > > +USAGE > > +===== > > + > > +To run a shell command inside a live kernel VM: > > + > > + python main.py -k 6.15 -r / -c "uname -a" > > + > > +To run a BPF source file in the VM: > > + > > + python main.py -k 6.15 --bpf-src fail.c > > + > > +To run a precompiled BPF object file: > > + > > + python main.py -k 6.15 --bpf-obj fail.bpf.o > > + > > +The tool will download and build the specified kernel version from: > > + > > + https://www.kernel.org/pub/linux/kernel > > + > > +A prebuilt `bzImage` can be supplied using the `--kernel-image` flag. > > + > > +NOTE > > +==== > > +- Only x86_64 is supported > > +- Only "/" (the root filesystem) is currently supported as the VM rootfs > > when > > +running or testing BPF programs using `--bpf-src` or `--bpf-obj`. > > + > > +DEPENDENCIES > > +============ > > + > > +- Python >= 3.9 > > +- vmtest >= v0.18.0 (https://github.com/danobi/vmtest) > > + - QEMU > > + - qemu-guest-agent > > + > > +For compiling kernel > > +- https://docs.kernel.org/process/changes.html#current-minimal-requirements > > +For compiling and loading BPF programs: > > + > > +- libbpf > > +- bpftool > > +- gcc-bpf-unknown-none > > + (https://gcc.gnu.org/wiki/BPFBackEnd#Where_to_find_GCC_BPF) > > +- vmlinux.h > > + Can be generated using: > > + > > + bpftool btf dump file /sys/kernel/btf/vmlinux format c > \ > > + /usr/local/include/vmlinux.h > > + > > + Or downloaded from https://github.com/libbpf/vmlinux.h/tree/main > > + > > +BUILD FLAGS > > +=========== > > +You can customize compiler settings using environment variables: > > + > > +- BPF_CC: Compiler for the BPF program (default: > > bpf-unknown-none-gcc) > > +- BPF_CFLAGS: Extra flags for BPF program compilation (default: "-O2") > > +- BPF_INCLUDES: Include paths for BPF (default: -I/usr/local/include > > -I/usr/include) > > +- VMTEST_CC: Compiler for the user-space loader (default: gcc) > > +- VMTEST_CFLAGS: Flags for compiling the loader (default: -g -Wall) > > +- VMTEST_LDFLAGS: Linker flags for the loader (default: -lelf -lz -lbpf) > > + > > +Example usage: > > + > > + BPF_CFLAGS="-O3 -g" CFLAGS="-O2" python main.py -k 6.15 --bpf-src > > fail.c > > + > > +DEVELOPMENT > > +=========== > > + > > +Development dependencies are specified in `pyproject.toml`, which can be > > used > > +with any suitable Python virtual environment manager. > > + > > +To run the test suite: > > + > > + python3 -m pytest > > diff --git a/contrib/bpf-vmtest-tool/bpf.py b/contrib/bpf-vmtest-tool/bpf.py > > new file mode 100644 > > index 00000000000..6f8ec062974 > > --- /dev/null > > +++ b/contrib/bpf-vmtest-tool/bpf.py > > @@ -0,0 +1,199 @@ > > +import re > > +import subprocess > > +import logging > > +from pathlib import Path > > +import tempfile > > +from typing import Optional > > +import utils > > +from config import ( > > + BPF_CC, > > + BPF_CFLAGS, > > + BPF_INCLUDES, > > + VMTEST_LDFLAGS, > > + VMTEST_CC, > > + VMTEST_CFLAGS, > > + ARCH, > > +) > > +import os > > + > > +# Based on the compilation process described in: > > +# https://git.sr.ht/~brianwitte/gcc-bpf-example/tree/master/item/Makefile > > + > > +logger = logging.getLogger(__name__) > > + > > + > > +def generate_sanitized_name(path: Path): > > + """generate sanitized c variable name""" > > + name = re.sub(r"\W", "_", path.stem) > > + if name and name[0].isdigit(): > > + name = "_" + name > > + return name > > + > > + > > +class BPFProgram: > > + tmp_base_dir = tempfile.TemporaryDirectory(prefix="vmtest") > > + tmp_base_dir_path = Path(tmp_base_dir.name) > > + > > + def __init__( > > + self, > > + source_path: Optional[Path] = None, > > + bpf_bytecode_path: Optional[Path] = None, > > + use_temp_dir: bool = False, > > + ): > > + path = source_path or bpf_bytecode_path > > + self.name = generate_sanitized_name(path) > > + self.build_dir = self.__class__.tmp_base_dir_path / > > "ebpf_programs" / self.name > > + > > + if source_path: > > + self.bpf_src = source_path > > + self.bpf_obj = self.build_dir / f"{self.name}.bpf.o" > > + else: > > + self.bpf_obj = bpf_bytecode_path > > + self.build_dir.mkdir(parents=True, exist_ok=True) > > + self.bpf_skel = self.build_dir / f"{self.name}.skel.h" > > + self.loader_src = self.build_dir / f"{self.name}-loader.c" > > + self.output = self.build_dir / f"{self.name}.o" > > + > > + @classmethod > > + def from_source(cls, source_path: Path): > > + self = cls(source_path=source_path) > > + self._compile_bpf() > > + self._compile_from_bpf_bytecode() > > + return self.output > > + > > + @classmethod > > + def from_bpf_obj(cls, obj_path: Path): > > + self = cls(bpf_bytecode_path=obj_path) > > + self._compile_from_bpf_bytecode() > > + return self.output > > + > > + def _compile_from_bpf_bytecode(self): > > + self._generate_skeleton() > > + self._compile_loader() > > + > > + def _compile_bpf(self): > > + """Compile the eBPF program using gcc""" > > + logger.info(f"Compiling eBPF source: {self.bpf_src}") > > + cmd = [ > > + BPF_CC, > > + f"-D__TARGET_ARCH_{ARCH}", > > + "-gbtf", > > + "-std=gnu17"] > > + if BPF_CFLAGS != "": > > + cmd.extend(BPF_CFLAGS.split(" ")) > > + cmd.extend(BPF_INCLUDES.split()) > > + cmd.extend( > > + [ > > + "-c", > > + str(self.bpf_src), > > + "-o", > > + str(self.bpf_obj), > > + ] > > + ) > > + logger.debug("".join(cmd)) > > + utils.run_command(cmd) > > + logger.info(f"eBPF compiled: {self.bpf_obj}") > > + > > + def _generate_skeleton(self): > > + """Generate the BPF skeleton header using bpftool""" > > + logger.info(f"Generating skeleton: {self.bpf_skel}") > > + cmd = ["bpftool", "gen", "skeleton", str(self.bpf_obj), "name", > > self.name] > > + try: > > + result = utils.run_command(cmd) > > + with open(self.bpf_skel, "w") as f: > > + f.write(result.stdout) > > + logger.info("Skeleton generated.") > > + except subprocess.CalledProcessError: > > + logger.error("Failed to generate skeleton.") > > + raise > > + > > + def _compile_loader(self): > > + """Compile the C loader program""" > > + self.generate_loader() > > + logger.info(f"Compiling loader: {self.loader_src}") > > + cmd = [ > > + VMTEST_CC, > > + *VMTEST_CFLAGS.split(" "), > > + "-I", > > + str(self.build_dir), > > + str(self.loader_src), > > + *VMTEST_LDFLAGS.split(" "), > > + "-o", > > + str(self.output), > > + ] > > + # remove variables that conflict with host compiler > > + clean_env = os.environ.copy() > > + clean_env.pop("GCC_EXEC_PREFIX", None) > > + utils.run_command(cmd, env=clean_env) > > + logger.info("Compilation complete") > > + > > + def generate_loader(self): > > + """ > > + Generate a loader C file for the given BPF skeleton. > > + > > + Args: > > + bpf_name (str): Name of the BPF program (e.g. "prog"). > > + output_path (str): Path to write loader.c. > > + """ > > + skeleton_header = f"{self.name}.skel.h" > > + loader_code = f"""\ > > + #include <stdio.h> > > + #include <stdlib.h> > > + #include <signal.h> > > + #include <unistd.h> > > + #include <bpf/libbpf.h> > > + #include "{skeleton_header}" > > + > > + #define LOG_BUF_SIZE 1024 * 1024 > > + > > + static volatile sig_atomic_t stop; > > + static char log_buf[LOG_BUF_SIZE]; > > + > > + void handle_sigint(int sig) {{ > > + stop = 1; > > + }} > > + > > + int main() {{ > > + struct {self.name} *skel; > > + struct bpf_program *prog; > > + int err; > > + > > + signal(SIGINT, handle_sigint); > > + > > + skel = {self.name}__open(); // STEP 1: open only > > + if (!skel) {{ > > + fprintf(stderr, "Failed to open BPF skeleton\\n"); > > + return 1; > > + }} > > + > > + // STEP 2: Get the bpf_program object for the main program > > + bpf_object__for_each_program(prog, skel->obj) {{ > > + bpf_program__set_log_buf(prog, log_buf, sizeof(log_buf)); > > + bpf_program__set_log_level(prog, 1); // optional: verbose > > logs > > + }} > > + > > + // STEP 3: Load the program (this will trigger verifier log > > output) > > + err = {self.name}__load(skel); > > + fprintf( > > + stderr, > > + "--- Verifier log start ---\\n" > > + "%s\\n" > > + "--- Verifier log end ---\\n", > > + log_buf > > + ); > > + if (err) {{ > > + fprintf(stderr, "Failed to load BPF skeleton: %d\\n", err); > > + {self.name}__destroy(skel); > > + return 1; > > + }} > > + > > + printf("BPF program loaded successfully.\\n"); > > + > > + {self.name}__destroy(skel); > > + return 0; > > + }} > > + > > + """ > > + with open(self.loader_src, "w") as f: > > + f.write(loader_code) > > + logger.info(f"Generated loader at {self.loader_src}") > > diff --git a/contrib/bpf-vmtest-tool/config.py > > b/contrib/bpf-vmtest-tool/config.py > > new file mode 100644 > > index 00000000000..002af8a0a6e > > --- /dev/null > > +++ b/contrib/bpf-vmtest-tool/config.py > > @@ -0,0 +1,18 @@ > > +import platform > > +from pathlib import Path > > +import os > > + > > +KERNEL_TARBALL_PREFIX_URL = "https://cdn.kernel.org/pub/linux/kernel/" > > +BASE_DIR = Path.home() / ".bpf-vmtest-tool" > > +ARCH = platform.machine() > > +KCONFIG_REL_PATHS = [ > > + "tools/testing/selftests/bpf/config", > > + "tools/testing/selftests/bpf/config.vm", > > + f"tools/testing/selftests/bpf/config.{ARCH}", > > +] > > +VMTEST_CC = os.getenv("VMTEST_CC", "gcc") > > +VMTEST_CFLAGS = os.getenv("VMTEST_CFLAGS", "-g -Wall") > > +VMTEST_LDFLAGS = os.getenv("VMTEST_LDFLAGS", "-lelf -lz -lbpf") > > +BPF_CC = os.getenv("BPF_CC", "bpf-unknown-none-gcc") > > +BPF_CFLAGS = os.getenv("BPF_CFLAGS", "-O2") > > +BPF_INCLUDES = os.getenv("BPF_INCLUDES", "-I/usr/local/include > > -I/usr/include") > > diff --git a/contrib/bpf-vmtest-tool/kernel.py > > b/contrib/bpf-vmtest-tool/kernel.py > > new file mode 100644 > > index 00000000000..2974948130e > > --- /dev/null > > +++ b/contrib/bpf-vmtest-tool/kernel.py > > @@ -0,0 +1,209 @@ > > +import logging > > +import os > > +import shutil > > +import subprocess > > +from pathlib import Path > > +import re > > +from urllib.parse import urljoin > > +from urllib.request import urlretrieve > > +from typing import Optional, List > > +from dataclasses import dataclass > > + > > +from config import ARCH, BASE_DIR, KCONFIG_REL_PATHS, > > KERNEL_TARBALL_PREFIX_URL > > +import utils > > + > > +logger = logging.getLogger(__name__) > > +KERNELS_DIR = BASE_DIR / "kernels" > > + > > + > > +@dataclass > > +class KernelSpec: > > + """Immutable kernel specification""" > > + > > + version: str > > + arch: str = ARCH > > + > > + def __post_init__(self): > > + self.major = self.version.split(".")[0] > > + > > + def __str__(self): > > + return f"{self.version}-{self.arch}" > > + > > + @property > > + def bzimage_path(self) -> Path: > > + return KERNELS_DIR / f"bzImage-{self}" > > + > > + @property > > + def tarball_path(self) -> Path: > > + return KERNELS_DIR / f"linux-{self.version}.tar.xz" > > + > > + @property > > + def kernel_dir(self) -> Path: > > + return KERNELS_DIR / f"linux-{self.version}" > > + > > + > > +class KernelImage: > > + """Represents a compiled kernel image""" > > + > > + def __init__(self, path: Path): > > + if not isinstance(path, Path): > > + path = Path(path) > > + > > + if not path.exists(): > > + raise FileNotFoundError(f"Kernel image not found: {path}") > > + > > + self.path = path > > + > > + def __str__(self): > > + return str(self.path) > > + > > + > > +class KernelCompiler: > > + """Handles complete kernel compilation process including download and > > build""" > > + > > + def compile_from_version(self, spec: KernelSpec) -> KernelImage: > > + """Complete compilation process from kernel version""" > > + if spec.bzimage_path.exists(): > > + logger.info(f"Kernel {spec} already exists, skipping > > compilation") > > + return KernelImage(spec.bzimage_path) > > + > > + try: > > + self._download_source(spec) > > + self._extract_source(spec) > > + self._configure_kernel(spec) > > + self._compile_kernel(spec) > > + self._copy_bzimage(spec) > > + > > + logger.info(f"Successfully compiled kernel {spec}") > > + return KernelImage(spec.bzimage_path) > > + > > + except Exception as e: > > + logger.error(f"Failed to compile kernel {spec}: {e}") > > + raise > > + finally: > > + # Always cleanup temporary files > > + self._cleanup(spec) > > + > > + def _download_source(self, spec: KernelSpec) -> None: > > + """Download kernel source tarball""" > > + if spec.tarball_path.exists(): > > + logger.info(f"Tarball already exists: {spec.tarball_path}") > > + return > > + > > + url_suffix = f"v{spec.major}.x/linux-{spec.version}.tar.xz" > > + url = urljoin(KERNEL_TARBALL_PREFIX_URL, url_suffix) > > + > > + logger.info(f"Downloading kernel from {url}") > > + spec.tarball_path.parent.mkdir(parents=True, exist_ok=True) > > + urlretrieve(url, spec.tarball_path) > > + logger.info("Kernel source downloaded") > > + > > + def _extract_source(self, spec: KernelSpec) -> None: > > + """Extract kernel source tarball""" > > + logger.info(f"Extracting kernel source to {spec.kernel_dir}") > > + spec.kernel_dir.mkdir(parents=True, exist_ok=True) > > + > > + utils.run_command( > > + [ > > + "tar", > > + "-xf", > > + str(spec.tarball_path), > > + "-C", > > + str(spec.kernel_dir), > > + "--strip-components=1", > > + ] > > + ) > > + > > + def _configure_kernel(self, spec: KernelSpec) -> None: > > + """Configure kernel with provided config files""" > > + config_path = spec.kernel_dir / ".config" > > + > > + with open(config_path, "wb") as kconfig: > > + for config_rel_path in KCONFIG_REL_PATHS: > > + config_abs_path = spec.kernel_dir / config_rel_path > > + if config_abs_path.exists(): > > + with open(config_abs_path, "rb") as conf: > > + kconfig.write(conf.read()) > > + > > + logger.info("Updated kernel configuration") > > + > > + def _compile_kernel(self, spec: KernelSpec) -> None: > > + """Compile the kernel""" > > + logger.info(f"Compiling kernel in {spec.kernel_dir}") > > + old_cwd = os.getcwd() > > + > > + try: > > + os.chdir(spec.kernel_dir) > > + utils.run_command(["make", "olddefconfig"]) > > + utils.run_command(["make", f"-j{os.cpu_count()}", "bzImage"]) > > + except subprocess.CalledProcessError as e: > > + logger.error(f"Kernel compilation failed: {e}") > > + raise > > + finally: > > + os.chdir(old_cwd) > > + > > + def _copy_bzimage(self, spec: KernelSpec) -> None: > > + """Copy compiled bzImage to final location""" > > + src = spec.kernel_dir / "arch/x86/boot/bzImage" > > + dest = spec.bzimage_path > > + dest.parent.mkdir(parents=True, exist_ok=True) > > + > > + shutil.copy2(src, dest) > > + logger.info(f"Stored bzImage at {dest}") > > + > > + def _cleanup(self, spec: KernelSpec) -> None: > > + """Clean up temporary files""" > > + if spec.tarball_path.exists(): > > + spec.tarball_path.unlink() > > + logger.info("Removed tarball") > > + > > + if spec.kernel_dir.exists(): > > + shutil.rmtree(spec.kernel_dir) > > + logger.info("Removed kernel source directory") > > + > > + > > +class KernelManager: > > + """Main interface for kernel management""" > > + > > + def __init__(self): > > + self.compiler = KernelCompiler() > > + > > + def get_kernel_image( > > + self, > > + version: Optional[str] = None, > > + kernel_image_path: Optional[str] = None, > > + arch: str = ARCH, > > + ) -> KernelImage: > > + """Get kernel image from version or existing file""" > > + > > + # Validate inputs > > + if not version and not kernel_image_path: > > + raise ValueError("Must provide either 'version' or > > 'kernel_image_path'") > > + > > + if version and kernel_image_path: > > + raise ValueError("Provide only one of 'version' or > > 'kernel_image_path'") > > + > > + # Handle existing kernel image > > + if kernel_image_path: > > + path = Path(kernel_image_path) > > + if not path.exists(): > > + raise FileNotFoundError(f"Kernel image not found: > > {kernel_image_path}") > > + return KernelImage(path) > > + > > + # Handle version-based compilation > > + if version: > > + spec = KernelSpec(version=version, arch=arch) > > + return self.compiler.compile_from_version(spec) > > + > > + def list_available_kernels(self) -> List[str]: > > + """List all available compiled kernels""" > > + if not KERNELS_DIR.exists(): > > + return [] > > + > > + kernels = [] > > + for file in KERNELS_DIR.glob("bzImage-*"): > > + match = re.match(r"bzImage-(.*)", file.name) > > + if match: > > + kernels.append(match.group(1)) > > + > > + return sorted(kernels) > > diff --git a/contrib/bpf-vmtest-tool/main.py > > b/contrib/bpf-vmtest-tool/main.py > > new file mode 100644 > > index 00000000000..3125bba95c4 > > --- /dev/null > > +++ b/contrib/bpf-vmtest-tool/main.py > > @@ -0,0 +1,103 @@ > > +import argparse > > +import logging > > +from pathlib import Path > > +import textwrap > > + > > +import bpf > > +import kernel > > +import vm > > + > > + > > +def main(): > > + parser = argparse.ArgumentParser() > > + kernel_group = parser.add_mutually_exclusive_group(required=True) > > + kernel_group.add_argument( > > + "-k", > > + "--kernel", > > + help="Kernel version to boot in the vm", > > + metavar="VERSION", > > + type=str, > > + ) > > + kernel_group.add_argument( > > + "--kernel-image", > > + help="Kernel image to boot in the vm", > > + metavar="PATH", > > + type=str, > > + ) > > + parser.add_argument( > > + "-r", "--rootfs", help="rootfs to mount in the vm", default="/", > > metavar="PATH" > > + ) > > + parser.add_argument( > > + "-v", > > + "--log-level", > > + help="Log level", > > + metavar="DEBUG|INFO|WARNING|ERROR", > > + choices=["DEBUG", "INFO", "WARNING", "ERROR"], > > + default="ERROR", > > + ) > > + command_group = parser.add_mutually_exclusive_group(required=True) > > + command_group.add_argument( > > + "--bpf-src", > > + help="Path to BPF C source file", > > + metavar="PATH", > > + type=str, > > + ) > > + command_group.add_argument( > > + "--bpf-obj", > > + help="Path to bpf bytecode object", > > + metavar="PATH", > > + type=str, > > + ) > > + command_group.add_argument( > > + "-c", "--command", help="command to run in the vm", > > metavar="COMMAND" > > + ) > > + command_group.add_argument( > > + "-s", "--shell", help="open interactive shell in the vm", > > action="store_true" > > + ) > > + > > + args = parser.parse_args() > > + > > + logging.basicConfig(level=args.log_level) > > + logger = logging.getLogger(__name__) > > + kmanager = kernel.KernelManager() > > + > > + if args.kernel: > > + kernel_image = kmanager.get_kernel_image(version=args.kernel) > > + elif args.kernel_image: > > + kernel_image = > > kmanager.get_kernel_image(kernel_image_path=args.kernel_image) > > + > > + try: > > + if args.bpf_src: > > + command = bpf.BPFProgram.from_source(Path(args.bpf_src)) > > + elif args.bpf_obj: > > + command = bpf.BPFProgram.from_bpf_obj(Path(args.bpf_obj)) > > + elif args.command: > > + command = args.command > > + elif args.shell: > > + raise NotImplementedError("Shell mode is not yet implemented") > > + except Exception as e: > > + print(f"Failed to prepare command for vmtest: {e}") > > + exit(1) > > + > > + virtual_machine = vm.VirtualMachine(kernel_image, args.rootfs, > > str(command)) > > + try: > > + result = virtual_machine.execute() > > + except vm.BootFailedError as e: > > + logger.error("VM boot failure: execution aborted. See logs for > > details.") > > + print(e) > > + exit(e.returncode) > > + if args.bpf_src or args.bpf_obj: > > + if result.returncode == 0: > > + print("BPF programs succesfully loaded") > > + else: > > + if "Failed to load BPF skeleton" in result.stdout: > > + print("BPF program failed to load") > > + print("Verifier logs:") > > + print(textwrap.indent(vm.bpf_verifier_logs(result.stdout), > > "\t")) > > + elif args.command: > > + print(result.stdout) > > + exit(result.returncode) > > + > > + > > +if __name__ == "__main__": > > + main() > > diff --git a/contrib/bpf-vmtest-tool/pyproject.toml > > b/contrib/bpf-vmtest-tool/pyproject.toml > > new file mode 100644 > > index 00000000000..1977612cfd6 > > --- /dev/null > > +++ b/contrib/bpf-vmtest-tool/pyproject.toml > > @@ -0,0 +1,36 @@ > > +[project] > > +name = "bpf-vmtest-tool" > > +version = "0.1.0" > > +description = "Test BPF code against live kernels" > > +readme = "README.md" > > +requires-python = ">=3.9" > > +dependencies = [] > > + > > +[dependency-groups] > > +dev = [ > > + "pre-commit>=4.2.0", > > + "pytest>=8.4.0", > > + "pytest-sugar>=1.0.0", > > + "ruff>=0.11.13", > > + "tox>=4.26.0", > > +] > > + > > +[tool.pytest.ini_options] > > +addopts = [ > > + "--import-mode=importlib", > > +] > > +testpaths = ["tests"] > > +pythonpath = ["."] > > + > > +[tool.ruff.lint] > > +select = [ > > + # pycodestyle > > + "E", > > + # Pyflakes > > + "F", > > +] > > +# Allow fix for all enabled rules (when `--fix`) is provided. > > +fixable = ["ALL"] > > + > > +# Allow unused variables when underscore-prefixed. > > +dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$" > > \ No newline at end of file > > diff --git a/contrib/bpf-vmtest-tool/tests/test_cli.py > > b/contrib/bpf-vmtest-tool/tests/test_cli.py > > new file mode 100644 > > index 00000000000..53df3ca3663 > > --- /dev/null > > +++ b/contrib/bpf-vmtest-tool/tests/test_cli.py > > @@ -0,0 +1,167 @@ > > +import sys > > +from unittest import mock > > +import pytest > > +from bpf import BPFProgram > > +import kernel > > +import main > > +import logging > > + > > +logger = logging.getLogger(__name__) > > + > > + > > [email protected] > > +def openat_bpf_source(tmp_path): > > + openat_bpf = tmp_path / "openat_bpf.c" > > + openat_bpf.write_text(r""" > > + #include "vmlinux.h" > > + #include <bpf/bpf_helpers.h> > > + #include <bpf/bpf_tracing.h> > > + #include <bpf/bpf_core_read.h> > > + > > + char LICENSE[] SEC("license") = "GPL"; > > + > > + int example_pid = 0; > > + > > + SEC("tracepoint/syscalls/sys_enter_openat") > > + int handle_openat(struct trace_event_raw_sys_enter *ctx) > > + { > > + int pid = bpf_get_current_pid_tgid() >> 32; > > + char filename[256]; // filename buffer > > + bpf_probe_read_user(&filename, sizeof(filename), (void > > *)ctx->args[1]); > > + bpf_printk("sys_enter_openat() called from PID %d for file: %s\n", > > pid, > > + filename); > > + > > + return 0; > > + } > > + > > + """) > > + return openat_bpf > > + > > + > > [email protected] > > +def openat_bpf_obj(openat_bpf_source): > > + bpf_program = BPFProgram(source_path=openat_bpf_source) > > + bpf_program._compile_bpf() > > + return bpf_program.bpf_obj > > + > > + > > [email protected] > > +def invalid_memory_access_bpf_source(tmp_path): > > + invalid_memory_access_bpf = tmp_path / "invalid_memory_access_bpf.c" > > + invalid_memory_access_bpf.write_text(r""" > > + #include "vmlinux.h" > > + #include <bpf/bpf_helpers.h> > > + #include <bpf/bpf_tracing.h> > > + > > + char LICENSE[] SEC("license") = "GPL"; > > + > > + SEC("tracepoint/syscalls/sys_enter_openat") > > + int bpf_prog(struct trace_event_raw_sys_enter *ctx) { > > + int arr[4] = {1, 2, 3, 4}; > > + > > + // Invalid memory access: out-of-bounds > > + int val = arr[5]; // This causes the verifier to fail > > + > > + return val; > > + } > > + """) > > + return invalid_memory_access_bpf > > + > > + > > [email protected] > > +def invalid_memory_access_bpf_obj(invalid_memory_access_bpf_source): > > + bpf_program = BPFProgram(source_path=invalid_memory_access_bpf_source) > > + bpf_program._compile_bpf() > > + return bpf_program.bpf_obj > > + > > + > > +def run_main_with_args_and_capture_output(args, capsys): > > + with mock.patch.object(sys, "argv", args): > > + try: > > + main.main() > > + except SystemExit as e: > > + result = capsys.readouterr() > > + output = result.out.rstrip() > > + error = result.err.rstrip() > > + logger.debug("STDOUT:\n%s", output) > > + logger.debug("STDERR:\n%s", error) > > + return e.code, output, error > > + > > + > > +kernel_image_path = kernel.KernelManager().get_kernel_image(version="6.15") > > +kernel_cli_flags = [["--kernel", "6.15"], ["--kernel-image", > > f"{kernel_image_path}"]] > > + > > + > > [email protected]("kernel_args", kernel_cli_flags) > > +class TestCLI: > > + def test_main_with_valid_bpf(self, kernel_args, openat_bpf_source, > > capsys): > > + args = [ > > + "main.py", > > + *kernel_args, > > + "--rootfs", > > + "/", > > + "--bpf-src", > > + str(openat_bpf_source), > > + ] > > + code, output, _ = run_main_with_args_and_capture_output(args, > > capsys) > > + assert code == 0 > > + assert "BPF programs succesfully loaded" == output > > + > > + def test_main_with_valid_bpf_obj(self, kernel_args, openat_bpf_obj, > > capsys): > > + args = [ > > + "main.py", > > + *kernel_args, > > + "--rootfs", > > + "/", > > + "--bpf-obj", > > + str(openat_bpf_obj), > > + ] > > + code, output, _ = run_main_with_args_and_capture_output(args, > > capsys) > > + assert code == 0 > > + assert "BPF programs succesfully loaded" == output > > + > > + def test_main_with_invalid_bpf( > > + self, kernel_args, invalid_memory_access_bpf_source, capsys > > + ): > > + args = [ > > + "main.py", > > + *kernel_args, > > + "--rootfs", > > + "/", > > + "--bpf-src", > > + str(invalid_memory_access_bpf_source), > > + ] > > + code, output, _ = run_main_with_args_and_capture_output(args, > > capsys) > > + output_lines = output.splitlines() > > + assert code == 1 > > + assert "BPF program failed to load" == output_lines[0] > > + assert "Verifier logs:" == output_lines[1] > > + > > + def test_main_with_invalid_bpf_obj( > > + self, kernel_args, invalid_memory_access_bpf_obj, capsys > > + ): > > + args = [ > > + "main.py", > > + *kernel_args, > > + "--rootfs", > > + "/", > > + "--bpf-obj", > > + str(invalid_memory_access_bpf_obj), > > + ] > > + code, output, _ = run_main_with_args_and_capture_output(args, > > capsys) > > + output_lines = output.splitlines() > > + assert code == 1 > > + assert "BPF program failed to load" == output_lines[0] > > + assert "Verifier logs:" == output_lines[1] > > + > > + def test_main_with_valid_command(self, kernel_args, capsys): > > + args = ["main.py", *kernel_args, "--rootfs", "/", "-c", "uname -r"] > > + code, output, error = run_main_with_args_and_capture_output(args, > > capsys) > > + assert code == 0 > > + assert "6.15.0" == output > > + > > + def test_main_with_invalid_command(self, kernel_args, capsys): > > + args = ["main.py", *kernel_args, "--rootfs", "/", "-c", > > "NotImplemented"] > > + code, output, error = run_main_with_args_and_capture_output(args, > > capsys) > > + assert code != 0 > > + assert f"Command failed with exit code: {code}" in output > > diff --git a/contrib/bpf-vmtest-tool/utils.py > > b/contrib/bpf-vmtest-tool/utils.py > > new file mode 100644 > > index 00000000000..8866c9559f4 > > --- /dev/null > > +++ b/contrib/bpf-vmtest-tool/utils.py > > @@ -0,0 +1,27 @@ > > +import subprocess > > +import logging > > +from typing import Any > > + > > +logger = logging.getLogger(__name__) > > + > > + > > +def run_command(cmd: list[str], **kwargs: Any): > > + try: > > + logger.debug(f"running command: {cmd}") > > + result = subprocess.run( > > + cmd, > > + text=True, > > + check=True, > > + capture_output=True, > > + shell=False, > > + **kwargs, > > + ) > > + logger.debug("Command stdout:\n" + result.stdout.strip()) > > + if result.stderr: > > + logger.debug("Command stderr:\n" + result.stderr.strip()) > > + return result > > + except subprocess.CalledProcessError as e: > > + logger.error(e) > > + logger.error("Command failed with stdout:\n" + e.stdout.strip()) > > + logger.error("Command failed with stderr:\n" + e.stderr.strip()) > > + raise > > diff --git a/contrib/bpf-vmtest-tool/vm.py b/contrib/bpf-vmtest-tool/vm.py > > new file mode 100644 > > index 00000000000..5d4a5747f0b > > --- /dev/null > > +++ b/contrib/bpf-vmtest-tool/vm.py > > @@ -0,0 +1,154 @@ > > +import logging > > +import subprocess > > +from typing import List > > + > > +from kernel import KernelImage > > + > > +logger = logging.getLogger(__name__) > > + > > + > > +class VMConfig: > > + """Configuration container for VM settings""" > > + > > + def __init__( > > + self, kernel_image: KernelImage, rootfs_path: str, command: str, > > **kwargs > > + ): > > + self.kernel = kernel_image > > + self.kernel_path = str(kernel_image.path) > > + self.rootfs_path = rootfs_path > > + self.command = command > > + self.memory_mb = kwargs.get("memory_mb", 512) > > + self.cpu_count = kwargs.get("cpu_count", 1) > > + self.extra_args = kwargs.get("extra_args", {}) > > + > > + > > +def bpf_verifier_logs(output: str) -> str: > > + start_tag = "--- Verifier log start ---" > > + end_tag = "--- Verifier log end ---" > > + > > + start_idx = output.find(start_tag) > > + end_idx = output.find(end_tag) > > + > > + if start_idx != -1 and end_idx != -1: > > + # Extract between the tags (excluding the markers themselves) > > + log_body = output[start_idx + len(start_tag) : end_idx].strip() > > + return log_body > > + else: > > + return "No verifier log found in the output." > > + > > + > > +class Vmtest: > > + """vmtest backend implementation""" > > + > > + def __init__(self): > > + pass > > + > > + def _boot_command(self, vm_config: VMConfig): > > + vmtest_command = ["vmtest"] > > + vmtest_command.extend(["-k", vm_config.kernel_path]) > > + vmtest_command.extend(["-r", vm_config.rootfs_path]) > > + vmtest_command.append(vm_config.command) > > + return vmtest_command > > + > > + def _remove_boot_log(self, full_output: str) -> str: > > + """ > > + Filters QEMU and kernel boot logs, returning only the output after > > the > > + `===> Running command` marker. > > + """ > > + marker = "===> Running command" > > + lines = full_output.splitlines() > > + > > + try: > > + start_index = next(i for i, line in enumerate(lines) if marker > > in line) > > + # Return everything after that marker (excluding the marker > > itself) > > + return "\n".join(lines[start_index + 1 :]).strip() > > + except StopIteration: > > + return full_output.strip() > > + > > + def run_command(self, vm_config): > > + vm = None > > + try: > > + logger.info(f"Booting VM with kernel: {vm_config.kernel_path}") > > + logger.info(f"Using rootfs: {vm_config.rootfs_path}") > > + vm = subprocess.run( > > + self._boot_command(vm_config), > > + check=True, > > + text=True, > > + capture_output=True, > > + shell=False, > > + ) > > + vm_stdout = vm.stdout > > + logger.debug(vm_stdout) > > + return VMCommandResult( > > + vm.returncode, self._remove_boot_log(vm_stdout), None > > + ) > > + except subprocess.CalledProcessError as e: > > + out = e.stdout > > + err = e.stderr > > + # when the command in the vm fails we consider it as a > > succesfull boot > > + if "===> Running command" not in out: > > + raise BootFailedError("Boot failed", out, err, > > e.returncode) > > + logger.debug("STDOUT: \n%s", out) > > + logger.debug("STDERR: \n%s", err) > > + return VMCommandResult(e.returncode, > > self._remove_boot_log(out), err) > > + > > + > > +class VMCommandResult: > > + def __init__(self, returncode, stdout, stderr) -> None: > > + self.returncode = returncode > > + self.stdout = stdout > > + self.stderr = stderr > > + > > + > > +class VirtualMachine: > > + """Main VM class - simple interface for end users""" > > + > > + # Registry of available hypervisors > > + _hypervisors = { > > + "vmtest": Vmtest, > > + } > > + > > + def __init__( > > + self, > > + kernel_image: KernelImage, > > + rootfs_path: str, > > + command: str, > > + hypervisor_type: str = "vmtest", > > + **kwargs, > > + ): > > + self.config = VMConfig(kernel_image, rootfs_path, command, > > **kwargs) > > + > > + if hypervisor_type not in self._hypervisors: > > + raise ValueError(f"Unsupported hypervisor: {hypervisor_type}") > > + > > + self.hypervisor = self._hypervisors[hypervisor_type]() > > + > > + @classmethod > > + def list_hypervisors(cls) -> List[str]: > > + """List available hypervisors""" > > + return list(cls._hypervisors.keys()) > > + > > + def execute(self): > > + """Execute command in VM""" > > + return self.hypervisor.run_command(self.config) > > + > > + > > +class BootFailedError(Exception): > > + """Raised when VM fails to boot properly (before command execution).""" > > + > > + def __init__( > > + self, message: str, stdout: str = "", stderr: str = "", > > returncode: int = -1 > > + ): > > + super().__init__(message) > > + self.stdout = stdout > > + self.stderr = stderr > > + self.returncode = returncode > > + > > + def __str__(self): > > + base = super().__str__() > > + return ( > > + f"{base}\n" > > + f"Return code: {self.returncode}\n" > > + f"--- STDOUT ---\n{self.stdout}\n" > > + f"--- STDERR ---\n{self.stderr}" > > + )
