From: Jeremy Spewock <>

Multiple test suites from the old DTS framework rely on being able to
consume and interpret the verbose output of testpmd. The new framework
doesn't have an elegant way for handling the verbose output, but test
suites are starting to be written that rely on it. This patch creates a
TextParser class that can be used to extract the verbose information
from any testpmd output and also adjusts the `stop` method of the shell
to return all output that it collected.

Signed-off-by: Jeremy Spewock <>

One thing to note here is I don't love the regex in
extract_verbose_output(). It works great when there is a bunch of
verbose output in a row, but any chunk that isn't followed by another
piece of verbose output will contain everything that comes after it in
the match group. This could be solved by changing the regex to look
ahead only for the next port X/queue Y line instead of also including
the end of the string, and then having another alternate route which is
solely dedicated to the last block of verbose output which greedily
consumes everything until the end of ol_flags, but I didn't want to over
complicate the regex since the text parser will extract the specific
information it needs anyways. For reference, I was thinking it could be
something like this:

r"(port \d+/queue \d+:.*?(?=port \d+/queue \d+)|port \d+/queue \d+:.*ol_flags: 
[\w ]+)"

but this has a lot of repition (some of which that could be ripped out
with a simple variable) and it is a little more confusing to read I

 dts/framework/                       |  30 ++++
 dts/framework/remote_session/ | 146 +++++++++++++++++-
 dts/framework/                        |   1 +
 3 files changed, 175 insertions(+), 2 deletions(-)

diff --git a/dts/framework/ b/dts/framework/
index 741dfff821..0b39025a48 100644
--- a/dts/framework/
+++ b/dts/framework/
@@ -160,6 +160,36 @@ def _find(text: str) -> Any:
         return ParserFn(TextParser_fn=_find)
