pkarashchenko commented on a change in pull request #5836:
URL: https://github.com/apache/incubator-nuttx/pull/5836#discussion_r834446920



##########
File path: tools/minidumpserver.py
##########
@@ -0,0 +1,530 @@
+#!/usr/bin/env python3
+# tools/minidumpserver.py
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.  The
+# ASF licenses this file to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance with the
+# License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+import argparse
+import binascii
+import logging
+import os
+import re
+import socket
+import struct
+import sys
+
+import elftools
+from elftools.elf.elffile import ELFFile
+
+# ELF section flags
+SHF_WRITE = 0x1
+SHF_ALLOC = 0x2
+SHF_EXEC = 0x4
+SHF_WRITE_ALLOC = SHF_WRITE | SHF_ALLOC
+SHF_ALLOC_EXEC = SHF_ALLOC | SHF_EXEC
+
+
+logger = logging.getLogger()
+
+
+class dump_elf_file:
+    """
+    Class to parse ELF file for memory content in various sections.
+    There are read-only sections (e.g. text and rodata) where
+    the memory content does not need to be dumped via coredump
+    and can be retrived from the ELF file.
+    """
+
+    def __init__(self, elffile):
+        self.elffile = elffile
+        self.fd = None
+        self.elf = None
+        self.memories = list()
+
+    def open(self):
+        self.fd = open(self.elffile, "rb")
+        self.elf = ELFFile(self.fd)
+
+    def close(self):
+        self.fd.close()
+
+    def parse(self):
+        if self.fd is None:
+            self.open()
+
+        for section in self.elf.iter_sections():
+            # REALLY NEED to match exact type as all other sections
+            # (debug, text, etc.) are descendants where
+            # isinstance() would match.
+            if (
+                type(section) is not elftools.elf.sections.Section
+            ):  # pylint: disable=unidiomatic-typecheck
+                continue
+
+            size = section["sh_size"]
+            flags = section["sh_flags"]
+            start = section["sh_addr"]
+            end = start + size - 1
+
+            store = False
+            desc = "?"
+
+            if section["sh_type"] == "SHT_PROGBITS":
+                if (flags & SHF_ALLOC_EXEC) == SHF_ALLOC_EXEC:
+                    # Text section
+                    store = True
+                    desc = "text"
+                elif (flags & SHF_WRITE_ALLOC) == SHF_WRITE_ALLOC:
+                    # Data section
+                    #
+                    # Running app changes the content so no need
+                    # to store
+                    pass
+                elif (flags & SHF_ALLOC) == SHF_ALLOC:
+                    # Read only data section
+                    store = True
+                    desc = "read-only data"
+
+            if store:
+                memory = {"start": start, "end": end, "data": section.data()}
+                logger.info(
+                    "ELF Section: 0x%x to 0x%x of size %d (%s)"
+                    % (memory["start"], memory["end"], len(memory["data"]), 
desc)
+                )
+
+                self.memories.append(memory)
+
+        return True
+
+
+reg_table = {
+    "arm": {
+        "R0": 0,
+        "R1": 1,
+        "R2": 2,
+        "R3": 3,
+        "R4": 4,
+        "R5": 5,
+        "R6": 6,
+        "FP": 7,
+        "R8": 8,
+        "SB": 9,
+        "SL": 10,
+        "R11": 11,
+        "IP": 12,
+        "SP": 13,
+        "LR": 14,
+        "SL": 10,
+        "R11": 11,
+        "IP": 12,
+        "SP": 13,
+        "LR": 14,
+        "SL": 10,
+        "R11": 11,
+        "IP": 12,
+        "SP": 13,
+        "LR": 14,
+        "PC": 15,
+        "xPSR": 16,
+    },
+    "riscv": {
+        "ZERO": 0,
+        "RA": 1,
+        "SP": 2,
+        "GP": 3,
+        "TP": 4,
+        "T0": 5,
+        "T1": 6,
+        "T2": 7,
+        "FP": 8,
+        "S1": 9,
+        "A0": 10,
+        "A1": 11,
+        "A2": 12,
+        "A3": 13,
+        "A4": 14,
+        "A5": 15,
+        "A6": 16,
+        "A7": 17,
+        "S2": 18,
+        "S3": 19,
+        "S4": 20,
+        "S5": 21,
+        "S6": 22,
+        "S7": 23,
+        "S8": 24,
+        "S9": 25,
+        "S10": 26,
+        "S11": 27,
+        "T3": 28,
+        "T4": 29,
+        "T5": 30,
+        "T6": 31,
+        "PC": 32,
+    },
+}
+
+
+class dump_log_file:
+    def __init__(self, logfile):
+        self.logfile = logfile
+        self.fd = None
+        self.arch = ""
+        self.registers = []
+        self.memories = list()
+
+    def open(self):
+        self.fd = open(self.logfile, "r")
+
+    def close(self):
+        self.fd.closeself()
+
+    def parse(self):
+        data = bytes()
+        start = 0
+        if self.fd is None:
+            self.open()
+        while 1:
+            line = self.fd.readline()
+            if line == "":
+                break
+
+            tmp = re.search(r"([^ ]*)_registerdump:?", line)
+            if tmp is not None:
+                # find arch
+                self.arch = tmp.group(1)
+                if self.arch not in reg_table:
+                    logger.error("%s not supported" % (self.arch))
+                # init register list
+                if len(self.registers) == 0:
+                    for x in range(len(reg_table[self.arch])):
+                        self.registers.append(b"x")
+
+                # find register value
+                line = line[tmp.span()[1] :]
+                line = line.replace("\n", " ")
+                while 1:
+                    tmp = re.search("([^ ]+):", line)
+                    if tmp is None:
+                        break
+                    register = tmp.group(1)
+                    line = line[tmp.span()[1] :]
+                    tmp = re.search("([0-9a-fA-F]+) ", line)
+                    if tmp is None:
+                        break
+                    if register in reg_table[self.arch].keys():
+                        self.registers[reg_table[self.arch][register]] = int(
+                            "0x" + tmp.group().replace(" ", ""), 16
+                        )
+                    line = line[tmp.span()[1] :]
+                continue
+
+            tmp = re.search("_stackdump:", line)
+            if tmp is not None:
+                # find stackdump
+                line = line[tmp.span()[1] :]
+                tmp = re.search("([0-9a-fA-F]+):", line)
+                if tmp is not None:
+                    line_start = int("0x" + tmp.group()[:-1], 16)
+
+                    if start + len(data) != line_start:
+                        # stack is not contiguous
+                        if len(data) == 0:
+                            start = line_start
+                        else:
+                            memory = {
+                                "start": start,
+                                "end": start + len(data),
+                                "data": data,
+                            }
+                            self.memories.append(memory)
+                            data = b""
+                            start = line_start
+
+                    line = line[tmp.span()[1] :]
+                    line = line.replace("\n", " ")
+
+                    while 1:
+                        # record stack value
+                        tmp = re.search(" ([0-9a-fA-F]+)", line)
+                        if tmp is None:
+                            break
+                        data = data + struct.pack(
+                            "<I", int("0x" + tmp.group().replace(" ", ""), 16)
+                        )
+                        line = line[tmp.span()[1] :]
+
+        if len(data):
+            memory = {"start": start, "end": start + len(data), "data": data}
+            self.memories.append(memory)
+
+
+GDB_SIGNAL_DEFAULT = 7
+
+
+class gdb_stub:
+    def __init__(self, logfile, elffile):
+        self.logfile = logfile
+        self.elffile = elffile
+        self.socket = None
+        self.gdb_signal = GDB_SIGNAL_DEFAULT
+        self.mem_regions = self.elffile.memories + self.logfile.memories
+
+    def get_gdb_packet(self):
+        socket = self.socket
+        if socket is None:
+            return None
+
+        data = b""
+        checksum = 0
+        # Wait for '$'
+        while True:
+            ch = socket.recv(1)
+            if ch == b"$":
+                break
+
+        # Get a full packet
+        while True:
+            ch = socket.recv(1)
+            if ch == b"#":
+                # End of packet
+                break
+
+            checksum += ord(ch)
+            data += ch
+
+        # Get checksum (2-bytes)
+        ch = socket.recv(2)
+        in_chksum = ord(binascii.unhexlify(ch))
+
+        logger.debug(f"Received GDB packet: {data}")
+
+        if (checksum % 256) == in_chksum:
+            # ACK
+            logger.debug("ACK")
+            socket.send(b"+")
+
+            return data
+        else:
+            # NACK
+            logger.debug(f"NACK (checksum {in_chksum} != {checksum}")
+            socket.send(b"-")
+
+            return None
+
+    def put_gdb_packet(self, data):
+        socket = self.socket
+        if socket is None:
+            return
+
+        checksum = 0
+        for d in data:
+            checksum += d
+
+        pkt = b"$" + data + b"#"
+
+        checksum = checksum % 256
+        pkt += format(checksum, "02X").encode()
+
+        logger.debug(f"Sending GDB packet: {pkt}")
+
+        socket.send(pkt)
+
+    def handle_signal_query_packet(self):
+        # the '?' packet
+        pkt = b"S"
+        pkt += format(self.gdb_signal, "02X").encode()
+
+        self.put_gdb_packet(pkt)
+
+    def handle_register_group_read_packet(self):
+        reg_fmt = "<I"
+        pkt = b""
+
+        for reg in self.logfile.registers:
+            if reg != b"x":
+                bval = struct.pack(reg_fmt, reg)
+                pkt += binascii.hexlify(bval)
+            else:
+                # Register not in coredump -> unknown value
+                # Send in "xxxxxxxx"
+                pkt += b"x" * 8
+
+        self.put_gdb_packet(pkt)
+
+    def handle_register_single_read_packet(self, pkt):
+        # Mark registers as "<unavailable>".
+        # 'p' packets are usually used for registers
+        # other than the general ones (e.g. eax, ebx)
+        # so we can safely reply "xxxxxxxx" here.
+        self.put_gdb_packet(b"x" * 8)
+
+    def handle_register_group_write_packet(self):
+        # the 'G' packet for writing to a group of registers
+        #
+        # We don't support writing so return error
+        self.put_gdb_packet(b"E01")
+
+    def handle_register_single_write_packet(self, pkt):
+        # the 'P' packet for writing to registers
+        #
+        # We don't support writing so return error
+        self.put_gdb_packet(b"E01")
+
+    def handle_memory_read_packet(self, pkt):
+        # the 'm' packet for reading memory: m<addr>,<len>
+
+        def get_mem_region(addr):
+            for r in self.mem_regions:
+                if r["start"] <= addr <= r["end"]:
+                    return r
+
+            return None
+
+        # extract address and length from packet
+        # and convert them into usable integer values
+        addr, length = pkt[1:].split(b",")
+        s_addr = int(b"0x" + addr, 16)
+        length = int(b"0x" + length, 16)
+
+        # FIXME: Need more efficient way of extracting memory content
+        remaining = length
+        addr = s_addr
+        barray = b""
+        r = get_mem_region(addr)
+        while remaining > 0:
+            if r is None:
+                barray = None
+                break
+
+            if addr > r["end"]:
+                r = get_mem_region(addr)
+                continue
+
+            offset = addr - r["start"]
+            barray += r["data"][offset : offset + 1]

Review comment:
       is it possible to
   ```suggestion
               barray += r["data"][offset:offset + 1]
   ```
   to get LINT happy?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@nuttx.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to