Hi Jose,

On Tue, 28 Oct 2025 at 20:59, Jose E. Marchesi <[email protected]> wrote:
>
>
> Hello Piyush.
>
> Sorry for taking so long to review this.
>
> I think the tool is great and will help us a lot, not only to be able to
> run BPF tests, but also to hack the compiler.  Having an easy and stable
> way of just running some particular built BPF program in some particular
> kernel version is priceless.
>
> So, FWIW, I would say it is OK to install this tool under contrib
> provided the points raised by David and by Hans Peter get addressed.
>
> It would be good to have it installed before Stage 0 ends, November 17.

Thanks for the feedback :)
I’ll try to send the patch by next week so we have enough time for
review before November 17


> > contrib/ChangeLog:
> >
> >       * bpf-vmtest-tool/README: New file.
> >       * bpf-vmtest-tool/bpf.py: New file.
> >       * bpf-vmtest-tool/config.py: New file.
> >       * bpf-vmtest-tool/kernel.py: New file.
> >       * bpf-vmtest-tool/main.py: New file.
> >       * bpf-vmtest-tool/pyproject.toml: New file.
> >       * bpf-vmtest-tool/tests/test_cli.py: New file.
> >       * bpf-vmtest-tool/utils.py: New file.
> >       * bpf-vmtest-tool/vm.py: New file.
> >
> > Signed-off-by: Piyush Raj <[email protected]>
> > ---
> >  contrib/bpf-vmtest-tool/README            |  78 ++++++++
> >  contrib/bpf-vmtest-tool/bpf.py            | 199 ++++++++++++++++++++
> >  contrib/bpf-vmtest-tool/config.py         |  18 ++
> >  contrib/bpf-vmtest-tool/kernel.py         | 209 ++++++++++++++++++++++
> >  contrib/bpf-vmtest-tool/main.py           | 103 +++++++++++
> >  contrib/bpf-vmtest-tool/pyproject.toml    |  36 ++++
> >  contrib/bpf-vmtest-tool/tests/test_cli.py | 167 +++++++++++++++++
> >  contrib/bpf-vmtest-tool/utils.py          |  27 +++
> >  contrib/bpf-vmtest-tool/vm.py             | 154 ++++++++++++++++
> >  9 files changed, 991 insertions(+)
> >  create mode 100644 contrib/bpf-vmtest-tool/README
> >  create mode 100644 contrib/bpf-vmtest-tool/bpf.py
> >  create mode 100644 contrib/bpf-vmtest-tool/config.py
> >  create mode 100644 contrib/bpf-vmtest-tool/kernel.py
> >  create mode 100644 contrib/bpf-vmtest-tool/main.py
> >  create mode 100644 contrib/bpf-vmtest-tool/pyproject.toml
> >  create mode 100644 contrib/bpf-vmtest-tool/tests/test_cli.py
> >  create mode 100644 contrib/bpf-vmtest-tool/utils.py
> >  create mode 100644 contrib/bpf-vmtest-tool/vm.py
> >
> > diff --git a/contrib/bpf-vmtest-tool/README b/contrib/bpf-vmtest-tool/README
> > new file mode 100644
> > index 00000000000..599e3529aa8
> > --- /dev/null
> > +++ b/contrib/bpf-vmtest-tool/README
> > @@ -0,0 +1,78 @@
> > +This directory contains a Python script to run BPF programs or shell 
> > commands
> > +under a live Linux kernel using QEMU virtual machines.
> > +
> > +USAGE
> > +=====
> > +
> > +To run a shell command inside a live kernel VM:
> > +
> > +    python main.py -k 6.15 -r / -c "uname -a"
> > +
> > +To run a BPF source file in the VM:
> > +
> > +    python main.py -k 6.15  --bpf-src fail.c
> > +
> > +To run a precompiled BPF object file:
> > +
> > +    python main.py -k 6.15 --bpf-obj fail.bpf.o
> > +
> > +The tool will download and build the specified kernel version from:
> > +
> > +    https://www.kernel.org/pub/linux/kernel
> > +
> > +A prebuilt `bzImage` can be supplied using the `--kernel-image` flag.
> > +
> > +NOTE
> > +====
> > +- Only x86_64 is supported
> > +- Only "/" (the root filesystem) is currently supported as the VM rootfs 
> > when
> > +running or testing BPF programs using `--bpf-src` or `--bpf-obj`.
> > +
> > +DEPENDENCIES
> > +============
> > +
> > +- Python >= 3.9
> > +- vmtest >= v0.18.0 (https://github.com/danobi/vmtest)
> > +    - QEMU
> > +    - qemu-guest-agent
> > +
> > +For compiling kernel
> > +- https://docs.kernel.org/process/changes.html#current-minimal-requirements
> > +For compiling and loading BPF programs:
> > +
> > +- libbpf
> > +- bpftool
> > +- gcc-bpf-unknown-none
> > +     (https://gcc.gnu.org/wiki/BPFBackEnd#Where_to_find_GCC_BPF)
> > +- vmlinux.h
> > +    Can be generated using:
> > +
> > +    bpftool btf dump file /sys/kernel/btf/vmlinux format c > \
> > +    /usr/local/include/vmlinux.h
> > +
> > +    Or downloaded from https://github.com/libbpf/vmlinux.h/tree/main
> > +
> > +BUILD FLAGS
> > +===========
> > +You can customize compiler settings using environment variables:
> > +
> > +- BPF_CC:          Compiler for the BPF program (default: 
> > bpf-unknown-none-gcc)
> > +- BPF_CFLAGS:      Extra flags for BPF program compilation (default: "-O2")
> > +- BPF_INCLUDES:    Include paths for BPF (default: -I/usr/local/include 
> > -I/usr/include)
> > +- VMTEST_CC:       Compiler for the user-space loader (default: gcc)
> > +- VMTEST_CFLAGS:   Flags for compiling the loader (default: -g -Wall)
> > +- VMTEST_LDFLAGS:  Linker flags for the loader (default: -lelf -lz -lbpf)
> > +
> > +Example usage:
> > +
> > +    BPF_CFLAGS="-O3 -g" CFLAGS="-O2" python main.py -k 6.15  --bpf-src 
> > fail.c
> > +
> > +DEVELOPMENT
> > +===========
> > +
> > +Development dependencies are specified in `pyproject.toml`, which can be 
> > used
> > +with any suitable Python virtual environment manager.
> > +
> > +To run the test suite:
> > +
> > +    python3 -m pytest
> > diff --git a/contrib/bpf-vmtest-tool/bpf.py b/contrib/bpf-vmtest-tool/bpf.py
> > new file mode 100644
> > index 00000000000..6f8ec062974
> > --- /dev/null
> > +++ b/contrib/bpf-vmtest-tool/bpf.py
> > @@ -0,0 +1,199 @@
> > +import re
> > +import subprocess
> > +import logging
> > +from pathlib import Path
> > +import tempfile
> > +from typing import Optional
> > +import utils
> > +from config import (
> > +    BPF_CC,
> > +    BPF_CFLAGS,
> > +    BPF_INCLUDES,
> > +    VMTEST_LDFLAGS,
> > +    VMTEST_CC,
> > +    VMTEST_CFLAGS,
> > +    ARCH,
> > +)
> > +import os
> > +
> > +# Based on the compilation process described in:
> > +# https://git.sr.ht/~brianwitte/gcc-bpf-example/tree/master/item/Makefile
> > +
> > +logger = logging.getLogger(__name__)
> > +
> > +
> > +def generate_sanitized_name(path: Path):
> > +    """generate sanitized c variable name"""
> > +    name = re.sub(r"\W", "_", path.stem)
> > +    if name and name[0].isdigit():
> > +        name = "_" + name
> > +    return name
> > +
> > +
> > +class BPFProgram:
> > +    tmp_base_dir = tempfile.TemporaryDirectory(prefix="vmtest")
> > +    tmp_base_dir_path = Path(tmp_base_dir.name)
> > +
> > +    def __init__(
> > +        self,
> > +        source_path: Optional[Path] = None,
> > +        bpf_bytecode_path: Optional[Path] = None,
> > +        use_temp_dir: bool = False,
> > +    ):
> > +        path = source_path or bpf_bytecode_path
> > +        self.name = generate_sanitized_name(path)
> > +        self.build_dir = self.__class__.tmp_base_dir_path / 
> > "ebpf_programs" / self.name
> > +
> > +        if source_path:
> > +            self.bpf_src = source_path
> > +            self.bpf_obj = self.build_dir / f"{self.name}.bpf.o"
> > +        else:
> > +            self.bpf_obj = bpf_bytecode_path
> > +        self.build_dir.mkdir(parents=True, exist_ok=True)
> > +        self.bpf_skel = self.build_dir / f"{self.name}.skel.h"
> > +        self.loader_src = self.build_dir / f"{self.name}-loader.c"
> > +        self.output = self.build_dir / f"{self.name}.o"
> > +
> > +    @classmethod
> > +    def from_source(cls, source_path: Path):
> > +        self = cls(source_path=source_path)
> > +        self._compile_bpf()
> > +        self._compile_from_bpf_bytecode()
> > +        return self.output
> > +
> > +    @classmethod
> > +    def from_bpf_obj(cls, obj_path: Path):
> > +        self = cls(bpf_bytecode_path=obj_path)
> > +        self._compile_from_bpf_bytecode()
> > +        return self.output
> > +
> > +    def _compile_from_bpf_bytecode(self):
> > +        self._generate_skeleton()
> > +        self._compile_loader()
> > +
> > +    def _compile_bpf(self):
> > +        """Compile the eBPF program using gcc"""
> > +        logger.info(f"Compiling eBPF source: {self.bpf_src}")
> > +        cmd = [
> > +            BPF_CC,
> > +            f"-D__TARGET_ARCH_{ARCH}",
> > +            "-gbtf",
> > +            "-std=gnu17"]
> > +        if BPF_CFLAGS != "":
> > +            cmd.extend(BPF_CFLAGS.split(" "))
> > +        cmd.extend(BPF_INCLUDES.split())
> > +        cmd.extend(
> > +            [
> > +                "-c",
> > +                str(self.bpf_src),
> > +                "-o",
> > +                str(self.bpf_obj),
> > +            ]
> > +        )
> > +        logger.debug("".join(cmd))
> > +        utils.run_command(cmd)
> > +        logger.info(f"eBPF compiled: {self.bpf_obj}")
> > +
> > +    def _generate_skeleton(self):
> > +        """Generate the BPF skeleton header using bpftool"""
> > +        logger.info(f"Generating skeleton: {self.bpf_skel}")
> > +        cmd = ["bpftool", "gen", "skeleton", str(self.bpf_obj), "name", 
> > self.name]
> > +        try:
> > +            result = utils.run_command(cmd)
> > +            with open(self.bpf_skel, "w") as f:
> > +                f.write(result.stdout)
> > +            logger.info("Skeleton generated.")
> > +        except subprocess.CalledProcessError:
> > +            logger.error("Failed to generate skeleton.")
> > +            raise
> > +
> > +    def _compile_loader(self):
> > +        """Compile the C loader program"""
> > +        self.generate_loader()
> > +        logger.info(f"Compiling loader: {self.loader_src}")
> > +        cmd = [
> > +            VMTEST_CC,
> > +            *VMTEST_CFLAGS.split(" "),
> > +            "-I",
> > +            str(self.build_dir),
> > +            str(self.loader_src),
> > +            *VMTEST_LDFLAGS.split(" "),
> > +            "-o",
> > +            str(self.output),
> > +        ]
> > +        # remove variables that conflict with host compiler
> > +        clean_env = os.environ.copy()
> > +        clean_env.pop("GCC_EXEC_PREFIX", None)
> > +        utils.run_command(cmd, env=clean_env)
> > +        logger.info("Compilation complete")
> > +
> > +    def generate_loader(self):
> > +        """
> > +        Generate a loader C file for the given BPF skeleton.
> > +
> > +        Args:
> > +            bpf_name (str): Name of the BPF program (e.g. "prog").
> > +            output_path (str): Path to write loader.c.
> > +        """
> > +        skeleton_header = f"{self.name}.skel.h"
> > +        loader_code = f"""\
> > +        #include <stdio.h>
> > +        #include <stdlib.h>
> > +        #include <signal.h>
> > +        #include <unistd.h>
> > +        #include <bpf/libbpf.h>
> > +        #include "{skeleton_header}"
> > +
> > +        #define LOG_BUF_SIZE 1024 * 1024
> > +
> > +        static volatile sig_atomic_t stop;
> > +        static char log_buf[LOG_BUF_SIZE];
> > +
> > +        void handle_sigint(int sig) {{
> > +            stop = 1;
> > +        }}
> > +
> > +        int main() {{
> > +            struct {self.name} *skel;
> > +            struct bpf_program *prog;
> > +            int err;
> > +
> > +            signal(SIGINT, handle_sigint);
> > +
> > +            skel = {self.name}__open();  // STEP 1: open only
> > +            if (!skel) {{
> > +                fprintf(stderr, "Failed to open BPF skeleton\\n");
> > +                return 1;
> > +            }}
> > +
> > +            // STEP 2: Get the bpf_program object for the main program
> > +            bpf_object__for_each_program(prog, skel->obj) {{
> > +                bpf_program__set_log_buf(prog, log_buf, sizeof(log_buf));
> > +                bpf_program__set_log_level(prog, 1); // optional: verbose 
> > logs
> > +            }}
> > +
> > +            // STEP 3: Load the program (this will trigger verifier log 
> > output)
> > +            err = {self.name}__load(skel);
> > +            fprintf(
> > +             stderr,
> > +             "--- Verifier log start ---\\n"
> > +             "%s\\n"
> > +             "--- Verifier log end ---\\n",
> > +             log_buf
> > +             );
> > +            if (err) {{
> > +                fprintf(stderr, "Failed to load BPF skeleton: %d\\n", err);
> > +                {self.name}__destroy(skel);
> > +                return 1;
> > +            }}
> > +
> > +            printf("BPF program loaded successfully.\\n");
> > +
> > +            {self.name}__destroy(skel);
> > +            return 0;
> > +        }}
> > +
> > +        """
> > +        with open(self.loader_src, "w") as f:
> > +            f.write(loader_code)
> > +        logger.info(f"Generated loader at {self.loader_src}")
> > diff --git a/contrib/bpf-vmtest-tool/config.py 
> > b/contrib/bpf-vmtest-tool/config.py
> > new file mode 100644
> > index 00000000000..002af8a0a6e
> > --- /dev/null
> > +++ b/contrib/bpf-vmtest-tool/config.py
> > @@ -0,0 +1,18 @@
> > +import platform
> > +from pathlib import Path
> > +import os
> > +
> > +KERNEL_TARBALL_PREFIX_URL = "https://cdn.kernel.org/pub/linux/kernel/";
> > +BASE_DIR = Path.home() / ".bpf-vmtest-tool"
> > +ARCH = platform.machine()
> > +KCONFIG_REL_PATHS = [
> > +    "tools/testing/selftests/bpf/config",
> > +    "tools/testing/selftests/bpf/config.vm",
> > +    f"tools/testing/selftests/bpf/config.{ARCH}",
> > +]
> > +VMTEST_CC = os.getenv("VMTEST_CC", "gcc")
> > +VMTEST_CFLAGS = os.getenv("VMTEST_CFLAGS", "-g -Wall")
> > +VMTEST_LDFLAGS = os.getenv("VMTEST_LDFLAGS", "-lelf -lz -lbpf")
> > +BPF_CC = os.getenv("BPF_CC", "bpf-unknown-none-gcc")
> > +BPF_CFLAGS = os.getenv("BPF_CFLAGS", "-O2")
> > +BPF_INCLUDES = os.getenv("BPF_INCLUDES", "-I/usr/local/include 
> > -I/usr/include")
> > diff --git a/contrib/bpf-vmtest-tool/kernel.py 
> > b/contrib/bpf-vmtest-tool/kernel.py
> > new file mode 100644
> > index 00000000000..2974948130e
> > --- /dev/null
> > +++ b/contrib/bpf-vmtest-tool/kernel.py
> > @@ -0,0 +1,209 @@
> > +import logging
> > +import os
> > +import shutil
> > +import subprocess
> > +from pathlib import Path
> > +import re
> > +from urllib.parse import urljoin
> > +from urllib.request import urlretrieve
> > +from typing import Optional, List
> > +from dataclasses import dataclass
> > +
> > +from config import ARCH, BASE_DIR, KCONFIG_REL_PATHS, 
> > KERNEL_TARBALL_PREFIX_URL
> > +import utils
> > +
> > +logger = logging.getLogger(__name__)
> > +KERNELS_DIR = BASE_DIR / "kernels"
> > +
> > +
> > +@dataclass
> > +class KernelSpec:
> > +    """Immutable kernel specification"""
> > +
> > +    version: str
> > +    arch: str = ARCH
> > +
> > +    def __post_init__(self):
> > +        self.major = self.version.split(".")[0]
> > +
> > +    def __str__(self):
> > +        return f"{self.version}-{self.arch}"
> > +
> > +    @property
> > +    def bzimage_path(self) -> Path:
> > +        return KERNELS_DIR / f"bzImage-{self}"
> > +
> > +    @property
> > +    def tarball_path(self) -> Path:
> > +        return KERNELS_DIR / f"linux-{self.version}.tar.xz"
> > +
> > +    @property
> > +    def kernel_dir(self) -> Path:
> > +        return KERNELS_DIR / f"linux-{self.version}"
> > +
> > +
> > +class KernelImage:
> > +    """Represents a compiled kernel image"""
> > +
> > +    def __init__(self, path: Path):
> > +        if not isinstance(path, Path):
> > +            path = Path(path)
> > +
> > +        if not path.exists():
> > +            raise FileNotFoundError(f"Kernel image not found: {path}")
> > +
> > +        self.path = path
> > +
> > +    def __str__(self):
> > +        return str(self.path)
> > +
> > +
> > +class KernelCompiler:
> > +    """Handles complete kernel compilation process including download and 
> > build"""
> > +
> > +    def compile_from_version(self, spec: KernelSpec) -> KernelImage:
> > +        """Complete compilation process from kernel version"""
> > +        if spec.bzimage_path.exists():
> > +            logger.info(f"Kernel {spec} already exists, skipping 
> > compilation")
> > +            return KernelImage(spec.bzimage_path)
> > +
> > +        try:
> > +            self._download_source(spec)
> > +            self._extract_source(spec)
> > +            self._configure_kernel(spec)
> > +            self._compile_kernel(spec)
> > +            self._copy_bzimage(spec)
> > +
> > +            logger.info(f"Successfully compiled kernel {spec}")
> > +            return KernelImage(spec.bzimage_path)
> > +
> > +        except Exception as e:
> > +            logger.error(f"Failed to compile kernel {spec}: {e}")
> > +            raise
> > +        finally:
> > +            # Always cleanup temporary files
> > +            self._cleanup(spec)
> > +
> > +    def _download_source(self, spec: KernelSpec) -> None:
> > +        """Download kernel source tarball"""
> > +        if spec.tarball_path.exists():
> > +            logger.info(f"Tarball already exists: {spec.tarball_path}")
> > +            return
> > +
> > +        url_suffix = f"v{spec.major}.x/linux-{spec.version}.tar.xz"
> > +        url = urljoin(KERNEL_TARBALL_PREFIX_URL, url_suffix)
> > +
> > +        logger.info(f"Downloading kernel from {url}")
> > +        spec.tarball_path.parent.mkdir(parents=True, exist_ok=True)
> > +        urlretrieve(url, spec.tarball_path)
> > +        logger.info("Kernel source downloaded")
> > +
> > +    def _extract_source(self, spec: KernelSpec) -> None:
> > +        """Extract kernel source tarball"""
> > +        logger.info(f"Extracting kernel source to {spec.kernel_dir}")
> > +        spec.kernel_dir.mkdir(parents=True, exist_ok=True)
> > +
> > +        utils.run_command(
> > +            [
> > +                "tar",
> > +                "-xf",
> > +                str(spec.tarball_path),
> > +                "-C",
> > +                str(spec.kernel_dir),
> > +                "--strip-components=1",
> > +            ]
> > +        )
> > +
> > +    def _configure_kernel(self, spec: KernelSpec) -> None:
> > +        """Configure kernel with provided config files"""
> > +        config_path = spec.kernel_dir / ".config"
> > +
> > +        with open(config_path, "wb") as kconfig:
> > +            for config_rel_path in KCONFIG_REL_PATHS:
> > +                config_abs_path = spec.kernel_dir / config_rel_path
> > +                if config_abs_path.exists():
> > +                    with open(config_abs_path, "rb") as conf:
> > +                        kconfig.write(conf.read())
> > +
> > +        logger.info("Updated kernel configuration")
> > +
> > +    def _compile_kernel(self, spec: KernelSpec) -> None:
> > +        """Compile the kernel"""
> > +        logger.info(f"Compiling kernel in {spec.kernel_dir}")
> > +        old_cwd = os.getcwd()
> > +
> > +        try:
> > +            os.chdir(spec.kernel_dir)
> > +            utils.run_command(["make", "olddefconfig"])
> > +            utils.run_command(["make", f"-j{os.cpu_count()}", "bzImage"])
> > +        except subprocess.CalledProcessError as e:
> > +            logger.error(f"Kernel compilation failed: {e}")
> > +            raise
> > +        finally:
> > +            os.chdir(old_cwd)
> > +
> > +    def _copy_bzimage(self, spec: KernelSpec) -> None:
> > +        """Copy compiled bzImage to final location"""
> > +        src = spec.kernel_dir / "arch/x86/boot/bzImage"
> > +        dest = spec.bzimage_path
> > +        dest.parent.mkdir(parents=True, exist_ok=True)
> > +
> > +        shutil.copy2(src, dest)
> > +        logger.info(f"Stored bzImage at {dest}")
> > +
> > +    def _cleanup(self, spec: KernelSpec) -> None:
> > +        """Clean up temporary files"""
> > +        if spec.tarball_path.exists():
> > +            spec.tarball_path.unlink()
> > +            logger.info("Removed tarball")
> > +
> > +        if spec.kernel_dir.exists():
> > +            shutil.rmtree(spec.kernel_dir)
> > +            logger.info("Removed kernel source directory")
> > +
> > +
> > +class KernelManager:
> > +    """Main interface for kernel management"""
> > +
> > +    def __init__(self):
> > +        self.compiler = KernelCompiler()
> > +
> > +    def get_kernel_image(
> > +        self,
> > +        version: Optional[str] = None,
> > +        kernel_image_path: Optional[str] = None,
> > +        arch: str = ARCH,
> > +    ) -> KernelImage:
> > +        """Get kernel image from version or existing file"""
> > +
> > +        # Validate inputs
> > +        if not version and not kernel_image_path:
> > +            raise ValueError("Must provide either 'version' or 
> > 'kernel_image_path'")
> > +
> > +        if version and kernel_image_path:
> > +            raise ValueError("Provide only one of 'version' or 
> > 'kernel_image_path'")
> > +
> > +        # Handle existing kernel image
> > +        if kernel_image_path:
> > +            path = Path(kernel_image_path)
> > +            if not path.exists():
> > +                raise FileNotFoundError(f"Kernel image not found: 
> > {kernel_image_path}")
> > +            return KernelImage(path)
> > +
> > +        # Handle version-based compilation
> > +        if version:
> > +            spec = KernelSpec(version=version, arch=arch)
> > +            return self.compiler.compile_from_version(spec)
> > +
> > +    def list_available_kernels(self) -> List[str]:
> > +        """List all available compiled kernels"""
> > +        if not KERNELS_DIR.exists():
> > +            return []
> > +
> > +        kernels = []
> > +        for file in KERNELS_DIR.glob("bzImage-*"):
> > +            match = re.match(r"bzImage-(.*)", file.name)
> > +            if match:
> > +                kernels.append(match.group(1))
> > +
> > +        return sorted(kernels)
> > diff --git a/contrib/bpf-vmtest-tool/main.py 
> > b/contrib/bpf-vmtest-tool/main.py
> > new file mode 100644
> > index 00000000000..3125bba95c4
> > --- /dev/null
> > +++ b/contrib/bpf-vmtest-tool/main.py
> > @@ -0,0 +1,103 @@
> > +import argparse
> > +import logging
> > +from pathlib import Path
> > +import textwrap
> > +
> > +import bpf
> > +import kernel
> > +import vm
> > +
> > +
> > +def main():
> > +    parser = argparse.ArgumentParser()
> > +    kernel_group = parser.add_mutually_exclusive_group(required=True)
> > +    kernel_group.add_argument(
> > +        "-k",
> > +        "--kernel",
> > +        help="Kernel version to boot in the vm",
> > +        metavar="VERSION",
> > +        type=str,
> > +    )
> > +    kernel_group.add_argument(
> > +        "--kernel-image",
> > +        help="Kernel image to boot in the vm",
> > +        metavar="PATH",
> > +        type=str,
> > +    )
> > +    parser.add_argument(
> > +        "-r", "--rootfs", help="rootfs to mount in the vm", default="/", 
> > metavar="PATH"
> > +    )
> > +    parser.add_argument(
> > +        "-v",
> > +        "--log-level",
> > +        help="Log level",
> > +        metavar="DEBUG|INFO|WARNING|ERROR",
> > +        choices=["DEBUG", "INFO", "WARNING", "ERROR"],
> > +        default="ERROR",
> > +    )
> > +    command_group = parser.add_mutually_exclusive_group(required=True)
> > +    command_group.add_argument(
> > +        "--bpf-src",
> > +        help="Path to BPF C source file",
> > +        metavar="PATH",
> > +        type=str,
> > +    )
> > +    command_group.add_argument(
> > +        "--bpf-obj",
> > +        help="Path to bpf bytecode object",
> > +        metavar="PATH",
> > +        type=str,
> > +    )
> > +    command_group.add_argument(
> > +        "-c", "--command", help="command to run in the vm", 
> > metavar="COMMAND"
> > +    )
> > +    command_group.add_argument(
> > +        "-s", "--shell", help="open interactive shell in the vm", 
> > action="store_true"
> > +    )
> > +
> > +    args = parser.parse_args()
> > +
> > +    logging.basicConfig(level=args.log_level)
> > +    logger = logging.getLogger(__name__)
> > +    kmanager = kernel.KernelManager()
> > +
> > +    if args.kernel:
> > +        kernel_image = kmanager.get_kernel_image(version=args.kernel)
> > +    elif args.kernel_image:
> > +        kernel_image = 
> > kmanager.get_kernel_image(kernel_image_path=args.kernel_image)
> > +
> > +    try:
> > +        if args.bpf_src:
> > +            command = bpf.BPFProgram.from_source(Path(args.bpf_src))
> > +        elif args.bpf_obj:
> > +            command = bpf.BPFProgram.from_bpf_obj(Path(args.bpf_obj))
> > +        elif args.command:
> > +            command = args.command
> > +        elif args.shell:
> > +            raise NotImplementedError("Shell mode is not yet implemented")
> > +    except Exception as e:
> > +        print(f"Failed to prepare command for vmtest: {e}")
> > +        exit(1)
> > +
> > +    virtual_machine = vm.VirtualMachine(kernel_image, args.rootfs, 
> > str(command))
> > +    try:
> > +        result = virtual_machine.execute()
> > +    except vm.BootFailedError as e:
> > +        logger.error("VM boot failure: execution aborted. See logs for 
> > details.")
> > +        print(e)
> > +        exit(e.returncode)
> > +    if args.bpf_src or args.bpf_obj:
> > +        if result.returncode == 0:
> > +            print("BPF programs succesfully loaded")
> > +        else:
> > +            if "Failed to load BPF skeleton" in result.stdout:
> > +                print("BPF program failed to load")
> > +                print("Verifier logs:")
> > +                print(textwrap.indent(vm.bpf_verifier_logs(result.stdout), 
> > "\t"))
> > +    elif args.command:
> > +        print(result.stdout)
> > +    exit(result.returncode)
> > +
> > +
> > +if __name__ == "__main__":
> > +    main()
> > diff --git a/contrib/bpf-vmtest-tool/pyproject.toml 
> > b/contrib/bpf-vmtest-tool/pyproject.toml
> > new file mode 100644
> > index 00000000000..1977612cfd6
> > --- /dev/null
> > +++ b/contrib/bpf-vmtest-tool/pyproject.toml
> > @@ -0,0 +1,36 @@
> > +[project]
> > +name = "bpf-vmtest-tool"
> > +version = "0.1.0"
> > +description = "Test BPF code against live kernels"
> > +readme = "README.md"
> > +requires-python = ">=3.9"
> > +dependencies = []
> > +
> > +[dependency-groups]
> > +dev = [
> > +    "pre-commit>=4.2.0",
> > +    "pytest>=8.4.0",
> > +    "pytest-sugar>=1.0.0",
> > +    "ruff>=0.11.13",
> > +    "tox>=4.26.0",
> > +]
> > +
> > +[tool.pytest.ini_options]
> > +addopts = [
> > +    "--import-mode=importlib",
> > +]
> > +testpaths = ["tests"]
> > +pythonpath = ["."]
> > +
> > +[tool.ruff.lint]
> > +select = [
> > +    # pycodestyle
> > +    "E",
> > +    # Pyflakes
> > +    "F",
> > +]
> > +# Allow fix for all enabled rules (when `--fix`) is provided.
> > +fixable = ["ALL"]
> > +
> > +# Allow unused variables when underscore-prefixed.
> > +dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
> > \ No newline at end of file
> > diff --git a/contrib/bpf-vmtest-tool/tests/test_cli.py 
> > b/contrib/bpf-vmtest-tool/tests/test_cli.py
> > new file mode 100644
> > index 00000000000..53df3ca3663
> > --- /dev/null
> > +++ b/contrib/bpf-vmtest-tool/tests/test_cli.py
> > @@ -0,0 +1,167 @@
> > +import sys
> > +from unittest import mock
> > +import pytest
> > +from bpf import BPFProgram
> > +import kernel
> > +import main
> > +import logging
> > +
> > +logger = logging.getLogger(__name__)
> > +
> > +
> > [email protected]
> > +def openat_bpf_source(tmp_path):
> > +    openat_bpf = tmp_path / "openat_bpf.c"
> > +    openat_bpf.write_text(r"""
> > +    #include "vmlinux.h"
> > +    #include <bpf/bpf_helpers.h>
> > +    #include <bpf/bpf_tracing.h>
> > +    #include <bpf/bpf_core_read.h>
> > +
> > +    char LICENSE[] SEC("license") = "GPL";
> > +
> > +    int example_pid = 0;
> > +
> > +    SEC("tracepoint/syscalls/sys_enter_openat")
> > +    int handle_openat(struct trace_event_raw_sys_enter *ctx)
> > +    {
> > +        int pid = bpf_get_current_pid_tgid() >> 32;
> > +        char filename[256]; // filename buffer
> > +        bpf_probe_read_user(&filename, sizeof(filename), (void 
> > *)ctx->args[1]);
> > +        bpf_printk("sys_enter_openat() called from PID %d for file: %s\n", 
> > pid,
> > +            filename);
> > +
> > +        return 0;
> > +    }
> > +
> > +    """)
> > +    return openat_bpf
> > +
> > +
> > [email protected]
> > +def openat_bpf_obj(openat_bpf_source):
> > +    bpf_program = BPFProgram(source_path=openat_bpf_source)
> > +    bpf_program._compile_bpf()
> > +    return bpf_program.bpf_obj
> > +
> > +
> > [email protected]
> > +def invalid_memory_access_bpf_source(tmp_path):
> > +    invalid_memory_access_bpf = tmp_path / "invalid_memory_access_bpf.c"
> > +    invalid_memory_access_bpf.write_text(r"""
> > +    #include "vmlinux.h"
> > +    #include <bpf/bpf_helpers.h>
> > +    #include <bpf/bpf_tracing.h>
> > +
> > +    char LICENSE[] SEC("license") = "GPL";
> > +
> > +    SEC("tracepoint/syscalls/sys_enter_openat")
> > +    int bpf_prog(struct trace_event_raw_sys_enter *ctx) {
> > +        int arr[4] = {1, 2, 3, 4};
> > +
> > +        // Invalid memory access: out-of-bounds
> > +        int val = arr[5];  // This causes the verifier to fail
> > +
> > +        return val;
> > +    }
> > +    """)
> > +    return invalid_memory_access_bpf
> > +
> > +
> > [email protected]
> > +def invalid_memory_access_bpf_obj(invalid_memory_access_bpf_source):
> > +    bpf_program = BPFProgram(source_path=invalid_memory_access_bpf_source)
> > +    bpf_program._compile_bpf()
> > +    return bpf_program.bpf_obj
> > +
> > +
> > +def run_main_with_args_and_capture_output(args, capsys):
> > +    with mock.patch.object(sys, "argv", args):
> > +        try:
> > +            main.main()
> > +        except SystemExit as e:
> > +            result = capsys.readouterr()
> > +            output = result.out.rstrip()
> > +            error = result.err.rstrip()
> > +            logger.debug("STDOUT:\n%s", output)
> > +            logger.debug("STDERR:\n%s", error)
> > +            return e.code, output, error
> > +
> > +
> > +kernel_image_path = kernel.KernelManager().get_kernel_image(version="6.15")
> > +kernel_cli_flags = [["--kernel", "6.15"], ["--kernel-image", 
> > f"{kernel_image_path}"]]
> > +
> > +
> > [email protected]("kernel_args", kernel_cli_flags)
> > +class TestCLI:
> > +    def test_main_with_valid_bpf(self, kernel_args, openat_bpf_source, 
> > capsys):
> > +        args = [
> > +            "main.py",
> > +            *kernel_args,
> > +            "--rootfs",
> > +            "/",
> > +            "--bpf-src",
> > +            str(openat_bpf_source),
> > +        ]
> > +        code, output, _ = run_main_with_args_and_capture_output(args, 
> > capsys)
> > +        assert code == 0
> > +        assert "BPF programs succesfully loaded" == output
> > +
> > +    def test_main_with_valid_bpf_obj(self, kernel_args, openat_bpf_obj, 
> > capsys):
> > +        args = [
> > +            "main.py",
> > +            *kernel_args,
> > +            "--rootfs",
> > +            "/",
> > +            "--bpf-obj",
> > +            str(openat_bpf_obj),
> > +        ]
> > +        code, output, _ = run_main_with_args_and_capture_output(args, 
> > capsys)
> > +        assert code == 0
> > +        assert "BPF programs succesfully loaded" == output
> > +
> > +    def test_main_with_invalid_bpf(
> > +        self, kernel_args, invalid_memory_access_bpf_source, capsys
> > +    ):
> > +        args = [
> > +            "main.py",
> > +            *kernel_args,
> > +            "--rootfs",
> > +            "/",
> > +            "--bpf-src",
> > +            str(invalid_memory_access_bpf_source),
> > +        ]
> > +        code, output, _ = run_main_with_args_and_capture_output(args, 
> > capsys)
> > +        output_lines = output.splitlines()
> > +        assert code == 1
> > +        assert "BPF program failed to load" == output_lines[0]
> > +        assert "Verifier logs:" == output_lines[1]
> > +
> > +    def test_main_with_invalid_bpf_obj(
> > +        self, kernel_args, invalid_memory_access_bpf_obj, capsys
> > +    ):
> > +        args = [
> > +            "main.py",
> > +            *kernel_args,
> > +            "--rootfs",
> > +            "/",
> > +            "--bpf-obj",
> > +            str(invalid_memory_access_bpf_obj),
> > +        ]
> > +        code, output, _ = run_main_with_args_and_capture_output(args, 
> > capsys)
> > +        output_lines = output.splitlines()
> > +        assert code == 1
> > +        assert "BPF program failed to load" == output_lines[0]
> > +        assert "Verifier logs:" == output_lines[1]
> > +
> > +    def test_main_with_valid_command(self, kernel_args, capsys):
> > +        args = ["main.py", *kernel_args, "--rootfs", "/", "-c", "uname -r"]
> > +        code, output, error = run_main_with_args_and_capture_output(args, 
> > capsys)
> > +        assert code == 0
> > +        assert "6.15.0" == output
> > +
> > +    def test_main_with_invalid_command(self, kernel_args, capsys):
> > +        args = ["main.py", *kernel_args, "--rootfs", "/", "-c", 
> > "NotImplemented"]
> > +        code, output, error = run_main_with_args_and_capture_output(args, 
> > capsys)
> > +        assert code != 0
> > +        assert f"Command failed with exit code: {code}" in output
> > diff --git a/contrib/bpf-vmtest-tool/utils.py 
> > b/contrib/bpf-vmtest-tool/utils.py
> > new file mode 100644
> > index 00000000000..8866c9559f4
> > --- /dev/null
> > +++ b/contrib/bpf-vmtest-tool/utils.py
> > @@ -0,0 +1,27 @@
> > +import subprocess
> > +import logging
> > +from typing import Any
> > +
> > +logger = logging.getLogger(__name__)
> > +
> > +
> > +def run_command(cmd: list[str], **kwargs: Any):
> > +    try:
> > +        logger.debug(f"running command: {cmd}")
> > +        result = subprocess.run(
> > +            cmd,
> > +            text=True,
> > +            check=True,
> > +            capture_output=True,
> > +            shell=False,
> > +            **kwargs,
> > +        )
> > +        logger.debug("Command stdout:\n" + result.stdout.strip())
> > +        if result.stderr:
> > +            logger.debug("Command stderr:\n" + result.stderr.strip())
> > +        return result
> > +    except subprocess.CalledProcessError as e:
> > +        logger.error(e)
> > +        logger.error("Command failed with stdout:\n" + e.stdout.strip())
> > +        logger.error("Command failed with stderr:\n" + e.stderr.strip())
> > +        raise
> > diff --git a/contrib/bpf-vmtest-tool/vm.py b/contrib/bpf-vmtest-tool/vm.py
> > new file mode 100644
> > index 00000000000..5d4a5747f0b
> > --- /dev/null
> > +++ b/contrib/bpf-vmtest-tool/vm.py
> > @@ -0,0 +1,154 @@
> > +import logging
> > +import subprocess
> > +from typing import List
> > +
> > +from kernel import KernelImage
> > +
> > +logger = logging.getLogger(__name__)
> > +
> > +
> > +class VMConfig:
> > +    """Configuration container for VM settings"""
> > +
> > +    def __init__(
> > +        self, kernel_image: KernelImage, rootfs_path: str, command: str, 
> > **kwargs
> > +    ):
> > +        self.kernel = kernel_image
> > +        self.kernel_path = str(kernel_image.path)
> > +        self.rootfs_path = rootfs_path
> > +        self.command = command
> > +        self.memory_mb = kwargs.get("memory_mb", 512)
> > +        self.cpu_count = kwargs.get("cpu_count", 1)
> > +        self.extra_args = kwargs.get("extra_args", {})
> > +
> > +
> > +def bpf_verifier_logs(output: str) -> str:
> > +    start_tag = "--- Verifier log start ---"
> > +    end_tag = "--- Verifier log end ---"
> > +
> > +    start_idx = output.find(start_tag)
> > +    end_idx = output.find(end_tag)
> > +
> > +    if start_idx != -1 and end_idx != -1:
> > +        # Extract between the tags (excluding the markers themselves)
> > +        log_body = output[start_idx + len(start_tag) : end_idx].strip()
> > +        return log_body
> > +    else:
> > +        return "No verifier log found in the output."
> > +
> > +
> > +class Vmtest:
> > +    """vmtest backend implementation"""
> > +
> > +    def __init__(self):
> > +        pass
> > +
> > +    def _boot_command(self, vm_config: VMConfig):
> > +        vmtest_command = ["vmtest"]
> > +        vmtest_command.extend(["-k", vm_config.kernel_path])
> > +        vmtest_command.extend(["-r", vm_config.rootfs_path])
> > +        vmtest_command.append(vm_config.command)
> > +        return vmtest_command
> > +
> > +    def _remove_boot_log(self, full_output: str) -> str:
> > +        """
> > +        Filters QEMU and kernel boot logs, returning only the output after 
> > the
> > +        `===> Running command` marker.
> > +        """
> > +        marker = "===> Running command"
> > +        lines = full_output.splitlines()
> > +
> > +        try:
> > +            start_index = next(i for i, line in enumerate(lines) if marker 
> > in line)
> > +            # Return everything after that marker (excluding the marker 
> > itself)
> > +            return "\n".join(lines[start_index + 1 :]).strip()
> > +        except StopIteration:
> > +            return full_output.strip()
> > +
> > +    def run_command(self, vm_config):
> > +        vm = None
> > +        try:
> > +            logger.info(f"Booting VM with kernel: {vm_config.kernel_path}")
> > +            logger.info(f"Using rootfs: {vm_config.rootfs_path}")
> > +            vm = subprocess.run(
> > +                self._boot_command(vm_config),
> > +                check=True,
> > +                text=True,
> > +                capture_output=True,
> > +                shell=False,
> > +            )
> > +            vm_stdout = vm.stdout
> > +            logger.debug(vm_stdout)
> > +            return VMCommandResult(
> > +                vm.returncode, self._remove_boot_log(vm_stdout), None
> > +            )
> > +        except subprocess.CalledProcessError as e:
> > +            out = e.stdout
> > +            err = e.stderr
> > +            # when the command in the vm fails we consider it as a 
> > succesfull boot
> > +            if "===> Running command" not in out:
> > +                raise BootFailedError("Boot failed", out, err, 
> > e.returncode)
> > +            logger.debug("STDOUT: \n%s", out)
> > +            logger.debug("STDERR: \n%s", err)
> > +            return VMCommandResult(e.returncode, 
> > self._remove_boot_log(out), err)
> > +
> > +
> > +class VMCommandResult:
> > +    def __init__(self, returncode, stdout, stderr) -> None:
> > +        self.returncode = returncode
> > +        self.stdout = stdout
> > +        self.stderr = stderr
> > +
> > +
> > +class VirtualMachine:
> > +    """Main VM class - simple interface for end users"""
> > +
> > +    # Registry of available hypervisors
> > +    _hypervisors = {
> > +        "vmtest": Vmtest,
> > +    }
> > +
> > +    def __init__(
> > +        self,
> > +        kernel_image: KernelImage,
> > +        rootfs_path: str,
> > +        command: str,
> > +        hypervisor_type: str = "vmtest",
> > +        **kwargs,
> > +    ):
> > +        self.config = VMConfig(kernel_image, rootfs_path, command, 
> > **kwargs)
> > +
> > +        if hypervisor_type not in self._hypervisors:
> > +            raise ValueError(f"Unsupported hypervisor: {hypervisor_type}")
> > +
> > +        self.hypervisor = self._hypervisors[hypervisor_type]()
> > +
> > +    @classmethod
> > +    def list_hypervisors(cls) -> List[str]:
> > +        """List available hypervisors"""
> > +        return list(cls._hypervisors.keys())
> > +
> > +    def execute(self):
> > +        """Execute command in VM"""
> > +        return self.hypervisor.run_command(self.config)
> > +
> > +
> > +class BootFailedError(Exception):
> > +    """Raised when VM fails to boot properly (before command execution)."""
> > +
> > +    def __init__(
> > +        self, message: str, stdout: str = "", stderr: str = "", 
> > returncode: int = -1
> > +    ):
> > +        super().__init__(message)
> > +        self.stdout = stdout
> > +        self.stderr = stderr
> > +        self.returncode = returncode
> > +
> > +    def __str__(self):
> > +        base = super().__str__()
> > +        return (
> > +            f"{base}\n"
> > +            f"Return code: {self.returncode}\n"
> > +            f"--- STDOUT ---\n{self.stdout}\n"
> > +            f"--- STDERR ---\n{self.stderr}"
> > +        )

Reply via email to