The current Python interface to Xenstore is just a thin binding to the
C libxenstore library. This means that it is architecture-specific and
makes it awkward to use in platform-independent code like the XenServer
guest agent.
The Xenstore protocol is simple and quite straightforward to implement
natively in Python.
Design & Implementation
Users create an instance of the Xenstore object. This is a singleton
that holds all the global state (xenbus fd etc).
Users invoke methods e.g. read() that block awaiting a response from
xenstored. Methods return either the result or throw XenstoreException
with a string representation of the error e.g. 'ENOENT'.
The methods attempt to present complex results using native datatypes
(lists, dictionaries) rather the usual Xenstore string values e.g.
{6: 'read-write', 9: 'none'}
rather than
'b6n9'
Transactions are managed using instances of the Transaction object. Each
instance must be utilized thus, either
t.start()
# zero or more operations
t.commit()
or
t.start()
# zero or more operations
t.abort()
Users should handle commit() raising XenstoreException('EAGAIN')
appropriately, e.g. replaying the whole start(), operations, commit()
sequence.
Watches are complicated because they are not synchronous. When the first
watch is registered a background thread is started. This thread takes
responsibility for the reading of all data from the xenbus fd. Responses
for operations are forwarded to the foreground thread using a Queue.
Users of watch() supply the path to watch, a callback function and
(optionally) parameters. The callback function is invoked with the path
and the optional parameters. watch() returns a string token that is
passed to unwatch() to remove a watch.
#!/bin/env python
import os
import Queue
import select
import struct
import tempfile
import threading
class XenstoreException(StandardError):
pass
"""
Background thread that reads from xenbus, acting on watch events and
forwarding
operation responses to the foreground thread.
"""
def handle_watches(xs):
while not xs.shutdown_event.isSet():
r, _, __ = select.select([xs.pipe_r, xs.xb_fd], [], [])
if xs.xb_fd in r:
# read and unpack header
hdr = xs.xb_fd.read(16)
op, _, __, l = struct.unpack('=IIII', hdr)
value = None
if l > 0:
value = xs.xb_fd.read(l)
if op == xs.XS_WATCH_EVENT:
path, token, _ = value.split('\0', 2)
if token in xs.watches:
# invoke user callback
xs.watches[token]['cb'](path,
*xs.watches[token]['cbargs'])
else:
# response to an operation, handle in main thread
xs.queue.put(hdr)
if value:
xs.queue.put(value)
class Xenstore(object):
"""
Parent Xenstore handle (singleton).
"""
# from xen/include/public/io/xs_wire.h
XS_READ = 2
XS_WATCH = 4
XS_GET_PERMS = 3
XS_UNWATCH = 5
XS_TRANSACTION_START = 6
XS_TRANSACTION_END = 7
XS_WRITE = 11
XS_MKDIR = 12
XS_RM = 13
XS_SET_PERMS = 14
XS_WATCH_EVENT = 15
XS_ERROR = 16
if os.uname()[0] == 'Linux':
DEV_PATH = '/proc/xen/xenbus'
elif os.uname()[0] == 'NetBSD':
DEV_PATH = '/kern/xen/xenbus'
else:
DEV_PATH = '/dev/xen/xenbus'
PERM_NONE, PERM_READ, PERM_WRITE, PERM_READ_WRITE = range(4)
__single = None
def __new__(classtype, *args, **kwargs):
if classtype != type(classtype.__single):
classtype.__single = object.__new__(classtype, *args, **kwargs)
return classtype.__single
def __init__(self):
self.xb_fd = open(self.DEV_PATH, 'r+', 0)
self.watch_thread = None
self.queue = None
self.watches = {}
def __del__(self):
self.xb_fd.close()
def do_op(self, ctxt, op, value = '', req = 0):
"""
Perform Xenstore operation $op and return response.
"""
if ctxt:
tx_id = ctxt.tx_id
else:
tx_id = 0
ret = None
self.xb_fd.write(struct.pack('=IIII', op, req, tx_id, len(value)))
if len(value):
self.xb_fd.write(value)
if self.queue:
hdr = self.queue.get()
else:
hdr = self.xb_fd.read(16)
r_op, req, tx, l = struct.unpack('=IIII', hdr)
if l > 0:
if self.queue:
ret = self.queue.get()
else:
ret = self.xb_fd.read(l)
if r_op == self.XS_ERROR:
raise XenstoreException, ret[:-1]
return ret
def _read(self, ctxt, *vars):
d = {}
for var in vars:
d[var] = self.do_op(ctxt, self.XS_READ, var + '\0')
if len(vars) == 1:
return d[var]
return d
def read(self, *vars):
"""
Return a dictionary of Xenstore values for $vars (or a string
if only a
single key is passed in).
"""
return self._read(None, *vars)
def _mkdir(self, ctxt, *vars):
for var in vars:
self.do_op(ctxt, self.XS_MKDIR, var + '\0')
def mkdir(self, *vars):
"""
Create empty directories in Xenstore.
"""
self._mkdir(None, *vars)
def _rm(self, ctxt, *vars):
for var in vars:
self.do_op(ctxt, self.XS_RM, var + '\0')
def rm(self, *vars):
"""
Remove all keys in $vars from Xenstore.
"""
self._rm(None, *vars)
def write(self, vars, ctxt = None):
"""
Update Xenstore with the keys and values in dictionary $vars.
"""
for k, v in vars.items():
self.do_op(ctxt, self.XS_WRITE, k + '\0' + v)
perm_map = {'n': PERM_NONE, 'r': PERM_READ, 'w': PERM_WRITE, 'b':
PERM_READ_WRITE}
rev_perm_map = {PERM_NONE: 'n', PERM_READ: 'r', PERM_WRITE: 'w',
PERM_READ_WRITE: 'b'}
def _get_permissions(self, ctxt, *paths):
d = {}
for path in paths:
t = self.do_op(ctxt, self.XS_GET_PERMS, path + '\0')
perm_d = {}
for e in t[:-1].split('\0'):
perm_d[int(e[1:])] = self.perm_map[e[0]]
d[path] = perm_d
if len(paths) == 1:
return d[path]
return d
def get_permissions(self, *paths):
"""
Return a list of permission dictionaries (or a single if only a
single key is passed in.
"""
return self._get_permissions(None, *paths)
def set_permissions(self, path, perms, ctxt = None):
"""
Set the permissions of $path based on the dictionary $perms.
"""
perm_l = []
for k, v in perms.items():
perm_l.append("%c%d" % (self.rev_perm_map.get(v, 'n'), k))
self.do_op(None, self.XS_SET_PERMS, path + '\0' +
'\0'.join(perm_l) + '\0')
def watch(self, path, callback, *callback_args):
"""
Watch $path (and its subordinates), invoking $callback($path,
$callback_args)
in a background thread.
Returns a token to be used by unwatch().
"""
if not self.watch_thread:
self.watch_thread = threading.Thread(target=handle_watches,
args=(self,))
self.watch_thread.setDaemon(True)
self.shutdown_event = threading.Event()
self.queue = Queue.Queue(2)
self.pipe_r, self.pipe_w = os.pipe()
self.watches = {}
self.watch_thread.start()
fh = tempfile.NamedTemporaryFile(prefix = 'xs')
token = fh.name
self.watches[token] = {'fh': fh, 'path': path, 'cb': callback,
'cbargs': callback_args}
self.do_op(None, self.XS_WATCH, path + '\0' + token + '\0')
return token
def unwatch(self, path, token):
"""
Stop watching the path monitored by the watch() call that
returned $token.
"""
if token in self.watches:
self.do_op(None, self.XS_UNWATCH, path + '\0' + token + '\0')
self.watches[token]['fh'].close()
del self.watches[token]
def unwatch_all(self):
for k, v in self.watches.items():
self.unwatch(v['path'], k)
def watch_stop(self):
if self.watch_thread:
self.shutdown_event.set()
os.write(self.pipe_w, 'x')
self.watch_thread.join()
self.watch_thread = None
self.queue = None
os.close(self.pipe_r)
os.close(self.pipe_w)
class Transaction(object):
"""
Xenstore transaction instance.
"""
def __init__(self):
self.xs = Xenstore()
self.tx_id = 0
def start(self):
"""
Start a new transaction.
"""
self.tx_id = 0
self.tx_id = int(self.xs.do_op(self,
self.xs.XS_TRANSACTION_START, '\0')[:-1])
def commit(self):
"""
Commit all modifications in this transaction to Xenstore.
"""
self.xs.do_op(self, self.xs.XS_TRANSACTION_END, 'T\0')
def abort(self):
"""
Discard all modifications in this transaction.
"""
self.xs.do_op(self, self.xs.XS_TRANSACTION_END, 'F\0')
def read(self, *vars):
return self.xs._read(self, *vars)
def mkdir(self, *vars):
self.xs._mkdir(self, *vars)
def rm(self, *vars):
self.xs._read(self, *vars)
def write(self, vars):
self.xs.write(vars, self)
def get_permissions(self, *paths):
return self.xs_get_permissions(self, *paths)
def set_permissions(self, path, perms):
self.xs.set_permissions(perms, self)
# example to code to dev test
if __name__ == '__main__':
def watch_cb(key, n):
print "Watch fired:", key, n
repeat_count = 5
# create xenstore handle
xs = Xenstore()
# read three keys, returns three values
print xs.read('domid', 'vm', 'name')
# write a key/value
xs.write({'new': 'stuff'})
# verify contents were written
assert xs.read('new') == 'stuff'
print xs.get_permissions('new')
xs.set_permissions('new', {0: xs.PERM_READ_WRITE, os.getpid():
xs.PERM_READ})
print xs.get_permissions('new')
xs.mkdir('another-new')
# write another key
xs.write({'new/path': 'more stuff'})
xs.read('new/path')
# delete key
xs.rm('new/path')
# attempt to read deleted key
try:
print xs.read('new/path')
except XenstoreException, e:
print "Failed to read (expected)", e
# create and start transaction
tx = Transaction()
tx.start()
tx.write({'tnew': 'committed'})
# commit and verify value is updated
tx.commit()
assert xs.read('tnew') == 'committed'
# start new transaction
tx.start()
tx.write({'tnew': 'aborted'})
# abort and verify value is NOT updated
tx.abort()
assert xs.read('tnew') != 'aborted'
# watch key
xs.watch('new', watch_cb, 7)
d = {}
# repeatedly run a large transaction to cause collisions between
two program instances
for n in range(1, 200):
d["collision/key"+str(n)] = "value"+str(n)
while True:
try:
tx.start()
tx.write(d)
tx.commit()
except XenstoreException, e:
if str(e) == 'EAGAIN':
repeat_count -= 1
if repeat_count == 0:
break
print "Collision on commit, repeating"
else:
print "Unexpected Xenstore error", e
break
print "End of test"
xs.unwatch_all()
# stop background watch thread
xs.watch_stop()
_______________________________________________
Xen-devel mailing list
Xen-devel@lists.xen.org
http://lists.xen.org/xen-devel