This is an automated email from the ASF dual-hosted git repository.

ppkarwasz pushed a commit to branch feat/vdr-generation
in repository https://gitbox.apache.org/repos/asf/logging-site.git

commit dceabfa9d13521373c702ea9862583cf8e8bf90e
Author: Piotr P. Karwasz <[email protected]>
AuthorDate: Fri Apr 24 14:04:22 2026 +0200

    feat: add `vdr_aggregate` script
    
    The `vdr_aggregate` performs the reverse operation, compared to `vdr_split`:
    
    - It merges all the CycloneDX documents in `src/vulnerabilities`,
    - If the result differs from the committed one, it bumps the version.
    
    Comparison does not take whitespace into consideration.
---
 scripts/vdr_aggregate.py | 241 +++++++++++++++++++++++++++++++++++++++++++++++
 scripts/vdr_common.py    | 188 ++++++++++++++++++++++++++++++++++++
 scripts/vdr_split.py     | 189 ++++++++++++-------------------------
 3 files changed, 488 insertions(+), 130 deletions(-)

diff --git a/scripts/vdr_aggregate.py b/scripts/vdr_aggregate.py
new file mode 100755
index 00000000..00a2a381
--- /dev/null
+++ b/scripts/vdr_aggregate.py
@@ -0,0 +1,241 @@
+#!/usr/bin/env -S uv run --script
+# /// script
+# requires-python = ">=3.11"
+# dependencies = ["lxml>=5"]
+# ///
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to you under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Aggregate per-CVE VDR files back into the monolithic ``vdr.xml``.
+
+Reads ``src/vulnerabilities/<CVE-id>/<component>.cdx.xml`` (CycloneDX 1.7)
+and writes ``src/site/static/cyclonedx/vdr.xml`` (CycloneDX 1.7), preserving
+the existing ``serialNumber`` and incrementing ``version`` by one.
+"""
+
+from __future__ import annotations
+
+import re
+import sys
+from pathlib import Path
+
+from lxml import etree
+
+from vdr_common import (
+    NS,
+    NS_XSI,
+    OUT_DIR,
+    ROOT,
+    SCHEMA_LOCATION,
+    SRC_VDR,
+    clone_into_namespace,
+    qn,
+    serialize,
+    write_bom_if_changed,
+)
+
+CVE_RE = re.compile(r"^CVE-(\d{4})-(\d+)$")
+
+DUMMY_COMPONENTS_COMMENT = """We add *dummy* components to refer to in 
`affects` blocks.
+    This is necessary, since not all Log4j components have SBOMs associated 
with them.
+"""
+
+GENERATED_HEADER = """<!-- This file is a Vulnerability Disclosure Report 
(VDR) covering all Apache Logging Services[1] projects.
+     This file adheres to the CycloneDX SBOM specification[2].
+
+     The latest version of this file can be found at 
https://logging.apache.org/cyclonedx/vdr.xml
+
+     All Apache Logging Services projects (e.g., Log4j) generate SBOMs 
containing `vulnerability-assertion` entries with links to this file.
+
+     If you need help in addressing these vulnerabilities, 
suggestions/corrections on the content, and/or reporting new vulnerabilities, 
please refer to the Log4j support page[3].
+
+     This file is maintained in version control[4].
+
+     GENERATED FILE. Do not edit by hand. To update the VDR, edit the per-CVE
+     files under `src/vulnerabilities/` and regenerate this file with:
+
+         uv run scripts/vdr_aggregate.py
+
+     [1] https://logging.apache.org
+     [2] https://cyclonedx.org
+     [3] https://logging.apache.org/log4j/2.x/support.html
+     [4] https://github.com/apache/logging-site/tree/cyclonedx
+     -->"""
+
+
+def parse_cve(cve_id: str) -> tuple[int, int]:
+    """Parse ``CVE-YYYY-NNNN`` into ``(year, number)`` for sorting."""
+    m = CVE_RE.match(cve_id)
+    if not m:
+        raise ValueError(f"unrecognized CVE id: {cve_id!r}")
+    return int(m.group(1)), int(m.group(2))
+
+
+def discover_inputs() -> list[tuple[int, int, str, str, Path]]:
+    """Return a list of ``(-year, -number, slug, cve_id, path)`` tuples.
+
+    Negated year/number give descending sort order via the default ascending 
sort.
+    """
+    inputs: list[tuple[int, int, str, str, Path]] = []
+    for path in OUT_DIR.glob("CVE-*/*.cdx.xml"):
+        cve_id = path.parent.name
+        slug = path.name.removesuffix(".cdx.xml")
+        year, number = parse_cve(cve_id)
+        inputs.append((-year, -number, slug, cve_id, path))
+    inputs.sort()
+    return inputs
+
+
+def collect(inputs: list[tuple[int, int, str, str, Path]]) -> tuple[
+    list[etree._Element], list[etree._Element], str
+]:
+    """Collect deduplicated components and vulnerabilities, plus the max 
timestamp.
+
+    Components are gathered from both ``metadata/component`` (subject) and any
+    top-level ``components/component`` (extras like log4cxx-conan), deduped by
+    ``bom-ref`` and sorted alphabetically. Vulnerabilities are deduped by CVE
+    id, preserving input sort order. The returned timestamp is the 
lexicographic
+    max of every input's ``metadata/timestamp`` (ISO 8601 UTC strings sort
+    correctly as text).
+    """
+    parser = etree.XMLParser(remove_blank_text=True, strip_cdata=False)
+
+    components_by_ref: dict[str, etree._Element] = {}
+    vulns_by_cve: dict[str, etree._Element] = {}
+    max_timestamp = ""
+
+    for _, _, _, cve_id, path in inputs:
+        tree = etree.parse(str(path), parser)
+        root = tree.getroot()
+        # Per-CVE files may be at any CycloneDX namespace; query and migrate
+        # using the actual root namespace, then write the aggregate at NS.
+        src_ns = etree.QName(root).namespace
+
+        ts = root.findtext(f"{qn('metadata', src_ns)}/{qn('timestamp', 
src_ns)}") or ""
+        if ts > max_timestamp:
+            max_timestamp = ts
+
+        subject = root.find(f"{qn('metadata', src_ns)}/{qn('component', 
src_ns)}")
+        if subject is not None and subject.get("bom-ref") not in 
components_by_ref:
+            components_by_ref[subject.get("bom-ref")] = 
clone_into_namespace(subject, NS)
+
+        extras_root = root.find(qn("components", src_ns))
+        if extras_root is not None:
+            for c in extras_root.findall(qn("component", src_ns)):
+                ref = c.get("bom-ref")
+                if ref not in components_by_ref:
+                    components_by_ref[ref] = clone_into_namespace(c, NS)
+
+        if cve_id not in vulns_by_cve:
+            vuln = root.find(f"{qn('vulnerabilities', 
src_ns)}/{qn('vulnerability', src_ns)}")
+            if vuln is not None:
+                vulns_by_cve[cve_id] = clone_into_namespace(vuln, NS)
+
+    components = [components_by_ref[ref] for ref in sorted(components_by_ref)]
+    seen: set[str] = set()
+    ordered_vulns: list[etree._Element] = []
+    for _, _, _, cve, _ in inputs:
+        if cve in seen or cve not in vulns_by_cve:
+            continue
+        seen.add(cve)
+        ordered_vulns.append(vulns_by_cve[cve])
+    return components, ordered_vulns, max_timestamp
+
+
+def build_bom(
+    serial_number: str,
+    version: int,
+    timestamp: str,
+    components: list[etree._Element],
+    vulnerabilities: list[etree._Element],
+) -> etree._Element:
+    """Build the aggregated CycloneDX 1.7 ``<bom>`` element."""
+    bom = etree.Element(qn("bom"), nsmap={None: NS, "xsi": NS_XSI})
+    bom.set(f"{{{NS_XSI}}}schemaLocation", SCHEMA_LOCATION)
+    bom.set("serialNumber", serial_number)
+    bom.set("version", str(version))
+
+    metadata = etree.SubElement(bom, qn("metadata"))
+    etree.SubElement(metadata, qn("timestamp")).text = timestamp
+    manufacturer = etree.SubElement(metadata, qn("manufacturer"))
+    etree.SubElement(manufacturer, qn("name")).text = "Apache Logging Services"
+    etree.SubElement(manufacturer, qn("url")).text = 
"https://logging.apache.org";
+
+    bom.append(etree.Comment(DUMMY_COMPONENTS_COMMENT))
+    components_elem = etree.SubElement(bom, qn("components"))
+    for c in components:
+        components_elem.append(c)
+
+    vulns_elem = etree.SubElement(bom, qn("vulnerabilities"))
+    for v in vulnerabilities:
+        vulns_elem.append(v)
+
+    return bom
+
+
+def apply_blank_lines(bom: etree._Element) -> None:
+    """Insert blank lines around children of 
``<components>``/``<vulnerabilities>``,
+    and after their closing tags.
+
+    Mutates ``.text`` of each parent and ``.tail`` of every child (and of the
+    parent itself) by prepending a single ``\\n`` to the indentation strings
+    that ``etree.indent()`` already set. This produces a blank line wherever
+    those whitespace nodes are emitted.
+    """
+    for parent_tag in ("components", "vulnerabilities"):
+        parent = bom.find(qn(parent_tag))
+        if parent is None:
+            continue
+        parent.text = "\n" + (parent.text or "")
+        for child in parent:
+            child.tail = "\n" + (child.tail or "")
+        parent.tail = "\n" + (parent.tail or "")
+
+
+def main() -> int:
+    inputs = discover_inputs()
+    if not inputs:
+        print(f"no inputs found under {OUT_DIR.relative_to(ROOT)}", 
file=sys.stderr)
+        return 1
+
+    components, vulnerabilities, timestamp = collect(inputs)
+
+    def build_fn(serial: str, version: int) -> etree._Element:
+        return build_bom(
+            serial_number=serial,
+            version=version,
+            timestamp=timestamp,
+            components=components,
+            vulnerabilities=vulnerabilities,
+        )
+
+    def serialize_fn(bom: etree._Element) -> bytes:
+        return serialize(
+            bom,
+            extra_header=GENERATED_HEADER.encode("utf-8") + b"\n",
+            after_indent=apply_blank_lines,
+        )
+
+    did_write, final_version = write_bom_if_changed(SRC_VDR, build_fn, 
serialize_fn)
+    rel = SRC_VDR.relative_to(ROOT)
+    verb = "wrote" if did_write else "unchanged"
+    print(
+        f"{verb} {rel} "
+        f"({len(components)} components, {len(vulnerabilities)} 
vulnerabilities, version {final_version})"
+    )
+    return 0
+
+
+if __name__ == "__main__":
+    raise SystemExit(main())
diff --git a/scripts/vdr_common.py b/scripts/vdr_common.py
new file mode 100644
index 00000000..b0cc602e
--- /dev/null
+++ b/scripts/vdr_common.py
@@ -0,0 +1,188 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to you under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Shared constants and helpers for the VDR split/aggregate scripts.
+
+This is a plain importable module (no inline ``# /// script`` block); the
+calling script's ``uv run --script`` venv must provide ``lxml``.
+"""
+
+from __future__ import annotations
+
+import copy
+import re
+import uuid
+from collections.abc import Callable
+from pathlib import Path
+
+from lxml import etree
+
+ROOT = Path(__file__).resolve().parent.parent
+SRC_VDR = ROOT / "src" / "site" / "static" / "cyclonedx" / "vdr.xml"
+OUT_DIR = ROOT / "src" / "vulnerabilities"
+
+NS = "http://cyclonedx.org/schema/bom/1.7";
+NS_XSI = "http://www.w3.org/2001/XMLSchema-instance";
+SCHEMA_LOCATION = f"{NS} https://cyclonedx.org/schema/bom-1.7.xsd";
+
+LICENSE_HEADER = """<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to you under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->"""
+
+
+def qn(tag: str, ns: str = NS) -> str:
+    """Return the Clark-notation qualified name ``{ns}tag`` for lxml 
lookups."""
+    return f"{{{ns}}}{tag}"
+
+
+def clone(elem: etree._Element) -> etree._Element:
+    """Deep-copy ``elem`` so it can be appended to another tree.
+
+    lxml preserves CDATA across ``deepcopy``; redundant inherited xmlns
+    declarations are stripped later by ``cleanup_namespaces`` in ``serialize``.
+    """
+    return copy.deepcopy(elem)
+
+
+def clone_into_namespace(elem: etree._Element, dst_ns: str) -> etree._Element:
+    """Deep-copy ``elem`` into ``dst_ns``, rewriting the default xmlns.
+
+    Used when migrating elements between CycloneDX schema versions (e.g.
+    reading a 1.6 source vdr.xml while writing 1.7 outputs). When the source
+    is already in ``dst_ns`` this is a no-op transform. Preserves CDATA via
+    the explicit ``strip_cdata=False`` parser.
+    """
+    src_ns = etree.QName(elem).namespace
+    inner = etree.tostring(elem, encoding="unicode")
+    if src_ns:
+        inner = inner.replace(f' xmlns="{src_ns}"', "")
+    wrapped = f'<wrap xmlns="{dst_ns}">{inner}</wrap>'
+    parser = etree.XMLParser(strip_cdata=False)
+    return etree.fromstring(wrapped.encode("utf-8"), parser)[0]
+
+
+def fold_bom_attributes(body: bytes) -> bytes:
+    """Hack: fold the ``<bom ...>`` start tag so each attribute past the
+    first sits on its own line, indented to align under the first. lxml's
+    serializer offers no per-attribute wrap option, so we post-process it.
+    """
+    match = re.match(rb'<bom ([^>]*?)(/?>)', body)
+    if not match:
+        return body
+    attrs = re.findall(rb'[\w:-]+="[^"]*"', match.group(1))
+    if len(attrs) <= 1:
+        return body
+    indent = b"\n" + b" " * len(b"<bom ")
+    return b"<bom " + indent.join(attrs) + match.group(2) + body[match.end():]
+
+
+def serialize(
+    bom: etree._Element,
+    extra_header: bytes = b"",
+    after_indent: Callable[[etree._Element], None] | None = None,
+) -> bytes:
+    """Serialize ``bom`` as a pretty-printed UTF-8 file with the ASF header.
+
+    Drops unused namespace declarations inherited from the source tree,
+    applies 2-space indentation, and prepends the XML declaration and
+    Apache License comment block. ``extra_header`` (if provided) is inserted
+    between the ASF license block and the ``<bom>`` tree. ``after_indent``
+    (if provided) runs after ``etree.indent()`` so callers can adjust
+    ``.text``/``.tail`` whitespace before serialization.
+    """
+    etree.cleanup_namespaces(bom, top_nsmap={None: NS})
+    etree.indent(bom, space="  ")
+    if after_indent is not None:
+        after_indent(bom)
+    body = fold_bom_attributes(etree.tostring(bom, xml_declaration=False, 
encoding="UTF-8"))
+    decl = b'<?xml version="1.0" encoding="UTF-8"?>\n'
+    header = LICENSE_HEADER.encode("utf-8") + b"\n"
+    return decl + header + extra_header + body + b"\n"
+
+
+def _local_attrs(elem: etree._Element) -> dict[str, str]:
+    """Attribute dict keyed by local name, dropping namespace prefixes."""
+    return {etree.QName(k).localname: v for k, v in elem.attrib.items()}
+
+
+def equivalent(a: etree._Element, b: etree._Element) -> bool:
+    """Recursively compare two elements, ignoring comments, namespaces, and 
outer whitespace.
+
+    Matches local tag name and attribute dict by local name (so namespace
+    differences -- e.g. CycloneDX 1.7 vs 1.8 -- don't trigger inequality on
+    structure alone). Compares ``.text`` after ``strip()``; ``.tail``
+    (inter-element whitespace) and comment children are ignored. Internal
+    whitespace inside text content is significant.
+    """
+    if etree.QName(a).localname != etree.QName(b).localname:
+        return False
+    if _local_attrs(a) != _local_attrs(b):
+        return False
+    if (a.text or "").strip() != (b.text or "").strip():
+        return False
+    a_kids = [c for c in a if not isinstance(c, etree._Comment)]
+    b_kids = [c for c in b if not isinstance(c, etree._Comment)]
+    if len(a_kids) != len(b_kids):
+        return False
+    return all(equivalent(ac, bc) for ac, bc in zip(a_kids, b_kids))
+
+
+def write_bom_if_changed(
+    path: Path,
+    build_fn: Callable[[str, int], etree._Element],
+    serialize_fn: Callable[[etree._Element], bytes],
+) -> tuple[bool, int]:
+    """Build a BOM and write it to ``path`` only if it differs from the 
existing file.
+
+    On a missing file: mints a new ``urn:uuid:`` serial and writes at version 
1.
+    On an existing file: reuses ``serialNumber``, builds a candidate at the
+    existing version, and compares via ``equivalent``. If equivalent, returns
+    ``(False, version)`` without touching the file. Otherwise bumps the version
+    by one, updates the candidate's ``version`` attribute, writes, and returns
+    ``(True, version + 1)``. Parsing errors on an existing file propagate.
+    """
+    if path.is_file():
+        compare_parser = etree.XMLParser(remove_blank_text=True, 
strip_cdata=False)
+        existing_root = etree.parse(str(path), compare_parser).getroot()
+        serial = existing_root.get("serialNumber")
+        version_str = existing_root.get("version")
+        if serial is None or version_str is None:
+            raise ValueError(f"{path}: missing serialNumber or version on root 
element")
+        version = int(version_str)
+        candidate = build_fn(serial, version)
+        if equivalent(candidate, existing_root):
+            return False, version
+        version += 1
+        candidate.set("version", str(version))
+    else:
+        serial = f"urn:uuid:{uuid.uuid4()}"
+        version = 1
+        candidate = build_fn(serial, version)
+
+    path.parent.mkdir(parents=True, exist_ok=True)
+    path.write_bytes(serialize_fn(candidate))
+    return True, version
diff --git a/scripts/vdr_split.py b/scripts/vdr_split.py
index eade77f0..7552fabf 100755
--- a/scripts/vdr_split.py
+++ b/scripts/vdr_split.py
@@ -19,84 +19,32 @@
 # limitations under the License.
 """Split the monolithic ``vdr.xml`` into per-(CVE, component) files.
 
-Reads ``src/site/static/cyclonedx/vdr.xml`` (CycloneDX 1.6) and writes
-``src/vulnerabilities/<CVE-id>/<component>.cdx.xml`` (CycloneDX 1.7).
+Reads ``src/site/static/cyclonedx/vdr.xml`` and writes 
``src/vulnerabilities/<CVE-id>/<component>.cdx.xml``.
 
-One output file is produced per affected component, except that
-``log4cxx-conan`` never gets its own file; its vulnerabilities are always
-one-to-one with ``log4cxx`` and are reflected in the log4cxx file via a
-``<components>`` entry plus a ``<dependencies>`` edge.
+One output file is produced per affected component.
+
+The special ``log4cxx-conan`` component never gets its own file; its 
vulnerabilities are always one-to-one with
+ ``log4cxx`` and are reflected in the log4cxx file via a ``<components>`` 
entry plus a ``<dependencies>`` edge.
 """
 
 from __future__ import annotations
 
-import re
 import sys
-import uuid
-from pathlib import Path
 
 from lxml import etree
 
-ROOT = Path(__file__).resolve().parent.parent
-SRC = ROOT / "src" / "site" / "static" / "cyclonedx" / "vdr.xml"
-OUT_DIR = ROOT / "src" / "vulnerabilities"
-
-NS_OLD = "http://cyclonedx.org/schema/bom/1.6";
-NS_NEW = "http://cyclonedx.org/schema/bom/1.7";
-NS_XSI = "http://www.w3.org/2001/XMLSchema-instance";
-SCHEMA_LOCATION = f"{NS_NEW} https://cyclonedx.org/schema/bom-1.7.xsd";
-
-LICENSE_HEADER = """<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to you under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~      http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->"""
-
-
-def qn(tag: str, ns: str = NS_NEW) -> str:
-    """Return the Clark-notation qualified name ``{ns}tag`` for lxml 
lookups."""
-    return f"{{{ns}}}{tag}"
-
-
-def copy_into_new_ns(source_elem: etree._Element) -> etree._Element:
-    """Deep-copy ``source_elem`` from NS_OLD into NS_NEW, preserving CDATA.
-
-    Serializes, strips the inherited default xmlns, and re-parses under a
-    wrapper declaring NS_NEW as default so the returned element carries no
-    redundant xmlns attribute and can be appended cleanly to the new BOM.
-    """
-    inner = etree.tostring(source_elem, encoding="unicode")
-    inner = inner.replace(f' xmlns="{NS_OLD}"', "")
-    wrapped = f'<wrap xmlns="{NS_NEW}">{inner}</wrap>'
-    parser = etree.XMLParser(strip_cdata=False)
-    wrap = etree.fromstring(wrapped.encode("utf-8"), parser)
-    return wrap[0]
-
-
-def read_existing_serial(path: Path) -> str | None:
-    """Return the ``serialNumber`` of the BOM at ``path`` if it parses, else 
None.
-
-    Used to keep ``urn:uuid:`` identifiers stable across re-runs so the script
-    is idempotent and re-generation produces no spurious diffs.
-    """
-    if not path.is_file():
-        return None
-    try:
-        existing = etree.parse(str(path)).getroot()
-    except etree.XMLSyntaxError:
-        return None
-    return existing.get("serialNumber")
+from vdr_common import (
+    NS,
+    NS_XSI,
+    OUT_DIR,
+    ROOT,
+    SCHEMA_LOCATION,
+    SRC_VDR,
+    clone_into_namespace,
+    qn,
+    serialize,
+    write_bom_if_changed,
+)
 
 
 def build_bom(
@@ -104,6 +52,7 @@ def build_bom(
     vuln_elem: etree._Element,
     timestamp: str,
     serial_number: str,
+    version: int,
     extra_components: list[etree._Element] | None = None,
     dependencies: list[tuple[str, list[str]]] | None = None,
 ) -> etree._Element:
@@ -113,14 +62,14 @@ def build_bom(
     and ``dependencies`` (optional) populate the top-level ``<components>`` and
     ``<dependencies>`` sections used by log4cxx files to link log4cxx-conan.
     """
-    bom = etree.Element(qn("bom"), nsmap={None: NS_NEW, "xsi": NS_XSI})
+    bom = etree.Element(qn("bom"), nsmap={None: NS, "xsi": NS_XSI})
     bom.set(f"{{{NS_XSI}}}schemaLocation", SCHEMA_LOCATION)
     bom.set("serialNumber", serial_number)
-    bom.set("version", "1")
+    bom.set("version", str(version))
 
     metadata = etree.SubElement(bom, qn("metadata"))
     etree.SubElement(metadata, qn("timestamp")).text = timestamp
-    metadata.append(copy_into_new_ns(subject_component))
+    metadata.append(clone_into_namespace(subject_component, NS))
     manufacturer = etree.SubElement(metadata, qn("manufacturer"))
     etree.SubElement(manufacturer, qn("name")).text = "Apache Logging Services"
     etree.SubElement(manufacturer, qn("url")).text = 
"https://logging.apache.org";
@@ -133,7 +82,7 @@ def build_bom(
     if extra_components:
         components_elem = etree.SubElement(bom, qn("components"))
         for c in extra_components:
-            components_elem.append(copy_into_new_ns(c))
+            components_elem.append(clone_into_namespace(c, NS))
 
     if dependencies:
         deps_elem = etree.SubElement(bom, qn("dependencies"))
@@ -143,41 +92,11 @@ def build_bom(
                 etree.SubElement(d, qn("dependency"), ref=s)
 
     vulns_elem = etree.SubElement(bom, qn("vulnerabilities"))
-    vulns_elem.append(copy_into_new_ns(vuln_elem))
+    vulns_elem.append(clone_into_namespace(vuln_elem, NS))
 
     return bom
 
 
-def fold_bom_attributes(body: bytes) -> bytes:
-    """Hack: fold the ``<bom ...>`` start tag so each attribute past the
-    first sits on its own line, indented to align under the first. lxml's
-    serializer offers no per-attribute wrap option, so we post-process it.
-    """
-    match = re.match(rb'<bom ([^>]*?)(/?>)', body)
-    if not match:
-        return body
-    attrs = re.findall(rb'[\w:-]+="[^"]*"', match.group(1))
-    if len(attrs) <= 1:
-        return body
-    indent = b"\n" + b" " * len(b"<bom ")
-    return b"<bom " + indent.join(attrs) + match.group(2) + body[match.end():]
-
-
-def serialize(bom: etree._Element) -> bytes:
-    """Serialize ``bom`` as a pretty-printed UTF-8 file with the ASF header.
-
-    Drops unused namespace declarations inherited from the source tree,
-    applies 2-space indentation, and prepends the XML declaration and
-    Apache License comment block.
-    """
-    etree.cleanup_namespaces(bom, top_nsmap={None: NS_NEW})
-    etree.indent(bom, space="  ")
-    body = fold_bom_attributes(etree.tostring(bom, xml_declaration=False, 
encoding="UTF-8"))
-    decl = b'<?xml version="1.0" encoding="UTF-8"?>\n'
-    header = LICENSE_HEADER.encode("utf-8") + b"\n"
-    return decl + header + body + b"\n"
-
-
 def main() -> int:
     """Parse the source VDR and write one output file per (CVE, component) 
pair.
 
@@ -186,30 +105,33 @@ def main() -> int:
     log4cxx output file gains the conan component plus a dependency edge.
     """
     parser = etree.XMLParser(remove_blank_text=True, strip_cdata=False)
-    tree = etree.parse(str(SRC), parser)
+    tree = etree.parse(str(SRC_VDR), parser)
     root = tree.getroot()
 
-    # Parses the components
-    components_root = root.find(qn("components", NS_OLD))
+    # The source vdr.xml may be at any CycloneDX namespace (1.6 or 1.7); query
+    # children using the actual namespace of the root, not the output NS.
+    src_ns = etree.QName(root).namespace
+
+    components_root = root.find(qn("components", src_ns))
     components_by_ref = {
         c.get("bom-ref"): c
-        for c in components_root.findall(qn("component", NS_OLD))
+        for c in components_root.findall(qn("component", src_ns))
     }
 
-    # Parses the vulnerabilities and write one file per (CVE, component) pair.
-    vulns_root = root.find(qn("vulnerabilities", NS_OLD))
-    count = 0
-    for vuln in vulns_root.findall(qn("vulnerability", NS_OLD)):
-        cve_id = vuln.findtext(qn("id", NS_OLD))
+    vulns_root = root.find(qn("vulnerabilities", src_ns))
+    wrote = 0
+    unchanged = 0
+    for vuln in vulns_root.findall(qn("vulnerability", src_ns)):
+        cve_id = vuln.findtext(qn("id", src_ns))
         target_refs = [
-            t.findtext(qn("ref", NS_OLD))
-            for t in vuln.findall(f".//{qn('target', NS_OLD)}")
+            t.findtext(qn("ref", src_ns))
+            for t in vuln.findall(f".//{qn('target', src_ns)}")
         ]
         subject_refs = [r for r in target_refs if r != "log4cxx-conan"]
         if not subject_refs:
             print(f"warning: {cve_id} has no non-conan subject; skipping", 
file=sys.stderr)
             continue
-        updated = vuln.findtext(qn("updated", NS_OLD))
+        updated = vuln.findtext(qn("updated", src_ns))
         for subject in subject_refs:
             extras: list[etree._Element] = []
             deps: list[tuple[str, list[str]]] = []
@@ -218,20 +140,27 @@ def main() -> int:
                 extras = [components_by_ref["log4cxx-conan"]]
                 deps = [("log4cxx-conan", ["log4cxx"])]
             out_path = OUT_DIR / cve_id / f"{subject}.cdx.xml"
-            serial_number = read_existing_serial(out_path) or 
f"urn:uuid:{uuid.uuid4()}"
-            bom = build_bom(
-                subject_component=components_by_ref[subject],
-                vuln_elem=vuln,
-                timestamp=updated,
-                serial_number=serial_number,
-                extra_components=extras,
-                dependencies=deps,
-            )
-            out_path.parent.mkdir(parents=True, exist_ok=True)
-            out_path.write_bytes(serialize(bom))
-            print(f"wrote {out_path.relative_to(ROOT)}")
-            count += 1
-    print(f"generated {count} files")
+
+            def build_fn(serial: str, version: int) -> etree._Element:
+                return build_bom(
+                    subject_component=components_by_ref[subject],
+                    vuln_elem=vuln,
+                    timestamp=updated,
+                    serial_number=serial,
+                    version=version,
+                    extra_components=extras,
+                    dependencies=deps,
+                )
+
+            did_write, final_version = write_bom_if_changed(out_path, 
build_fn, serialize)
+            rel = out_path.relative_to(ROOT)
+            if did_write:
+                print(f"wrote {rel} (v{final_version})")
+                wrote += 1
+            else:
+                print(f"unchanged {rel} (v{final_version})")
+                unchanged += 1
+    print(f"summary: wrote {wrote}, unchanged {unchanged}")
     return 0
 
 

Reply via email to