Python idl works now with "monitor_cond" method. Add test for backward compatibility with old "monitor" method.
Signed-off-by: Liran Schour <lir...@il.ibm.com> --- python/ovs/db/data.py | 12 ++++ python/ovs/db/idl.py | 167 ++++++++++++++++++++++++++++++++++++++++++++++---- tests/ovsdb-idl.at | 97 +++++++++++++++++++++++++++++ 3 files changed, 265 insertions(+), 11 deletions(-) diff --git a/python/ovs/db/data.py b/python/ovs/db/data.py index 6baff38..0d382e1 100644 --- a/python/ovs/db/data.py +++ b/python/ovs/db/data.py @@ -386,6 +386,18 @@ class Datum(object): s.append(tail) return ''.join(s) + def diff(self, datum): + if self.type.n_max > 1 or len(self.values) == 0: + for k, v in datum.values.iteritems(): + if k in self.values and v == self.values[k]: + del self.values[k] + else: + self.values[k] = v + else: + return datum + + return self + def as_list(self): if self.type.is_map(): return [[k.value, v.value] for k, v in self.values.iteritems()] diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py index 17ed15b..e95f42e 100644 --- a/python/ovs/db/idl.py +++ b/python/ovs/db/idl.py @@ -30,6 +30,9 @@ ROW_CREATE = "create" ROW_UPDATE = "update" ROW_DELETE = "delete" +class Update_version(): + OVSDB_UPDATE = 0, + OVSDB_UPDATE2 = 1. class Idl(object): """Open vSwitch Database Interface Definition Language (OVSDB IDL). @@ -83,6 +86,10 @@ class Idl(object): currently being constructed, if there is one, or None otherwise. """ + IDL_S_INITIAL = 0 + IDL_S_MONITOR_REQUESTED = 1 + IDL_S_MONITOR_COND_REQUESTED = 2 + def __init__(self, remote, schema): """Creates and returns a connection to the database named 'db_name' on 'remote', which should be in a form acceptable to @@ -113,6 +120,8 @@ class Idl(object): self._monitor_request_id = None self._last_seqno = None self.change_seqno = 0 + self.uuid = uuid.uuid1() + self.state = self.IDL_S_INITIAL # Database locking. self.lock_name = None # Name of lock we need, None if none. @@ -131,6 +140,7 @@ class Idl(object): table.need_table = False table.rows = {} table.idl = self + table.condition = [] def close(self): """Closes the connection to the database. The IDL will no longer @@ -177,11 +187,15 @@ class Idl(object): if msg is None: break if (msg.type == ovs.jsonrpc.Message.T_NOTIFY + and msg.method == "update2" + and len(msg.params) == 2): + # Database contents changed. + self.__parse_update(msg.params[1], Update_version.OVSDB_UPDATE2) + elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.method == "update" - and len(msg.params) == 2 - and msg.params[0] is None): + and len(msg.params) == 2): # Database contents changed. - self.__parse_update(msg.params[1]) + self.__parse_update(msg.params[1], Update_version.OVSDB_UPDATE) elif (msg.type == ovs.jsonrpc.Message.T_REPLY and self._monitor_request_id is not None and self._monitor_request_id == msg.id): @@ -190,8 +204,13 @@ class Idl(object): self.change_seqno += 1 self._monitor_request_id = None self.__clear() - self.__parse_update(msg.result) - except error.Error as e: + if self.state == self.IDL_S_MONITOR_COND_REQUESTED: + self.__parse_update(msg.result, Update_version.OVSDB_UPDATE2) + else: + assert self.state == self.IDL_S_MONITOR_REQUESTED + self.__parse_update(msg.result, Update_version.OVSDB_UPDATE) + + except error.Error, e: vlog.err("%s: parse error in received schema: %s" % (self._session.get_name(), e)) self.__error() @@ -211,6 +230,11 @@ class Idl(object): elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo": # Reply to our echo request. Ignore it. pass + elif (msg.type == ovs.jsonrpc.Message.T_ERROR and + self.state == self.IDL_S_MONITOR_COND_REQUESTED and + self._monitor_request_id == msg.id): + if msg.error == "unknown method": + self.__send_monitor_request() elif (msg.type in (ovs.jsonrpc.Message.T_ERROR, ovs.jsonrpc.Message.T_REPLY) and self.__txn_process_reply(msg)): @@ -225,6 +249,21 @@ class Idl(object): return initial_change_seqno != self.change_seqno + def cond_update(self, table_name, add, remove): + """Change conditions for this IDL session. If session is not already + connected, add condtion to table and submit it on send_monitor_request. + Otherwise send monitor_cond_update method with the requested changes.""" + table = self.tables.get(table_name) + if not table: + raise error.Error('Unknown table "%s"' % table_name) + if self._session.is_connected(): + self.__send_cond_update(table, add ,remove) + else: + if remove: + raise error.Error('Non-empty remove condition on unconnected' + 'idl session') + self.__add_condition(table, add) + def wait(self, poller): """Arranges for poller.block() to wake up when self.run() has something to do or when activity occurs on a transaction on 'self'.""" @@ -280,6 +319,21 @@ class Idl(object): :type updates: Row """ + def __send_cond_update(self, table, add ,remove): + monitor_cond_update = {table.name: [{"added": add, + "removed": remove}]} + old_uuid = str(self.uuid) + self.uuid = uuid.uuid1() + params = [old_uuid, str(self.uuid), monitor_cond_update] + msg = ovs.jsonrpc.Message.create_request("monitor_cond_update", params) + msg_id = msg.id + self._session.send(msg) + + def __add_condition(self, table, add): + for clause in add: + if not clause in table.condition: + table.condition.append(clause) + def __clear(self): changed = False @@ -337,6 +391,13 @@ class Idl(object): self.is_lock_contended = True def __send_monitor_request(self): + if self.state == self.IDL_S_INITIAL: + self.state = self.IDL_S_MONITOR_COND_REQUESTED + method = "monitor_cond" + else: + self.state = self.IDL_S_MONITOR_REQUESTED + method = "monitor" + monitor_requests = {} for table in self.tables.itervalues(): columns = [] @@ -346,23 +407,26 @@ class Idl(object): (column not in self.readonly[table.name])): columns.append(column) monitor_requests[table.name] = {"columns": columns} + if method == "monitor_cond" and table.condition: + monitor_requests[table.name]["where"] = table.condition + table.condition = None + msg = ovs.jsonrpc.Message.create_request( - "monitor", [self._db.name, None, monitor_requests]) + method, [self._db.name, str(self.uuid), monitor_requests]) self._monitor_request_id = msg.id self._session.send(msg) - def __parse_update(self, update): + def __parse_update(self, update, version): try: - self.__do_parse_update(update) - except error.Error as e: + self.__do_parse_update(update, version) + except error.Error, e: vlog.err("%s: error parsing update: %s" % (self._session.get_name(), e)) - def __do_parse_update(self, table_updates): + def __do_parse_update(self, table_updates, version): if type(table_updates) != dict: raise error.Error("<table-updates> is not an object", table_updates) - for table_name, table_update in table_updates.iteritems(): table = self.tables.get(table_name) if not table: @@ -387,6 +451,11 @@ class Idl(object): 'is not an object' % (table_name, uuid_string)) + if version == Update_version.OVSDB_UPDATE2: + if self.__process_update2(table, uuid, row_update): + self.change_seqno += 1 + continue + parser = ovs.db.parser.Parser(row_update, "row-update") old = parser.get_optional("old", [dict]) new = parser.get_optional("new", [dict]) @@ -399,6 +468,46 @@ class Idl(object): if self.__process_update(table, uuid, old, new): self.change_seqno += 1 + def __process_update2(self, table, uuid, row_update): + row = table.rows.get(uuid) + changed = False + if "delete" in row_update: + if row: + del table.rows[uuid] + self.notify(ROW_DELETE, row) + changed = True + else: + # XXX rate-limit + vlog.warn("cannot delete missing row %s from table" + "%s" % (uuid, table.name)) + elif "insert" in row_update or "initial" in row_update: + if row: + vlog.warn("cannot add existing row %s from table" + " %s" % (uuid, table.name)) + del table.rows[uuid] + row = self.__create_row(table, uuid) + if "insert" in row_update: + row_update = row_update['insert'] + else: + row_update = row_update['initial'] + self.__add_default(table, row_update) + if self.__row_update(table, row, row_update): + changed = True + self.notify(ROW_CREATE, row) + elif "modify" in row_update: + if not row: + raise error.Error('Modify non-existing row') + + self.__apply_diff(table, row, row_update['modify']) + self.notify(ROW_UPDATE, row, + Row.from_json(self, table, + uuid, row_update['modify'])) + changed = True + else: + raise error.Error('<row-update> unknown operation', + row_update) + return changed + def __process_update(self, table, uuid, old, new): """Returns True if a column changed, False otherwise.""" row = table.rows.get(uuid) @@ -439,6 +548,42 @@ class Idl(object): self.notify(op, row, Row.from_json(self, table, uuid, old)) return changed + def __add_default(self, table, row_update): + for column in table.columns.itervalues(): + if column.name not in row_update: + if ((table.name not in self.readonly) or + (table.name in self.readonly) and + (column.name not in self.readonly[table.name])): + if column.type.n_min != 0 and not column.type.is_map(): + if column.type.key.type == ovs.db.types.UuidType: + row_update[column.name] = ovs.ovsuuid.to_json(column.type.key.type.default) + else: + row_update[column.name] = column.type.key.type.default + + def __apply_diff(self, table, row, row_diff): + data = {} + for column_name, datum_json in row_diff.iteritems(): + column = table.columns.get(column_name) + if not column: + # XXX rate-limit + vlog.warn("unknown column %s updating table %s" + % (column_name, table.name)) + continue + + try: + datum = ovs.db.data.Datum.from_json(column.type, datum_json) + except error.Error, e: + # XXX rate-limit + vlog.warn("error parsing column %s in table %s: %s" + % (column_name, table.name, e)) + continue + + datum = row._data[column_name].diff(datum) + if datum != row._data[column_name]: + row._data[column_name] = datum + if column.alert: + changed = True + def __row_update(self, table, row, row_json): changed = False for column_name, datum_json in row_json.iteritems(): diff --git a/tests/ovsdb-idl.at b/tests/ovsdb-idl.at index 4baac46..2238653 100644 --- a/tests/ovsdb-idl.at +++ b/tests/ovsdb-idl.at @@ -646,6 +646,103 @@ OVSDB_CHECK_IDL_FETCH_COLUMNS([simple idl, initially populated], 003: done ]]) +m4_define([OVSDB_CHECK_IDL_WO_MONITOR_COND_PY], + [AT_SETUP([$1 - Python]) + AT_SKIP_IF([test $HAVE_PYTHON = no]) + AT_KEYWORDS([ovsdb server idl Python monitor $4]) + AT_CHECK([ovsdb-tool create db $abs_srcdir/idltest.ovsschema], + [0], [stdout], [ignore]) + AT_CHECK([ovsdb-server '-vPATTERN:console:ovsdb-server|%c|%m' --detach --no-chdir --pidfile="`pwd`"/pid --remote=punix:socket --unixctl="`pwd`"/unixctl db], [0], [ignore], [ignore]) + AT_CHECK([ovs-appctl -t "`pwd`"/unixctl ovsdb-server/disable-monitor-cond]) + AT_CHECK([$PYTHON $srcdir/test-ovsdb.py -t10 idl $srcdir/idltest.ovsschema unix:socket $2], + [0], [stdout], [ignore], [kill `cat pid`]) + AT_CHECK([sort stdout | ${PERL} $srcdir/uuidfilt.pl]m4_if([$5],,, [[| $5]]), + [0], [$3], [], [kill `cat pid`]) + OVSDB_SERVER_SHUTDOWN + AT_CLEANUP]) + + +m4_define([OVSDB_CHECK_IDL_WO_MONITOR_COND], + [OVSDB_CHECK_IDL_WO_MONITOR_COND_PY($@)]) + + +OVSDB_CHECK_IDL_WO_MONITOR_COND([simple idl disable monitor-cond], + [['["idltest", + {"op": "insert", + "table": "simple", + "row": {"i": 1, + "r": 2.0, + "b": true, + "s": "mystring", + "u": ["uuid", "84f5c8f5-ac76-4dbc-a24f-8860eb407fc1"], + "ia": ["set", [1, 2, 3]], + "ra": ["set", [-0.5]], + "ba": ["set", [true]], + "sa": ["set", ["abc", "def"]], + "ua": ["set", [["uuid", "69443985-7806-45e2-b35f-574a04e720f9"], + ["uuid", "aad11ef0-816a-4b01-93e6-03b8b4256b98"]]]}}, + {"op": "insert", + "table": "simple", + "row": {}}]' \ + '["idltest", + {"op": "update", + "table": "simple", + "where": [], + "row": {"b": true}}]' \ + '["idltest", + {"op": "update", + "table": "simple", + "where": [], + "row": {"r": 123.5}}]' \ + '["idltest", + {"op": "insert", + "table": "simple", + "row": {"i": -1, + "r": 125, + "b": false, + "s": "", + "ia": ["set", [1]], + "ra": ["set", [1.5]], + "ba": ["set", [false]], + "sa": ["set", []], + "ua": ["set", []]}}]' \ + '["idltest", + {"op": "update", + "table": "simple", + "where": [["i", "<", 1]], + "row": {"s": "newstring"}}]' \ + '["idltest", + {"op": "delete", + "table": "simple", + "where": [["i", "==", 0]]}]' \ + 'reconnect']], + [[000: empty +001: {"error":null,"result":[{"uuid":["uuid","<0>"]},{"uuid":["uuid","<1>"]}]} +002: i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> +002: i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> +003: {"error":null,"result":[{"count":2}]} +004: i=0 r=0 b=true s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> +004: i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> +005: {"error":null,"result":[{"count":2}]} +006: i=0 r=123.5 b=true s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> +006: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> +007: {"error":null,"result":[{"uuid":["uuid","<6>"]}]} +008: i=-1 r=125 b=false s= u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6> +008: i=0 r=123.5 b=true s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> +008: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> +009: {"error":null,"result":[{"count":2}]} +010: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6> +010: i=0 r=123.5 b=true s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> +010: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> +011: {"error":null,"result":[{"count":1}]} +012: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6> +012: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> +013: reconnect +014: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6> +014: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> +015: done +]]) + m4_define([OVSDB_CHECK_IDL_TRACK_C], [AT_SETUP([$1 - C]) AT_KEYWORDS([ovsdb server idl tracking positive $5]) -- 2.1.4 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev