https://github.com/ashgti updated 
https://github.com/llvm/llvm-project/pull/143818

>From ab4b987aec591491d3805af41c7127ff6698fe0e Mon Sep 17 00:00:00 2001
From: John Harrison <harj...@google.com>
Date: Wed, 11 Jun 2025 15:57:16 -0700
Subject: [PATCH 1/2] [lldb-dap] Refactring DebugCommunication to improve test
 consistency.

In DebugCommunication, we currently are using 2 thread to drive lldb-dap. At 
the moment, they make an attempt at only synchronizing the `recv_packets` 
between the reader thread and the main test thread. Other stateful properties 
of the debug session are not guarded by any mutexs.

To mitigate this, I am moving any state updates to the main thread inside the 
`_recv_packet` method to ensure that between calls to `_recv_packet` the state 
does not change out from under us in a test.

This does mean the precise timing of events has changed slightly as a result 
and I've updated the existing tests that fail for me locally with this new 
behavior.

I think this should result in overall more predictable behavior, even if the 
test is slow due to the host workload or architecture differences.
---
 .../test/tools/lldb-dap/dap_server.py         | 839 +++++++++++-------
 .../test/tools/lldb-dap/lldbdap_testcase.py   |  79 +-
 .../breakpoint/TestDAP_setBreakpoints.py      |   5 +-
 .../tools/lldb-dap/cancel/TestDAP_cancel.py   |  10 +-
 .../tools/lldb-dap/launch/TestDAP_launch.py   |  12 +-
 .../tools/lldb-dap/module/TestDAP_module.py   |   2 +-
 .../tools/lldb-dap/output/TestDAP_output.py   |   4 +-
 7 files changed, 568 insertions(+), 383 deletions(-)

diff --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py 
b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
index 9786678aa53f9..20a1b4480df6d 100644
--- a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
+++ b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
@@ -10,17 +10,117 @@
 import subprocess
 import signal
 import sys
+from dataclasses import dataclass
 import threading
 import time
-from typing import Any, Optional, Union, BinaryIO, TextIO
+from typing import (
+    IO,
+    Any,
+    Callable,
+    Dict,
+    List,
+    Optional,
+    Tuple,
+    TypeGuard,
+    TypeVar,
+    TypedDict,
+    Union,
+    BinaryIO,
+    TextIO,
+    Literal,
+    cast,
+)
 
 ## DAP type references
-Event = dict[str, Any]
-Request = dict[str, Any]
-Response = dict[str, Any]
+
+T = TypeVar("T")
+
+
+class Event(TypedDict):
+    type: Literal["event"]
+    seq: Literal[0]
+    event: str
+    body: Optional[dict]
+
+
+class Request(TypedDict):
+    type: Literal["request"]
+    seq: int
+    command: str
+    arguments: Optional[dict]
+
+
+class Response(TypedDict):
+    type: Literal["response"]
+    seq: Literal[0]
+    request_seq: int
+    success: bool
+    command: str
+    message: Optional[str]
+    body: Optional[dict]
+
+
 ProtocolMessage = Union[Event, Request, Response]
 
 
+class AttachOrLaunchArguments(TypedDict, total=False):
+    stopOnEntry: bool
+    disableASLR: bool
+    disableSTDIO: bool
+    enableAutoVariableSummaries: bool
+    displayExtendedBacktrace: bool
+    enableSyntheticChildDebugging: bool
+    initCommands: List[str]
+    preRunCommands: List[str]
+    postRunCommands: List[str]
+    stopCommands: List[str]
+    exitCommands: List[str]
+    terminateCommands: List[str]
+    sourceMap: Union[List[Tuple[str, str]], Dict[str, str]]
+    sourcePath: str
+    debuggerRoot: str
+    commandEscapePrefix: str
+    customFrameFormat: str
+    customThreadFormat: str
+
+
+class LaunchArguments(AttachOrLaunchArguments, total=False):
+    program: str
+    args: List[str]
+    cwd: str
+    env: Dict[str, str]
+    shellExpandArguments: bool
+    runInTerminal: bool
+    launchCommands: List[str]
+
+
+class AttachArguments(AttachOrLaunchArguments, total=False):
+    program: str
+    pid: int
+    waitFor: bool
+    attachCommands: List[str]
+    coreFile: str
+    gdbRemotePort: int
+    gdbRemoteHostname: str
+
+
+class BreakpiontData(TypedDict, total=False):
+    column: int
+    condition: str
+    hitCondition: str
+    logMessage: str
+    mode: str
+
+
+class SourceBreakpoint(BreakpiontData):
+    line: int
+
+
+class Breakpoint(TypedDict, total=False):
+    id: int
+    verified: bool
+
+
 def dump_memory(base_addr, data, num_per_line, outfile):
     data_len = len(data)
     hex_string = binascii.hexlify(data)
@@ -58,7 +158,9 @@ def dump_memory(base_addr, data, num_per_line, outfile):
         outfile.write("\n")
 
 
-def read_packet(f, verbose=False, trace_file=None):
+def read_packet(
+    f: IO[bytes], verbose: bool = False, trace_file: Optional[IO[str]] = None
+) -> Optional[ProtocolMessage]:
     """Decode a JSON packet that starts with the content length and is
     followed by the JSON bytes from a file 'f'. Returns None on EOF.
     """
@@ -76,26 +178,22 @@ def read_packet(f, verbose=False, trace_file=None):
         if verbose:
             print('length: "%u"' % (length))
         # Skip empty line
-        line = f.readline()
+        line = f.readline().decode()
         if verbose:
             print('empty: "%s"' % (line))
         # Read JSON bytes
         json_str = f.read(length)
         if verbose:
-            print('json: "%s"' % (json_str))
+            print('json: "%r"' % (json_str))
         if trace_file:
-            trace_file.write("from adapter:\n%s\n" % (json_str))
+            trace_file.write("from adapter:\n%r\n" % (json_str))
         # Decode the JSON bytes into a python dictionary
         return json.loads(json_str)
 
     raise Exception("unexpected malformed message from lldb-dap: " + line)
 
 
-def packet_type_is(packet, packet_type):
-    return "type" in packet and packet["type"] == packet_type
-
-
-def dump_dap_log(log_file):
+def dump_dap_log(log_file: Optional[str]) -> None:
     print("========= DEBUG ADAPTER PROTOCOL LOGS =========", file=sys.stderr)
     if log_file is None:
         print("no log file available", file=sys.stderr)
@@ -105,34 +203,30 @@ def dump_dap_log(log_file):
     print("========= END =========", file=sys.stderr)
 
 
-class Source(object):
+@dataclass
+class Source:
+    path: Optional[str]
+    source_reference: Optional[int]
+
+    @property
+    def name(self) -> Optional[str]:
+        if not self.path:
+            return None
+        return os.path.basename(self.path)
+
     def __init__(
         self, path: Optional[str] = None, source_reference: Optional[int] = 
None
     ):
