Add a basic utility that can create random L3 and L4 packets with random payloads and port numbers (if L4).
Signed-off-by: Luca Vizzarro <luca.vizza...@arm.com> Reviewed-by: Paul Szczepanek <paul.szczepa...@arm.com> Reviewed-by: Alex Chapman <alex.chap...@arm.com> --- dts/framework/utils.py | 79 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 77 insertions(+), 2 deletions(-) diff --git a/dts/framework/utils.py b/dts/framework/utils.py index 6b5d5a805f..c768dd0c99 100644 --- a/dts/framework/utils.py +++ b/dts/framework/utils.py @@ -17,14 +17,16 @@ import atexit import json import os +import random import subprocess -from enum import Enum +from enum import Enum, Flag from pathlib import Path from subprocess import SubprocessError +from scapy.layers.inet import IP, TCP, UDP, Ether # type: ignore[import-untyped] from scapy.packet import Packet # type: ignore[import-untyped] -from .exception import ConfigurationError +from .exception import ConfigurationError, InternalError REGEX_FOR_PCI_ADDRESS: str = "/[0-9a-fA-F]{4}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}.[0-9]{1}/" @@ -244,3 +246,76 @@ def _delete_tarball(self) -> None: def __fspath__(self) -> str: """The os.PathLike protocol implementation.""" return str(self._tarball_path) + + +class PacketProtocols(Flag): + """Flag specifying which protocols to use for packet generation.""" + + #: + IP = 1 + #: + TCP = 2 | IP + #: + UDP = 4 | IP + #: + ALL = TCP | UDP + + +def generate_random_packets( + number_of: int, + payload_size: int = 1500, + protocols: PacketProtocols = PacketProtocols.ALL, + ports_range: range = range(1024, 49152), + mtu: int = 1500, +) -> list[Packet]: + """Generate a number of random packets. + + The payload of the packets will consist of random bytes. If `payload_size` is too big, then the + maximum payload size allowed for the specific packet type is used. The size is calculated based + on the specified `mtu`, therefore it is essential that `mtu` is set correctly to match the MTU + of the port that will send out the generated packets. + + If `protocols` has any L4 protocol enabled then all the packets are generated with any of + the specified L4 protocols chosen at random. If only :attr:`~PacketProtocols.IP` is set, then + only L3 packets are generated. + + If L4 packets will be generated, then the TCP/UDP ports to be used will be chosen at random from + `ports_range`. + + Args: + number_of: The number of packets to generate. + payload_size: The packet payload size to generate, capped based on `mtu`. + protocols: The protocols to use for the generated packets. + ports_range: The range of L4 port numbers to use. Used only if `protocols` has L4 protocols. + mtu: The MTU of the NIC port that will send out the generated packets. + + Raises: + InternalError: If the `payload_size` is invalid. + + Returns: + A list containing the randomly generated packets. + """ + if payload_size < 0: + raise InternalError(f"An invalid payload_size of {payload_size} was given.") + + l4_factories = [] + if protocols & PacketProtocols.TCP: + l4_factories.append(TCP) + if protocols & PacketProtocols.UDP: + l4_factories.append(UDP) + + def _make_packet() -> Packet: + packet = Ether() + + if protocols & PacketProtocols.IP: + packet /= IP() + + if len(l4_factories) > 0: + src_port, dst_port = random.choices(ports_range, k=2) + packet /= random.choice(l4_factories)(sport=src_port, dport=dst_port) + + max_payload_size = mtu - len(packet) + usable_payload_size = payload_size if payload_size < max_payload_size else max_payload_size + return packet / random.randbytes(usable_payload_size) + + return [_make_packet() for _ in range(number_of)] -- 2.34.1