Hi Julien,

On Mon, Jan 13, 2025 at 07:00:01PM +0100, Julien Plissonneau Duquène wrote:
> Let's start with this then. I implemented a PoC prototype [1] as a shell
> script that is currently fairly linux-specific and doesn't account for
> cgroup limits (yet?). Feedback is welcome (everything is open for discussion
> there, including the name) and if there is enough interest I may end up
> packaging it or submitting it to an existing collection (I am thinking about
> devscripts).

I'm sorry for not having come back earlier and thus caused duplicaton of
work. I had started a Python-based implementation last year and then
dropped the ball over other stuff. It also implements the --require-mem
flag in the way you suggested. It parses DEB_BUILD_OPTIONS,
RPM_BUILD_NCPUS and CMAKE_BUILD_PARALLEL_LEVEL and also considers cgroup
memory limits. I hope this captures all of the feedback I got during
discussions and research.

I'm attaching my proof of concept. Would you join forces and turn either
of these PoCs into a proper Debian package that could be used during
package builds? Once accepted, we may send patches to individual Debian
packages making use of it and call OOM FTBFS a packaging bug eventually.

Helmut
#!/usr/bin/python3
# Copyright 2024 Helmut Grohne <hel...@subdivi.de>
# SPDX-License-Identifier: GPL-3

import argparse
import itertools
import os
import pathlib
import subprocess
import sys
import typing


def positive_integer(decstr: str) -> int:
    """Parse a positive, integral number from a string and raise a ValueError
    otherwise.

    >>> positive_integer(-1)
    Traceback (most recent call last):
        ...
    ValueError: integer must be positive
    >>> positive_integer(0)
    Traceback (most recent call last):
        ...
    ValueError: integer must be positive
    >>> positive_integer(1)
    1
    """
    value = int(decstr)  # raises ValueError
    if value < 1:
        raise ValueError("integer must be positive")
    return value


def parse_size(expression: str) -> int:
    """Parse an expression representing a data size with an optional unit
    suffix into an integer. Raises a ValueError on failure.

    >>> parse_size("5")
    5
    >>> parse_size("4KB")
    4096
    >>> parse_size("0.9g")
    966367641
    >>> parse_size("-1")
    Traceback (most recent call last):
        ...
    ValueError: number must be positive
    """
    expression = expression.lower()
    if expression.endswith("b"):
        expression = expression[:-1]
    suffixes = {
        "k": 2**10,
        "m": 2**20,
        "g": 2**30,
        "t": 2**40,
        "e": 2**50,
    }
    factor = 1
    if expression[-1:] in suffixes:
        factor = suffixes[expression[-1]]
        expression = expression[:-1]
    fval = float(expression)  # propagate ValueError
    value = int(fval * factor)  # propagate ValueError
    if value < 1:
        raise ValueError("number must be positive")
    return value


def guess_python() -> int | None:
    """Estimate the number of processors using Python's os.cpu_count().

    >>> guess_python() > 0
    True
    """
    return os.cpu_count()


def guess_nproc() -> int:
    """Estimate number of processors using coreutils' nproc.

    >>> guess_nproc() > 0
    True
    """
    return positive_integer(
        subprocess.check_output(["nproc"], encoding="ascii")
    )


def guess_deb_build_parallel(
    environ: typing.Mapping[str, str] = os.environ
) -> int | None:
    """Parse a possible parallel= assignment in a DEB_BUILD_OPTIONS environment
    variable.

    >>> guess_deb_build_parallel({})
    >>> guess_deb_build_parallel({"DEB_BUILD_OPTIONS": "nocheck parallel=3"})
    3
    """
    try:
        options = environ["DEB_BUILD_OPTIONS"]
    except KeyError:
        return None
    for option in options.split():
        if option.startswith("parallel="):
            option = option.removeprefix("parallel=")
            try:
                return positive_integer(option)
            except ValueError:
                pass
    return None


def guess_from_environment(
    variable: str, environ: typing.Mapping[str, str] = os.environ
) -> int | None:
    """Read a number from an environment variable.

    >>> guess_from_environment("CPUS", {"CPUS": 4})
    4
    >>> guess_from_environment("CPUS", {"other": 3})
    """
    try:
        return positive_integer(environ[variable])
    except (KeyError, ValueError):
        return None