-        self._name = None
-        self._path = None
-        self._source_reference = None
-
-        if path is not None:
-            self._name = os.path.basename(path)
-            self._path = path
-        elif source_reference is not None:
-            self._source_reference = source_reference
-        else:
-            raise ValueError("Either path or source_reference must be 
provided")
+        self.path = path
+        self.source_reference = source_reference
 
-    def __str__(self):
-        return f"Source(name={self.name}, path={self.path}), 
source_reference={self.source_reference})"
+        if path is None and source_reference is None:
+            raise ValueError("Either path or source_reference must be 
provided")
 
-    def as_dict(self):
-        source_dict = {}
-        if self._name is not None:
-            source_dict["name"] = self._name
-        if self._path is not None:
-            source_dict["path"] = self._path
-        if self._source_reference is not None:
-            source_dict["sourceReference"] = self._source_reference
-        return source_dict
+    def to_DAP(self) -> dict:
+        if self.path:
+            return {"path": self.path, "name": self.name}
+        return {"sourceReference": self.source_reference}
 
 
 class NotSupportedError(KeyError):
@@ -152,35 +246,50 @@ def __init__(
         self.log_file = log_file
         self.send = send
         self.recv = recv
-        self.recv_packets: list[Optional[ProtocolMessage]] = []
-        self.recv_condition = threading.Condition()
-        self.recv_thread = threading.Thread(target=self._read_packet_thread)
-        self.process_event_body = None
-        self.exit_status: Optional[int] = None
-        self.capabilities: dict[str, Any] = {}
-        self.progress_events: list[Event] = []
-        self.reverse_requests = []
-        self.sequence = 1
-        self.threads = None
-        self.thread_stop_reasons = {}
-        self.recv_thread.start()
-        self.output_condition = threading.Condition()
-        self.output: dict[str, list[str]] = {}
-        self.configuration_done_sent = False
-        self.initialized = False
-        self.frame_scopes = {}
+        # Packets that have been received and processed but have not yet been
+        # requested by a test case.
+        self._pending_packets: List[Optional[ProtocolMessage]] = []
+        # Recieved packets that have not yet been processed.
+        self._recv_packets: List[Optional[ProtocolMessage]] = []
+        # Used as a mutex for _recv_packets and for notify when _recv_packets
+        # changes.
+        self._recv_condition = threading.Condition()
+        self._recv_thread = threading.Thread(target=self._read_packet_thread)
+
+        # session state
         self.init_commands = init_commands
-        self.resolved_breakpoints = {}
+        self.exit_status: Optional[int] = None
+        self.capabilities: Optional[Dict] = None
+        self.initialized: bool = False
+        self.configuration_done_sent: bool = False
+        self.process_event_body: Optional[Dict] = None
+        self.terminated: bool = False
+        self.events: List[Event] = []
+        self.progress_events: List[Event] = []
+        self.reverse_requests: List[Request] = []
+        self.module_events: List[Dict] = []
+        self.sequence: int = 1
+        self.output: Dict[str, str] = {}
+
+        # debuggee state
+        self.threads: Optional[dict] = None
+        self.thread_stop_reasons: Dict[str, Any] = {}
+        self.frame_scopes: Dict[str, Any] = {}
+        # keyed by breakpoint id
+        self.resolved_breakpoints: Dict[int, bool] = {}
+
+        # trigger enqueue thread
+        self._recv_thread.start()
 
     @classmethod
     def encode_content(cls, s: str) -> bytes:
         return ("Content-Length: %u\r\n\r\n%s" % (len(s), s)).encode("utf-8")
 
     @classmethod
-    def validate_response(cls, command, response):
-        if command["command"] != response["command"]:
+    def validate_response(cls, request: Request, response: Response) -> None:
+        if request["command"] != response["command"]:
             raise ValueError("command mismatch in response")
-        if command["seq"] != response["request_seq"]:
+        if request["seq"] != response["request_seq"]:
             raise ValueError("seq mismatch in response")
 
     def _read_packet_thread(self):
@@ -189,262 +298,322 @@ def _read_packet_thread(self):
             while not done:
                 packet = read_packet(self.recv, trace_file=self.trace_file)
                 # `packet` will be `None` on EOF. We want to pass it down to
-                # handle_recv_packet anyway so the main thread can handle 
unexpected
-                # termination of lldb-dap and stop waiting for new packets.
+                # handle_recv_packet anyway so the main thread can handle
+                # unexpected termination of lldb-dap and stop waiting for new
+                # packets.
                 done = not self._handle_recv_packet(packet)
         finally:
             dump_dap_log(self.log_file)
 
-    def get_modules(self):
-        module_list = self.request_modules()["body"]["modules"]
-        modules = {}
-        for module in module_list:
-            modules[module["name"]] = module
-        return modules
+    def _handle_recv_packet(self, packet: Optional[ProtocolMessage]) -> bool:
+        """Handles an incoming packet.
 
-    def get_output(self, category, timeout=0.0, clear=True):
-        self.output_condition.acquire()
-        output = None
-        if category in self.output:
-            output = self.output[category]
-            if clear:
-                del self.output[category]
-        elif timeout != 0.0:
-            self.output_condition.wait(timeout)
-            if category in self.output:
-                output = self.output[category]
-                if clear:
-                    del self.output[category]
-        self.output_condition.release()
-        return output
+        Called by the read thread that is waiting for all incoming packets
+        to store the incoming packet in "self._recv_packets" in a thread safe
+        way. This function will then signal the "self._recv_condition" to
+        indicate a new packet is available.
 
-    def collect_output(self, category, timeout_secs, pattern, clear=True):
-        end_time = time.time() + timeout_secs
-        collected_output = ""
-        while end_time > time.time():
-            output = self.get_output(category, timeout=0.25, clear=clear)
-            if output:
-                collected_output += output
-                if pattern is not None and pattern in output:
-                    break
-        return collected_output if collected_output else None
-
-    def _enqueue_recv_packet(self, packet: Optional[ProtocolMessage]):
-        self.recv_condition.acquire()
-        self.recv_packets.append(packet)
-        self.recv_condition.notify()
-        self.recv_condition.release()
+        Args:
+            packet: A new packet to store.
 
-    def _handle_recv_packet(self, packet: Optional[ProtocolMessage]) -> bool:
-        """Called by the read thread that is waiting for all incoming packets
-        to store the incoming packet in "self.recv_packets" in a thread safe
-        way. This function will then signal the "self.recv_condition" to
-        indicate a new packet is available. Returns True if the caller
-        should keep calling this function for more packets.
+        Returns:
+            True if the caller should keep calling this function for more
+            packets.
         """
-        # If EOF, notify the read thread by enqueuing a None.
-        if not packet:
-            self._enqueue_recv_packet(None)
-            return False
-
-        # Check the packet to see if is an event packet
-        keepGoing = True
-        packet_type = packet["type"]
-        if packet_type == "event":
-            event = packet["event"]
-            body = None
-            if "body" in packet:
-                body = packet["body"]
-            # Handle the event packet and cache information from these packets
-            # as they come in
-            if event == "output":
-                # Store any output we receive so clients can retrieve it later.
-                category = body["category"]
-                output = body["output"]
-                self.output_condition.acquire()
-                if category in self.output:
-                    self.output[category] += output
-                else:
-                    self.output[category] = output
-                self.output_condition.notify()
-                self.output_condition.release()
-                # no need to add 'output' event packets to our packets list
-                return keepGoing
-            elif event == "initialized":
-                self.initialized = True
-            elif event == "process":
-                # When a new process is attached or launched, remember the
-                # details that are available in the body of the event
-                self.process_event_body = body
-            elif event == "exited":
-                # Process exited, mark the status to indicate the process is 
not
-                # alive.
-                self.exit_status = body["exitCode"]
-            elif event == "continued":
-                # When the process continues, clear the known threads and
-                # thread_stop_reasons.
-                all_threads_continued = body.get("allThreadsContinued", True)
-                tid = body["threadId"]
-                if tid in self.thread_stop_reasons:
-                    del self.thread_stop_reasons[tid]
-                self._process_continued(all_threads_continued)
-            elif event == "stopped":
-                # Each thread that stops with a reason will send a
-                # 'stopped' event. We need to remember the thread stop
-                # reasons since the 'threads' command doesn't return
-                # that information.
-                self._process_stopped()
-                tid = body["threadId"]
-                self.thread_stop_reasons[tid] = body
-            elif event.startswith("progress"):
-                # Progress events come in as 'progressStart', 'progressUpdate',
-                # and 'progressEnd' events. Keep these around in case test
-                # cases want to verify them.
-                self.progress_events.append(packet)
-            elif event == "breakpoint":
-                # Breakpoint events are sent when a breakpoint is resolved
-                self._update_verified_breakpoints([body["breakpoint"]])
-            elif event == "capabilities":
-                # Update the capabilities with new ones from the event.
-                self.capabilities.update(body["capabilities"])
-
-        elif packet_type == "response":
-            if packet["command"] == "disconnect":
-                keepGoing = False
-        self._enqueue_recv_packet(packet)
-        return keepGoing
+        with self._recv_condition:
+            self._recv_packets.append(packet)
+            self._recv_condition.notify()
+            # packet is None on EOF
+            return packet is not None and not (
+                packet["type"] == "response" and packet["command"] == 
"disconnect"
+            )
+
+    def _recv_packet(
+        self,
+        *,
+        predicate: Optional[Callable[[ProtocolMessage], bool]] = None,
+        timeout: Optional[float] = None,
+    ) -> Optional[ProtocolMessage]:
+        """Processes recived packets from the adapter.
+
+        Updates the DebugCommunication stateful properties based on the 
received
+        packets in the order they are recieved.
+
+        NOTE: The only time the session state properties should be updated is
+        during this call to ensure consistency during tests.
+
+        Args:
+            predicate:
+                Optional, if specified, returns the first packet that matches
+                the given predicate.
+            timeout:
+                Optional, if specified, processes packets until either the
+                timeout occurs or the predicate matches a packet, whichever
+                occurs first.
+
+        Returns:
+            The first matching packet for the given predicate, if specified,
+            otherwise None.
+        """
+        assert (
+            threading.current_thread != self._recv_thread
+        ), "Must not be called from the _recv_thread"
+
+        def process_until_match():
+            self._process_recv_packets()
+            for i, packet in enumerate(self._pending_packets):
+                if packet is None:
+                    # We need to return a truthy value to break out of the
+                    # wait_for, use `EOFError` as an indicator of EOF.
+                    return EOFError()
+                if predicate and predicate(packet):
+                    self._pending_packets.pop(i)
+                    return packet
+
+        with self._recv_condition:
+            packet = self._recv_condition.wait_for(process_until_match, 
timeout)
+            return None if isinstance(packet, EOFError) else packet
+
+    def _process_recv_packets(self) -> None:
+        """Process received packets, updating the session state."""
+        with self._recv_condition:
+            for packet in self._recv_packets:
+                # Handle events that may modify any stateful properties of
+                # the DAP session.
+                if packet and packet["type"] == "event":
+                    self._handle_event(packet)
+                elif packet and packet["type"] == "request":
+                    # Handle reverse requests and keep processing.
+                    self._handle_reverse_request(packet)
+                # Move the packet to the pending queue.
+                self._pending_packets.append(packet)
+            self._recv_packets.clear()
+
+    def _handle_event(self, packet: Event) -> None:
+        """Handle any events that modify debug session state we track."""
+        event = packet["event"]
+        body: Optional[Dict] = packet.get("body", None)
+
+        if event == "output":
+            # Store any output we receive so clients can retrieve it later.
+            category = body["category"]
+            output = body["output"]
+            if category in self.output:
+                self.output[category] += output
+            else:
+                self.output[category] = output
+        elif event == "initialized":
+            self.initialized = True
+        elif event == "process":
+            # When a new process is attached or launched, remember the
+            # details that are available in the body of the event
+            self.process_event_body = body
+        elif event == "exited":
+            # Process exited, mark the status to indicate the process is not
+            # alive.
+            self.exit_status = body["exitCode"]
+        elif event == "continued":
+            # When the process continues, clear the known threads and
+            # thread_stop_reasons.
+            all_threads_continued = (
+                body.get("allThreadsContinued", True) if body else True
+            )
+            tid = body["threadId"]
+            if tid in self.thread_stop_reasons:
+                del self.thread_stop_reasons[tid]
+            self._process_continued(all_threads_continued)
+        elif event == "stopped":
+            # Each thread that stops with a reason will send a
+            # 'stopped' event. We need to remember the thread stop
+            # reasons since the 'threads' command doesn't return
+            # that information.
+            self._process_stopped()
+            tid = body["threadId"]
+            self.thread_stop_reasons[tid] = body
+        elif event.startswith("progress"):
+            # Progress events come in as 'progressStart', 'progressUpdate',
+            # and 'progressEnd' events. Keep these around in case test
+            # cases want to verify them.
+            self.progress_events.append(packet)
+        elif event == "breakpoint":
+            # Breakpoint events are sent when a breakpoint is resolved
+            self._update_verified_breakpoints([body["breakpoint"]])
+        elif event == "capabilities":
+            # Update the capabilities with new ones from the event.
+            self.capabilities.update(body["capabilities"])
+
+    def _handle_reverse_request(self, request: Request) -> None:
+        if request in self.reverse_requests:
+            return
+        self.reverse_requests.append(request)
+        arguments = request.get("arguments")
+        if request["command"] == "runInTerminal" and arguments is not None:
+            in_shell = arguments.get("argsCanBeInterpretedByShell", False)
+            proc = subprocess.Popen(
+                arguments["args"],
+                env=arguments.get("env", {}),
+                cwd=arguments["cwd"],
+                stdin=subprocess.DEVNULL,
+                stdout=subprocess.DEVNULL,
+                stderr=subprocess.DEVNULL,
+                shell=in_shell,
+            )
+            body = {}
+            if in_shell:
+                body["shellProcessId"] = proc.pid
+            else:
+                body["processId"] = proc.pid
+            self.send_packet(
+                {
+                    "type": "response",
+                    "seq": 0,
+                    "request_seq": request["seq"],
+                    "success": True,
+                    "command": "runInTerminal",
+                    "message": None,
+                    "body": body,
+                }
+            )
+        elif request["command"] == "startDebugging":
+            self.send_packet(
+                {
+                    "type": "response",
+                    "seq": 0,
+                    "request_seq": request["seq"],
+                    "success": True,
+                    "message": None,
+                    "command": "startDebugging",
+                    "body": {},
+                }
+            )
+        else:
+            desc = 'unknown reverse request "%s"' % (request["command"])
+            raise ValueError(desc)
 
     def _process_continued(self, all_threads_continued: bool):
         self.frame_scopes = {}
         if all_threads_continued:
             self.thread_stop_reasons = {}
 
-    def _update_verified_breakpoints(self, breakpoints: list[Event]):
+    def _update_verified_breakpoints(self, breakpoints: list[Breakpoint]):
         for breakpoint in breakpoints:
-            if "id" in breakpoint:
-                self.resolved_breakpoints[str(breakpoint["id"])] = 
breakpoint.get(
-                    "verified", False
-                )
+            # If no id is set, we cannot correlate the given breakpoint across
+            # requests, ignore it.
+            if "id" not in breakpoint:
+                continue
+
+            self.resolved_breakpoints[str(breakpoint["id"])] = breakpoint.get(
+                "verified", False
+            )
 
-    def send_packet(self, command_dict: Request, set_sequence=True):
+    def _send_recv(self, request: Request) -> Optional[Response]:
+        """Send a command python dictionary as JSON and receive the JSON
+        response. Validates that the response is the correct sequence and
+        command in the reply. Any events that are received are added to the
+        events list in this object"""
+        seq = self.send_packet(request)
+        response = self.receive_response(seq)
+        if response is None:
+            desc = 'no response for "%s"' % (request["command"])
+            raise ValueError(desc)
+        self.validate_response(request, response)
+        return response
+
+    def send_packet(self, packet: ProtocolMessage) -> int:
         """Take the "command_dict" python dictionary and encode it as a JSON
         string and send the contents as a packet to the VSCode debug
-        adapter"""
-        # Set the sequence ID for this command automatically
-        if set_sequence:
-            command_dict["seq"] = self.sequence
+        adapter.
+
+        Returns the seq of the packet."""
+        # Set the seq for requests.
+        if packet["type"] == "request":
+            packet["seq"] = self.sequence
             self.sequence += 1
+        else:
+            packet["seq"] = 0
+
         # Encode our command dictionary as a JSON string
-        json_str = json.dumps(command_dict, separators=(",", ":"))
+        json_str = json.dumps(packet, separators=(",", ":"))
+
         if self.trace_file:
             self.trace_file.write("to adapter:\n%s\n" % (json_str))
+
         length = len(json_str)
         if length > 0:
             # Send the encoded JSON packet and flush the 'send' file
             self.send.write(self.encode_content(json_str))
             self.send.flush()
 
-    def recv_packet(
-        self,
-        filter_type: Optional[str] = None,
-        filter_event: Optional[Union[str, list[str]]] = None,
-        timeout: Optional[float] = None,
-    ) -> Optional[ProtocolMessage]:
-        """Get a JSON packet from the VSCode debug adapter. This function
-        assumes a thread that reads packets is running and will deliver
-        any received packets by calling handle_recv_packet(...). This
-        function will wait for the packet to arrive and return it when
-        it does."""
-        while True:
-            try:
-                self.recv_condition.acquire()
-                packet = None
-                while True:
-                    for i, curr_packet in enumerate(self.recv_packets):
-                        if not curr_packet:
-                            raise EOFError
-                        packet_type = curr_packet["type"]
-                        if filter_type is None or packet_type in filter_type:
-                            if filter_event is None or (
-                                packet_type == "event"
-                                and curr_packet["event"] in filter_event
-                            ):
-                                packet = self.recv_packets.pop(i)
-                                break
-                    if packet:
-                        break
-                    # Sleep until packet is received
-                    len_before = len(self.recv_packets)
-                    self.recv_condition.wait(timeout)
-                    len_after = len(self.recv_packets)
-                    if len_before == len_after:
-                        return None  # Timed out
-                return packet
-            except EOFError:
-                return None
-            finally:
-                self.recv_condition.release()
-
-    def send_recv(self, command):
-        """Send a command python dictionary as JSON and receive the JSON
-        response. Validates that the response is the correct sequence and
-        command in the reply. Any events that are received are added to the
-        events list in this object"""
-        self.send_packet(command)
-        done = False
-        while not done:
-            response_or_request = self.recv_packet(filter_type=["response", 
"request"])
-            if response_or_request is None:
-                desc = 'no response for "%s"' % (command["command"])
-                raise ValueError(desc)
-            if response_or_request["type"] == "response":
-                self.validate_response(command, response_or_request)
-                return response_or_request
-            else:
-                self.reverse_requests.append(response_or_request)
-                if response_or_request["command"] == "runInTerminal":
-                    subprocess.Popen(
-                        response_or_request["arguments"]["args"],
-                        env=response_or_request["arguments"]["env"],
-                    )
-                    self.send_packet(
-                        {
-                            "type": "response",
-                            "request_seq": response_or_request["seq"],
-                            "success": True,
-                            "command": "runInTerminal",
-                            "body": {},
-                        },
-                    )
-                elif response_or_request["command"] == "startDebugging":
-                    self.send_packet(
-                        {
-                            "type": "response",
-                            "request_seq": response_or_request["seq"],
-                            "success": True,
-                            "command": "startDebugging",
-                            "body": {},
-                        },
-                    )
-                else:
-                    desc = 'unknown reverse request "%s"' % (
-                        response_or_request["command"]
-                    )
-                    raise ValueError(desc)
+        return packet["seq"]
 
-        return None
+    def receive_response(self, seq: int) -> Optional[Response]:
+        """Waits for the a response with the associated request_sec."""
+
+        def predicate(p: ProtocolMessage):
+            return p["type"] == "response" and p["request_seq"] == seq
+
+        return cast(Optional[Response], self._recv_packet(predicate=predicate))
+
+    def get_modules(self):
+        modules = {}
+        resp = self.request_modules()
+        if resp["success"]:
+            module_list = resp["body"]["modules"]
+            for module in module_list:
+                modules[module["name"]] = module
+        return modules
+
+    def get_output(self, category: str, clear=True) -> str:
+        output = ""
+        if category in self.output:
+            output = self.output.get(category, "")
+            if clear:
+                del self.output[category]
+        return output
+
+    def collect_output(
+        self,
+        category: str,
+        timeout_secs: float,
+        pattern: Optional[str] = None,
+        clear=True,
+    ) -> str:
+        """Collect output from 'output' events.
+
+        Args:
+            category: The category to collect.
+            timeout_secs: The max duration for collecting output.
+            pattern:
+                Optional, if set, return once this pattern is detected in the
+                collected output.
+
+        Returns:
+            The collected output.
+        """
+        deadline = time.monotonic() + timeout_secs
+        output = self.get_output(category, clear)
+        while deadline >= time.monotonic() and pattern is None or pattern not 
in output:
+            event = self.wait_for_event(["output"], timeout=deadline - 
time.monotonic())
+            if not event:  # Timeout or EOF
+                break
+            output += self.get_output(category, clear=clear)
+        return output
 
     def wait_for_event(
-        self, filter: Union[str, list[str]], timeout: Optional[float] = None
+        self, filter: List[str] = [], timeout: Optional[float] = None
     ) -> Optional[Event]:
         """Wait for the first event that matches the filter."""
-        return self.recv_packet(
-            filter_type="event", filter_event=filter, timeout=timeout
+
+        def predicate(p: ProtocolMessage):
+            return p["type"] == "event" and p["event"] in filter
+
+        return cast(
+            Optional[Event], self._recv_packet(predicate=predicate, 
timeout=timeout)
         )
 
     def wait_for_stopped(
         self, timeout: Optional[float] = None
-    ) -> Optional[list[Event]]:
+    ) -> Optional[List[Event]]:
         stopped_events = []
         stopped_event = self.wait_for_event(
             filter=["stopped", "exited"], timeout=timeout
@@ -463,9 +632,9 @@ def wait_for_stopped(
         return stopped_events
 
     def wait_for_breakpoint_events(self, timeout: Optional[float] = None):
-        breakpoint_events: list[Event] = []
+        breakpoint_events: List[Event] = []
         while True:
-            event = self.wait_for_event("breakpoint", timeout=timeout)
+            event = self.wait_for_event(["breakpoint"], timeout=timeout)
             if not event:
                 break
             breakpoint_events.append(event)
@@ -476,20 +645,26 @@ def wait_for_breakpoints_to_be_verified(
     ):
         """Wait for all breakpoints to be verified. Return all unverified 
breakpoints."""
         while any(id not in self.resolved_breakpoints for id in 
breakpoint_ids):
-            breakpoint_event = self.wait_for_event("breakpoint", 
timeout=timeout)
+            breakpoint_event = self.wait_for_event(["breakpoint"], 
timeout=timeout)
             if breakpoint_event is None:
                 break
 
-        return [id for id in breakpoint_ids if id not in 
self.resolved_breakpoints]
+        return [
+            id
+            for id in breakpoint_ids
+            if id not in self.resolved_breakpoints and not 
self.resolved_breakpoints[id]
+        ]
 
     def wait_for_exited(self, timeout: Optional[float] = None):
-        event_dict = self.wait_for_event("exited", timeout=timeout)
+        event_dict = self.wait_for_event(["exited"], timeout=timeout)
         if event_dict is None:
             raise ValueError("didn't get exited event")
         return event_dict
 
     def wait_for_terminated(self, timeout: Optional[float] = None):
-        event_dict = self.wait_for_event("terminated", timeout)
+        if self.terminated:
+            raise ValueError("already terminated")
+        event_dict = self.wait_for_event(["terminated"], timeout)
         if event_dict is None:
             raise ValueError("didn't get terminated event")
         return event_dict
@@ -667,7 +842,7 @@ def request_attach(
         gdbRemotePort: Optional[int] = None,
         gdbRemoteHostname: Optional[str] = None,
     ):
-        args_dict = {}
+        args_dict: AttachArguments = {}
         if pid is not None:
             args_dict["pid"] = pid
         if program is not None:
@@ -700,7 +875,7 @@ def request_attach(
         if gdbRemoteHostname is not None:
             args_dict["gdb-remote-hostname"] = gdbRemoteHostname
         command_dict = {"command": "attach", "type": "request", "arguments": 
args_dict}
-        return self.send_recv(command_dict)
+        return self._send_recv(command_dict)
 
     def request_breakpointLocations(
         self, file_path, line, end_line=None, column=None, end_column=None
@@ -722,7 +897,7 @@ def request_breakpointLocations(
             "type": "request",
             "arguments": args_dict,
         }
-        return self.send_recv(command_dict)
+        return self._send_recv(command_dict)
 
     def request_configurationDone(self):
         command_dict = {
@@ -730,7 +905,7 @@ def request_configurationDone(self):
             "type": "request",
             "arguments": {},
         }
-        response = self.send_recv(command_dict)
+        response = self._send_recv(command_dict)
         if response:
             self.configuration_done_sent = True
             self.request_threads()
@@ -759,7 +934,7 @@ def request_continue(self, threadId=None, 
singleThread=False):
             "type": "request",
             "arguments": args_dict,
         }
-        response = self.send_recv(command_dict)
+        response = self._send_recv(command_dict)
         if response["success"]:
             self._process_continued(response["body"]["allThreadsContinued"])
         # Caller must still call wait_for_stopped.
@@ -776,7 +951,7 @@ def request_restart(self, restartArguments=None):
         if restartArguments:
             command_dict["arguments"] = restartArguments
 
-        response = self.send_recv(command_dict)
+        response = self._send_recv(command_dict)
         # Caller must still call wait_for_stopped.
         return response
 
@@ -792,7 +967,7 @@ def request_disconnect(self, terminateDebuggee=None):
             "type": "request",
             "arguments": args_dict,
         }
-        return self.send_recv(command_dict)
+        return self._send_recv(command_dict)
 
     def request_disassemble(
         self,
@@ -812,7 +987,7 @@ def request_disassemble(
             "type": "request",
             "arguments": args_dict,
         }
-        return self.send_recv(command_dict)["body"]["instructions"]
+        return self._send_recv(command_dict)["body"]["instructions"]
 
     def request_readMemory(self, memoryReference, offset, count):
         args_dict = {
@@ -825,7 +1000,7 @@ def request_readMemory(self, memoryReference, offset, 
count):
             "type": "request",
             "arguments": args_dict,
         }
-        return self.send_recv(command_dict)
+        return self._send_recv(command_dict)
 
     def request_evaluate(self, expression, frameIndex=0, threadId=None, 
context=None):
         stackFrame = self.get_stackFrame(frameIndex=frameIndex, 
threadId=threadId)
@@ -841,7 +1016,7 @@ def request_evaluate(self, expression, frameIndex=0, 
threadId=None, context=None
             "type": "request",
             "arguments": args_dict,
         }
-        return self.send_recv(command_dict)
+        return self._send_recv(command_dict)
 
     def request_exceptionInfo(self, threadId=None):
         if threadId is None:
@@ -852,7 +1027,7 @@ def request_exceptionInfo(self, threadId=None):
             "type": "request",
             "arguments": args_dict,
         }
-        return self.send_recv(command_dict)
+        return self._send_recv(command_dict)
 
     def request_initialize(self, sourceInitFile=False):
         command_dict = {
@@ -873,7 +1048,7 @@ def request_initialize(self, sourceInitFile=False):
                 "$__lldb_sourceInitFile": sourceInitFile,
             },
         }
-        response = self.send_recv(command_dict)
+        response = self._send_recv(command_dict)
         if response:
             if "body" in response:
                 self.capabilities = response["body"]
@@ -908,7 +1083,7 @@ def request_launch(
         customFrameFormat: Optional[str] = None,
         customThreadFormat: Optional[str] = None,
     ):
-        args_dict = {"program": program}
+        args_dict: LaunchArguments = {"program": program}
         if args:
             args_dict["args"] = args
         if cwd:
@@ -956,14 +1131,14 @@ def request_launch(
         if commandEscapePrefix is not None:
             args_dict["commandEscapePrefix"] = commandEscapePrefix
         command_dict = {"command": "launch", "type": "request", "arguments": 
args_dict}
-        return self.send_recv(command_dict)
+        return self._send_recv(command_dict)
 
     def request_next(self, threadId, granularity="statement"):
         if self.exit_status is not None:
             raise ValueError("request_continue called after process exited")
         args_dict = {"threadId": threadId, "granularity": granularity}
         command_dict = {"command": "next", "type": "request", "arguments": 
args_dict}
-        return self.send_recv(command_dict)
+        return self._send_recv(command_dict)
 
     def request_stepIn(self, threadId, targetId, granularity="statement"):
         if self.exit_status is not None:
@@ -976,7 +1151,7 @@ def request_stepIn(self, threadId, targetId, 
granularity="statement"):
             "granularity": granularity,
         }
         command_dict = {"command": "stepIn", "type": "request", "arguments": 
args_dict}
-        return self.send_recv(command_dict)
+        return self._send_recv(command_dict)
 
     def request_stepInTargets(self, frameId):
         if self.exit_status is not None:
@@ -988,14 +1163,14 @@ def request_stepInTargets(self, frameId):
             "type": "request",
             "arguments": args_dict,
         }
-        return self.send_recv(command_dict)
+        return self._send_recv(command_dict)
 
     def request_stepOut(self, threadId):
         if self.exit_status is not None:
             raise ValueError("request_stepOut called after process exited")
         args_dict = {"threadId": threadId}
         command_dict = {"command": "stepOut", "type": "request", "arguments": 
args_dict}
-        return self.send_recv(command_dict)
+        return self._send_recv(command_dict)
 
     def request_pause(self, threadId=None):
         if self.exit_status is not None:
@@ -1004,39 +1179,35 @@ def request_pause(self, threadId=None):
             threadId = self.get_thread_id()
         args_dict = {"threadId": threadId}
         command_dict = {"command": "pause", "type": "request", "arguments": 
args_dict}
-        return self.send_recv(command_dict)
+        return self._send_recv(command_dict)
 
     def request_scopes(self, frameId):
         args_dict = {"frameId": frameId}
         command_dict = {"command": "scopes", "type": "request", "arguments": 
args_dict}
-        return self.send_recv(command_dict)
+        return self._send_recv(command_dict)
 
-    def request_setBreakpoints(self, source: Source, line_array, data=None):
+    def request_setBreakpoints(
+        self,
+        source: Union[Source, str],
+        line_array: Optional[List[int]],
+        data: Optional[List[BreakpiontData]] = None,
+    ):
         """data is array of parameters for breakpoints in line_array.
         Each parameter object is 1:1 mapping with entries in line_entry.
         It contains optional location/hitCondition/logMessage parameters.
         """
         args_dict = {
-            "source": source.as_dict(),
+            "source": source.to_DAP(),
             "sourceModified": False,
         }
-        if line_array is not None:
+        if line_array:
             args_dict["lines"] = line_array
             breakpoints = []
             for i, line in enumerate(line_array):
-                breakpoint_data = None
+                breakpoint_data: BreakpiontData = {}
                 if data is not None and i < len(data):
                     breakpoint_data = data[i]
-                bp = {"line": line}
-                if breakpoint_data is not None:
-                    if breakpoint_data.get("condition"):
-                        bp["condition"] = breakpoint_data["condition"]
-                    if breakpoint_data.get("hitCondition"):
-                        bp["hitCondition"] = breakpoint_data["hitCondition"]
-                    if breakpoint_data.get("logMessage"):
-                        bp["logMessage"] = breakpoint_data["logMessage"]
-                    if breakpoint_data.get("column"):
-                        bp["column"] = breakpoint_data["column"]
+                bp: SourceBreakpoint = {"line": line, **breakpoint_data}
                 breakpoints.append(bp)
             args_dict["breakpoints"] = breakpoints
 
@@ -1045,7 +1216,7 @@ def request_setBreakpoints(self, source: Source, 
line_array, data=None):
             "type": "request",
             "arguments": args_dict,
         }
-        response = self.send_recv(command_dict)
+        response = self._send_recv(command_dict)
         if response["success"]:
             self._update_verified_breakpoints(response["body"]["breakpoints"])
         return response
@@ -1057,7 +1228,7 @@ def request_setExceptionBreakpoints(self, filters):
             "type": "request",
             "arguments": args_dict,
         }
