I am not much of a python person. Hope some one more knowledgable with python can review this.
On Sat, Jan 16, 2016 at 12:16 AM, Liran Schour <lir...@il.ibm.com> wrote: > Python idl works now with "monitor_cond" method. Add test > for backward compatibility with old "monitor" method. > > Signed-off-by: Liran Schour <lir...@il.ibm.com> > --- > python/ovs/db/data.py | 12 ++++ > python/ovs/db/idl.py | 167 > ++++++++++++++++++++++++++++++++++++++++++++++---- > tests/ovsdb-idl.at | 97 +++++++++++++++++++++++++++++ > 3 files changed, 265 insertions(+), 11 deletions(-) > > diff --git a/python/ovs/db/data.py b/python/ovs/db/data.py > index 6baff38..0d382e1 100644 > --- a/python/ovs/db/data.py > +++ b/python/ovs/db/data.py > @@ -386,6 +386,18 @@ class Datum(object): > s.append(tail) > return ''.join(s) > > + def diff(self, datum): > + if self.type.n_max > 1 or len(self.values) == 0: > + for k, v in datum.values.iteritems(): > + if k in self.values and v == self.values[k]: > + del self.values[k] > + else: > + self.values[k] = v > + else: > + return datum > + > + return self > + > def as_list(self): > if self.type.is_map(): > return [[k.value, v.value] for k, v in > self.values.iteritems()] > diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py > index 17ed15b..e95f42e 100644 > --- a/python/ovs/db/idl.py > +++ b/python/ovs/db/idl.py > @@ -30,6 +30,9 @@ ROW_CREATE = "create" > ROW_UPDATE = "update" > ROW_DELETE = "delete" > > +class Update_version(): > + OVSDB_UPDATE = 0, > + OVSDB_UPDATE2 = 1. > > class Idl(object): > """Open vSwitch Database Interface Definition Language (OVSDB IDL). > @@ -83,6 +86,10 @@ class Idl(object): > currently being constructed, if there is one, or None otherwise. > """ > > + IDL_S_INITIAL = 0 > + IDL_S_MONITOR_REQUESTED = 1 > + IDL_S_MONITOR_COND_REQUESTED = 2 > + > def __init__(self, remote, schema): > """Creates and returns a connection to the database named > 'db_name' on > 'remote', which should be in a form acceptable to > @@ -113,6 +120,8 @@ class Idl(object): > self._monitor_request_id = None > self._last_seqno = None > self.change_seqno = 0 > + self.uuid = uuid.uuid1() > + self.state = self.IDL_S_INITIAL > > # Database locking. > self.lock_name = None # Name of lock we need, None if > none. > @@ -131,6 +140,7 @@ class Idl(object): > table.need_table = False > table.rows = {} > table.idl = self > + table.condition = [] > > def close(self): > """Closes the connection to the database. The IDL will no longer > @@ -177,11 +187,15 @@ class Idl(object): > if msg is None: > break > if (msg.type == ovs.jsonrpc.Message.T_NOTIFY > + and msg.method == "update2" > + and len(msg.params) == 2): > + # Database contents changed. > + self.__parse_update(msg.params[1], > Update_version.OVSDB_UPDATE2) > + elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY > and msg.method == "update" > - and len(msg.params) == 2 > - and msg.params[0] is None): > + and len(msg.params) == 2): > # Database contents changed. > - self.__parse_update(msg.params[1]) > + self.__parse_update(msg.params[1], > Update_version.OVSDB_UPDATE) > elif (msg.type == ovs.jsonrpc.Message.T_REPLY > and self._monitor_request_id is not None > and self._monitor_request_id == msg.id): > @@ -190,8 +204,13 @@ class Idl(object): > self.change_seqno += 1 > self._monitor_request_id = None > self.__clear() > - self.__parse_update(msg.result) > - except error.Error as e: > + if self.state == self.IDL_S_MONITOR_COND_REQUESTED: > + self.__parse_update(msg.result, > Update_version.OVSDB_UPDATE2) > + else: > + assert self.state == self.IDL_S_MONITOR_REQUESTED > + self.__parse_update(msg.result, > Update_version.OVSDB_UPDATE) > + > + except error.Error, e: > vlog.err("%s: parse error in received schema: %s" > % (self._session.get_name(), e)) > self.__error() > @@ -211,6 +230,11 @@ class Idl(object): > elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == > "echo": > # Reply to our echo request. Ignore it. > pass > + elif (msg.type == ovs.jsonrpc.Message.T_ERROR and > + self.state == self.IDL_S_MONITOR_COND_REQUESTED and > + self._monitor_request_id == msg.id): > + if msg.error == "unknown method": > + self.__send_monitor_request() > elif (msg.type in (ovs.jsonrpc.Message.T_ERROR, > ovs.jsonrpc.Message.T_REPLY) > and self.__txn_process_reply(msg)): > @@ -225,6 +249,21 @@ class Idl(object): > > return initial_change_seqno != self.change_seqno > > + def cond_update(self, table_name, add, remove): > + """Change conditions for this IDL session. If session is not > already > + connected, add condtion to table and submit it on > send_monitor_request. > + Otherwise send monitor_cond_update method with the requested > changes.""" > + table = self.tables.get(table_name) > + if not table: > + raise error.Error('Unknown table "%s"' % table_name) > + if self._session.is_connected(): > + self.__send_cond_update(table, add ,remove) > + else: > + if remove: > + raise error.Error('Non-empty remove condition on > unconnected' > + 'idl session') > + self.__add_condition(table, add) > + > def wait(self, poller): > """Arranges for poller.block() to wake up when self.run() has > something > to do or when activity occurs on a transaction on 'self'.""" > @@ -280,6 +319,21 @@ class Idl(object): > :type updates: Row > """ > > + def __send_cond_update(self, table, add ,remove): > + monitor_cond_update = {table.name: [{"added": add, > + "removed": remove}]} > + old_uuid = str(self.uuid) > + self.uuid = uuid.uuid1() > + params = [old_uuid, str(self.uuid), monitor_cond_update] > + msg = ovs.jsonrpc.Message.create_request("monitor_cond_update", > params) > + msg_id = msg.id > + self._session.send(msg) > + > + def __add_condition(self, table, add): > + for clause in add: > + if not clause in table.condition: > + table.condition.append(clause) > + > def __clear(self): > changed = False > > @@ -337,6 +391,13 @@ class Idl(object): > self.is_lock_contended = True > > def __send_monitor_request(self): > + if self.state == self.IDL_S_INITIAL: > + self.state = self.IDL_S_MONITOR_COND_REQUESTED > + method = "monitor_cond" > + else: > + self.state = self.IDL_S_MONITOR_REQUESTED > + method = "monitor" > + > monitor_requests = {} > for table in self.tables.itervalues(): > columns = [] > @@ -346,23 +407,26 @@ class Idl(object): > (column not in self.readonly[table.name])): > columns.append(column) > monitor_requests[table.name] = {"columns": columns} > + if method == "monitor_cond" and table.condition: > + monitor_requests[table.name]["where"] = table.condition > + table.condition = None > + > msg = ovs.jsonrpc.Message.create_request( > - "monitor", [self._db.name, None, monitor_requests]) > + method, [self._db.name, str(self.uuid), monitor_requests]) > self._monitor_request_id = msg.id > self._session.send(msg) > > - def __parse_update(self, update): > + def __parse_update(self, update, version): > try: > - self.__do_parse_update(update) > - except error.Error as e: > + self.__do_parse_update(update, version) > + except error.Error, e: > vlog.err("%s: error parsing update: %s" > % (self._session.get_name(), e)) > > - def __do_parse_update(self, table_updates): > + def __do_parse_update(self, table_updates, version): > if type(table_updates) != dict: > raise error.Error("<table-updates> is not an object", > table_updates) > - > for table_name, table_update in table_updates.iteritems(): > table = self.tables.get(table_name) > if not table: > @@ -387,6 +451,11 @@ class Idl(object): > 'is not an object' > % (table_name, uuid_string)) > > + if version == Update_version.OVSDB_UPDATE2: > + if self.__process_update2(table, uuid, row_update): > + self.change_seqno += 1 > + continue > + > parser = ovs.db.parser.Parser(row_update, "row-update") > old = parser.get_optional("old", [dict]) > new = parser.get_optional("new", [dict]) > @@ -399,6 +468,46 @@ class Idl(object): > if self.__process_update(table, uuid, old, new): > self.change_seqno += 1 > > + def __process_update2(self, table, uuid, row_update): > + row = table.rows.get(uuid) > + changed = False > + if "delete" in row_update: > + if row: > + del table.rows[uuid] > + self.notify(ROW_DELETE, row) > + changed = True > + else: > + # XXX rate-limit > + vlog.warn("cannot delete missing row %s from table" > + "%s" % (uuid, table.name)) > + elif "insert" in row_update or "initial" in row_update: > + if row: > + vlog.warn("cannot add existing row %s from table" > + " %s" % (uuid, table.name)) > + del table.rows[uuid] > + row = self.__create_row(table, uuid) > + if "insert" in row_update: > + row_update = row_update['insert'] > + else: > + row_update = row_update['initial'] > + self.__add_default(table, row_update) > + if self.__row_update(table, row, row_update): > + changed = True > + self.notify(ROW_CREATE, row) > + elif "modify" in row_update: > + if not row: > + raise error.Error('Modify non-existing row') > + > + self.__apply_diff(table, row, row_update['modify']) > + self.notify(ROW_UPDATE, row, > + Row.from_json(self, table, > + uuid, row_update['modify'])) > + changed = True > + else: > + raise error.Error('<row-update> unknown operation', > + row_update) > + return changed > + > def __process_update(self, table, uuid, old, new): > """Returns True if a column changed, False otherwise.""" > row = table.rows.get(uuid) > @@ -439,6 +548,42 @@ class Idl(object): > self.notify(op, row, Row.from_json(self, table, uuid, > old)) > return changed > > + def __add_default(self, table, row_update): > + for column in table.columns.itervalues(): > + if column.name not in row_update: > + if ((table.name not in self.readonly) or > + (table.name in self.readonly) and > + (column.name not in self.readonly[table.name])): > + if column.type.n_min != 0 and not > column.type.is_map(): > + if column.type.key.type == ovs.db.types.UuidType: > + row_update[column.name] = > ovs.ovsuuid.to_json(column.type.key.type.default) > + else: > + row_update[column.name] = > column.type.key.type.default > + > + def __apply_diff(self, table, row, row_diff): > + data = {} > + for column_name, datum_json in row_diff.iteritems(): > + column = table.columns.get(column_name) > + if not column: > + # XXX rate-limit > + vlog.warn("unknown column %s updating table %s" > + % (column_name, table.name)) > + continue > + > + try: > + datum = ovs.db.data.Datum.from_json(column.type, > datum_json) > + except error.Error, e: > + # XXX rate-limit > + vlog.warn("error parsing column %s in table %s: %s" > + % (column_name, table.name, e)) > + continue > + > + datum = row._data[column_name].diff(datum) > + if datum != row._data[column_name]: > + row._data[column_name] = datum > + if column.alert: > + changed = True > + > def __row_update(self, table, row, row_json): > changed = False > for column_name, datum_json in row_json.iteritems(): > diff --git a/tests/ovsdb-idl.at b/tests/ovsdb-idl.at > index 4baac46..2238653 100644 > --- a/tests/ovsdb-idl.at > +++ b/tests/ovsdb-idl.at > @@ -646,6 +646,103 @@ OVSDB_CHECK_IDL_FETCH_COLUMNS([simple idl, initially > populated], > 003: done > ]]) > > +m4_define([OVSDB_CHECK_IDL_WO_MONITOR_COND_PY], > + [AT_SETUP([$1 - Python]) > + AT_SKIP_IF([test $HAVE_PYTHON = no]) > + AT_KEYWORDS([ovsdb server idl Python monitor $4]) > + AT_CHECK([ovsdb-tool create db $abs_srcdir/idltest.ovsschema], > + [0], [stdout], [ignore]) > + AT_CHECK([ovsdb-server '-vPATTERN:console:ovsdb-server|%c|%m' --detach > --no-chdir --pidfile="`pwd`"/pid --remote=punix:socket > --unixctl="`pwd`"/unixctl db], [0], [ignore], [ignore]) > + AT_CHECK([ovs-appctl -t "`pwd`"/unixctl > ovsdb-server/disable-monitor-cond]) > + AT_CHECK([$PYTHON $srcdir/test-ovsdb.py -t10 idl > $srcdir/idltest.ovsschema unix:socket $2], > + [0], [stdout], [ignore], [kill `cat pid`]) > + AT_CHECK([sort stdout | ${PERL} $srcdir/uuidfilt.pl]m4_if([$5],,, [[| > $5]]), > + [0], [$3], [], [kill `cat pid`]) > + OVSDB_SERVER_SHUTDOWN > + AT_CLEANUP]) > + > + > +m4_define([OVSDB_CHECK_IDL_WO_MONITOR_COND], > + [OVSDB_CHECK_IDL_WO_MONITOR_COND_PY($@)]) > + > + > +OVSDB_CHECK_IDL_WO_MONITOR_COND([simple idl disable monitor-cond], > + [['["idltest", > + {"op": "insert", > + "table": "simple", > + "row": {"i": 1, > + "r": 2.0, > + "b": true, > + "s": "mystring", > + "u": ["uuid", "84f5c8f5-ac76-4dbc-a24f-8860eb407fc1"], > + "ia": ["set", [1, 2, 3]], > + "ra": ["set", [-0.5]], > + "ba": ["set", [true]], > + "sa": ["set", ["abc", "def"]], > + "ua": ["set", [["uuid", > "69443985-7806-45e2-b35f-574a04e720f9"], > + ["uuid", > "aad11ef0-816a-4b01-93e6-03b8b4256b98"]]]}}, > + {"op": "insert", > + "table": "simple", > + "row": {}}]' \ > + '["idltest", > + {"op": "update", > + "table": "simple", > + "where": [], > + "row": {"b": true}}]' \ > + '["idltest", > + {"op": "update", > + "table": "simple", > + "where": [], > + "row": {"r": 123.5}}]' \ > + '["idltest", > + {"op": "insert", > + "table": "simple", > + "row": {"i": -1, > + "r": 125, > + "b": false, > + "s": "", > + "ia": ["set", [1]], > + "ra": ["set", [1.5]], > + "ba": ["set", [false]], > + "sa": ["set", []], > + "ua": ["set", []]}}]' \ > + '["idltest", > + {"op": "update", > + "table": "simple", > + "where": [["i", "<", 1]], > + "row": {"s": "newstring"}}]' \ > + '["idltest", > + {"op": "delete", > + "table": "simple", > + "where": [["i", "==", 0]]}]' \ > + 'reconnect']], > + [[000: empty > +001: > {"error":null,"result":[{"uuid":["uuid","<0>"]},{"uuid":["uuid","<1>"]}]} > +002: i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> > +002: i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] > sa=[abc def] ua=[<4> <5>] uuid=<0> > +003: {"error":null,"result":[{"count":2}]} > +004: i=0 r=0 b=true s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> > +004: i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] > sa=[abc def] ua=[<4> <5>] uuid=<0> > +005: {"error":null,"result":[{"count":2}]} > +006: i=0 r=123.5 b=true s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> > +006: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] > sa=[abc def] ua=[<4> <5>] uuid=<0> > +007: {"error":null,"result":[{"uuid":["uuid","<6>"]}]} > +008: i=-1 r=125 b=false s= u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] > uuid=<6> > +008: i=0 r=123.5 b=true s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> > +008: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] > sa=[abc def] ua=[<4> <5>] uuid=<0> > +009: {"error":null,"result":[{"count":2}]} > +010: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] > sa=[] ua=[] uuid=<6> > +010: i=0 r=123.5 b=true s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] > uuid=<1> > +010: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] > sa=[abc def] ua=[<4> <5>] uuid=<0> > +011: {"error":null,"result":[{"count":1}]} > +012: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] > sa=[] ua=[] uuid=<6> > +012: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] > sa=[abc def] ua=[<4> <5>] uuid=<0> > +013: reconnect > +014: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] > sa=[] ua=[] uuid=<6> > +014: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] > sa=[abc def] ua=[<4> <5>] uuid=<0> > +015: done > +]]) > + > m4_define([OVSDB_CHECK_IDL_TRACK_C], > [AT_SETUP([$1 - C]) > AT_KEYWORDS([ovsdb server idl tracking positive $5]) > -- > 2.1.4 > > > _______________________________________________ > dev mailing list > dev@openvswitch.org > http://openvswitch.org/mailman/listinfo/dev > _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev