================
@@ -188,267 +241,326 @@ def validate_response(cls, command, response):
             )
 
     def _read_packet_thread(self):
-        done = False
         try:
-            while not done:
+            while True:
                 packet = read_packet(self.recv, trace_file=self.trace_file)
                 # `packet` will be `None` on EOF. We want to pass it down to
                 # handle_recv_packet anyway so the main thread can handle 
unexpected
                 # termination of lldb-dap and stop waiting for new packets.
-                done = not self._handle_recv_packet(packet)
+                if not self._handle_recv_packet(packet):
+                    break
         finally:
             dump_dap_log(self.log_file)
 
-    def get_modules(self, startModule: int = 0, moduleCount: int = 0):
-        module_list = self.request_modules(startModule, 
moduleCount)["body"]["modules"]
+    def get_modules(
+        self, start_module: Optional[int] = None, module_count: Optional[int] 
= None
+    ) -> Dict:
+        resp = self.request_modules(start_module, module_count)
+        if not resp["success"]:
+            raise ValueError(f"request_modules failed: {resp!r}")
         modules = {}
+        module_list = resp["body"]["modules"]
         for module in module_list:
             modules[module["name"]] = module
         return modules
 
-    def get_output(self, category, timeout=0.0, clear=True):
-        self.output_condition.acquire()
-        output = None
+    def get_output(self, category: str, clear=True) -> str:
+        output = ""
         if category in self.output:
-            output = self.output[category]
+            output = self.output.get(category, "")
             if clear:
                 del self.output[category]
-        elif timeout != 0.0:
-            self.output_condition.wait(timeout)
-            if category in self.output:
-                output = self.output[category]
-                if clear:
-                    del self.output[category]
-        self.output_condition.release()
         return output
 
-    def collect_output(self, category, timeout_secs, pattern, clear=True):
-        end_time = time.time() + timeout_secs
-        collected_output = ""
-        while end_time > time.time():
-            output = self.get_output(category, timeout=0.25, clear=clear)
-            if output:
-                collected_output += output
-                if pattern is not None and pattern in output:
-                    break
-        return collected_output if collected_output else None
+    def collect_output(
+        self,
+        category: str,
+        timeout: float,
+        pattern: Optional[str] = None,
+        clear=True,
+    ) -> str:
+        """Collect output from 'output' events.
+        Args:
+            category: The category to collect.
+            timeout: The max duration for collecting output.
+            pattern:
+                Optional, if set, return once this pattern is detected in the
+                collected output.
+        Returns:
+            The collected output.
+        """
+        deadline = time.monotonic() + timeout
+        output = self.get_output(category, clear)
+        while deadline >= time.monotonic() and (
+            pattern is None or pattern not in output
+        ):
+            event = self.wait_for_event(["output"], timeout=deadline - 
time.monotonic())
+            if not event:  # Timeout or EOF
+                break
+            output += self.get_output(category, clear=clear)
+        return output
 
     def _enqueue_recv_packet(self, packet: Optional[ProtocolMessage]):
-        self.recv_condition.acquire()
-        self.recv_packets.append(packet)
-        self.recv_condition.notify()
-        self.recv_condition.release()
+        with self.recv_condition:
+            self.recv_packets.append(packet)
+            self.recv_condition.notify()
 
     def _handle_recv_packet(self, packet: Optional[ProtocolMessage]) -> bool:
-        """Called by the read thread that is waiting for all incoming packets
-        to store the incoming packet in "self.recv_packets" in a thread safe
-        way. This function will then signal the "self.recv_condition" to
-        indicate a new packet is available. Returns True if the caller
-        should keep calling this function for more packets.
+        """Handles an incoming packet.
+
+        Called by the read thread that is waiting for all incoming packets
+        to store the incoming packet in "self._recv_packets" in a thread safe
+        way. This function will then signal the "self._recv_condition" to
+        indicate a new packet is available.
+
+        Args:
+            packet: A new packet to store.
+
+        Returns:
+            True if the caller should keep calling this function for more
+            packets.
         """
-        # If EOF, notify the read thread by enqueuing a None.
-        if not packet:
-            self._enqueue_recv_packet(None)
-            return False
-
-        # Check the packet to see if is an event packet
-        keepGoing = True
-        packet_type = packet["type"]
-        if packet_type == "event":
-            event = packet["event"]
-            body = None
-            if "body" in packet:
-                body = packet["body"]
-            # Handle the event packet and cache information from these packets
-            # as they come in
-            if event == "output":
-                # Store any output we receive so clients can retrieve it later.
-                category = body["category"]
-                output = body["output"]
-                self.output_condition.acquire()
-                if category in self.output:
-                    self.output[category] += output
-                else:
-                    self.output[category] = output
-                self.output_condition.notify()
-                self.output_condition.release()
-                # no need to add 'output' event packets to our packets list
-                return keepGoing
-            elif event == "initialized":
-                self.initialized = True
-            elif event == "process":
-                # When a new process is attached or launched, remember the
-                # details that are available in the body of the event
-                self.process_event_body = body
-            elif event == "exited":
-                # Process exited, mark the status to indicate the process is 
not
-                # alive.
-                self.exit_status = body["exitCode"]
-            elif event == "continued":
-                # When the process continues, clear the known threads and
-                # thread_stop_reasons.
-                all_threads_continued = body.get("allThreadsContinued", True)
-                tid = body["threadId"]
-                if tid in self.thread_stop_reasons:
-                    del self.thread_stop_reasons[tid]
-                self._process_continued(all_threads_continued)
-            elif event == "stopped":
-                # Each thread that stops with a reason will send a
-                # 'stopped' event. We need to remember the thread stop
-                # reasons since the 'threads' command doesn't return
-                # that information.
-                self._process_stopped()
-                tid = body["threadId"]
-                self.thread_stop_reasons[tid] = body
-            elif event.startswith("progress"):
-                # Progress events come in as 'progressStart', 'progressUpdate',
-                # and 'progressEnd' events. Keep these around in case test
-                # cases want to verify them.
-                self.progress_events.append(packet)
-            elif event == "breakpoint":
-                # Breakpoint events are sent when a breakpoint is resolved
-                self._update_verified_breakpoints([body["breakpoint"]])
-            elif event == "capabilities":
-                # Update the capabilities with new ones from the event.
-                self.capabilities.update(body["capabilities"])
-
-        elif packet_type == "response":
-            if packet["command"] == "disconnect":
-                keepGoing = False
-        self._enqueue_recv_packet(packet)
-        return keepGoing
+        with self._recv_condition:
+            self._recv_packets.append(packet)
+            self._recv_condition.notify()
+            # packet is None on EOF
+            return packet is not None and not (
+                packet["type"] == "response" and packet["command"] == 
"disconnect"
+            )
+
+    def _recv_packet(
+        self,
+        *,
+        predicate: Optional[Callable[[ProtocolMessage], bool]] = None,
+        timeout: Optional[float] = None,
+    ) -> Optional[ProtocolMessage]:
+        """Processes received packets from the adapter.
+        Updates the DebugCommunication stateful properties based on the 
received
+        packets in the order they are received.
+        NOTE: The only time the session state properties should be updated is
+        during this call to ensure consistency during tests.
+        Args:
+            predicate:
+                Optional, if specified, returns the first packet that matches
+                the given predicate.
+            timeout:
+                Optional, if specified, processes packets until either the
+                timeout occurs or the predicate matches a packet, whichever
+                occurs first.
+        Returns:
+            The first matching packet for the given predicate, if specified,
+            otherwise None.
+        """
+        assert (
+            threading.current_thread != self._recv_thread
+        ), "Must not be called from the _recv_thread"
+
+        def process_until_match():
+            self._process_recv_packets()
+            for i, packet in enumerate(self._pending_packets):
+                if packet is None:
+                    # We need to return a truthy value to break out of the
+                    # wait_for, use `EOFError` as an indicator of EOF.
+                    return EOFError()
+                if predicate and predicate(packet):
+                    self._pending_packets.pop(i)
+                    return packet
+
+        with self._recv_condition:
+            packet = self._recv_condition.wait_for(process_until_match, 
timeout)
+            return None if isinstance(packet, EOFError) else packet
+
+    def _process_recv_packets(self) -> None:
+        """Process received packets, updating the session state."""
+        with self._recv_condition:
+            for packet in self._recv_packets:
+                # Handle events that may modify any stateful properties of
+                # the DAP session.
+                if packet and packet["type"] == "event":
+                    self._handle_event(packet)
+                elif packet and packet["type"] == "request":
+                    # Handle reverse requests and keep processing.
+                    self._handle_reverse_request(packet)
+                # Move the packet to the pending queue.
+                self._pending_packets.append(packet)
+            self._recv_packets.clear()
+
+    def _handle_event(self, packet: Event) -> None:
+        """Handle any events that modify debug session state we track."""
+        event = packet["event"]
+        body: Optional[Dict] = packet.get("body", None)
+
+        if event == "output" and body:
+            # Store any output we receive so clients can retrieve it later.
+            category = body["category"]
+            output = body["output"]
+            if category in self.output:
+                self.output[category] += output
+            else:
+                self.output[category] = output
+        elif event == "initialized":
+            self.initialized = True
+        elif event == "process":
+            # When a new process is attached or launched, remember the
+            # details that are available in the body of the event
+            self.process_event_body = body
+        elif event == "exited" and body:
+            # Process exited, mark the status to indicate the process is not
+            # alive.
+            self.exit_status = body["exitCode"]
+        elif event == "continued" and body:
+            # When the process continues, clear the known threads and
+            # thread_stop_reasons.
+            all_threads_continued = body.get("allThreadsContinued", True)
+            tid = body["threadId"]
+            if tid in self.thread_stop_reasons:
+                del self.thread_stop_reasons[tid]
+            self._process_continued(all_threads_continued)
+        elif event == "stopped" and body:
+            # Each thread that stops with a reason will send a
+            # 'stopped' event. We need to remember the thread stop
+            # reasons since the 'threads' command doesn't return
+            # that information.
+            self._process_stopped()
+            tid = body["threadId"]
+            self.thread_stop_reasons[tid] = body
+        elif event.startswith("progress"):
+            # Progress events come in as 'progressStart', 'progressUpdate',
+            # and 'progressEnd' events. Keep these around in case test
+            # cases want to verify them.
+            self.progress_events.append(packet)
+        elif event == "breakpoint" and body:
+            # Breakpoint events are sent when a breakpoint is resolved
+            self._update_verified_breakpoints([body["breakpoint"]])
+        elif event == "capabilities" and body:
+            if self.capabilities is None:
+                self.capabilities = {}
+            # Update the capabilities with new ones from the event.
+            self.capabilities.update(body["capabilities"])
+
+    def _handle_reverse_request(self, request: Request) -> None:
+        if request in self.reverse_requests:
+            return
+        self.reverse_requests.append(request)
+        arguments = request.get("arguments")
+        if request["command"] == "runInTerminal" and arguments is not None:
+            in_shell = arguments.get("argsCanBeInterpretedByShell", False)
+            proc = subprocess.Popen(
+                arguments["args"],
+                env=arguments.get("env", {}),
+                cwd=arguments.get("cwd", None),
+                stdin=subprocess.DEVNULL,
+                stdout=subprocess.DEVNULL,
+                stderr=subprocess.DEVNULL,
+                shell=in_shell,
+            )
+            body = {}
+            if in_shell:
+                body["shellProcessId"] = proc.pid
+            else:
+                body["processId"] = proc.pid
+            self.send_packet(
+                {
+                    "type": "response",
+                    "seq": 0,
+                    "request_seq": request["seq"],
+                    "success": True,
+                    "command": "runInTerminal",
+                    "message": None,
+                    "body": body,
+                }
+            )
+        elif request["command"] == "startDebugging":
+            self.send_packet(
+                {
+                    "type": "response",
+                    "seq": 0,
+                    "request_seq": request["seq"],
+                    "success": True,
+                    "message": None,
+                    "command": "startDebugging",
+                    "body": {},
+                }
+            )
+        else:
+            desc = 'unknown reverse request "%s"' % (request["command"])
+            raise ValueError(desc)
 
     def _process_continued(self, all_threads_continued: bool):
         self.frame_scopes = {}
         if all_threads_continued:
             self.thread_stop_reasons = {}
 
-    def _update_verified_breakpoints(self, breakpoints: list[Event]):
-        for breakpoint in breakpoints:
-            if "id" in breakpoint:
-                self.resolved_breakpoints[str(breakpoint["id"])] = 
breakpoint.get(
-                    "verified", False
-                )
+    def _update_verified_breakpoints(self, breakpoints: list[Breakpoint]):
+        for bp in breakpoints:
+            # If no id is set, we cannot correlate the given breakpoint across
+            # requests, ignore it.
+            if "id" not in bp:
+                continue
+
+            self.resolved_breakpoints[str(bp["id"])] = bp.get("verified", 
False)
+
+    def send_packet(self, packet: ProtocolMessage) -> int:
+        """Takes a dictionary representation of a DAP request and send the 
request to the debug adapter.
 
-    def send_packet(self, command_dict: Request, set_sequence=True):
-        """Take the "command_dict" python dictionary and encode it as a JSON
-        string and send the contents as a packet to the VSCode debug
-        adapter"""
-        # Set the sequence ID for this command automatically
-        if set_sequence:
-            command_dict["seq"] = self.sequence
+        Returns the seq number of the request.
+        """
+        # Set the seq for requests.
+        if packet["type"] == "request":
+            packet["seq"] = self.sequence
             self.sequence += 1
+        else:
+            packet["seq"] = 0
+
         # Encode our command dictionary as a JSON string
-        json_str = json.dumps(command_dict, separators=(",", ":"))
+        json_str = json.dumps(packet, separators=(",", ":"))
+
         if self.trace_file:
             self.trace_file.write("to adapter:\n%s\n" % (json_str))
+
         length = len(json_str)
         if length > 0:
             # Send the encoded JSON packet and flush the 'send' file
             self.send.write(self.encode_content(json_str))
             self.send.flush()
 
-    def recv_packet(
-        self,
-        filter_type: Optional[str] = None,
-        filter_event: Optional[Union[str, list[str]]] = None,
-        timeout: Optional[float] = None,
-    ) -> Optional[ProtocolMessage]:
-        """Get a JSON packet from the VSCode debug adapter. This function
-        assumes a thread that reads packets is running and will deliver
-        any received packets by calling handle_recv_packet(...). This
-        function will wait for the packet to arrive and return it when
-        it does."""
-        while True:
-            try:
-                self.recv_condition.acquire()
-                packet = None
-                while True:
-                    for i, curr_packet in enumerate(self.recv_packets):
-                        if not curr_packet:
-                            raise EOFError
-                        packet_type = curr_packet["type"]
-                        if filter_type is None or packet_type in filter_type:
-                            if filter_event is None or (
-                                packet_type == "event"
-                                and curr_packet["event"] in filter_event
-                            ):
-                                packet = self.recv_packets.pop(i)
-                                break
-                    if packet:
-                        break
-                    # Sleep until packet is received
-                    len_before = len(self.recv_packets)
-                    self.recv_condition.wait(timeout)
-                    len_after = len(self.recv_packets)
-                    if len_before == len_after:
-                        return None  # Timed out
-                return packet
-            except EOFError:
-                return None
-            finally:
-                self.recv_condition.release()
-
-    def send_recv(self, command):
+        return packet["seq"]
+
+    def _send_recv(self, request: Request) -> Optional[Response]:
         """Send a command python dictionary as JSON and receive the JSON
         response. Validates that the response is the correct sequence and
         command in the reply. Any events that are received are added to the
         events list in this object"""
-        self.send_packet(command)
-        done = False
-        while not done:
-            response_or_request = self.recv_packet(filter_type=["response", 
"request"])
-            if response_or_request is None:
-                desc = 'no response for "%s"' % (command["command"])
-                raise ValueError(desc)
-            if response_or_request["type"] == "response":
-                self.validate_response(command, response_or_request)
-                return response_or_request
-            else:
-                self.reverse_requests.append(response_or_request)
-                if response_or_request["command"] == "runInTerminal":
-                    subprocess.Popen(
-                        response_or_request["arguments"].get("args"),
-                        env=response_or_request["arguments"].get("env", {}),
-                    )
-                    self.send_packet(
-                        {
-                            "type": "response",
-                            "request_seq": response_or_request["seq"],
-                            "success": True,
-                            "command": "runInTerminal",
-                            "body": {},
-                        },
-                    )
-                elif response_or_request["command"] == "startDebugging":
-                    self.send_packet(
-                        {
-                            "type": "response",
-                            "request_seq": response_or_request["seq"],
-                            "success": True,
-                            "command": "startDebugging",
-                            "body": {},
-                        },
-                    )
-                else:
-                    desc = 'unknown reverse request "%s"' % (
-                        response_or_request["command"]
-                    )
-                    raise ValueError(desc)
+        seq = self.send_packet(request)
+        response = self.receive_response(seq)
+        if response is None:
+            raise ValueError(f"no response for {request!r}")
+        self.validate_response(request, response)
+        return response
 
-        return None
+    def receive_response(self, seq: int) -> Optional[Response]:
+        """Waits for the a response with the associated request_sec."""
----------------
da-viper wrote:

```suggestion
        """Waits for a response with the associated request_sec."""
```

https://github.com/llvm/llvm-project/pull/147787
_______________________________________________
lldb-commits mailing list
lldb-commits@lists.llvm.org
https://lists.llvm.org/cgi-bin/mailman/listinfo/lldb-commits

Reply via email to