-        return self.send_recv(command_dict)
+        return self._send_recv(command_dict)
 
     def request_setFunctionBreakpoints(self, names, condition=None, 
hitCondition=None):
         breakpoints = []
@@ -1074,7 +1245,7 @@ def request_setFunctionBreakpoints(self, names, 
condition=None, hitCondition=Non
             "type": "request",
             "arguments": args_dict,
         }
-        response = self.send_recv(command_dict)
+        response = self._send_recv(command_dict)
         if response["success"]:
             self._update_verified_breakpoints(response["body"]["breakpoints"])
         return response
@@ -1095,7 +1266,7 @@ def request_dataBreakpointInfo(
             "type": "request",
             "arguments": args_dict,
         }
-        return self.send_recv(command_dict)
+        return self._send_recv(command_dict)
 
     def request_setDataBreakpoint(self, dataBreakpoints):
         """dataBreakpoints is a list of dictionary with following fields:
@@ -1112,7 +1283,7 @@ def request_setDataBreakpoint(self, dataBreakpoints):
             "type": "request",
             "arguments": args_dict,
         }
-        return self.send_recv(command_dict)
+        return self._send_recv(command_dict)
 
     def request_compileUnits(self, moduleId):
         args_dict = {"moduleId": moduleId}
@@ -1121,7 +1292,7 @@ def request_compileUnits(self, moduleId):
             "type": "request",
             "arguments": args_dict,
         }
-        response = self.send_recv(command_dict)
+        response = self._send_recv(command_dict)
         return response
 
     def request_completions(self, text, frameId=None):
@@ -1133,10 +1304,10 @@ def request_completions(self, text, frameId=None):
             "type": "request",
             "arguments": args_dict,
         }
-        return self.send_recv(command_dict)
+        return self._send_recv(command_dict)
 
     def request_modules(self):
-        return self.send_recv({"command": "modules", "type": "request"})
+        return self._send_recv({"command": "modules", "type": "request"})
 
     def request_stackTrace(
         self, threadId=None, startFrame=None, levels=None, format=None, 
dump=False
@@ -1155,7 +1326,7 @@ def request_stackTrace(
             "type": "request",
             "arguments": args_dict,
         }
-        response = self.send_recv(command_dict)
+        response = self._send_recv(command_dict)
         if dump:
             for idx, frame in enumerate(response["body"]["stackFrames"]):
                 name = frame["name"]
@@ -1181,7 +1352,7 @@ def request_source(self, sourceReference):
                 "sourceReference": sourceReference,
             },
         }
-        return self.send_recv(command_dict)
+        return self._send_recv(command_dict)
 
     def request_threads(self):
         """Request a list of all threads and combine any information from any
@@ -1189,7 +1360,7 @@ def request_threads(self):
         thread actually stopped. Returns an array of thread dictionaries
         with information about all threads"""
         command_dict = {"command": "threads", "type": "request", "arguments": 
{}}
-        response = self.send_recv(command_dict)
+        response = self._send_recv(command_dict)
         if not response["success"]:
             self.threads = None
             return response
@@ -1229,7 +1400,7 @@ def request_variables(
             "type": "request",
             "arguments": args_dict,
         }
-        return self.send_recv(command_dict)
+        return self._send_recv(command_dict)
 
     def request_setVariable(self, containingVarRef, name, value, id=None):
         args_dict = {
@@ -1244,7 +1415,7 @@ def request_setVariable(self, containingVarRef, name, 
value, id=None):
             "type": "request",
             "arguments": args_dict,
         }
-        return self.send_recv(command_dict)
+        return self._send_recv(command_dict)
 
     def request_locations(self, locationReference):
         args_dict = {
@@ -1255,7 +1426,7 @@ def request_locations(self, locationReference):
             "type": "request",
             "arguments": args_dict,
         }
-        return self.send_recv(command_dict)
+        return self._send_recv(command_dict)
 
     def request_testGetTargetBreakpoints(self):
         """A request packet used in the LLDB test suite to get all currently
@@ -1267,12 +1438,12 @@ def request_testGetTargetBreakpoints(self):
             "type": "request",
             "arguments": {},
         }
-        return self.send_recv(command_dict)
+        return self._send_recv(command_dict)
 
     def terminate(self):
         self.send.close()
-        if self.recv_thread.is_alive():
-            self.recv_thread.join()
+        if self._recv_thread.is_alive():
+            self._recv_thread.join()
 
     def request_setInstructionBreakpoints(self, memory_reference=[]):
         breakpoints = []
@@ -1287,7 +1458,7 @@ def request_setInstructionBreakpoints(self, 
memory_reference=[]):
             "type": "request",
             "arguments": args_dict,
         }
-        return self.send_recv(command_dict)
+        return self._send_recv(command_dict)
 
 
 class DebugAdapterServer(DebugCommunication):
diff --git 
a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py 
b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py
index 3b54d598c3509..8778b51e7c360 100644
--- a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py
+++ b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py
@@ -1,6 +1,6 @@
 import os
 import time
-from typing import Optional
+from typing import Optional, Callable
 import uuid
 
 import dap_server
@@ -121,11 +121,19 @@ def wait_for_breakpoints_to_resolve(
             f"Expected to resolve all breakpoints. Unresolved breakpoint ids: 
{unresolved_breakpoints}",
         )
 