def guess_memavailable() -> int:
    """Estimate the available memory from /proc/meminfo in bytes."""
    with open("/proc/meminfo", encoding="ascii") as fh:
        for line in fh:
            if line.startswith("MemAvailable:"):
                line = line.removeprefix("MemAvailable:").strip()
                return 1024 * positive_integer(line.removesuffix("kB"))
    raise RuntimeError("no MemAvailable line found in /proc/meminfo")


def guess_cgroup_memory() -> int | None:
    """Return the smalles "memory.high" or "memory.max" limit of the current
    cgroup or any parent of it if any.
    """
    guess: int | None = None
    mygroup = pathlib.PurePath(
        pathlib.Path("/proc/self/cgroup")
        .read_text(encoding=sys.getfilesystemencoding())
        .strip()
        .split(":", 2)[2]
    ).relative_to("/")
    sfc = pathlib.Path("/sys/fs/cgroup")
    for group in itertools.chain((mygroup,), mygroup.parents):
        for entry in ("memory.max", "memory.high"):
            try:
                value = positive_integer(
                    (sfc / group / entry).read_text(encoding="ascii")
                )
            except (FileNotFoundError, ValueError):
                pass
            else:
                if guess is None:
                    guess = value
                else:
                    guess = min(guess, value)
    return guess


def parse_required_memory(expression: str) -> list[int]:
    """Parse comma-separated list of memory expressions. Empty expressions copy
    the previous entry.

    >>> parse_required_memory("1k,9,,1")
    [1024, 9, 9, 1]
    """
    values: list[int] = []
    for memexpr in expression.split(","):
        if not memexpr:
            if values:
                values.append(values[-1])
            else:
                raise ValueError("initial memory expression cannot be empty")
        else:
            values.append(parse_size(memexpr))
    return values


def guess_memory_concurrency(memory: int, usage: list[int]) -> int:
    """Estimate the maximum number of cores that can be used given the
    available memory and a sequence of per-core memory consumption.

    >>> guess_memory_concurrency(4, [1])
    4
    >>> guess_memory_concurrency(10, [5, 4, 3])
    2
    >>> guess_memory_concurrency(2, [3])
    1
    """
    concurrency = 0
    for use in usage[:-1]:
        if use > memory:
            break
        memory -= use
        concurrency += 1
    else:
        concurrency += memory // usage[-1]
    return max(1, concurrency)


def clamp(
    value: int, lower: int | None = None, upper: int | None = None
) -> int:
    """Return an adjusted value that does not exceed the lower or upper limits
    if any.

    >>> clamp(5, upper=4)
    4
    >>> clamp(9, 2)
    9
    """
    if upper is not None and upper < value:
        value = upper
    if lower is not None and lower > value:
        value = lower
    return value


def main() -> None:
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--detect",
        action="store",
        default="nproc",
        metavar="METHOD",
        help="supply a processor count or select a detection method"
        "(nproc or python)",
    )
    parser.add_argument(
        "--max",
        action="store",
        type=positive_integer,
        default=None,
        metavar="N",
        help="limit the number of detected cores to a given maximum",
    )
    parser.add_argument(
        "--min",
        action="store",
        type=positive_integer,
        default=None,
        metavar="N",
        help="limit the number of detected cores to a given minimum",
    )
    parser.add_argument(
        "--require-mem",
        action="store",
        type=parse_required_memory,
        default=[],
        metavar="MEMLIST",
        help="specify per-core required memory as a comma separated list",
    )
    args = parser.parse_args()
    guess = None
    try:
        guess = positive_integer(args.detect)
    except ValueError:
        try:
            detector = {
                "nproc": guess_nproc,
                "python": guess_python,
            }[args.detect]
        except KeyError:
            parser.error("invalid argument to --detect")
        guess = detector()
    guess = guess_from_environment("RPM_BUILD_NCPUS") or guess
    guess = guess_deb_build_parallel() or guess
    guess = guess_from_environment("CMAKE_BUILD_PARALLEL_LEVEL") or guess
    if guess is None:
        print("failed to guess processor count", file=sys.stderr)
        sys.exit(1)
    if args.require_mem and guess > 1:
        memory = clamp(guess_memavailable(), upper=guess_cgroup_memory())
        guess = clamp(
            guess, upper=guess_memory_concurrency(memory, args.require_mem)
        )
    guess = clamp(guess, args.min, args.max)
    print(guess)


if __name__ == "__main__":
    main()

Reply via email to