Ian Bicking wrote:
> I got a puzzler for y'all.  I want to allow the editing of functions
> in-place.  I won't go into the reason (it's for HTConsole --
> http://blog.ianbicking.org/introducing-htconsole.html), except that I
> really want to edit it all in-process and in-memory.  So I want the
> identity of the function to remain the same, even as I edit the body
> and hopefully the signature too.
> 
> Well, the reason is that I want to edit any function object, without
> having to know who has a reference to the function.  This way editing a
> function or a method or a property.fget can all be done in the same
> way.
> 
> The func_code attributes of functions is writable, but I don't know how
> to create the proper code object.  Just compiling a new body isn't good
> enough.
> 


The experimental module below updates many of the builtin types in-place, 
including functions.  Functions in particular are handled by 
update_FunctionType.  I use this in my own development environment to edit live 
code i.e., as a better reload.


Michael


"""Update live code with new source.

Example:
     >>> source1 = "def func1(a): return 2*a"
     >>> namespace = {}
     >>> exec source1 in namespace
     >>> func1 = namespace["func1"]
     >>> func1(2)
     4
     >>> source2 = "def func1(a, b): return 2*a+b"
     >>> exec_update(namespace, source2, verbosity=2)
     Updating: <type 'dict'>,
     .func1 Updated
     True
     >>> func1(2,2)
     6
     >>>
"""


import types
import gc

class UpdateException(Exception): pass

def exec_update(namespace, source, name="", verbosity = 1):
     module_proxy.verbosity = verbosity
     if verbosity == 2:
         print "Updating: %s, %s" % (type(namespace), name)
     if isinstance(namespace, types.ModuleType):
         proxy = module_proxy(namespace)
     elif isinstance(namespace, (type, types.ClassType)):
         proxy = cls_proxy(namespace)
     elif isinstance(namespace, dict):
         proxy = dict_proxy(namespace, name)
     else:
         raise UpdateException, "Unrecognized namespace type: %s" % 
type(namespace)
     exec source in proxy.dict, proxy
     return True


class module_proxy(object):
     DO_NOT_COPY = ()
     verbosity = 1
     def __init__(self, module, name = ""):
         self.dict = module.__dict__
         self.namespace = module
         self.name = getattr(module,"__name__", name)

     def __contains__(self, key):
         return key in self.dict

     def _setitem(self,key,value):
         self.dict[key] = value

     def _delitem(self, key):
         del self.dict[key]

     def __getitem__(self, key):
         return self.dict[key]

     def __setitem__(self, key, value):
         try:
             obj  = self.dict[key]
         except KeyError:
             if self.verbosity >= 1:
                 print "** %s.%s=%s (Binding)" % (self.name, key, repr(value))
             self._setitem(key, value)
             return True
         try:
             if update(obj, value):
                 if self.verbosity >= 1:
                     print "%s.%s Updated" % (self.name, key)
                 return True
             else:
                 if self.verbosity >= 2:
                     print "%s.%s No change" % (self.name, key)
                 return False
         except UpdateException:
             if self.verbosity >= 1:
                 print "** %s.%s=>%s (Rebinding)" % (self.name, key, 
repr(value))
             self._setitem(key, value)
             return True

     def __delitem__(self, key):
         if self.verbosity >= 1:
             print "** del %s.%s" % (self.name, key)
         self._delitem(key)

     def update_ns(self, other_dict, delete_missing=False, skip = ()):
         dirty = False
         if delete_missing:
             for attr in self.dict.keys():
                 if not((attr in other_dict) or (attr in skip)):
                     dirty = True
                     del self[attr]
         for to_attr, to_obj in other_dict.iteritems():
             if to_attr in skip:
                 continue
             dirty |= self.__setitem__(to_attr, to_obj)
         return dirty

class dict_proxy(module_proxy):
     def __init__(self, my_dict, name = ""):
         self.dict = my_dict
         self.namespace = None
         self.name = name


class cls_proxy(module_proxy):

     def _setitem(self,key,value):
         setattr(self.namespace,key,value)

     def _delitem(self, key):
         delattr(self.namespace, key)

     def update_cls(self, other):
         DONOTCOPY = set(["__name__","__bases__","__base__","__dict__",
                         "__doc__","__weakref__","__module__"])

         # This will often fail if they are not equal, so find out first!
         obj = self.namespace
         try:
             obj.__bases__ = other.__bases__
         except TypeError, err:
             raise UpdateException, err

         fromdict = obj.__dict__
         todict = other.__dict__
         obj_slots = set(getattr(obj, "__slots__", ()))
         other_slots = set(getattr(other, "__slots__", ()))
         if  other_slots > obj_slots:
             raise UpdateException, "Can't add slots %s" % \
                                             list(other_slots - obj_slots)

         return self.update_ns(todict, delete_missing=True, skip = DONOTCOPY | 
obj_slots)

##TODO - Instances with __slots__ (see decimal.Decimal)
class instance_proxy(dict_proxy):
     def __init__(self, my_obj, name = ""):
         self.dict = my_obj.__dict__
         self.namespace = my_obj
         self.name = name