-    def waitUntil(self, condition_callback):
-        for _ in range(20):
-            if condition_callback():
+    def wait_until(
+        self,
+        predicate: Callable[[], bool],
+        delay: float = 0.5,
+        timeout: float = DEFAULT_TIMEOUT,
+    ) -> bool:
+        """Repeatedly run the predicate until either the predicate returns True
+        or a timeout has occurred."""
+        deadline = time.monotonic() + timeout
+        while deadline > time.monotonic():
+            if predicate():
                 return True
-            time.sleep(0.5)
+            time.sleep(delay)
         return False
 
     def assertCapabilityIsSet(self, key: str, msg: Optional[str] = None) -> 
None:
@@ -144,6 +152,7 @@ def verify_breakpoint_hit(self, breakpoint_ids, 
timeout=DEFAULT_TIMEOUT):
         "breakpoint_ids" should be a list of breakpoint ID strings
         (["1", "2"]). The return value from self.set_source_breakpoints()
         or self.set_function_breakpoints() can be passed to this function"""
+        breakpoint_ids = [str(i) for i in breakpoint_ids]
         stopped_events = self.dap_server.wait_for_stopped(timeout)
         for stopped_event in stopped_events:
             if "body" in stopped_event:
@@ -155,22 +164,16 @@ def verify_breakpoint_hit(self, breakpoint_ids, 
timeout=DEFAULT_TIMEOUT):
                     and body["reason"] != "instruction breakpoint"
                 ):
                     continue
-                if "description" not in body:
+                if "hitBreakpointIds" not in body:
                     continue
-                # Descriptions for breakpoints will be in the form
-                # "breakpoint 1.1", so look for any description that matches
-                # ("breakpoint 1.") in the description field as verification
-                # that one of the breakpoint locations was hit. DAP doesn't
-                # allow breakpoints to have multiple locations, but LLDB does.
-                # So when looking at the description we just want to make sure
-                # the right breakpoint matches and not worry about the actual
-                # location.
-                description = body["description"]
-                for breakpoint_id in breakpoint_ids:
-                    match_desc = f"breakpoint {breakpoint_id}."
-                    if match_desc in description:
+                hit_breakpoint_ids = body["hitBreakpointIds"]
+                for bp in hit_breakpoint_ids:
+                    if str(bp) in breakpoint_ids:
                         return
-        self.assertTrue(False, f"breakpoint not hit, 
stopped_events={stopped_events}")
+        self.assertTrue(
+            False,
+            f"breakpoint not hit, wanted breakpoint_ids={breakpoint_ids} 
stopped_events={stopped_events}",
+        )
 
     def verify_stop_exception_info(self, expected_description, 
timeout=DEFAULT_TIMEOUT):
         """Wait for the process we are debugging to stop, and verify the stop
