diff --git a/dts/framework/remote_session/scapy_shell.py 
b/dts/framework/remote_session/scapy_shell.py
new file mode 100644
index 0000000000..fa647dc870
--- /dev/null
+++ b/dts/framework/remote_session/scapy_shell.py
@@ -0,0 +1,175 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2024 University of New Hampshire
+
+"""Scapy interactive shell."""
+
+import re
+import time
+from typing import Callable, ClassVar
+
+from scapy.compat import base64_bytes  # type: ignore[import]
+from scapy.layers.l2 import Ether  # type: ignore[import]
+from scapy.packet import Packet  # type: ignore[import]
+
+from framework.testbed_model.port import Port
+from framework.utils import REGEX_FOR_BASE64_ENCODING
+
+from .python_shell import PythonShell
+
+
+class ScapyShell(PythonShell):
+    """Scapy interactive shell.
+
+    The scapy shell is implemented using a :class:`~.python_shell.PythonShell` 
and importing
+    everything from the "scapy.all" library. This is done due to formatting 
issues that occur from
+    the scapy interactive shell attempting to use iPython, which is not 
compatible with the
+    pseudo-terminal that paramiko creates to manage its channels.
+
+    This class is used as an underlying session for the scapy traffic 
generator and shouldn't be
+    used directly inside of test suites. If there isn't a method in
+    
:class:`framework.testbed_model.traffic_generator.scapy.ScapyTrafficGenerator` 
to fulfill a
+    need, one should be added there and implemented here.
+    """
+
+    #: Name of sniffer to ensure the same is used in all places
+    _sniffer_name: ClassVar[str] = "sniffer"
+    #: Name of variable that points to the list of packets inside the scapy 
shell.
+    _send_packet_list_name: ClassVar[str] = "packets"
+    #: Padding to add to the start of a line for python syntax compliance.
+    _padding: ClassVar[str] = " " * 4
+
+    def _start_application(self, get_privileged_command: Callable[[str], str] | 
None) -> None:
+        """Overrides :meth:`~.interactive_shell._start_application`.

This extends the method and in that case we should mention what the extension is.

+
+        Adds a command that imports everything from the scapy library 
immediately after starting
+        the shell for usage in later calls to the methods of this class.
+
+        Args:
+            get_privileged_command: A function (but could be any callable) 
that produces
+                the version of the command with elevated privileges.
+        """
+        super()._start_application(get_privileged_command)
+        self.send_command("from scapy.all import *")
+
+    def _build_send_packet_list(self, packets: list[Packet]) -> None:

The send in the name evokes that the method sends the packets.

The description in the Args section says "packets to recreate in the shell" and I like that so I'd put that in the name: _create_packet_list()

+        """Build a list of packets to send later.
+
+        Gets the string that represents the Python command that was used to 
create each packet in

Gets the string sounds like that's what the methods returns, as a getter method would.

+        `packets` and sends these commands into the underlying Python session. 
The purpose behind
+        doing this is to create a list that is identical to `packets` inside 
the shell. This method
+        should only be called by methods for sending packets immediately prior 
to sending. The list
+        of packets will continue to exist in the scope of the shell until 
subsequent calls to this
+        method, so failure to rebuild the list prior to sending packets could 
lead to undesired
+        "stale" packets to be sent.
+
+        Args:
+            packets: The list of packets to recreate in the shell.
+        """
+        self._logger.info("Building a list of packets to send...")

The could be just a regular dot instead of the ellipsis (I don't like random ellipses as those read as if I was supposed to expect something and we don't provide a subsequent log that would continue this ellipsis).