# Specialized updaters by type
# Each updater must return: True, False or raise UpdateException
#   False: no change required - new object is equal to the old
#   True: object was successfully updated
#   UpdateException - object could not be updated in place.  Caller must
#    then decide whether to re-bind

def update_classmethod(obj, other):
     # Massive hack.  classmethod (and staticmethod) do not expose their
     # underlying callable to Python.  But they *seem* always to have
     # one reference - which must be to the callable.
     obj_refs = gc.get_referents(obj)
     other_refs = gc.get_referents(other)
     if len(obj_refs) != len(other_refs) != 1:
         raise UpdateException, "Too many references from classmethod"
     return update(obj_refs[0], other_refs[0])

update_staticmethod = update_classmethod

def update_classmethod_FunctionType(obj, other):
     obj_refs = gc.get_referents(obj)
     if len(obj_refs) != 1:
         raise UpdateException, "Too many references from classmethod"
     return update(obj_refs[0], other)

def update_property(obj, other):
     # These are read-only attributes, so unless we can update them
     # in-place, update_property will fail
     dirty = False
     dirty |= update(obj.fget, other.fget)
     dirty |= update(obj.fset, other.fset)
     dirty |= update(obj.fdel, other.fdel)
     dirty |= update(obj.__doc__, other.__doc__)
     return dirty


def update_FunctionType(obj, other):
     # Note this applies only to Python-defined functions
     dirty = False
     for attr in ("func_code", "func_defaults", "func_dict", "func_doc", 
"func_globals"):
         try:
             obj_attr, other_attr = getattr(obj, attr), getattr(other,attr)
         except AttributeError, err:
             # missing attribute means can't update the function
             raise UpdateException, err
         try:
             # If possible, update the function attributes in-place
             # this preserves, in particular, func_defaults
             dirty |= update(obj_attr, other_attr)
         except UpdateException:
             # If we can't update the function attribute in place
             # go ahead and overwrite it
             dirty = True
             setattr(obj, attr, other_attr)
     return dirty

def update_MethodType(obj, other):
     # No need to update the class/object bindings
     return update_FunctionType(obj.im_func, other.im_func)

def update_MethodType_FunctionType(obj, other):
     return update_FunctionType(obj.im_func, other)

def update_list(obj, other):
     if obj == other:
         return False
     obj[:] = other
     return True

def update_dict(obj, other):
     if obj == other:
         return False
     obj.clear()
     obj.update(other)
     return True

def update_deque(obj, other):
     if obj == other:
         return False
     obj.clear()
     obj.extend(other)
     return True

def update_array(obj, other):
     if obj == other:
         return False
     if obj.typecode != other.typecode:
         raise UpdateException, "Can't change typecode of array"
     obj[:] = other
     return True


def update_tuple(obj, other):
     # We go to some additional lengths with tuples
     if obj == other:
         return False
     if len(obj) == len(other):
         dirty = False
         for ob1, ob2 in zip(obj, other):
             # This will raise UpdateException if any of the members
             # are not updatable
             dirty |= update(ob1, ob2)
         return dirty
     raise UpdateException, "Unequal tuples"

update_set = update_dict

def update_type(obj, other):
     proxy_cls = cls_proxy(obj)
     return proxy_cls.update_cls(other)

update_ClassType = update_type


def update(obj, other):
     if obj is other:
         return False
     type_pair = type(obj), type(other)
     #name = getattr(obj,"__name__", "")
     try:
         update_method = _method_cache[type_pair]
     except KeyError:
         pass
     else:
         #print "UpdateType %r" % obj
         return update_method(obj, other)

     if type_pair[0] == type_pair[1]:
         if isinstance(obj, (object, types.InstanceType)):
             try:
                 obj_dict, other_dict = obj.__dict__, other.__dict__
             except AttributeError:
                 pass
             else:
                 obj_dict_proxy = instance_proxy(obj)
                 return obj_dict_proxy.update_ns(other_dict, True)

         if isinstance(obj, IMMUTABLE_TYPES):
             if obj == other:
                 return False

     raise UpdateException, "Can't update %s to %s" % type_pair

# Can't update these, but if they compare equal, there's no need to
import datetime
IMMUTABLE_TYPES = (bool, int, long, float, complex,
         buffer, basestring, slice,
         types.BuiltinFunctionType, types.BuiltinMethodType, types.CodeType,
         types.DictProxyType,
         datetime.date, datetime.time, datetime.timedelta, datetime.tzinfo)

del datetime

def _get_types():
     from collections import deque
     from array import array
     d= locals()
     d.update(types.__dict__)
     return d

def _get_method_cache():
     cache = {}
     type_dict = _get_types()
     for name, obj in globals().items():
         if name.startswith("update_"):
             fromtypename, totypename = name.rsplit("_", 2)[-2:]
             if fromtypename == "update":
                 fromtypename = totypename
             fromtype = eval(fromtypename, type_dict)
             totype = eval(totypename, type_dict)
             cache[(fromtype,totype)] = obj
     return cache

_method_cache = _get_method_cache()

-- 
http://mail.python.org/mailman/listinfo/python-list

Reply via email to