@@ -205,7 +208,9 @@ def verify_commands(self, flavor, output, commands):
                     found = True
                     break
             self.assertTrue(
-                found, "verify '%s' found in console output for '%s'" % (cmd, 
flavor)
+                found,
+                "verify '%s' found in console output for '%s' in %s"
+                % (cmd, flavor, output),
             )
 
     def get_dict_value(self, d, key_path):
@@ -277,26 +282,30 @@ def get_source_and_line(self, threadId=None, 
frameIndex=0):
                         return (source["path"], stackFrame["line"])
         return ("", 0)
 
-    def get_stdout(self, timeout=0.0):
-        return self.dap_server.get_output("stdout", timeout=timeout)
+    def get_stdout(self):
+        return self.dap_server.get_output("stdout")
 
-    def get_console(self, timeout=0.0):
-        return self.dap_server.get_output("console", timeout=timeout)
+    def get_console(self):
+        return self.dap_server.get_output("console")
 
-    def get_important(self, timeout=0.0):
-        return self.dap_server.get_output("important", timeout=timeout)
+    def get_important(self):
+        return self.dap_server.get_output("important")
 
-    def collect_stdout(self, timeout_secs, pattern=None):
+    def collect_stdout(self, timeout_secs: float, pattern: Optional[str] = 
None) -> str:
         return self.dap_server.collect_output(
             "stdout", timeout_secs=timeout_secs, pattern=pattern
         )
 
-    def collect_console(self, timeout_secs, pattern=None):
+    def collect_console(
+        self, timeout_secs: float, pattern: Optional[str] = None
+    ) -> str:
         return self.dap_server.collect_output(
             "console", timeout_secs=timeout_secs, pattern=pattern
         )
 
-    def collect_important(self, timeout_secs, pattern=None):
+    def collect_important(
+        self, timeout_secs: float, pattern: Optional[str] = None
+    ) -> str:
         return self.dap_server.collect_output(
             "important", timeout_secs=timeout_secs, pattern=pattern
         )
@@ -355,7 +364,7 @@ def stepOut(self, threadId=None, waitForStop=True, 
timeout=DEFAULT_TIMEOUT):
             return self.dap_server.wait_for_stopped(timeout)
         return None
 
-    def do_continue(self):  # `continue` is a keyword.
+    def do_continue(self) -> None:  # `continue` is a keyword.
         resp = self.dap_server.request_continue()
         self.assertTrue(resp["success"], f"continue request failed: {resp}")
 
@@ -363,10 +372,14 @@ def continue_to_next_stop(self, timeout=DEFAULT_TIMEOUT):
         self.do_continue()
         return self.dap_server.wait_for_stopped(timeout)
 
-    def continue_to_breakpoint(self, breakpoint_id: str, 
timeout=DEFAULT_TIMEOUT):
-        self.continue_to_breakpoints((breakpoint_id), timeout)
+    def continue_to_breakpoint(
+        self, breakpoint_id: int, timeout: Optional[float] = DEFAULT_TIMEOUT
+    ) -> None:
+        self.continue_to_breakpoints([breakpoint_id], timeout)
 
-    def continue_to_breakpoints(self, breakpoint_ids, timeout=DEFAULT_TIMEOUT):
+    def continue_to_breakpoints(
+        self, breakpoint_ids: list[int], timeout: Optional[float] = 
DEFAULT_TIMEOUT
+    ) -> None:
         self.do_continue()
         self.verify_breakpoint_hit(breakpoint_ids, timeout)
 
diff --git a/lldb/test/API/tools/lldb-dap/breakpoint/TestDAP_setBreakpoints.py 
b/lldb/test/API/tools/lldb-dap/breakpoint/TestDAP_setBreakpoints.py
index 831edd6494c1e..a6eeee3a02543 100644
--- a/lldb/test/API/tools/lldb-dap/breakpoint/TestDAP_setBreakpoints.py
+++ b/lldb/test/API/tools/lldb-dap/breakpoint/TestDAP_setBreakpoints.py
@@ -78,7 +78,7 @@ def test_source_map(self):
         self.assertFalse(breakpoint["verified"])
         self.assertEqual(other_basename, breakpoint["source"]["name"])
         self.assertEqual(new_other_path, breakpoint["source"]["path"])
-        other_breakpoint_id = breakpoint["id"]
+        other_breakpoint_id = str(breakpoint["id"])
 
         self.dap_server.request_continue()
         self.verify_breakpoint_hit([other_breakpoint_id])
@@ -379,7 +379,8 @@ def test_column_breakpoints(self):
             self.assertEqual(breakpoint["line"], loop_line)
             self.assertEqual(breakpoint["column"], columns[index])
             self.assertTrue(breakpoint["verified"], "expect breakpoint 
verified")
-            breakpoint_ids.append(breakpoint["id"])
+            self.assertIn("id", breakpoint, "expected breakpoint id")
+            breakpoint_ids.append(str(breakpoint["id"]))
 
         # Continue to the first breakpoint,
         self.continue_to_breakpoints([breakpoint_ids[0]])
diff --git a/lldb/test/API/tools/lldb-dap/cancel/TestDAP_cancel.py 
b/lldb/test/API/tools/lldb-dap/cancel/TestDAP_cancel.py
index 824ed8fe3bb97..c750cff071a80 100644
--- a/lldb/test/API/tools/lldb-dap/cancel/TestDAP_cancel.py
+++ b/lldb/test/API/tools/lldb-dap/cancel/TestDAP_cancel.py
@@ -54,18 +54,18 @@ def test_pending_request(self):
         pending_seq = 
self.async_blocking_request(duration=self.DEFAULT_TIMEOUT / 2)
         cancel_seq = self.async_cancel(requestId=pending_seq)
 
-        blocking_resp = self.dap_server.recv_packet(filter_type=["response"])
+        blocking_resp = self.dap_server.receive_response(blocking_seq)
         self.assertEqual(blocking_resp["request_seq"], blocking_seq)
         self.assertEqual(blocking_resp["command"], "evaluate")
         self.assertEqual(blocking_resp["success"], True)
 
-        pending_resp = self.dap_server.recv_packet(filter_type=["response"])
+        pending_resp = self.dap_server.receive_response(pending_seq)
         self.assertEqual(pending_resp["request_seq"], pending_seq)
         self.assertEqual(pending_resp["command"], "evaluate")
         self.assertEqual(pending_resp["success"], False)
         self.assertEqual(pending_resp["message"], "cancelled")
 
-        cancel_resp = self.dap_server.recv_packet(filter_type=["response"])
+        cancel_resp = self.dap_server.receive_response(cancel_seq)
         self.assertEqual(cancel_resp["request_seq"], cancel_seq)
         self.assertEqual(cancel_resp["command"], "cancel")
         self.assertEqual(cancel_resp["success"], True)
@@ -86,13 +86,13 @@ def test_inflight_request(self):
         )
         cancel_seq = self.async_cancel(requestId=blocking_seq)
 