+        self.send_command(
+            f"{self._send_packet_list_name} = [{', '.join(map(Packet.command, 
packets))}]"
+        )
+
+    def send_packets(self, packets: list[Packet], send_port: Port) -> None:
+        """Send packets without capturing any received traffic.
+
+        Provides a "fire and forget" method for sending packets for situations 
when there is no
+        need to collected any received traffic.

Typo: collected

+
+        Args:
+            packets: The packets to send.
+            send_port: The port to send the packets from.
+        """
+        self._build_send_packet_list(packets)
+        send_command = [
+            "sendp(",
+            f"{self._send_packet_list_name},",
+            f"iface='{send_port.logical_name}',",
+            "realtime=True,",
+            "verbose=True",
+            ")",
+        ]
+        self.send_command(f"\n{self._padding}".join(send_command))
+
+    def _create_sniffer(
+        self, packets_to_send: list[Packet], send_port: Port, recv_port: Port, 
filter_config: str
+    ) -> None:
+        """Create an asynchronous sniffer in the shell.
+
+        A list of packets to send is added to the sniffer inside of a callback 
function so that
+        they are immediately sent at the time sniffing is started.
+
+        Args:
+            packets_to_send: A list of packets to send when sniffing is 
started.
+            send_port: The port to send the packets on when sniffing is 
started.
+            recv_port: The port to collect the traffic from.
+            filter_config: An optional BPF format filter to use when sniffing 
for packets. Omitted
+                when set to an empty string.
+        """
+        self._build_send_packet_list(packets_to_send)
+        sniffer_commands = [
+            f"{self._sniffer_name} = AsyncSniffer(",
+            f"iface='{recv_port.logical_name}',",
+            "store=True,",
+            "started_callback=lambda *args: sendp(",
+            f"{self._padding}{self._send_packet_list_name}, 
iface='{send_port.logical_name}'),",
+            ")",
+        ]
+        if filter_config:
+            sniffer_commands.insert(-1, f"filter='{filter_config}'")
+
+        self.send_command(f"\n{self._padding}".join(sniffer_commands))
+
+    def _start_and_stop_sniffing(self, duration: float) -> list[Packet]:
+        """Starts asynchronous sniffer, runs for a set `duration`, then 
collects received packets.

This should be in imperative to align with the rest of the docstrings.

+ > + This method expects that you have first created an
asynchronous sniffer inside the shell
+        and will fail if you haven't. Received packets are collected by 
printing the base64
+        encoding of each packet in the shell and then harvesting these 
encodings using regex to
+        convert back into packet objects.
+
+        Args:
+            duration: The amount of time in seconds to sniff for received 
packets.
+
+        Returns:
+            A list of all packets that were received while the sniffer was 
running.
+        """
+        sniffed_packets_name = "gathered_packets"
+        self.send_command(f"{self._sniffer_name}.start()")
+        time.sleep(duration)
+        self.send_command(f"{sniffed_packets_name} = 
{self._sniffer_name}.stop(join=True)")
+        # An extra newline is required here due to the nature of interactive 
Python shells
+        packet_objects = self.send_command(

These are strings, which are objects, but I'd like to be more explicit, so maybe packet_strs?

+            f"for pakt in {sniffed_packets_name}: 
print(bytes_base64(pakt.build()))\n"
+        )
+        # In the string of bytes "b'XXXX'", we only want the contents ("XXXX")
+        list_of_packets_base64 = re.findall(
+            f"^b'({REGEX_FOR_BASE64_ENCODING})'", packet_objects, re.MULTILINE
+        )
+        return [Ether(base64_bytes(pakt)) for pakt in list_of_packets_base64]
+
+    def send_packets_and_capture(
+        self,
+        packets: list[Packet],
+        send_port: Port,
+        recv_port: Port,
+        filter_config: str,
+        duration: float,
+    ) -> list[Packet]:
+        """Send packets and capture any received traffic.
+
+        The steps required to collect these packets are creating a sniffer 
that holds the packets to
+        send then starting and stopping the sniffer.
+
+        Args:
+            packets: The packets to send.
+            send_port: The port to send the packets from.
+            recv_port: The port to collect received packets from.
+            filter_config: The filter to use while sniffing for packets.
+            duration: The amount of time in seconds to sniff for received 
packets.
+
+        Returns:
+            A list of packets received after sending `packets`.
+        """
+        self._create_sniffer(packets, send_port, recv_port, filter_config)
+        return self._start_and_stop_sniffing(duration)

Reply via email to