From: Thomas Wilks <thomas.wi...@arm.com>

Add a test suite that tests the packet capture framework
through the use of dpdk-dumpcap.

Signed-off-by: Thomas Wilks <thomas.wi...@arm.com>
Signed-off-by: Luca Vizzarro <luca.vizza...@arm.com>
Reviewed-by: Paul Szczepanek <paul.szczepa...@arm.com>
---
 .../dts/tests.TestSuite_packet_capture.rst    |   8 +
 dts/tests/TestSuite_packet_capture.py         | 238 ++++++++++++++++++
 2 files changed, 246 insertions(+)
 create mode 100644 doc/api/dts/tests.TestSuite_packet_capture.rst
 create mode 100644 dts/tests/TestSuite_packet_capture.py

diff --git a/doc/api/dts/tests.TestSuite_packet_capture.rst 
b/doc/api/dts/tests.TestSuite_packet_capture.rst
new file mode 100644
index 0000000000..3d760d3ae4
--- /dev/null
+++ b/doc/api/dts/tests.TestSuite_packet_capture.rst
@@ -0,0 +1,8 @@
+.. SPDX-License-Identifier: BSD-3-Clause
+
+packet_capture Test Suite
+=========================
+
+.. automodule:: tests.TestSuite_packet_capture
+   :members:
+   :show-inheritance:
diff --git a/dts/tests/TestSuite_packet_capture.py 
b/dts/tests/TestSuite_packet_capture.py
new file mode 100644
index 0000000000..67a3049b78
--- /dev/null
+++ b/dts/tests/TestSuite_packet_capture.py
@@ -0,0 +1,238 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2025 Arm Limited
+
+"""Packet capture test suite.
+
+Test the DPDK packet capturing framework through the combined use of testpmd 
and dumpcap.
+"""
+
+from dataclasses import dataclass, field
+from pathlib import Path, PurePath
+
+from scapy.contrib.lldp import (
+    LLDPDUChassisID,
+    LLDPDUEndOfLLDPDU,
+    LLDPDUPortID,
+    LLDPDUSystemCapabilities,
+    LLDPDUSystemDescription,
+    LLDPDUSystemName,
+    LLDPDUTimeToLive,
+)
+from scapy.layers.inet import IP, TCP, UDP
+from scapy.layers.inet6 import IPv6
+from scapy.layers.l2 import Dot1Q, Ether
+from scapy.layers.sctp import SCTP
+from scapy.packet import Packet, Raw, raw
+from scapy.utils import rdpcap
+from typing_extensions import Self
+
+from framework.context import get_ctx
+from framework.params import Params
+from framework.remote_session.dpdk_shell import compute_eal_params
+from framework.remote_session.interactive_shell import InteractiveShell
+from framework.remote_session.testpmd_shell import TestPmdShell
+from framework.settings import SETTINGS
+from framework.test_suite import TestSuite, func_test
+from framework.testbed_model.capability import requires
+from framework.testbed_model.cpu import LogicalCoreList
+from framework.testbed_model.os_session import FilePermissions
+from framework.testbed_model.topology import TopologyType
+from framework.testbed_model.traffic_generator.capturing_traffic_generator 
import (
+    PacketFilteringConfig,
+)
+
+
+@dataclass(kw_only=True)
+class DumpcapParams(Params):
+    """Parameters for the dpdk-dumpcap app.
+
+    Attributes:
+        lcore_list: The list of logical cores to use.
+        file_prefix: The DPDK file prefix that the primary DPDK application is 
using.
+        interface: The PCI address of the interface to capture.
+        output_pcap_path: The path to the pcapng file to dump captured packets 
into.
+        packet_filter: A packet filter in libpcap filter syntax.
+    """
+
+    lcore_list: LogicalCoreList | None = field(default=None, 
metadata=Params.long("lcore"))
+    file_prefix: str | None = None
+    interface: str = field(metadata=Params.short("i"))
+    output_pcap_path: PurePath = field(metadata=Params.convert_value(str) | 
Params.short("w"))
+    packet_filter: str | None = field(default=None, metadata=Params.short("f"))
+
+
+class Dumpcap(InteractiveShell):
+    """Class to spawn and manage a dpdk-dumpcap process.
+
+    The dpdk-dumpcap is a DPDK app but instead of providing a regular DPDK EAL 
interface to the
+    user, it replicates the Wireshark dumpcap app.
+    """
+
+    _app_params: DumpcapParams
+
+    def __init__(self, params: DumpcapParams) -> None:
+        """Extends :meth:`~.interactive_shell.InteractiveShell.__init__`."""
+        self.ctx = get_ctx()
+        eal_params = compute_eal_params()
+        params.lcore_list = eal_params.lcore_list
+        params.file_prefix = eal_params.prefix
+
+        super().__init__(self.ctx.sut_node, name=None, privileged=True, 
app_params=params)
+
+    @property
+    def path(self) -> PurePath:
+        """Path to the shell executable."""
+        return 
PurePath(self.ctx.dpdk_build.remote_dpdk_build_dir).joinpath("app/dpdk-dumpcap")
+
+    def wait_until_ready(self) -> Self:
+        """Start app and wait until ready."""
+        self.start_application(f"Capturing on '{self._app_params.interface}'")
+        return self
+
+    def close(self) -> None:
+        """Close the application.
+
+        Sends a SIGINT to close the application.
+        """
+        self.send_command("\x03")
+        super().close()
+
+
+@requires(topology_type=TopologyType.two_links)
+class TestPacketCapture(TestSuite):
+    """Packet Capture TestSuite.
+
+    Attributes:
+        packets: List of packets to send for testing pdump.
+        rx_pcap_path: The remote path where to create the RX packets pcap with 
pdump.
+        tx_pcap_path: The remote path where to create the TX packets pcap with 
pdump.
+    """
+
+    packets: list[Packet]
+    rx_pcap_path: PurePath
+    tx_pcap_path: PurePath
+
+    def set_up_suite(self) -> None:
+        """Test suite setup.
+
+        Prepare the packets, file paths and queue range to be used in the test 
suite.
+        """
+        self.packets = [
+            Ether() / IP() / Raw(b"\0" * 60),
+            Ether() / IP() / TCP() / Raw(b"\0" * 60),
+            Ether() / IP() / UDP() / Raw(b"\0" * 60),
+            Ether() / IP() / SCTP() / Raw(b"\0" * 40),
+            Ether() / IPv6() / TCP() / Raw(b"\0" * 60),
+            Ether() / IPv6() / UDP() / Raw(b"\0" * 60),
+            Ether() / IP() / IPv6() / SCTP() / Raw(b"\0" * 40),
+            Ether() / Dot1Q() / IP() / UDP() / Raw(b"\0" * 40),
+            Ether(dst="FF:FF:FF:FF:FF:FF", type=0x88F7) / Raw(b"\0" * 60),
+            Ether(type=0x88CC)
+            / LLDPDUChassisID(subtype=4, 
id=self.topology.tg_port_egress.mac_address)
+            / LLDPDUPortID(subtype=5, id="Test Id")
+            / LLDPDUTimeToLive(ttl=180)
+            / LLDPDUSystemName(system_name="DTS Test sys")
+            / LLDPDUSystemDescription(description="DTS Test Packet")
+            / LLDPDUSystemCapabilities()
+            / LLDPDUEndOfLLDPDU(),
+        ]
+        self.tx_pcap_path = self._ctx.sut_node.tmp_dir.joinpath("tx.pcapng")
+        self.rx_pcap_path = self._ctx.sut_node.tmp_dir.joinpath("rx.pcapng")
+
+    def _load_pcap_packets(self, remote_pcap_path: PurePath) -> list[Packet]:
+        local_pcap_path = 
Path(SETTINGS.output_dir).joinpath(remote_pcap_path.name)
+        self._ctx.sut_node.main_session.copy_from(remote_pcap_path, 
local_pcap_path)
+        return list(rdpcap(str(local_pcap_path)))
+
+    def _send_and_dump(
+        self, packet_filter: str | None = None, rx_only: bool = False
+    ) -> list[Packet]:
+        dumpcap_rx = Dumpcap(
+            DumpcapParams(
+                interface=self.topology.sut_port_ingress.pci,
+                output_pcap_path=self.rx_pcap_path,
+                packet_filter=packet_filter,
+            )
+        ).wait_until_ready()
+        if not rx_only:
+            dumpcap_tx = Dumpcap(
+                DumpcapParams(
+                    interface=self.topology.sut_port_egress.pci,
+                    output_pcap_path=self.tx_pcap_path,
+                    packet_filter=packet_filter,
+                )
+            ).wait_until_ready()
+
+        received_packets = self.send_packets_and_capture(
+            self.packets, PacketFilteringConfig(no_lldp=False)
+        )
+
+        dumpcap_rx.close()
+        self._ctx.sut_node.main_session.change_permissions(
+            self.rx_pcap_path, FilePermissions(0o644)
+        )
+        if not rx_only:
+            dumpcap_tx.close()
+            self._ctx.sut_node.main_session.change_permissions(
+                self.tx_pcap_path, FilePermissions(0o644)
+            )
+
+        return received_packets
+
+    @func_test
+    def test_dumpcap(self) -> None:
+        """Test dumpcap on RX and TX interfaces.
+
+        Steps:
+            * Start up testpmd shell.
+            * Start up dpdk-dumpcap with the default values.
+            * Send packets.
+
+        Verify:
+            * The expected packets are the same as the RX packets.
+            * The TX packets are the same as the packets received from Scapy.
+        """
+        with TestPmdShell() as testpmd:
+            testpmd.start()
+            received_packets = self._send_and_dump()
+
+            expected_packets = self.get_expected_packets(self.packets, 
sent_from_tg=True)
+            rx_pcap_packets = self._load_pcap_packets(self.rx_pcap_path)
+            self.verify(
+                self.match_all_packets(expected_packets, rx_pcap_packets, 
verify=False),
+                "RX packets from dumpcap weren't the same as the expected 
packets.",
+            )
+
+            tx_pcap_packets = self._load_pcap_packets(self.tx_pcap_path)
+            self.verify(
+                self.match_all_packets(tx_pcap_packets, received_packets, 
verify=False),
+                "TX packets from dumpcap weren't the same as the packets 
received by Scapy.",
+            )
+
+    @func_test
+    def test_dumpcap_filter(self) -> None:
+        """Test the dumpcap filtering feature.
+
+        Steps:
+            * Start up testpmd shell.
+            * Start up dpdk-dumpcap listening for TCP packets on the RX 
interface.
+            * Send packets.
+
+        Verify:
+            * The dumped packets did not contain any of the packets meant for 
filtering.
+        """
+        with TestPmdShell() as testpmd:
+            testpmd.start()
+            self._send_and_dump("tcp", rx_only=True)
+            filtered_packets = [
+                raw(p)
+                for p in self.get_expected_packets(self.packets, 
sent_from_tg=True)
+                if not p.haslayer(TCP)
+            ]
+
+            rx_pcap_packets = [raw(p) for p in 
self._load_pcap_packets(self.rx_pcap_path)]
+            for filtered_packet in filtered_packets:
+                self.verify(
+                    filtered_packet not in rx_pcap_packets,
+                    "Found a packet in the pcap that was meant to be filtered 
out.",
+                )
-- 
2.43.0

Reply via email to