On Tue, Jun 11, 2024 at 7:12 AM Juraj Linkeš <juraj.lin...@pantheon.tech> wrote: > > > > diff --git a/dts/framework/remote_session/scapy_shell.py > > b/dts/framework/remote_session/scapy_shell.py > > new file mode 100644 > > index 0000000000..fa647dc870 > > --- /dev/null > > +++ b/dts/framework/remote_session/scapy_shell.py > > @@ -0,0 +1,175 @@ > > +# SPDX-License-Identifier: BSD-3-Clause > > +# Copyright(c) 2024 University of New Hampshire > > + > > +"""Scapy interactive shell.""" > > + > > +import re > > +import time > > +from typing import Callable, ClassVar > > + > > +from scapy.compat import base64_bytes # type: ignore[import] > > +from scapy.layers.l2 import Ether # type: ignore[import] > > +from scapy.packet import Packet # type: ignore[import] > > + > > +from framework.testbed_model.port import Port > > +from framework.utils import REGEX_FOR_BASE64_ENCODING > > + > > +from .python_shell import PythonShell > > + > > + > > +class ScapyShell(PythonShell): > > + """Scapy interactive shell. > > + > > + The scapy shell is implemented using a > > :class:`~.python_shell.PythonShell` and importing > > + everything from the "scapy.all" library. This is done due to > > formatting issues that occur from > > + the scapy interactive shell attempting to use iPython, which is not > > compatible with the > > + pseudo-terminal that paramiko creates to manage its channels. > > + > > + This class is used as an underlying session for the scapy traffic > > generator and shouldn't be > > + used directly inside of test suites. If there isn't a method in > > + > > :class:`framework.testbed_model.traffic_generator.scapy.ScapyTrafficGenerator` > > to fulfill a > > + need, one should be added there and implemented here. > > + """ > > + > > + #: Name of sniffer to ensure the same is used in all places > > + _sniffer_name: ClassVar[str] = "sniffer" > > + #: Name of variable that points to the list of packets inside the > > scapy shell. > > + _send_packet_list_name: ClassVar[str] = "packets" > > + #: Padding to add to the start of a line for python syntax compliance. > > + _padding: ClassVar[str] = " " * 4 > > + > > + def _start_application(self, get_privileged_command: Callable[[str], > > str] | None) -> None: > > + """Overrides :meth:`~.interactive_shell._start_application`. > > This extends the method and in that case we should mention what the > extension is.
Ack. > > > + > > + Adds a command that imports everything from the scapy library > > immediately after starting > > + the shell for usage in later calls to the methods of this class. > > + > > + Args: > > + get_privileged_command: A function (but could be any callable) > > that produces > > + the version of the command with elevated privileges. > > + """ > > + super()._start_application(get_privileged_command) > > + self.send_command("from scapy.all import *") > > + > > + def _build_send_packet_list(self, packets: list[Packet]) -> None: > > The send in the name evokes that the method sends the packets. > > The description in the Args section says "packets to recreate in the > shell" and I like that so I'd put that in the name: _create_packet_list() > Great point. I was trying too hard to force the idea of building the "list of packets that will be sent" but on its own this just builds a list of packets. > > + """Build a list of packets to send later. > > + > > + Gets the string that represents the Python command that was used > > to create each packet in > > Gets the string sounds like that's what the methods returns, as a getter > method would. Fair enough, I'll edit this. > > > + `packets` and sends these commands into the underlying Python > > session. The purpose behind > > + doing this is to create a list that is identical to `packets` > > inside the shell. This method > > + should only be called by methods for sending packets immediately > > prior to sending. The list > > + of packets will continue to exist in the scope of the shell until > > subsequent calls to this > > + method, so failure to rebuild the list prior to sending packets > > could lead to undesired > > + "stale" packets to be sent. > > + > > + Args: > > + packets: The list of packets to recreate in the shell. > > + """ > > + self._logger.info("Building a list of packets to send...") > > The could be just a regular dot instead of the ellipsis (I don't like > random ellipses as those read as if I was supposed to expect something > and we don't provide a subsequent log that would continue this ellipsis). Ack. > > > + self.send_command( > > + f"{self._send_packet_list_name} = [{', > > '.join(map(Packet.command, packets))}]" > > + ) > > + > > + def send_packets(self, packets: list[Packet], send_port: Port) -> None: > > + """Send packets without capturing any received traffic. > > + > > + Provides a "fire and forget" method for sending packets for > > situations when there is no > > + need to collected any received traffic. > > Typo: collected Good catch! > > > + > > + Args: > > + packets: The packets to send. > > + send_port: The port to send the packets from. > > + """ > > + self._build_send_packet_list(packets) > > + send_command = [ > > + "sendp(", > > + f"{self._send_packet_list_name},", > > + f"iface='{send_port.logical_name}',", > > + "realtime=True,", > > + "verbose=True", > > + ")", > > + ] > > + self.send_command(f"\n{self._padding}".join(send_command)) > > + > > + def _create_sniffer( > > + self, packets_to_send: list[Packet], send_port: Port, recv_port: > > Port, filter_config: str > > + ) -> None: > > + """Create an asynchronous sniffer in the shell. > > + > > + A list of packets to send is added to the sniffer inside of a > > callback function so that > > + they are immediately sent at the time sniffing is started. > > + > > + Args: > > + packets_to_send: A list of packets to send when sniffing is > > started. > > + send_port: The port to send the packets on when sniffing is > > started. > > + recv_port: The port to collect the traffic from. > > + filter_config: An optional BPF format filter to use when > > sniffing for packets. Omitted > > + when set to an empty string. > > + """ > > + self._build_send_packet_list(packets_to_send) > > + sniffer_commands = [ > > + f"{self._sniffer_name} = AsyncSniffer(", > > + f"iface='{recv_port.logical_name}',", > > + "store=True,", > > + "started_callback=lambda *args: sendp(", > > + f"{self._padding}{self._send_packet_list_name}, > > iface='{send_port.logical_name}'),", > > + ")", > > + ] > > + if filter_config: > > + sniffer_commands.insert(-1, f"filter='{filter_config}'") > > + > > + self.send_command(f"\n{self._padding}".join(sniffer_commands)) > > + > > + def _start_and_stop_sniffing(self, duration: float) -> list[Packet]: > > + """Starts asynchronous sniffer, runs for a set `duration`, then > > collects received packets. > > This should be in imperative to align with the rest of the docstrings. Ack. > > > + > + This method expects that you have first created an > asynchronous sniffer inside the shell > > + and will fail if you haven't. Received packets are collected by > > printing the base64 > > + encoding of each packet in the shell and then harvesting these > > encodings using regex to > > + convert back into packet objects. > > + > > + Args: > > + duration: The amount of time in seconds to sniff for received > > packets. > > + > > + Returns: > > + A list of all packets that were received while the sniffer was > > running. > > + """ > > + sniffed_packets_name = "gathered_packets" > > + self.send_command(f"{self._sniffer_name}.start()") > > + time.sleep(duration) > > + self.send_command(f"{sniffed_packets_name} = > > {self._sniffer_name}.stop(join=True)") > > + # An extra newline is required here due to the nature of > > interactive Python shells > > + packet_objects = self.send_command( > > These are strings, which are objects, but I'd like to be more explicit, > so maybe packet_strs? Good point, that would be more clear. > > > + f"for pakt in {sniffed_packets_name}: > > print(bytes_base64(pakt.build()))\n" > > + ) > > + # In the string of bytes "b'XXXX'", we only want the contents > > ("XXXX") > > + list_of_packets_base64 = re.findall( > > + f"^b'({REGEX_FOR_BASE64_ENCODING})'", packet_objects, > > re.MULTILINE > > + ) > > + return [Ether(base64_bytes(pakt)) for pakt in > > list_of_packets_base64] > > + > > + def send_packets_and_capture( > > + self, > > + packets: list[Packet], > > + send_port: Port, > > + recv_port: Port, > > + filter_config: str, > > + duration: float, > > + ) -> list[Packet]: > > + """Send packets and capture any received traffic. > > + > > + The steps required to collect these packets are creating a sniffer > > that holds the packets to > > + send then starting and stopping the sniffer. > > + > > + Args: > > + packets: The packets to send. > > + send_port: The port to send the packets from. > > + recv_port: The port to collect received packets from. > > + filter_config: The filter to use while sniffing for packets. > > + duration: The amount of time in seconds to sniff for received > > packets. > > + > > + Returns: > > + A list of packets received after sending `packets`. > > + """ > > + self._create_sniffer(packets, send_port, recv_port, filter_config) > > + return self._start_and_stop_sniffing(duration)