Add a new TestPmdPort data structure to represent the output
returned by `show port info`, which is implemented as part of
TestPmdShell.

The TestPmdPort data structure and its derived classes are modelled
based on the relevant testpmd source code.

This implementation makes extensive use of regular expressions, which
all parse individually. The rationale behind this is to lower the risk
of the testpmd output changing as part of development. Therefore
minimising breakage.

Bugzilla ID: 1407

Signed-off-by: Luca Vizzarro <luca.vizza...@arm.com>
Reviewed-by: Paul Szczepanek <paul.szczepa...@arm.com>
---
 dts/framework/remote_session/testpmd_shell.py | 473 +++++++++++++++++-
 1 file changed, 472 insertions(+), 1 deletion(-)

diff --git a/dts/framework/remote_session/testpmd_shell.py 
b/dts/framework/remote_session/testpmd_shell.py
index cb2ab6bd00..3cf123ff57 100644
--- a/dts/framework/remote_session/testpmd_shell.py
+++ b/dts/framework/remote_session/testpmd_shell.py
@@ -1,6 +1,7 @@
 # SPDX-License-Identifier: BSD-3-Clause
 # Copyright(c) 2023 University of New Hampshire
 # Copyright(c) 2023 PANTHEON.tech s.r.o.
+# Copyright(c) 2024 Arm Limited
 
 """Testpmd interactive shell.
 
@@ -15,14 +16,19 @@
     testpmd_shell.close()
 """
 
+from dataclasses import dataclass, field
+import re
 import time
-from enum import auto
+from enum import Flag, auto
 from pathlib import PurePath
 from typing import Callable, ClassVar
+from typing_extensions import Self
 
 from framework.exception import InteractiveCommandExecutionError
 from framework.settings import SETTINGS
 from framework.utils import StrEnum
+from framework import parser
+from framework.parser import TextParser
 
 from .interactive_shell import InteractiveShell
 
@@ -80,6 +86,439 @@ class TestPmdForwardingModes(StrEnum):
     recycle_mbufs = auto()
 
 