-        blocking_resp = self.dap_server.recv_packet(filter_type=["response"])
+        blocking_resp = self.dap_server.receive_response(blocking_seq)
         self.assertEqual(blocking_resp["request_seq"], blocking_seq)
         self.assertEqual(blocking_resp["command"], "evaluate")
         self.assertEqual(blocking_resp["success"], False)
         self.assertEqual(blocking_resp["message"], "cancelled")
 
-        cancel_resp = self.dap_server.recv_packet(filter_type=["response"])
+        cancel_resp = self.dap_server.receive_response(cancel_seq)
         self.assertEqual(cancel_resp["request_seq"], cancel_seq)
         self.assertEqual(cancel_resp["command"], "cancel")
         self.assertEqual(cancel_resp["success"], True)
diff --git a/lldb/test/API/tools/lldb-dap/launch/TestDAP_launch.py 
b/lldb/test/API/tools/lldb-dap/launch/TestDAP_launch.py
index ae8142ae4f484..c29e0d3fa7b81 100644
--- a/lldb/test/API/tools/lldb-dap/launch/TestDAP_launch.py
+++ b/lldb/test/API/tools/lldb-dap/launch/TestDAP_launch.py
@@ -191,7 +191,7 @@ def test_disableSTDIO(self):
         self.continue_to_exit()
         # Now get the STDOUT and verify our program argument is correct
         output = self.get_stdout()
-        self.assertEqual(output, None, "expect no program output")
+        self.assertEqual(output, "", "expect no program output")
 
     @skipIfWindows
     @skipIfLinux  # shell argument expansion doesn't seem to work on Linux
@@ -392,14 +392,14 @@ def test_commands(self):
         # Get output from the console. This should contain both the
         # "stopCommands" that were run after the first breakpoint was hit
         self.continue_to_breakpoints(breakpoint_ids)
-        output = self.get_console(timeout=self.DEFAULT_TIMEOUT)
+        output = self.get_console()
         self.verify_commands("stopCommands", output, stopCommands)
 
         # Continue again and hit the second breakpoint.
         # Get output from the console. This should contain both the
         # "stopCommands" that were run after the second breakpoint was hit
         self.continue_to_breakpoints(breakpoint_ids)
-        output = self.get_console(timeout=self.DEFAULT_TIMEOUT)
+        output = self.get_console()
         self.verify_commands("stopCommands", output, stopCommands)
 
         # Continue until the program exits
@@ -461,21 +461,21 @@ def test_extra_launch_commands(self):
         self.verify_commands("launchCommands", output, launchCommands)
         # Verify the "stopCommands" here
         self.continue_to_next_stop()
-        output = self.get_console(timeout=self.DEFAULT_TIMEOUT)
+        output = self.get_console()
         self.verify_commands("stopCommands", output, stopCommands)
 
         # Continue and hit the second breakpoint.
         # Get output from the console. This should contain both the
         # "stopCommands" that were run after the first breakpoint was hit
         self.continue_to_next_stop()
-        output = self.get_console(timeout=self.DEFAULT_TIMEOUT)
+        output = self.get_console()
         self.verify_commands("stopCommands", output, stopCommands)
 
         # Continue until the program exits
         self.continue_to_exit()
         # Get output from the console. This should contain both the
         # "exitCommands" that were run after the second breakpoint was hit
-        output = self.get_console(timeout=self.DEFAULT_TIMEOUT)
+        output = self.get_console()
         self.verify_commands("exitCommands", output, exitCommands)
 
     def test_failing_launch_commands(self):
diff --git a/lldb/test/API/tools/lldb-dap/module/TestDAP_module.py 
b/lldb/test/API/tools/lldb-dap/module/TestDAP_module.py
index 4fc221668a8ee..b1823e4c8b1c3 100644
--- a/lldb/test/API/tools/lldb-dap/module/TestDAP_module.py
+++ b/lldb/test/API/tools/lldb-dap/module/TestDAP_module.py
@@ -54,7 +54,7 @@ def checkSymbolsLoadedWithSize():
             return symbol_regex.match(program_module["symbolStatus"])
 
         if expect_debug_info_size:
-            self.waitUntil(checkSymbolsLoadedWithSize)
+            self.wait_until(checkSymbolsLoadedWithSize)
         active_modules = self.dap_server.get_modules()
         program_module = active_modules[program_basename]
         self.assertEqual(program_basename, program_module["name"])
diff --git a/lldb/test/API/tools/lldb-dap/output/TestDAP_output.py 
b/lldb/test/API/tools/lldb-dap/output/TestDAP_output.py
index 0425b55a5e552..4fcde623e3829 100644
--- a/lldb/test/API/tools/lldb-dap/output/TestDAP_output.py
+++ b/lldb/test/API/tools/lldb-dap/output/TestDAP_output.py
@@ -37,14 +37,14 @@ def test_output(self):
         # Disconnecting from the server to ensure any pending IO is flushed.
         self.dap_server.request_disconnect()
 
-        output += self.get_stdout(timeout=self.DEFAULT_TIMEOUT)
+        output += self.get_stdout()
         self.assertTrue(output and len(output) > 0, "expect program stdout")
         self.assertIn(
             "abcdefghi\r\nhello world\r\nfinally\0\0",
             output,
             "full stdout not found in: " + repr(output),
         )
-        console = self.get_console(timeout=self.DEFAULT_TIMEOUT)
+        console = self.get_console()
         self.assertTrue(console and len(console) > 0, "expect dap messages")
         self.assertIn(
             "out\0\0\r\nerr\0\0\r\n", console, f"full console message not 
found"

>From 1ac2ef82a698bba960636b1009de4c2d89ba4863 Mon Sep 17 00:00:00 2001
From: John Harrison <harj...@google.com>
Date: Thu, 12 Jun 2025 16:41:25 -0700
Subject: [PATCH 2/2] Adjusting print to use f strings.

---
 .../Python/lldbsuite/test/tools/lldb-dap/dap_server.py | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py 
b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
index 20a1b4480df6d..bb6e06520d408 100644
--- a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
+++ b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
@@ -173,20 +173,20 @@ def read_packet(
     if line.startswith(prefix):
         # Decode length of JSON bytes
         if verbose:
-            print('content: "%s"' % (line))
+            print(f"content: {line}")
         length = int(line[len(prefix) :])
         if verbose:
-            print('length: "%u"' % (length))
+            print(f"length: {length}")
         # Skip empty line
         line = f.readline().decode()
         if verbose:
-            print('empty: "%s"' % (line))
+            print(f"empty: {line!r}")
         # Read JSON bytes
         json_str = f.read(length)
         if verbose:
-            print('json: "%r"' % (json_str))
+            print(f"json: {json_str!r}")
         if trace_file:
-            trace_file.write("from adapter:\n%r\n" % (json_str))
+            trace_file.write(f"from adapter:\n{json_str!r}\n")
         # Decode the JSON bytes into a python dictionary
         return json.loads(json_str)
 

_______________________________________________
lldb-commits mailing list
lldb-commits@lists.llvm.org
https://lists.llvm.org/cgi-bin/mailman/listinfo/lldb-commits

Reply via email to