+    @staticmethod
+    def find_all(
+        pattern: str | re.Pattern[str],
+        flags: re.RegexFlag = re.RegexFlag(0),
+    ) -> ParserFn:
+        """Makes a parser function that finds all of the regular expression 
matches in the text.
+        If there are no matches found in the text than None will be returned, 
otherwise a list
+        containing all matches will be returned. Patterns that contain 
multiple groups will pack
+        the matches for each group into a tuple.
+        Args:
+            pattern: The regular expression pattern.
+            flags: The regular expression flags. Ignored if the given pattern 
is already compiled.
+        Returns:
+            A :class:`ParserFn` that can be used as metadata for a dataclass 
+        """
+        if isinstance(pattern, str):
+            pattern = re.compile(pattern, flags)
+        def _find_all(text: str) -> list[str] | None:
+            m = pattern.findall(text)
+            if len(m) == 0:
+                return None
+            return m
+        return ParserFn(TextParser_fn=_find_all)
     def find_int(
         pattern: str | re.Pattern[str],
diff --git a/dts/framework/remote_session/ 
index 43e9f56517..9f09a98490 100644
--- a/dts/framework/remote_session/
+++ b/dts/framework/remote_session/
@@ -31,7 +31,7 @@
 from framework.settings import SETTINGS
 from framework.testbed_model.cpu import LogicalCoreCount, LogicalCoreList
 from framework.testbed_model.sut_node import SutNode
-from framework.utils import StrEnum
+from framework.utils import REGEX_FOR_MAC_ADDRESS, StrEnum
 class TestPmdDevice:
@@ -577,6 +577,128 @@ class TestPmdPortStats(TextParser):
     tx_bps: int = field(metadata=TextParser.find_int(r"Tx-bps:\s+(\d+)"))
+class VerboseOLFlag(Flag):
+    """Flag representing the OL flags of a packet from Testpmd verbose 
+    #:
+    RTE_MBUF_F_RX_RSS_HASH = auto()
+    #:
+    RTE_MBUF_F_RX_L4_CKSUM_GOOD = auto()
+    #:
+    RTE_MBUF_F_RX_L4_CKSUM_BAD = auto()
+    #:
+    #:
+    #:
+    RTE_MBUF_F_RX_IP_CKSUM_BAD = auto()
+    #:
+    #:
+    #:
+    #:
+    @classmethod
+    def from_str_list(cls, arr: list[str]) -> Self:
+        """Makes an instance from a list containing the flag members.
+        Args:
+            arr: A list of strings containing ol_flag values.
+        Returns:
+            A new instance of the flag.
+        """
+        flag = cls(0)
+        for name in cls.__members__:
+            if name in arr:
+                flag |= cls[name]
+        return flag
+    @classmethod
+    def make_parser(cls) -> ParserFn:
+        """Makes a parser function.
+        Returns:
+            ParserFn: A dictionary for the `dataclasses.field` metadata 
argument containing a
+                parser function that makes an instance of this flag from text.
+        """
+        return TextParser.wrap(
+            TextParser.wrap(TextParser.find(r"ol_flags: ([^\n]+)"), str.split),
+            cls.from_str_list,
+        )
+class TestPmdVerbosePacket(TextParser):
+    """Packet information provided by verbose output in Testpmd.
+    The "receive/sent queue" information is not included in this dataclass 
because this value is
+    captured on the outer layer of input found in 
+    """
+    #:
+    src_mac: str = 
+    #:
+    dst_mac: str = 
+    #: Memory pool the packet was handled on.
+    pool: str = field(metadata=TextParser.find(r"pool=(\S+)"))
+    #: Packet type in hex.
+    p_type: int = field(metadata=TextParser.find_int(r"type=(0x[a-fA-F\d]+)"))
+    #:
+    length: int = field(metadata=TextParser.find_int(r"length=(\d+)"))
+    #: Number of segments in the packet.
+    nb_segs: int = field(metadata=TextParser.find_int(r"nb_segs=(\d+)"))
+    #: Hardware packet type. Collected as a string delimited by whitespace.
+    hw_ptype: str = field(metadata=TextParser.find(r"hw ptype: ([^-]+)"))
+    #: Software packet type. Collected as a string delimited by whitespace.
+    sw_ptype: str = field(metadata=TextParser.find(r"sw ptype: ([^-]+)"))
+    #:
+    l2_len: int = field(metadata=TextParser.find_int(r"l2_len=(\d+)"))
+    #:
+    ol_flags: VerboseOLFlag = field(metadata=VerboseOLFlag.make_parser())
+    #: RSS has of the packet in hex.
+    rss_hash: int | None = field(
+        default=None, metadata=TextParser.find_int(r"RSS hash=(0x[a-fA-F\d]+)")
+    )
+    #: RSS queue that handled the packet in hex.
+    rss_queue: int | None = field(
+        default=None, metadata=TextParser.find_int(r"RSS 
+    )
+    #:
+    l3_len: int | None = field(default=None, 
+    #:
+    l4_len: int | None = field(default=None, 
+class TestPmdVerboseOutput(TextParser):
+    """Verbose output generated by Testpmd.
+    This class is the top level of the output, containing verbose output 
delimited by
+    "port X/queue Y: sent/received Z packets".
+    """
+    #: ID of the port that handled the packet.
+    port_id: int = field(metadata=TextParser.find(r"port (\d+)/queue \d+"))
+    #: ID of the queue that handled the packet.
+    queue_id: int = field(metadata=TextParser.find(r"port \d+/queue (\d+)"))
+    #: Whether the packet was received or sent by the queue/port.
+    was_received: bool = field(metadata=TextParser.find(r"received \d+ 
+    #: List of packets handed by the port/queue in this section.
+    packets: list[TestPmdVerbosePacket] = field(
+        metadata=TextParser.wrap(
+            TextParser.find_all(r"(src=[\w\s=:-]+ol_flags: [\w ]+)"),
+            lambda matches_arr: list(map(TestPmdVerbosePacket.parse, 
+        )
+    )
 class TestPmdShell(DPDKShell):
     """Testpmd interactive shell.
@@ -645,7 +767,7 @@ def start(self, verify: bool = True) -> None:
                         "Not all ports came up after starting packet 
forwarding in testpmd."
-    def stop(self, verify: bool = True) -> None:
+    def stop(self, verify: bool = True) -> str:
         """Stop packet forwarding.
@@ -656,6 +778,9 @@ def stop(self, verify: bool = True) -> None:
             InteractiveCommandExecutionError: If `verify` is :data:`True` and 
the command to stop
                 forwarding results in an error.
+        Returns:
+            Output gathered from sending the stop command.
         stop_cmd_output = self.send_command("stop")
         if verify:
@@ -665,6 +790,7 @@ def stop(self, verify: bool = True) -> None:
                 self._logger.debug(f"Failed to stop packet forwarding: 
                 raise InteractiveCommandExecutionError("Testpmd failed to stop 
packet forwarding.")
+        return stop_cmd_output
     def get_devices(self) -> list[TestPmdDevice]:
         """Get a list of device names that are known to testpmd.
@@ -806,6 +932,22 @@ def show_port_stats(self, port_id: int) -> 
         return TestPmdPortStats.parse(output)
+    @staticmethod
+    def extract_verbose_output(output: str) -> list[TestPmdVerboseOutput]:
+        """Extract the verbose information present in given testpmd output.
+        This method extracts sections of verbose output that begin with the 
+        "port X/queue Y: sent/received Z packets" and end with the ol_flags of 
a packet.
+        Args:
+            output: Testpmd output that contains verbose information
+        Returns:
+            List of parsed packet information gathered from verbose 
information in `output`.
+        """
+        iter = re.finditer(r"(port \d+/queue \d+:.*?(?=port \d+/queue 
\d+|$))", output, re.S)
+        return [TestPmdVerboseOutput.parse( for s in iter]
     def _close(self) -> None:
         """Overrides :meth:`~.interactive_shell.close`."""
diff --git a/dts/framework/ b/dts/framework/
index 6b5d5a805f..9c64cf497f 100644
--- a/dts/framework/
+++ b/dts/framework/
@@ -27,6 +27,7 @@
 from .exception import ConfigurationError
+REGEX_FOR_MAC_ADDRESS: str = r"(?:[\da-fA-F]{2}:){5}[\da-fA-F]{2}"
 def expand_range(range_str: str) -> list[int]:

Reply via email to