apolyakov created this revision.
apolyakov added reviewers: jingham, aprantl, labath.
Herald added a project: LLDB.

This commit shows how LLDB python plug-in could look like.

Such an approach allows users to use LLDB for OS kernel debugging, for doing 
that they only need to implement two interfaces: `OperatingSystemInterface` and 
`OSThread`.  As an example, this patch contains `zephyr.py` file with simple 
plug-in implementation for Zephyr OS, OS-specific files could live in OS' 
repository and evolve alongside it.

In the future, arch-specific code, e.g. `arc.py`, can be eliminated since LLDB 
already implements that functionality.

Remaining code can be upstreamed to LLDB with or without significant changes, 
the main idea of the review is to discuss these changes.


Repository:
  rG LLVM Github Monorepo

https://reviews.llvm.org/D60968

Files:
  lldb/examples/python/lldb-os-plugin/arch/__init__.py
  lldb/examples/python/lldb-os-plugin/arch/arc.py
  lldb/examples/python/lldb-os-plugin/arch/common.py
  lldb/examples/python/lldb-os-plugin/core/__init__.py
  lldb/examples/python/lldb-os-plugin/core/cvalue.py
  lldb/examples/python/lldb-os-plugin/core/operating_system.py
  lldb/examples/python/lldb-os-plugin/core/osplugin_common.py
  lldb/examples/python/lldb-os-plugin/core/register_set_common.py
  lldb/examples/python/lldb-os-plugin/core/thread.py
  lldb/examples/python/lldb-os-plugin/logging.yaml
  lldb/examples/python/lldb-os-plugin/logs/.gitkeep
  lldb/examples/python/lldb-os-plugin/utilities/helpers.py
  lldb/examples/python/lldb-os-plugin/utilities/tracing.py
  lldb/examples/python/lldb-os-plugin/zephyr.py

Index: lldb/examples/python/lldb-os-plugin/zephyr.py
===================================================================
--- /dev/null
+++ lldb/examples/python/lldb-os-plugin/zephyr.py
@@ -0,0 +1,156 @@
+import logging
+
+from typing import Iterator
+
+import lldb
+
+from core.operating_system import *
+from core.osplugin_common import *
+from core.cvalue import *
+from arch.common import *
+from utilities.helpers import *
+
+# Common defines.
+ZEPHYR_THREAD_TYPE = 'struct k_thread'
+
+# ARC-related defines.
+ARC_CALLEE_SAVED_TYPE = 'struct _callee_saved_stack'
+ARC_IRQ_STACK_FRAME_TYPE = 'struct _irq_stack_frame'
+ARC_REGISTER_TYPE = 'u32_t'
+ARC_CAUSE_NONE = 0
+ARC_CAUSE_COOP = 1
+ARC_CAUSE_RIRQ = 2
+ARC_CAUSE_FIRQ = 3
+
+logger = logging.getLogger(__name__)
+
+
+def create_k_thread(target: lldb.SBTarget, address: int) -> value:
+    return create_value_from_address(target, ZEPHYR_THREAD_TYPE, address)
+
+
+class ZephyrThread(OSThread):
+
+    def __init__(self, target: lldb.SBTarget, arch: str, address: int, is_active: bool = False):
+        self._k_thread = create_k_thread(target, address)
+
+        # Zephyr thread has 'name' attribute only if CONFIG_THREAD_NAME option is enabled.
+        try:
+            th_name = self._k_thread.name
+        except AttributeError:
+            super().__init__(target, arch, address, address, is_active=is_active)
+            return
+
+        super().__init__(target, arch, address, address, name=str(th_name), is_active=is_active)
+
+    def _arc_register_context_create(self) -> Dict[str, int]:
+        stack_ptr = unsigned(self._callee_saved.sp)
+
+        registers = register_value_to_dict(
+            create_value_from_address(self.target, ARC_CALLEE_SAVED_TYPE, stack_ptr))
+
+        callee_saved_size = self.target.FindFirstType(ARC_CALLEE_SAVED_TYPE).GetByteSize()
+
+        relinquish_cause = self._thread_arch.relinquish_cause
+        if relinquish_cause == ARC_CAUSE_NONE:
+            logger.warning(f'Bad relinquish cause for thread {hex(self.id)}')
+        elif relinquish_cause == ARC_CAUSE_COOP:
+            stack_ptr += callee_saved_size + 4
+            # TODO: check status32' AE bit
+            status32 = register_value_to_dict(create_value_from_address(
+                self.target, ARC_REGISTER_TYPE, stack_ptr, 'status32'))
+            pc = register_value_to_dict(create_value_from_address(
+                self.target, ARC_REGISTER_TYPE, stack_ptr + 4, 'pc'))
+
+            registers = {**registers, **status32, **pc}
+            # In case of returning from coop, we should also copy BLINK to PC, STATUS32 to R3, and restore SP.
+            registers['blink'] = registers['pc']
+            registers['r3'] = registers['status32']
+            registers['sp'] = stack_ptr + 8
+        elif relinquish_cause == ARC_CAUSE_RIRQ or relinquish_cause == ARC_CAUSE_FIRQ:
+            irq_stack_frame_ptr = stack_ptr + callee_saved_size
+            irq_stack_frame = register_value_to_dict(create_value_from_address(
+                self.target, ARC_IRQ_STACK_FRAME_TYPE, irq_stack_frame_ptr))
+            # Restore SP.
+            irq_stack_frame_size = self.target.FindFirstType(ARC_IRQ_STACK_FRAME_TYPE).GetByteSize()
+            irq_stack_frame['sp'] = irq_stack_frame_ptr + irq_stack_frame_size + 4
+
+            registers = {**registers, **irq_stack_frame}
+
+        return registers
+
+    def fetch_registers(self) -> RegisterSetCommon:
+        # At start-up Zephyr has a k_thread object at nullptr, so we should check this to avoid crushing.
+        if not self._k_thread:
+            return self.register_set
+
+        if self.arch == ARC_ARCH:
+            registers = self._arc_register_context_create()
+        else:
+            logger.error(f'Unsupported architecture: {self.arch.value}')
+            return self.register_set
+
+        self.register_set.fill(registers)
+        return self.register_set
+
+    @property
+    def _callee_saved(self) -> value:
+        return self._k_thread.callee_saved
+
+    @property
+    def _thread_arch(self) -> value:
+        return self._k_thread.arch
+
+
+@OperatingSystemInterface.register
+class OperatingSystemPlugIn(OSPlugInCommon):
+
+    def __init__(self, process: lldb.SBProcess):
+        super().__init__(process, self)
+
+        self._threads = dict()
+        self._current_threads = list()
+        self._kernel = find_global_variable(self.target, '_kernel')
+
+    def update(self) -> None:
+        self._threads.clear()
+        self._current_threads.clear()
+        self._build_thread_list()
+
+    def get_threads(self) -> Iterable[ZephyrThread]:
+        return self._threads.values()
+
+    def get_registers(self, tid: int) -> RegisterSetCommon:
+        return self._threads[tid].fetch_registers()
+
+    def _build_thread_list(self) -> None:
+        try:
+            threads = self._kernel.threads
+        except AttributeError:
+            logger.warning('CONFIG_THREAD_MONITOR should be enabled to access kernel threads')
+            return
+
+        # Collect current threads.
+        for cpu in self._kernel.cpus:
+            self._current_threads.append(cpu.current)
+
+        for thread in self._iterate_threads(threads):
+            ptr = unsigned(thread)
+            self._threads[ptr] = ZephyrThread(
+                self.target, self.arch, ptr, is_active=(thread in self._current_threads))
+
+    def _iterate_threads(self, thread_val: value) -> Iterator[value]:
+        visited = set()
+
+        while True:
+            thread_addr = unsigned(thread_val)
+            if thread_addr == 0 or thread_addr in visited:
+                raise StopIteration
+            yield thread_val
+            visited.add(thread_addr)
+            thread_val = thread_val.next_thread
+
+    def create_thread(self, tid: int, context: int) -> ZephyrThread:
+        thread = ZephyrThread(self.target, self.arch, context)
+        self._threads[thread.id] = thread
+        return thread
Index: lldb/examples/python/lldb-os-plugin/utilities/tracing.py
===================================================================
--- /dev/null
+++ lldb/examples/python/lldb-os-plugin/utilities/tracing.py
@@ -0,0 +1,50 @@
+import logging
+from functools import wraps
+
+from typing import Callable
+
+
+def _format_arg_value(arg_val):
+    """
+    Return a string representing a (name, value) pair.
+
+    _format_arg_value(('x', (1, 2, 3)))
+    'x=(1, 2, 3)'
+    """
+    arg, val = arg_val
+    return f'{arg}={val}'
+
+
+def traced(func: Callable) -> Callable:
+    """
+    Trace calls to a function.
+
+    :returns A decorated version of the input function which "traces" calls
+    made to it by writing out the function's name and the arguments it was
+    called with.
+    """
+
+    # Unpack function's arg count, arg names, arg defaults.
+    code = func.__code__
+    arg_count = code.co_argcount
+    arg_names = code.co_varnames[:arg_count]
+    fn_defaults = func.__defaults__ or list()
+    arg_defs = dict(zip(arg_names[-len(fn_defaults):], fn_defaults))
+
+    @wraps(func)
+    def wrapped(*v, **k):
+        # Collect function arguments by chaining together positional,
+        # defaulted, extra positional and keyword arguments.
+        positional = [_format_arg_value(arg_val) for arg_val in zip(arg_names, v)]
+        defaulted = [_format_arg_value((name, arg_defs[name]))
+                     for name in arg_names[len(v):] if name not in k]
+        nameless = [repr(elem) for elem in v[arg_count:]]
+        keyword = [_format_arg_value(arg_val) for arg_val in k.items()]
+        args = positional + defaulted + nameless + keyword
+
+        logger = logging.getLogger(func.__module__)
+        logger.debug(func.__name__ + '( ' + ', '.join(args) + ' )')
+
+        return func(*v, **k)
+
+    return wrapped
Index: lldb/examples/python/lldb-os-plugin/utilities/helpers.py
===================================================================
--- /dev/null
+++ lldb/examples/python/lldb-os-plugin/utilities/helpers.py
@@ -0,0 +1,84 @@
+from typing import Dict, Union
+
+import lldb
+
+from core.cvalue import *
+
+
+_dummy_name = 'dummy_name'
+
+
+def create_value_from_address(target: lldb.SBTarget, val_type: Union[str, lldb.SBType],
+                              address: Union[int, lldb.SBAddress], name: str = None) -> value:
+    """
+    Helper function to create value object from given address and type.
+
+    :param lldb.SBTarget target: SBTarget object representing debug target.
+    :param val_type: Type of object to create.
+    :type val_type: str or lldb.SBType
+    :param address: Address in memory where the object is.
+    :type address: int or lldb.SBAddress
+    :param str name: Name to give to the object.
+
+    :returns value object with wrapped object.
+    """
+    if isinstance(val_type, str):
+        val_type = target.FindFirstType(val_type)
+    if isinstance(address, int):
+        address = lldb.SBAddress(address, target)
+    if name is None:
+        name = _dummy_name
+
+    return value(target.CreateValueFromAddress(name, address, val_type))
+
+
+def find_global_variable(target: lldb.SBTarget, var: str) -> value:
+    """
+    Helper function to find global variable in the debug target.
+
+    :param lldb.SBTarget target: SBTarget object representing debug target.
+    :param str var: Name of variable to find.
+
+    :returns value object with found variable.
+    """
+    return value(target.FindGlobalVariables(var, 1).GetValueAtIndex(0))
+
+
+def register_value_to_dict(val: value) -> Dict[str, int]:
+    """
+    Helper function to create a map (register name => register value) from a value.
+
+    Examples:
+        1)
+            struct callee_saved { # C
+                uint32_t reg1;
+                uint32_t reg2;
+                ...
+            }
+            val = create_value_from_address(target, 'struct callee_saved', 0x1234) # python
+            registers = register_value_to_dict(val) # python
+            /*
+                reg1: 0x11111111
+                reg2: 0x22222222
+                ...
+            */
+
+        2)
+            val = create_value_from_address(target, 'uint32_t', 0x1234, 'regname') # python
+            registers = register_value_to_dict(val) # python
+            /*
+                regname: 0x11111111
+            */
+
+    :param value val: value that contains register(s).
+
+    :returns Dictionary object (register name => register value).
+    """
+    if len(val) == 0:
+        return {val.get_sbvalue().GetName(): unsigned(val)}
+
+    registers = dict()
+    for elem in val:
+        registers[elem.get_sbvalue().GetName()] = unsigned(elem)
+
+    return registers
Index: lldb/examples/python/lldb-os-plugin/logging.yaml
===================================================================
--- /dev/null
+++ lldb/examples/python/lldb-os-plugin/logging.yaml
@@ -0,0 +1,24 @@
+version: 1
+disable_existing_loggers: False
+formatters:
+  light:
+    format: "%(levelname)s - %(name)s - %(message)s"
+  debug:
+    format: "%(asctime)s - %(levelname)s - %(name)s - %(message)s"
+handlers:
+  console:
+    class: logging.StreamHandler
+    level: WARNING
+    formatter: light
+    stream: ext://sys.stdout
+  debug_file_handler:
+    class: logging.handlers.RotatingFileHandler
+    level: DEBUG
+    formatter: debug
+    filename: debug.log
+    maxBytes: 10485760 # 10 MB
+    backupCount: 10
+    encoding: utf8
+root:
+  level: DEBUG
+  handlers: [debug_file_handler, console]
\ No newline at end of file
Index: lldb/examples/python/lldb-os-plugin/core/thread.py
===================================================================
--- /dev/null
+++ lldb/examples/python/lldb-os-plugin/core/thread.py
@@ -0,0 +1,85 @@
+import lldb
+
+from core.register_set_common import *
+from arch.common import *
+from utilities.tracing import traced
+
+
+class OSThread(metaclass=ABCMeta):
+    """
+    Class that defines an interface operating system specific thread should implement.
+
+    :param lldb.SBTarget target: SBTarget object representing debug target.
+    :param Arch arch: Current architecture.
+    :param int address: Address of a thread structure.
+    :param int tid: Thread's ID.
+    :param str name: Name of the thread, will be set to 'thread_at_%address' if not specified.
+    :param bool is_active: Specifies if the thread is active or not. Defaults to False.
+
+    :var lldb.SBTarget target: Debug target the thread belongs to.
+    :var Arch arch: Target architecture.
+    :var RegisterSet register_set: Architecture specific register set.
+    :var int id: Thread's ID.
+    :var int address: Thread's address.
+    :var bool is_active: Shows is thread active or not.
+    """
+
+    @abstractmethod
+    def __init__(self, target: lldb.SBTarget, arch: str, address: int,
+                 tid: int, name: str = None, is_active: bool = False):
+        self.id = tid
+        self.address = address
+        self.is_active = is_active
+        if name is None:
+            self.name = 'thread_at_' + hex(address)
+        else:
+            self.name = name
+
+        self.target = target
+        self.arch = arch
+        self.register_set = arch_to_regset_type(arch)()
+
+    @traced
+    @abstractmethod
+    def fetch_registers(self) -> RegisterSetCommon:
+        """
+        Method that fetches saved thread's registers.
+
+        :returns RegisterSet object filled with register values.
+        """
+        raise NotImplementedError
+
+    @traced
+    def get_info(self) -> Dict[str, Any]:
+        """
+        Method for getting basic thread information.
+
+        Possible fields:
+                tid (mandatory) -> thread ID
+                name (optional) -> thread name
+                queue (optional) -> thread dispatch queue name
+                state (mandatory) -> thread state (set to 'stopped' for now)
+                stop_reason (mandatory) -> thread stop reason (usually set to 'none')
+                    Possible values:
+                        'breakpoint' -> is the thread is stopped at a breakpoint
+                        'none' -> thread is just stopped because the process is stopped
+                        'trace' -> the thread just single stepped
+                register_data_addr (optional) -> the address of the register data in memory (optional key/value pair)
+                    Specifying this key/value pair for a thread will avoid a call to get_register_data()
+                    and can be used when your registers are in a thread context structure that is contiguous
+                    in memory. Don't specify this if your register layout in memory doesn't match the layout
+                    described by the dictionary returned from a call to the get_register_info() method.
+
+        :returns Dictionary with basic thread information.
+        """
+        return {
+            'tid': self.id,
+            'name': self.name,
+            'state': 'stopped',
+            'stop_reason': 'none'
+        }
+
+    def __eq__(self, other):
+        if isinstance(other, OSThread):
+            return self.id == other.id
+        return False
Index: lldb/examples/python/lldb-os-plugin/core/register_set_common.py
===================================================================
--- /dev/null
+++ lldb/examples/python/lldb-os-plugin/core/register_set_common.py
@@ -0,0 +1,68 @@
+from typing import Any, Dict, Union
+
+from abc import ABCMeta, abstractmethod
+
+from utilities.tracing import traced
+
+
+class RegisterSetCommon(metaclass=ABCMeta):
+    """
+    Class that provides functionality to work with architecture-specific register set.
+
+    Each architecture should define a register set and register_info in format DynamicRegisterInfo expects.
+    """
+
+    register_info = {'sets': [], 'registers': []}
+
+    @traced
+    @abstractmethod
+    def reset_register_values(self) -> None:
+        """
+        Method that makes a reset of all registers.
+        """
+        raise NotImplementedError
+
+    @traced
+    @abstractmethod
+    def get_packed_register_state(self) -> bytes:
+        """
+        Method that packs register values as bytes in a specific format.
+
+        You could use python module 'struct' to convert register values to a sequence of bytes.
+
+        :returns Sequence of bytes representing register values.
+        """
+        raise NotImplementedError
+
+    @traced
+    @abstractmethod
+    def set_register_value(self, key: Union[str, int], value: Any) -> None:
+        """
+        Method that sets register value by key.
+
+        :param key: Key to find register (register's name of number)
+        :type key: str or int
+        :param Any value: Value to assign to register.
+        """
+        raise NotImplementedError
+
+    @traced
+    @abstractmethod
+    def fill(self, registers: Dict[Union[str, int], Any]) -> None:
+        """
+        Method that takes register values and fills the register set.
+
+        :param registers: Dictionary (register's name or number => value).
+        :type registers: Dict[str or int, Any]
+        """
+        raise NotImplementedError
+
+    @classmethod
+    @traced
+    def get_register_info(cls) -> Dict:
+        """
+        Method that returns register info for this register set.
+
+        :returns Dictionary in a format DynamicRegisterInfo expects.
+        """
+        return cls.register_info
Index: lldb/examples/python/lldb-os-plugin/core/osplugin_common.py
===================================================================
--- /dev/null
+++ lldb/examples/python/lldb-os-plugin/core/osplugin_common.py
@@ -0,0 +1,108 @@
+import os
+import yaml
+import logging.config
+
+from typing import Dict, List, Any
+
+import lldb
+
+from arch.common import *
+from arch.arc import *
+from core.register_set_common import RegisterSetCommon
+from core.cvalue import *
+from core.operating_system import OperatingSystemInterface
+from utilities.tracing import traced
+
+
+# Set up logging.
+_cur_dir = os.path.dirname(__file__)
+_path = os.path.join(_cur_dir, '..', 'logging.yaml')
+with open(_path, 'r') as f:
+    _config = yaml.safe_load(f.read())
+    _log_file = _config['handlers']['debug_file_handler']['filename']
+    _config['handlers']['debug_file_handler']['filename'] = os.path.join(_cur_dir, '..', 'logs', _log_file)
+logging.config.dictConfig(_config)
+
+logger = logging.getLogger(__name__)
+
+
+class OSPlugInCommon:
+    """
+    Class that provides data for an instance of a LLDB 'OperatingSystemPython' plug-in class.
+
+    OSPlugInCommon is used to communicate with LLDB. Each new OS needs to create its own OperatingSystemPython
+    class implementing OperatingSystemInterface and inheriting from OSPluginCommon.
+
+    IMPORTANT: do not override following methods when inheriting from this classas LLDB uses them
+    to communicate with the plug-in:
+        create_thread
+        get_thread_info
+        get_register_info
+        get_register_data
+
+    :param lldb.SBProcess process: SBProcess object representing executing program.
+    :param OperatingSystemInterface operating_system: An object that implements OperatingSystemInterfacez.
+
+    :var lldb.SBTarget target: SBTarget object representing debug target.
+    :var Arch arch: An object representing target architecture.
+    """
+    def __init__(self, process: lldb.SBProcess, operating_system: OperatingSystemInterface):
+        # Initialization needs a valid.SBProcess object.
+        if not (isinstance(process, lldb.SBProcess) and process.IsValid()):
+            error_str = 'Invalid SBProcess object'
+            logger.critical(error_str)
+            raise ValueError(error_str)
+
+        self.__process = process
+        self.__operating_system = operating_system
+        self.__thread_cache = dict()
+
+        self.target = self.__process.target
+
+        self.arch = self.target.triple.split('-')[0].lower()
+
+        self.__register_set_type = arch_to_regset_type(self.arch)
+
+        logger.info('OSPlugIn has been initialized')
+
+    @traced
+    def __fetch_hardware_registers(self, cpu: int = 0) -> RegisterSetCommon:
+        hardware_registers = self.__process.GetThreadAtIndex(cpu).GetFrameAtIndex(0).GetRegisters()
+        registers = dict()
+
+        regnum = 0
+        for i in range(hardware_registers.GetSize()):
+            _set = value(hardware_registers.GetValueAtIndex(i))
+            for reg in _set:
+                registers[regnum] = unsigned(reg)
+                regnum += 1
+
+        reg_set = self.__register_set_type()
+        reg_set.fill(registers)
+
+        return reg_set
+
+    @traced
+    def create_thread(self, tid: int, context: int) -> Dict:
+        return self.__operating_system.create_thread(tid, context).get_info()
+
+    @traced
+    def get_thread_info(self) -> List[Dict[str, Any]]:
+        self.__operating_system.update()
+
+        res = []
+        for thread in self.__operating_system.get_threads():
+            self.__thread_cache[thread.id] = thread
+            res.append(thread.get_info())
+
+        return res
+
+    @traced
+    def get_register_info(self) -> Dict:
+        return self.__register_set_type.register_info
+
+    @traced
+    def get_register_data(self, tid: int) -> bytes:
+        if self.__thread_cache[tid].is_active:
+            return self.__fetch_hardware_registers().get_packed_register_state()
+        return self.__operating_system.get_registers(tid).get_packed_register_state()
Index: lldb/examples/python/lldb-os-plugin/core/operating_system.py
===================================================================
--- /dev/null
+++ lldb/examples/python/lldb-os-plugin/core/operating_system.py
@@ -0,0 +1,56 @@
+from typing import Iterable
+
+from abc import ABCMeta, abstractmethod
+
+from core.thread import *
+from utilities.tracing import traced
+
+
+class OperatingSystemInterface(metaclass=ABCMeta):
+    """
+    Class that defines an interface each operating system needs to implement to be supported by this plug-in.
+    """
+
+    @traced
+    @abstractmethod
+    def update(self) -> None:
+        """
+        Method that updates operating system's thread list. It is used by OSPlugInCommon to keep the data up-to-date.
+        """
+        raise NotImplementedError
+
+    @traced
+    @abstractmethod
+    def get_threads(self) -> Iterable[OSThread]:
+        """
+        Method that returns current threads in the operating system.
+
+        :returns An iterable object with OSThread objects.
+        """
+        raise NotImplementedError
+
+    @traced
+    @abstractmethod
+    def get_registers(self, tid: int) -> RegisterSetCommon:
+        """
+        Method that fetches registers for the specified thread.
+
+        :param int tid: ID of a thread to fetch registers from.
+
+        :returns RegisterSet object filled with register values.
+        """
+        raise NotImplementedError
+
+    @traced
+    @abstractmethod
+    def create_thread(self, tid: int, context: int) -> OSThread:
+        """
+        Method that created operating system's thread. It is used by OSPlugInCommon to create thread info as operating
+        system defines it.
+
+        :param int tid: ID of a thread.
+        :param int context: pointer to the thread's context.
+
+        :returns Created thread object.
+        """
+        raise NotImplementedError
Index: lldb/examples/python/lldb-os-plugin/core/cvalue.py
===================================================================
--- /dev/null
+++ lldb/examples/python/lldb-os-plugin/core/cvalue.py
@@ -0,0 +1,306 @@
+from typing import Any
+
+from utilities.tracing import traced
+
+import lldb
+
+
+class value:
+    """
+    A class designed to wrap lldb.SBValue() objects so the resulting object
+    can be used as a variable. So if you have a Point structure variable in
+    your code in the current frame named "pt", you can initialize an instance
+    of this class with it:
+
+    pt = lldb.value(lldb.frame.FindVariable("pt"))
+    print pt
+    print pt.x
+    print pt.y
+
+    pt = lldb.value(lldb.frame.FindVariable("rectangle_array"))
+    print rectangle_array[12]
+    print rectangle_array[5].origin.x
+
+    If you need to access wrapped SBValue, use get_sbvalue() method since this class does not have any public
+    attributes in order to avoid collisions with SBValue's fields.
+
+    :param lldb.SBValue sb_value: SBValue object to wrap.
+    """
+    def __init__(self, sb_value: lldb.SBValue):
+        self.sbvalue = sb_value
+
+    def get_sbvalue(self) -> lldb.SBValue:
+        """
+        Access wrapped value.
+
+        :returns Unwrapped SBValue object.
+        """
+        return object.__getattribute__(self, 'sbvalue')
+
+    def __nonzero__(self):
+        return self.get_sbvalue().__nonzero__() and self._get_value_as_unsigned() != 0
+
+    def __repr__(self):
+        self.get_sbvalue().__str__()
+
+    def __str__(self):
+        summary = self.get_sbvalue().GetSummary()
+        if summary:
+            return summary.strip('"')
+        return self.get_sbvalue().__str__()
+
+    def __len__(self):
+        return self.get_sbvalue().GetNumChildren()
+
+    def __iter__(self):
+        return ValueIter(self)
+
+    def __getitem__(self, key):
+        # Allow array access if this value has children.
+        if isinstance(key, slice):
+            start = int(key.start)
+            end = int(key.stop)
+            step = 1 if key.step is None else int(key.step)
+            return [self[i] for i in range(start, end, step)]
+        if isinstance(key, value):
+            key = int(key)
+        if isinstance(key, int):
+            child_sbvlaue = self.get_sbvalue().GetChildAtIndex(key)
+            if child_sbvlaue and child_sbvlaue.IsValid():
+                return value(child_sbvlaue)
+            raise IndexError(f"Index '{key}' is out of range")
+        raise TypeError(f"Cannot fetch array item for type '{type(key)}'. "
+                        "Accepted types: slice, value, int")
+
+    def __getattribute__(self, item):
+        if item in dir(value):
+            return object.__getattribute__(self, item)
+
+        child_sbvalue = self.get_sbvalue().GetChildMemberWithName(item)
+        if child_sbvalue and child_sbvalue.IsValid():
+            return value(child_sbvalue)
+        raise AttributeError(f"Attribute '{item}' is not defined")
+
+    def __int__(self):
+        if self.get_sbvalue().GetType().IsPointerType():
+            return self._get_value_as_unsigned()
+
+        is_num, is_sign = lldb.is_numeric_type(self.get_sbvalue().GetType().GetCanonicalType().GetBasicType())
+        if is_num and not is_sign:
+            return self._get_value_as_unsigned()
+        return self._get_value_as_signed()
+
+    def __add__(self, other):
+        return int(self) + int(other)
+
+    def __radd__(self, other):
+        return int(self) + int(other)
+
+    def __sub__(self, other):
+        return int(self) - int(other)
+
+    def __rsub__(self, other):
+        return int(other) - int(self)
+
+    def __mul__(self, other):
+        return int(self) * int(other)
+
+    def __rmul__(self, other):
+        return int(self) * int(other)
+
+    def __floordiv__(self, other):
+        return int(self) // int(other)
+
+    def __mod__(self, other):
+        return int(self) % int(other)
+
+    def __rmod__(self, other):
+        return int(other) % int(self)
+
+    def __divmod__(self, other):
+        return int(self) % int(other)
+
+    def __rdivmod__(self, other):
+        return int(other) % int(self)
+
+    def __pow__(self, other, modulo=None):
+        if modulo is None:
+            return int(self) ** int(other)
+        return self.__pow__(other) % int(modulo)
+
+    def __lshift__(self, other):
+        return int(self) << int(other)
+
+    def __rshift__(self, other):
+        return int(self) >> int(other)
+
+    def __and__(self, other):
+        return int(self) & int(other)
+
+    def __rand(self, other):
+        return int(self) & int(other)
+
+    def __xor__(self, other):
+        return int(self) ^ int(other)
+
+    def __or__(self, other):
+        return int(self) | int(other)
+
+    def __div__(self, other):
+        return int(self) / int(other)
+
+    def __rdiv__(self, other):
+        return int(other) / int(self)
+
+    def __truediv__(self, other):
+        return int(self) / int(other)
+
+    def __iadd__(self, other):
+        result = self.__add__(other)
+        self.get_sbvalue().SetValueFromCString(str(result))
+        return result
+
+    def __isub__(self, other):
+        result = self.__sub__(other)
+        self.get_sbvalue().SetValueFromCString(str(result))
+        return result
+
+    def __imul__(self, other):
+        result = self.__mul__(other)
+        self.get_sbvalue().SetValueFromCString(str(result))
+        return result
+
+    def __idiv__(self, other):
+        result = self.__div__(other)
+        self.get_sbvalue().SetValueFromCString(str(result))
+        return result
+
+    def __itruediv__(self, other):
+        result = self.__truediv__(other)
+        self.get_sbvalue().SetValueFromCString(str(result))
+        return result
+
+    def __ifloordiv__(self, other):
+        result = self.__floordiv__(other)
+        self.get_sbvalue().SetValueFromCString(str(result))
+        return result
+
+    def __imod__(self, other):
+        result = self.__and__(other)
+        self.get_sbvalue().SetValueFromCString(str(result))
+        return result
+
+    def __ipow__(self, other, modulo=None):
+        result = self.__pow__(other, modulo)
+        self.get_sbvalue().SetValueFromCString(str(result))
+        return result
+
+    def __ilshift__(self, other):
+        result = self.__lshift__(other)
+        self.get_sbvalue().SetValueFromCString(str(result))
+        return result
+
+    def __irshift__(self, other):
+        result = self.__rshift__(other)
+        self.get_sbvalue().SetValueFromCString(str(result))
+        return result
+
+    def __iand__(self, other):
+        result = self.__and__(other)
+        self.get_sbvalue().SetValueFromCString(str(result))
+        return result
+
+    def __eq__(self, other):
+        if isinstance(other, int):
+            return int(self) == other
+        elif isinstance(other, str):
+            return str(self) == other
+        elif isinstance(other, value):
+            self_err = lldb.SBError()
+            other_err = lldb.SBError()
+            self_val = self.get_sbvalue().GetValueAsUnsigned(self_err)
+            if self_err.fail:
+                raise ValueError('Unable to extract value of self')
+            other_val = other.get_sbvalue().GetValueAsUnsigned(other_err)
+            if other_err.fail:
+                raise ValueError('Unable to extract value of other')
+            return self_val == other_val
+        raise TypeError(f"Unknown type: '{type(other)}', No equality operation defined")
+
+    def _get_value_as_unsigned(self):
+        sbvalue = self.get_sbvalue()
+
+        serr = lldb.SBError()
+        retval = sbvalue.GetValueAsUnsigned(serr)
+        if serr.success:
+            return retval
+        raise ValueError("Failed to read unsigned data. "
+                         f"{sbvalue}(type={sbvalue.GetType()}) "
+                         f"Error description: {serr.GetCString()}")
+
+    def _get_value_as_signed(self):
+        sbvalue = self.get_sbvalue()
+
+        serr = lldb.SBError()
+        retval = sbvalue.GetValueAsSigned(serr)
+        if serr.success:
+            return retval
+        raise ValueError("Failed to read signed data. "
+                         f"{sbvalue}(type={sbvalue.GetType()}) "
+                         f"Error description: {serr.GetCString()}")
+
+
+class ValueIter:
+
+    def __init__(self, val: value):
+        self.index = 0
+        self.val = val
+        self.length = len(val)
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        if self.index == self.length:
+            raise StopIteration
+        child_value = self.val[self.index]
+        self.index += 1
+        return child_value
+
+
+@traced
+def unsigned(val: Any) -> int:
+    """
+    Helper function to get unsigned value from core.value
+
+    :param Any val: value (see value class above) to be representated as integer.
+
+    :returns Unsigned int representation of the val.
+
+    :raises ValueError if the type cannot be represented as unsigned integer.
+    """
+    if isinstance(val, value):
+        return val._get_value_as_unsigned()
+    return int(val)
+
+
+@traced
+def dereference(val: value) -> value:
+    """
+    Get a dereferenced object for a pointer type object.
+
+    Example:
+        val = dereference(ptr_obj) # python
+        is same as:
+            ptr_obj = (int *)0x1234; # C
+            val = *ptr_obj; # C
+
+    :param value val: value object representing a pointer type.
+
+    :returns Dereferenced value.
+
+    :raises TypeError if the type cannot be dereferenced.
+    """
+    if val.get_sbvalue().GetType().IsPointerType():
+        return value(val.get_sbvalue().Dereference())
+    raise TypeError('Cannot dereference a non-pointer type')
Index: lldb/examples/python/lldb-os-plugin/arch/common.py
===================================================================
--- /dev/null
+++ lldb/examples/python/lldb-os-plugin/arch/common.py
@@ -0,0 +1,38 @@
+from typing import Type
+
+from core.register_set_common import *
+
+
+"""
+This file contains common functionality for arch-specific code, helper functions,
+and definitions of architecture names.
+
+IMPORTANT:
+Architecture name should be a valid name of that architecture in the LLVM.
+By convention:
+    1) arch-specific code must be placed under $(ARCH_NAME).py file;
+    2) each architecture should implement a class named 'RegisterSet'
+       inherited from core.RegisterSetCommon class.
+"""
+
+ARC_ARCH = 'arc'
+
+
+def arch_to_regset_type(arch: str) -> Type[RegisterSetCommon]:
+    """
+    Helper function that for an architecture returns a type of corresponding register set.
+
+    :param Arch arch: Target architecture.
+
+    :returns Type of register set for that architecture.
+
+    :raises ValueError if the architecture is not supported by the plug-in.
+    """
+    import importlib.util
+    spec = importlib.util.find_spec('arch.' + arch)
+    if spec is None:
+        raise ValueError(f'Unsupported architecture: {arch}')
+
+    module = importlib.util.module_from_spec(spec)
+    spec.loader.exec_module(module)
+    return module.RegisterSet
Index: lldb/examples/python/lldb-os-plugin/arch/arc.py
===================================================================
--- /dev/null
+++ lldb/examples/python/lldb-os-plugin/arch/arc.py
@@ -0,0 +1,65 @@
+import logging
+
+from typing import Dict, Union, Any
+
+import os
+import sys
+import struct
+
+from core.register_set_common import *
+
+logger = logging.getLogger(__name__)
+
+import lldb
+# Find the location of registers description file.
+_interp = lldb.debugger.GetCommandInterpreter()
+_result = lldb.SBCommandReturnObject()
+_interp.HandleCommand('settings show plugin.process.icd.hardware.registers', _result)
+sys.path.append(os.path.dirname(_result.GetOutput().split()[-1][1:]))
+try:
+    import registers_description as rd
+except ImportError:
+    logger.critical("ARC architecture is chosen, but "
+                    "'plugin.process.icd.hardware.registers' setting points to wrong location")
+    raise
+
+
+class RegisterSet(RegisterSetCommon):
+
+    register_info = rd.get_dynamic_setting(None,  # We do not actually need a target for that.
+                                           'target-registers-description')
+
+    regname_to_regnum = dict()
+    for index, reg_info in enumerate(register_info['registers']):
+        regname_to_regnum[reg_info['name']] = index
+        if 'alt-name' in reg_info:
+            regname_to_regnum[reg_info['alt-name']] = index
+
+    regname_to_regnum['lp_count'] = regname_to_regnum['lp-count']
+
+    def __init__(self, init_values: Dict[Union[str, int], int] = None):
+        super().__init__()
+
+        self._register_count = len(self.register_info['registers'])
+        self._registers = [None] * self._register_count
+
+        if init_values is not None:
+            self.fill(init_values)
+
+    def reset_register_values(self) -> None:
+        self._registers = [None] * self._register_count
+
+    def get_packed_register_state(self) -> bytes:
+        self._registers = [0 if elem is None else elem for elem in self._registers]
+        return struct.pack('%uI' % self._register_count, *self._registers)
+
+    def set_register_value(self, key: Union[str, int], value: int) -> None:
+        if isinstance(key, str):
+            key = self.regname_to_regnum[key]
+        self._registers[key] = value
+
+    def fill(self, registers: Dict[Union[str, int], int]) -> None:
+        self.reset_register_values()
+
+        for k, v in registers.items():
+            self.set_register_value(k, v)
_______________________________________________
lldb-commits mailing list
lldb-commits@lists.llvm.org
https://lists.llvm.org/cgi-bin/mailman/listinfo/lldb-commits

Reply via email to