+class VLANOffloadFlag(Flag):
+    #:
+    STRIP = auto()
+    #:
+    FILTER = auto()
+    #:
+    EXTEND = auto()
+    #:
+    QINQ_STRIP = auto()
+
+    @classmethod
+    def from_str_dict(cls, d):
+        """Makes an instance from a dictionary containing the flag member 
names with an "on" value."""
+        flag = cls(0)
+        for name in cls.__members__:
+            if d.get(name) == "on":
+                flag |= cls[name]
+        return flag
+
+
+class RSSOffloadTypesFlag(Flag):
+    #:
+    ipv4 = auto()
+    #:
+    ipv4_frag = auto()
+    #:
+    ipv4_tcp = auto()
+    #:
+    ipv4_udp = auto()
+    #:
+    ipv4_sctp = auto()
+    #:
+    ipv4_other = auto()
+    #:
+    ipv6 = auto()
+    #:
+    ipv6_frag = auto()
+    #:
+    ipv6_tcp = auto()
+    #:
+    ipv6_udp = auto()
+    #:
+    ipv6_sctp = auto()
+    #:
+    ipv6_other = auto()
+    #:
+    l2_payload = auto()
+    #:
+    ipv6_ex = auto()
+    #:
+    ipv6_tcp_ex = auto()
+    #:
+    ipv6_udp_ex = auto()
+    #:
+    port = auto()
+    #:
+    vxlan = auto()
+    #:
+    geneve = auto()
+    #:
+    nvgre = auto()
+    #:
+    user_defined_22 = auto()
+    #:
+    gtpu = auto()
+    #:
+    eth = auto()
+    #:
+    s_vlan = auto()
+    #:
+    c_vlan = auto()
+    #:
+    esp = auto()
+    #:
+    ah = auto()
+    #:
+    l2tpv3 = auto()
+    #:
+    pfcp = auto()
+    #:
+    pppoe = auto()
+    #:
+    ecpri = auto()
+    #:
+    mpls = auto()
+    #:
+    ipv4_chksum = auto()
+    #:
+    l4_chksum = auto()
+    #:
+    l2tpv2 = auto()
+    #:
+    ipv6_flow_label = auto()
+    #:
+    user_defined_38 = auto()
+    #:
+    user_defined_39 = auto()
+    #:
+    user_defined_40 = auto()
+    #:
+    user_defined_41 = auto()
+    #:
+    user_defined_42 = auto()
+    #:
+    user_defined_43 = auto()
+    #:
+    user_defined_44 = auto()
+    #:
+    user_defined_45 = auto()
+    #:
+    user_defined_46 = auto()
+    #:
+    user_defined_47 = auto()
+    #:
+    user_defined_48 = auto()
+    #:
+    user_defined_49 = auto()
+    #:
+    user_defined_50 = auto()
+    #:
+    user_defined_51 = auto()
+    #:
+    l3_pre96 = auto()
+    #:
+    l3_pre64 = auto()
+    #:
+    l3_pre56 = auto()
+    #:
+    l3_pre48 = auto()
+    #:
+    l3_pre40 = auto()
+    #:
+    l3_pre32 = auto()
+    #:
+    l2_dst_only = auto()
+    #:
+    l2_src_only = auto()
+    #:
+    l4_dst_only = auto()
+    #:
+    l4_src_only = auto()
+    #:
+    l3_dst_only = auto()
+    #:
+    l3_src_only = auto()
+
+    #:
+    ip = ipv4 | ipv4_frag | ipv4_other | ipv6 | ipv6_frag | ipv6_other | 
ipv6_ex
+    #:
+    udp = ipv4_udp | ipv6_udp | ipv6_udp_ex
+    #:
+    tcp = ipv4_tcp | ipv6_tcp | ipv6_tcp_ex
+    #:
+    sctp = ipv4_sctp | ipv6_sctp
+    #:
+    tunnel = vxlan | geneve | nvgre
+    #:
+    vlan = s_vlan | c_vlan
+    #:
+    all = (
+        eth
+        | vlan
+        | ip
+        | tcp
+        | udp
+        | sctp
+        | l2_payload
+        | l2tpv3
+        | esp
+        | ah
+        | pfcp
+        | gtpu
+        | ecpri
+        | mpls
+        | l2tpv2
+    )
+
+    @classmethod
+    def from_list_string(cls, names: str) -> Self:
+        flag = cls(0)
+        for name in names.split():
+            flag |= cls.from_str(name)
+        return flag
+
+    @classmethod
+    def from_str(cls, name: str) -> Self:
+        member_name = name.strip().replace("-", "_")
+        return cls[member_name]
+
+    def __str__(self):
+        return self.name.replace("_", "-")
+
+
+class DeviceCapabilitiesFlag(Flag):
+    RUNTIME_RX_QUEUE_SETUP = auto()
+    """Device supports Rx queue setup after device started."""
+    RUNTIME_TX_QUEUE_SETUP = auto()
+    """Device supports Tx queue setup after device started."""
+    RXQ_SHARE = auto()
+    """Device supports shared Rx queue among ports within Rx domain and switch 
domain."""
+    FLOW_RULE_KEEP = auto()
+    """Device supports keeping flow rules across restart."""
+    FLOW_SHARED_OBJECT_KEEP = auto()
+    """Device supports keeping shared flow objects across restart."""
+
+
+class DeviceErrorHandlingMode(StrEnum):
+    #:
+    none = auto()
+    #:
+    passive = auto()
+    #:
+    proactive = auto()
+    #:
+    unknown = auto()
+
+
+def _validate_device_private_info(info: str) -> str | None:
+    """Ensure that we are not parsing invalid device private info output."""
+    info = info.strip()
+    if info == "none" or info.startswith("Invalid file") or 
info.startswith("Failed to dump"):
+        return None
+    return info
+
+
+@dataclass
+class TestPmdPort(TextParser):
+    #:
+    id: int = field(metadata=parser.to_int(parser.regex(r"Infos for port 
(\d+)\b")))
+    #:
+    device_name: str = field(metadata=parser.regex(r"Device name: ([^\r\n]+)"))
+    #:
+    driver_name: str = field(metadata=parser.regex(r"Driver name: ([^\r\n]+)"))
+    #:
+    socket_id: int = field(metadata=parser.to_int(parser.regex(r"Connect to 
socket: (\d+)")))
+    #:
+    is_link_up: bool = field(metadata=parser.eq("up", parser.regex(r"Link 
status: (up|down)")))
+    #:
+    link_speed: str = field(metadata=parser.regex(r"Link speed: ([^\r\n]+)"))
+    #:
+    is_link_full_duplex: bool = field(
+        metadata=parser.eq("full", parser.regex(r"Link duplex: 
(full|half)-duplex"))
+    )
+    #:
+    is_link_autonegotiated: bool = field(
+        metadata=parser.to_bool(parser.regex(r"Autoneg status: (On|Off)"))
+    )
+    #:
+    is_promiscuous_mode_enabled: bool = field(
+        metadata=parser.to_bool(parser.regex(r"Promiscuous mode: 
(enabled|disabled)"))
+    )
+    #:
+    is_allmulticast_mode_enabled: bool = field(
+        metadata=parser.to_bool(parser.regex(r"Allmulticast mode: 
(enabled|disabled)"))
+    )
+    #: Maximum number of MAC addresses
+    max_mac_addresses_num: int = field(
+        metadata=parser.to_int(parser.regex(r"Maximum number of MAC addresses: 
(\d+)"))
+    )
+    #: Maximum configurable length of RX packet
+    max_hash_mac_addresses_num: int = field(
+        metadata=parser.to_int(
+            parser.regex(r"Maximum number of MAC addresses of hash filtering: 
(\d+)")
+        )
+    )
+    #: Minimum size of RX buffer
+    min_rx_bufsize: int = field(
+        metadata=parser.to_int(parser.regex(r"Minimum size of RX buffer: 
(\d+)"))
+    )
+    #: Maximum configurable length of RX packet
+    max_rx_packet_length: int = field(
+        metadata=parser.to_int(parser.regex(r"Maximum configurable length of 
RX packet: (\d+)"))
+    )
+    #: Maximum configurable size of LRO aggregated packet
+    max_lro_packet_size: int = field(
+        metadata=parser.to_int(
+            parser.regex(r"Maximum configurable size of LRO aggregated packet: 
(\d+)")
+        )
+    )
+
+    #: Current number of RX queues
+    rx_queues_num: int = field(
+        metadata=parser.to_int(parser.regex(r"Current number of RX queues: 
(\d+)"))
+    )
+    #: Max possible RX queues
+    max_rx_queues_num: int = field(
+        metadata=parser.to_int(parser.regex(r"Max possible RX queues: (\d+)"))
+    )
+    #: Max possible number of RXDs per queue
+    max_queue_rxd_num: int = field(
+        metadata=parser.to_int(parser.regex(r"Max possible number of RXDs per 
queue: (\d+)"))
+    )
+    #: Min possible number of RXDs per queue
+    min_queue_rxd_num: int = field(
+        metadata=parser.to_int(parser.regex(r"Min possible number of RXDs per 
queue: (\d+)"))
+    )
+    #: RXDs number alignment
+    rxd_alignment_num: int = field(
+        metadata=parser.to_int(parser.regex(r"RXDs number alignment: (\d+)"))
+    )
+
+    #: Current number of TX queues
+    tx_queues_num: int = field(
+        metadata=parser.to_int(parser.regex(r"Current number of TX queues: 
(\d+)"))
+    )
+    #: Max possible TX queues
+    max_tx_queues_num: int = field(
+        metadata=parser.to_int(parser.regex(r"Max possible TX queues: (\d+)"))
+    )
+    #: Max possible number of TXDs per queue
+    max_queue_txd_num: int = field(
+        metadata=parser.to_int(parser.regex(r"Max possible number of TXDs per 
queue: (\d+)"))
+    )
+    #: Min possible number of TXDs per queue
+    min_queue_txd_num: int = field(
+        metadata=parser.to_int(parser.regex(r"Min possible number of TXDs per 
queue: (\d+)"))
+    )
+    #: TXDs number alignment
+    txd_alignment_num: int = field(
+        metadata=parser.to_int(parser.regex(r"TXDs number alignment: (\d+)"))
+    )
+    #: Max segment number per packet
+    max_packet_segment_num: int = field(
+        metadata=parser.to_int(parser.regex(r"Max segment number per packet: 
(\d+)"))
+    )
+    #: Max segment number per MTU/TSO
+    max_mtu_segment_num: int = field(
+        metadata=parser.to_int(parser.regex(r"Max segment number per MTU\/TSO: 
(\d+)"))
+    )
+
+    #:
+    device_capabilities: DeviceCapabilitiesFlag = field(
+        metadata=parser.chain(
+            DeviceCapabilitiesFlag,
+            parser.to_int(parser.regex(r"Device capabilities: 
(0x[A-Fa-f\d]+)")),
+        )
+    )
+    #:
+    device_error_handling_mode: DeviceErrorHandlingMode = field(
+        metadata=parser.chain(
+            DeviceErrorHandlingMode, parser.regex(r"Device error handling 
mode: (\w+)")
+        )
+    )
+    #:
+    device_private_info: str | None = field(
+        default=None,
+        metadata=parser.chain(
+            _validate_device_private_info,
+            parser.regex(r"Device private info:\s+([\s\S]+)", re.MULTILINE),
+        ),
+    )
+
+    #:
+    hash_key_size: int | None = field(
+        default=None, metadata=parser.to_int(parser.regex(r"Hash key size in 
bytes: (\d+)"))
+    )
+    #:
+    redirection_table_size: int | None = field(
+        default=None, metadata=parser.to_int(parser.regex(r"Redirection table 
size: (\d+)"))
+    )
+    #:
+    supported_rss_offload_flow_types: RSSOffloadTypesFlag = field(
+        default=RSSOffloadTypesFlag(0),
+        metadata=parser.chain(
+            RSSOffloadTypesFlag.from_list_string,
+            parser.regex(r"Supported RSS offload flow types:((?:\r?\n?  
\S+)+)", re.MULTILINE),
+        ),
+    )
+
+    #:
+    mac_address: str | None = field(
+        default=None, metadata=parser.regex(r"MAC address: ([A-Fa-f0-9:]+)")
+    )
+    #:
+    fw_version: str | None = field(
+        default=None, metadata=parser.regex(r"Firmware-version: ([^\r\n]+)")
+    )
+    #:
+    dev_args: str | None = field(default=None, 
metadata=parser.regex(r"Devargs: ([^\r\n]+)"))
+    #: Socket id of the memory allocation
+    mem_alloc_socket_id: int | None = field(
+        default=None,
+        metadata=parser.to_int(parser.regex(r"memory allocation on the socket: 
(\d+)")),
+    )
+    #:
+    mtu: int | None = field(default=None, 
metadata=parser.to_int(parser.regex(r"MTU: (\d+)")))
+
+    #:
+    vlan_offload: VLANOffloadFlag | None = field(
+        default=None,
+        metadata=parser.chain(
+            VLANOffloadFlag.from_str_dict,
+            parser.regex(
+                r"VLAN offload:\s+"
+                r"strip (?P<STRIP>on|off), "
+                r"filter (?P<FILTER>on|off), "
+                r"extend (?P<EXTEND>on|off), "
+                r"qinq strip (?P<QINQ_STRIP>on|off)$",
+                re.MULTILINE,
+                named=True,
+            ),
+        ),
+    )
+
+    #: Maximum size of RX buffer
+    max_rx_bufsize: int | None = field(
+        default=None, metadata=parser.to_int(parser.regex(r"Maximum size of RX 
buffer: (\d+)"))
+    )
+    #: Maximum number of VFs
+    max_vfs_num: int | None = field(
+        default=None, metadata=parser.to_int(parser.regex(r"Maximum number of 
VFs: (\d+)"))
+    )
+    #: Maximum number of VMDq pools
+    max_vmdq_pools_num: int | None = field(
+        default=None, metadata=parser.to_int(parser.regex(r"Maximum number of 
VMDq pools: (\d+)"))
+    )
+
+    #:
+    switch_name: str | None = field(default=None, 
metadata=parser.regex(r"Switch name: ([\r\n]+)"))
+    #:
+    switch_domain_id: int | None = field(
+        default=None, metadata=parser.to_int(parser.regex(r"Switch domain Id: 
(\d+)"))
+    )
+    #:
+    switch_port_id: int | None = field(
+        default=None, metadata=parser.to_int(parser.regex(r"Switch Port Id: 
(\d+)"))
+    )
+    #:
+    switch_rx_domain: int | None = field(
+        default=None, metadata=parser.to_int(parser.regex(r"Switch Rx domain: 
(\d+)"))
+    )
+
+
 class TestPmdShell(InteractiveShell):
     """Testpmd interactive shell.
 
@@ -225,6 +664,38 @@ def set_forward_mode(self, mode: TestPmdForwardingModes, 
verify: bool = True):
                 f"Test pmd failed to set fwd mode to {mode.value}"
             )
 
+    def show_port_info_all(self) -> list[TestPmdPort]:
+        """Returns the information of all the ports."""
+        output = self.send_command("show port info all")
+
+        ports = []
+        iter = re.finditer(r"\*+.+\*+", output)
+        if next(iter, False):  # we are slicing retrospectively, skip first 
block
+            start_pos = 0
+            for block in iter:
+                end_pos = block.start()
+                ports.append(TestPmdPort.parse(output[start_pos:end_pos]))
+                start_pos = end_pos
+
+            ports.append(TestPmdPort.parse(output[start_pos:]))
+
+        return ports
+
+    def show_port_info(self, port_id: int) -> TestPmdPort:
+        """Returns the given port information.
+
+        Args:
+            port_id: The port ID to gather information for.
+
+        Raises:
+            InteractiveCommandExecutionError: If `port_id` is invalid.
+        """
+        output = self.send_command(f"show port info {port_id}", 
skip_first_line=True)
+        if output.startswith("Invalid port"):
+            raise InteractiveCommandExecutionError("invalid port given")
+
+        return TestPmdPort.parse(output)
+
     def close(self) -> None:
         """Overrides :meth:`~.interactive_shell.close`."""
         self.send_command("quit", "")
-- 
2.34.1

Reply via email to