Hi Robin, I've tested your patch and this is all good for me. The errors are better handled.
The most common one is when the socket telemetry is closed and you get: 2024-04-19 15:22:00 ERROR 192.168.122.116 GET /metrics HTTP/1.1: telemetry socket not available Traceback (most recent call last): File "/usr/bin/dpdk-telemetry-exporter.py", line 312, in do_GET with TelemetrySocket(self.server.dpdk_socket_path) as sock: File "/usr/bin/dpdk-telemetry-exporter.py", line 165, in __init__ self.sock.connect(path) FileNotFoundError: [Errno 2] No such file or directory You get the Traceback of Python which is a bit useless for the user but at least you have at the first line the root cause: "telemetry socket not available" which is IMO the most important. Thanks for you patch ! Tested-by: Anthony Harivel <ahari...@redhat.com> Regards, Anthony Robin Jarry, Apr 16, 2024 at 15:46: > For now the telemetry socket is local to the machine running a DPDK > application. Also, there is no official "schema" for the exposed > metrics. Add a framework and a script to collect and expose these > metrics to telemetry and observability agree gators such as Prometheus, > Carbon or Influxdb. The exposed data must be done with end-users in > mind, some DPDK terminology or internals may not make sense to everyone. > > The script only serves as an entry point and does not know anything > about any specific metrics nor JSON data structures exposed in the > telemetry socket. > > It uses dynamically loaded endpoint exporters which are basic python > files that must implement two functions: > > def info() -> dict[MetricName, MetricInfo]: > Mapping of metric names to their description and type. > > def metrics(sock: TelemetrySocket) -> list[MetricValue]: > Request data from sock and return it as metric values. A metric > value is a 3-tuple: (name: str, value: any, labels: dict). Each > name must be present in info(). > > The sock argument passed to metrics() has a single method: > > def cmd(self, uri: str, arg: any = None) -> dict | list: > Request JSON data to the telemetry socket and parse it to python > values. > > The main script invokes endpoints and exports the data into an output > format. For now, only two formats are implemented: > > * openmetrics/prometheus: text based format exported via a local HTTP > server. > * carbon/graphite: binary (python pickle) format exported to a distant > carbon TCP server. > > As a starting point, 3 built-in endpoints are implemented: > > * counters: ethdev hardware counters > * cpu: lcore usage > * memory: overall memory usage > > The goal is to keep all built-in endpoints in the DPDK repository so > that they can be updated along with the telemetry JSON data structures. > > Example output for the openmetrics:// format: > > ~# dpdk-telemetry-exporter.py -o openmetrics://:9876 & > INFO using endpoint: counters (from .../telemetry-endpoints/counters.py) > INFO using endpoint: cpu (from .../telemetry-endpoints/cpu.py) > INFO using endpoint: memory (from .../telemetry-endpoints/memory.py) > INFO listening on port 9876 > [1] 838829 > > ~$ curl http://127.0.0.1:9876/ > # HELP dpdk_cpu_total_cycles Total number of CPU cycles. > # TYPE dpdk_cpu_total_cycles counter > # HELP dpdk_cpu_busy_cycles Number of busy CPU cycles. > # TYPE dpdk_cpu_busy_cycles counter > dpdk_cpu_total_cycles{cpu="73", numa="0"} 4353385274702980 > dpdk_cpu_busy_cycles{cpu="73", numa="0"} 6215932860 > dpdk_cpu_total_cycles{cpu="9", numa="0"} 4353385274745740 > dpdk_cpu_busy_cycles{cpu="9", numa="0"} 6215932860 > dpdk_cpu_total_cycles{cpu="8", numa="0"} 4353383451895540 > dpdk_cpu_busy_cycles{cpu="8", numa="0"} 6171923160 > dpdk_cpu_total_cycles{cpu="72", numa="0"} 4353385274817320 > dpdk_cpu_busy_cycles{cpu="72", numa="0"} 6215932860 > # HELP dpdk_memory_total_bytes The total size of reserved memory in bytes. > # TYPE dpdk_memory_total_bytes gauge > # HELP dpdk_memory_used_bytes The currently used memory in bytes. > # TYPE dpdk_memory_used_bytes gauge > dpdk_memory_total_bytes 1073741824 > dpdk_memory_used_bytes 794197376 > > Link: > https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format > Link: > https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#text-format > Link: > https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-pickle-protocol > Link: > https://github.com/influxdata/telegraf/tree/master/plugins/inputs/prometheus > Signed-off-by: Robin Jarry <rja...@redhat.com> > --- > > Notes: > v2: > > * Refuse to run if no endpoints are enabled. > * Handle endpoint errors gracefully without failing the whole query. > > usertools/dpdk-telemetry-exporter.py | 405 ++++++++++++++++++++++ > usertools/meson.build | 6 + > usertools/telemetry-endpoints/counters.py | 47 +++ > usertools/telemetry-endpoints/cpu.py | 29 ++ > usertools/telemetry-endpoints/memory.py | 37 ++ > 5 files changed, 524 insertions(+) > create mode 100755 usertools/dpdk-telemetry-exporter.py > create mode 100644 usertools/telemetry-endpoints/counters.py > create mode 100644 usertools/telemetry-endpoints/cpu.py > create mode 100644 usertools/telemetry-endpoints/memory.py > > diff --git a/usertools/dpdk-telemetry-exporter.py > b/usertools/dpdk-telemetry-exporter.py > new file mode 100755 > index 000000000000..f8d873ad856c > --- /dev/null > +++ b/usertools/dpdk-telemetry-exporter.py > @@ -0,0 +1,405 @@ > +#!/usr/bin/env python3 > +# SPDX-License-Identifier: BSD-3-Clause > +# Copyright (c) 2023 Robin Jarry > + > +r''' > +DPDK telemetry exporter. > + > +It uses dynamically loaded endpoint exporters which are basic python files > that > +must implement two functions: > + > + def info() -> dict[MetricName, MetricInfo]: > + """ > + Mapping of metric names to their description and type. > + """ > + > + def metrics(sock: TelemetrySocket) -> list[MetricValue]: > + """ > + Request data from sock and return it as metric values. A metric value > + is a 3-tuple: (name: str, value: any, labels: dict). Each name must > be > + present in info(). > + """ > + > +The sock argument passed to metrics() has a single method: > + > + def cmd(self, uri, arg=None) -> dict | list: > + """ > + Request JSON data to the telemetry socket and parse it to python > + values. > + """ > + > +See existing endpoints for examples. > + > +The exporter supports multiple output formats: > + > +prometheus://ADDRESS:PORT > +openmetrics://ADDRESS:PORT > + Expose the enabled endpoints via a local HTTP server listening on the > + specified address and port. GET requests on that server are served with > + text/plain responses in the prometheus/openmetrics format. > + > + More details: > + > https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format > + > +carbon://ADDRESS:PORT > +graphite://ADDRESS:PORT > + Export all enabled endpoints to the specified TCP ADDRESS:PORT in the > pickle > + carbon format. > + > + More details: > + > https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-pickle-protocol > +''' > + > +import argparse > +import importlib.util > +import json > +import logging > +import os > +import pickle > +import re > +import socket > +import struct > +import sys > +import time > +import typing > +from http import HTTPStatus, server > +from urllib.parse import urlparse > + > +LOG = logging.getLogger(__name__) > +# Use local endpoints path only when running from source > +LOCAL = os.path.join(os.path.dirname(__file__), "telemetry-endpoints") > +DEFAULT_LOAD_PATHS = [] > +if os.path.isdir(LOCAL): > + DEFAULT_LOAD_PATHS.append(LOCAL) > +DEFAULT_LOAD_PATHS += [ > + "/usr/local/share/dpdk/telemetry-endpoints", > + "/usr/share/dpdk/telemetry-endpoints", > +] > +DEFAULT_OUTPUT = "openmetrics://:9876" > + > + > +def main(): > + logging.basicConfig( > + stream=sys.stdout, > + level=logging.INFO, > + format="%(asctime)s %(levelname)s %(message)s", > + datefmt="%Y-%m-%d %H:%M:%S", > + ) > + parser = argparse.ArgumentParser( > + description=__doc__, > + formatter_class=argparse.RawDescriptionHelpFormatter, > + ) > + parser.add_argument( > + "-o", > + "--output", > + metavar="FORMAT://PARAMETERS", > + default=urlparse(DEFAULT_OUTPUT), > + type=urlparse, > + help=f""" > + Output format (default: "{DEFAULT_OUTPUT}"). Depending on the format, > + URL elements have different meanings. By default, the exporter > starts a > + local HTTP server on port 9876 that serves requests in the > + prometheus/openmetrics plain text format. > + """, > + ) > + parser.add_argument( > + "-p", > + "--load-path", > + dest="load_paths", > + type=lambda v: v.split(os.pathsep), > + default=DEFAULT_LOAD_PATHS, > + help=f""" > + The list of paths from which to disvover endpoints. > + (default: "{os.pathsep.join(DEFAULT_LOAD_PATHS)}"). > + """, > + ) > + parser.add_argument( > + "-e", > + "--endpoint", > + dest="endpoints", > + metavar="ENDPOINT", > + action="append", > + help=""" > + Telemetry endpoint to export (by default, all discovered endpoints > are > + enabled). This option can be specified more than once. > + """, > + ) > + parser.add_argument( > + "-l", > + "--list", > + action="store_true", > + help=""" > + Only list detected endpoints and exit. > + """, > + ) > + parser.add_argument( > + "-s", > + "--socket-path", > + default="/run/dpdk/rte/dpdk_telemetry.v2", > + help=""" > + The DPDK telemetry socket path (default: "%(default)s"). > + """, > + ) > + args = parser.parse_args() > + output = OUTPUT_FORMATS.get(args.output.scheme) > + if output is None: > + parser.error(f"unsupported output format: {args.output.scheme}://") > + > + try: > + endpoints = load_endpoints(args.load_paths, args.endpoints) > + if args.list: > + return > + except Exception as e: > + parser.error(str(e)) > + > + output(args, endpoints) > + > + > +class TelemetrySocket: > + """ > + Abstraction of the DPDK telemetry socket. > + """ > + > + def __init__(self, path: str): > + self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET) > + self.sock.connect(path) > + data = json.loads(self.sock.recv(1024).decode()) > + self.max_output_len = data["max_output_len"] > + > + def cmd( > + self, uri: str, arg: typing.Any = None > + ) -> typing.Optional[typing.Union[dict, list]]: > + """ > + Request JSON data to the telemetry socket and parse it to python > + values. > + """ > + if arg is not None: > + u = f"{uri},{arg}" > + else: > + u = uri > + self.sock.send(u.encode("utf-8")) > + data = self.sock.recv(self.max_output_len) > + return json.loads(data.decode("utf-8"))[uri] > + > + def __enter__(self): > + return self > + > + def __exit__(self, *args, **kwargs): > + self.sock.close() > + > + > +MetricDescription = str > +MetricType = str > +MetricName = str > +MetricLabels = typing.Dict[str, typing.Any] > +MetricInfo = typing.Tuple[MetricDescription, MetricType] > +MetricValue = typing.Tuple[MetricName, typing.Any, MetricLabels] > + > + > +class TelemetryEndpoint: > + """ > + Placeholder class only used for typing annotations. > + """ > + > + @staticmethod > + def info() -> typing.Dict[MetricName, MetricInfo]: > + """ > + Mapping of metric names to their description and type. > + """ > + raise NotImplementedError() > + > + @staticmethod > + def metrics(sock: TelemetrySocket) -> typing.List[MetricValue]: > + """ > + Request data from sock and return it as metric values. Each metric > + name must be present in info(). > + """ > + raise NotImplementedError() > + > + > +def load_endpoints( > + paths: typing.List[str], names: typing.List[str] > +) -> typing.List[TelemetryEndpoint]: > + """ > + Load selected telemetry endpoints from the specified paths. > + """ > + > + endpoints = {} > + dwb = sys.dont_write_bytecode > + sys.dont_write_bytecode = True # never generate .pyc files for endpoints > + > + for p in paths: > + if not os.path.isdir(p): > + continue > + for fname in os.listdir(p): > + f = os.path.join(p, fname) > + if os.path.isdir(f): > + continue > + try: > + name, _ = os.path.splitext(fname) > + if names is not None and name not in names: > + # not selected by user > + continue > + if name in endpoints: > + # endpoint with same name already loaded > + continue > + spec = importlib.util.spec_from_file_location(name, f) > + module = importlib.util.module_from_spec(spec) > + spec.loader.exec_module(module) > + endpoints[name] = module > + except Exception: > + LOG.exception("parsing endpoint: %s", f) > + > + if not endpoints: > + raise Exception("no telemetry endpoints detected/selected") > + > + sys.dont_write_bytecode = dwb > + > + modules = [] > + info = {} > + for name, module in sorted(endpoints.items()): > + LOG.info("using endpoint: %s (from %s)", name, module.__file__) > + try: > + for metric, (description, type_) in module.info().items(): > + info[(name, metric)] = (description, type_) > + modules.append(module) > + except Exception: > + LOG.exception("getting endpoint info: %s", name) > + return modules > + > + > +def serve_openmetrics( > + args: argparse.Namespace, endpoints: typing.List[TelemetryEndpoint] > +): > + """ > + Start an HTTP server and serve requests in the openmetrics/prometheus > + format. > + """ > + listen = (args.output.hostname or "", int(args.output.port or 80)) > + with server.HTTPServer(listen, OpenmetricsHandler) as httpd: > + httpd.dpdk_socket_path = args.socket_path > + httpd.telemetry_endpoints = endpoints > + LOG.info("listening on port %s", httpd.server_port) > + try: > + httpd.serve_forever() > + except KeyboardInterrupt: > + LOG.info("shutting down") > + > + > +class OpenmetricsHandler(server.BaseHTTPRequestHandler): > + """ > + Basic HTTP handler that returns prometheus/openmetrics formatted > responses. > + """ > + > + CONTENT_TYPE = "text/plain; version=0.0.4; charset=utf-8" > + > + def escape(self, value: typing.Any) -> str: > + """ > + Escape a metric label value. > + """ > + value = str(value) > + value = value.replace('"', '\\"') > + value = value.replace("\\", "\\\\") > + return value.replace("\n", "\\n") > + > + def do_GET(self): > + """ > + Called uppon GET requests. > + """ > + try: > + lines = [] > + metrics_names = set() > + with TelemetrySocket(self.server.dpdk_socket_path) as sock: > + for e in self.server.telemetry_endpoints: > + info = e.info() > + metrics_lines = [] > + try: > + metrics = e.metrics(sock) > + except Exception: > + LOG.exception("%s: metrics collection failed", > e.__name__) > + continue > + for name, value, labels in metrics: > + fullname = re.sub(r"\W", "_", > f"dpdk_{e.__name__}_{name}") > + labels = ", ".join( > + f'{k}="{self.escape(v)}"' for k, v in > labels.items() > + ) > + if labels: > + labels = f"{{{labels}}}" > + metrics_lines.append(f"{fullname}{labels} {value}") > + if fullname not in metrics_names: > + metrics_names.add(fullname) > + desc, metric_type = info[name] > + lines += [ > + f"# HELP {fullname} {desc}", > + f"# TYPE {fullname} {metric_type}", > + ] > + lines += metrics_lines > + if not lines: > + self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR) > + LOG.error( > + "%s %s: no metrics collected", > + self.address_string(), > + self.requestline, > + ) > + body = "\n".join(lines).encode("utf-8") + b"\n" > + self.send_response(HTTPStatus.OK) > + self.send_header("Content-Type", self.CONTENT_TYPE) > + self.send_header("Content-Length", str(len(body))) > + self.end_headers() > + self.wfile.write(body) > + LOG.info("%s %s", self.address_string(), self.requestline) > + > + except (FileNotFoundError, ConnectionRefusedError): > + self.send_error(HTTPStatus.SERVICE_UNAVAILABLE) > + LOG.exception( > + "%s %s: telemetry socket not available", > + self.address_string(), > + self.requestline, > + ) > + except Exception: > + self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR) > + LOG.exception("%s %s", self.address_string(), self.requestline) > + > + def log_message(self, fmt, *args): > + pass # disable built-in logger > + > + > +def export_carbon(args: argparse.Namespace, endpoints: > typing.List[TelemetryEndpoint]): > + """ > + Collect all metrics and export them to a carbon server in the pickle > format. > + """ > + addr = (args.output.hostname or "", int(args.output.port or 80)) > + with TelemetrySocket(args.socket_path) as dpdk: > + with socket.socket() as carbon: > + carbon.connect(addr) > + all_metrics = [] > + for e in endpoints: > + try: > + metrics = e.metrics(dpdk) > + except Exception: > + LOG.exception("%s: metrics collection failed", > e.__name__) > + continue > + for name, value, labels in metrics: > + fullname = re.sub(r"\W", ".", > f"dpdk.{e.__name__}.{name}") > + for key, val in labels.items(): > + val = str(val).replace(";", "") > + fullname += f";{key}={val}" > + all_metrics.append((fullname, (time.time(), value))) > + if not all_metrics: > + raise Exception("no metrics collected") > + payload = pickle.dumps(all_metrics, protocol=2) > + header = struct.pack("!L", len(payload)) > + buf = header + payload > + carbon.sendall(buf) > + > + > +OUTPUT_FORMATS = { > + "openmetrics": serve_openmetrics, > + "prometheus": serve_openmetrics, > + "carbon": export_carbon, > + "graphite": export_carbon, > +} > + > + > +if __name__ == "__main__": > + main() > diff --git a/usertools/meson.build b/usertools/meson.build > index 740b4832f36d..eb48e2f4403f 100644 > --- a/usertools/meson.build > +++ b/usertools/meson.build > @@ -11,5 +11,11 @@ install_data([ > 'dpdk-telemetry.py', > 'dpdk-hugepages.py', > 'dpdk-rss-flows.py', > + 'dpdk-telemetry-exporter.py', > ], > install_dir: 'bin') > + > +install_subdir( > + 'telemetry-endpoints', > + install_dir: 'share/dpdk', > + strip_directory: false) > diff --git a/usertools/telemetry-endpoints/counters.py > b/usertools/telemetry-endpoints/counters.py > new file mode 100644 > index 000000000000..e17cffb43b2c > --- /dev/null > +++ b/usertools/telemetry-endpoints/counters.py > @@ -0,0 +1,47 @@ > +# SPDX-License-Identifier: BSD-3-Clause > +# Copyright (c) 2023 Robin Jarry > + > +RX_PACKETS = "rx_packets" > +RX_BYTES = "rx_bytes" > +RX_MISSED = "rx_missed" > +RX_NOMBUF = "rx_nombuf" > +RX_ERRORS = "rx_errors" > +TX_PACKETS = "tx_packets" > +TX_BYTES = "tx_bytes" > +TX_ERRORS = "tx_errors" > + > + > +def info() -> "dict[Name, tuple[Description, Type]]": > + return { > + RX_PACKETS: ("Number of successfully received packets.", "counter"), > + RX_BYTES: ("Number of successfully received bytes.", "counter"), > + RX_MISSED: ( > + "Number of packets dropped by the HW because Rx queues are > full.", > + "counter", > + ), > + RX_NOMBUF: ("Number of Rx mbuf allocation failures.", "counter"), > + RX_ERRORS: ("Number of erroneous received packets.", "counter"), > + TX_PACKETS: ("Number of successfully transmitted packets.", > "counter"), > + TX_BYTES: ("Number of successfully transmitted bytes.", "counter"), > + TX_ERRORS: ("Number of packet transmission failures.", "counter"), > + } > + > + > +def metrics(sock: "TelemetrySocket") -> "list[tuple[Name, Value, Labels]]": > + out = [] > + for port_id in sock.cmd("/ethdev/list"): > + port = sock.cmd("/ethdev/info", port_id) > + stats = sock.cmd("/ethdev/stats", port_id) > + labels = {"port": port["name"]} > + out += [ > + (RX_PACKETS, stats["ipackets"], labels), > + (RX_PACKETS, stats["ipackets"], labels), > + (RX_BYTES, stats["ibytes"], labels), > + (RX_MISSED, stats["imissed"], labels), > + (RX_NOMBUF, stats["rx_nombuf"], labels), > + (RX_ERRORS, stats["ierrors"], labels), > + (TX_PACKETS, stats["opackets"], labels), > + (TX_BYTES, stats["obytes"], labels), > + (TX_ERRORS, stats["oerrors"], labels), > + ] > + return out > diff --git a/usertools/telemetry-endpoints/cpu.py > b/usertools/telemetry-endpoints/cpu.py > new file mode 100644 > index 000000000000..d38d8d6e2558 > --- /dev/null > +++ b/usertools/telemetry-endpoints/cpu.py > @@ -0,0 +1,29 @@ > +# SPDX-License-Identifier: BSD-3-Clause > +# Copyright (c) 2023 Robin Jarry > + > +CPU_TOTAL = "total_cycles" > +CPU_BUSY = "busy_cycles" > + > + > +def info() -> "dict[Name, tuple[Description, Type]]": > + return { > + CPU_TOTAL: ("Total number of CPU cycles.", "counter"), > + CPU_BUSY: ("Number of busy CPU cycles.", "counter"), > + } > + > + > +def metrics(sock: "TelemetrySocket") -> "list[tuple[Name, Value, Labels]]": > + out = [] > + for lcore_id in sock.cmd("/eal/lcore/list"): > + lcore = sock.cmd("/eal/lcore/info", lcore_id) > + cpu = ",".join(str(c) for c in lcore.get("cpuset", [])) > + total = lcore.get("total_cycles") > + busy = lcore.get("busy_cycles", 0) > + if not (cpu and total): > + continue > + labels = {"cpu": cpu, "numa": lcore.get("socket", 0)} > + out += [ > + (CPU_TOTAL, total, labels), > + (CPU_BUSY, busy, labels), > + ] > + return out > diff --git a/usertools/telemetry-endpoints/memory.py > b/usertools/telemetry-endpoints/memory.py > new file mode 100644 > index 000000000000..32cce1e59382 > --- /dev/null > +++ b/usertools/telemetry-endpoints/memory.py > @@ -0,0 +1,37 @@ > +# SPDX-License-Identifier: BSD-3-Clause > +# Copyright (c) 2023 Robin Jarry > + > +MEM_TOTAL = "total_bytes" > +MEM_USED = "used_bytes" > + > + > +def info() -> "dict[Name, tuple[Description, Type]]": > + return { > + MEM_TOTAL: ("The total size of reserved memory in bytes.", "gauge"), > + MEM_USED: ("The currently used memory in bytes.", "gauge"), > + } > + > + > +def metrics(sock: "TelemetrySocket") -> "list[tuple[Name, Value, Labels]]": > + zones = {} > + used = 0 > + for zone in sock.cmd("/eal/memzone_list") or []: > + z = sock.cmd("/eal/memzone_info", zone) > + start = int(z["Hugepage_base"], 16) > + end = start + (z["Hugepage_size"] * z["Hugepage_used"]) > + used += z["Length"] > + for s, e in list(zones.items()): > + if s < start < e < end: > + zones[s] = end > + break > + if start < s < end < e: > + del zones[s] > + zones[start] = e > + break > + else: > + zones[start] = end > + > + return [ > + (MEM_TOTAL, sum(end - start for (start, end) in zones.items()), {}), > + (MEM_USED, max(0, used), {}), > + ] > -- > 2.44.0