On Mon, Feb 1, 2016 at 12:33 PM, Liran Schour <lir...@il.ibm.com> wrote:

> Python idl works now with "monitor_cond" method. Add test
> for backward compatibility with old "monitor" method.
>
> Signed-off-by: Liran Schour <lir...@il.ibm.com>
>
> ---
> v2->v3:
> *cond_update() receives a single condition
> ---
>  python/ovs/db/data.py |  18 ++++-
>  python/ovs/db/idl.py  | 181
> ++++++++++++++++++++++++++++++++++++++++++++------
>  tests/ovsdb-idl.at    |  97 +++++++++++++++++++++++++++
>  3 files changed, 271 insertions(+), 25 deletions(-)
>
> diff --git a/python/ovs/db/data.py b/python/ovs/db/data.py
> index 3075ee6..162ab19 100644
> --- a/python/ovs/db/data.py
> +++ b/python/ovs/db/data.py
> @@ -146,7 +146,7 @@ class Atom(object):
>                  % (self.to_string(), base.enum.to_string()))
>          elif base.type in [ovs.db.types.IntegerType,
> ovs.db.types.RealType]:
>              if ((base.min is None or self.value >= base.min) and
> -                (base.max is None or self.value <= base.max)):
> +                    (base.max is None or self.value <= base.max)):
>                  pass
>              elif base.min is not None and base.max is not None:
>                  raise ConstraintViolation(
> @@ -155,7 +155,7 @@ class Atom(object):
>              elif base.min is not None:
>                  raise ConstraintViolation(
>                      "%s is less than minimum allowed value %.15g"
> -                            % (self.to_string(), base.min))
> +                    % (self.to_string(), base.min))
>              else:
>                  raise ConstraintViolation(
>                      "%s is greater than maximum allowed value %.15g"
> @@ -313,7 +313,7 @@ class Datum(object):
>          that this function accepts."""
>          is_map = type_.is_map()
>          if (is_map or
> -            (type(json) == list and len(json) > 0 and json[0] == "set")):
> +                (type(json) == list and len(json) > 0 and json[0] ==
> "set")):
>              if is_map:
>                  class_ = "map"
>              else:
> @@ -388,6 +388,18 @@ class Datum(object):
>              s.append(tail)
>          return ''.join(s)
>
> +    def diff(self, datum):
> +        if self.type.n_max > 1 or len(self.values) == 0:
> +            for k, v in six.iteritems(datum.values):
> +                if k in self.values and v == self.values[k]:
> +                    del self.values[k]
> +                else:
> +                    self.values[k] = v
> +        else:
> +            return datum
> +
> +        return self
> +
>      def as_list(self):
>          if self.type.is_map():
>              return [[k.value, v.value] for k, v in
> six.iteritems(self.values)]
> diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py
> index 3187db9..693ec91 100644
> --- a/python/ovs/db/idl.py
> +++ b/python/ovs/db/idl.py
> @@ -32,6 +32,9 @@ ROW_CREATE = "create"
>  ROW_UPDATE = "update"
>  ROW_DELETE = "delete"
>
> +OVSDB_UPDATE = 0
> +OVSDB_UPDATE2 = 1
> +
>
>  class Idl(object):
>      """Open vSwitch Database Interface Definition Language (OVSDB IDL).
> @@ -85,6 +88,10 @@ class Idl(object):
>        currently being constructed, if there is one, or None otherwise.
>  """
>
> +    IDL_S_INITIAL = 0
> +    IDL_S_MONITOR_REQUESTED = 1
> +    IDL_S_MONITOR_COND_REQUESTED = 2
> +
>      def __init__(self, remote, schema):
>          """Creates and returns a connection to the database named
> 'db_name' on
>          'remote', which should be in a form acceptable to
> @@ -115,6 +122,8 @@ class Idl(object):
>          self._monitor_request_id = None
>          self._last_seqno = None
>          self.change_seqno = 0
> +        self.uuid = uuid.uuid1()
> +        self.state = self.IDL_S_INITIAL
>
>          # Database locking.
>          self.lock_name = None          # Name of lock we need, None if
> none.
> @@ -133,6 +142,7 @@ class Idl(object):
>              table.need_table = False
>              table.rows = {}
>              table.idl = self
> +            table.condition = []
>
>      def close(self):
>          """Closes the connection to the database.  The IDL will no longer
> @@ -179,11 +189,15 @@ class Idl(object):
>              if msg is None:
>                  break
>              if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
> -                and msg.method == "update"
> -                and len(msg.params) == 2
> -                and msg.params[0] is None):
> +                    and msg.method == "update2"
> +                    and len(msg.params) == 2):
> +                # Database contents changed.
> +                self.__parse_update(msg.params[1], OVSDB_UPDATE2)
> +            elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
> +                    and msg.method == "update"
> +                    and len(msg.params) == 2):
>                  # Database contents changed.
> -                self.__parse_update(msg.params[1])
> +                self.__parse_update(msg.params[1], OVSDB_UPDATE)
>              elif (msg.type == ovs.jsonrpc.Message.T_REPLY
>                    and self._monitor_request_id is not None
>                    and self._monitor_request_id == msg.id):
> @@ -192,10 +206,15 @@ class Idl(object):
>                      self.change_seqno += 1
>                      self._monitor_request_id = None
>                      self.__clear()
> -                    self.__parse_update(msg.result)
> -                except error.Error as e:
> +                    if self.state == self.IDL_S_MONITOR_COND_REQUESTED:
> +                        self.__parse_update(msg.result, OVSDB_UPDATE2)
> +                    else:
> +                        assert self.state == self.IDL_S_MONITOR_REQUESTED
> +                        self.__parse_update(msg.result, OVSDB_UPDATE)
> +
> +                except error.Error, e:
>                      vlog.err("%s: parse error in received schema: %s"
> -                              % (self._session.get_name(), e))
> +                             % (self._session.get_name(), e))
>                      self.__error()
>              elif (msg.type == ovs.jsonrpc.Message.T_REPLY
>                    and self._lock_request_id is not None
> @@ -213,6 +232,11 @@ class Idl(object):
>              elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id ==
> "echo":
>                  # Reply to our echo request.  Ignore it.
>                  pass
> +            elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
> +                  self.state == self.IDL_S_MONITOR_COND_REQUESTED and
> +                  self._monitor_request_id == msg.id):
> +                if msg.error == "unknown method":
> +                    self.__send_monitor_request()
>              elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
>                                 ovs.jsonrpc.Message.T_REPLY)
>                    and self.__txn_process_reply(msg)):
> @@ -227,6 +251,19 @@ class Idl(object):
>
>          return initial_change_seqno != self.change_seqno
>
> +    def cond_update(self, table_name, cond):
> +        """Change conditions for this IDL session. If session is not
> already
> +        connected, add condtion to table and submit it on
> send_monitor_request.
> +        Otherwise  send monitor_cond_update method with the requested
> +        changes."""
> +        table = self.tables.get(table_name)
> +        if not table:
> +            raise error.Error('Unknown table "%s"' % table_name)
> +        if self._session.is_connected():
> +            self.__send_cond_update(table, cond)
> +        else:
> +            table.condition = cond
> +
>      def wait(self, poller):
>          """Arranges for poller.block() to wake up when self.run() has
> something
>          to do or when activity occurs on a transaction on 'self'."""
> @@ -282,6 +319,14 @@ class Idl(object):
>          :type updates:  Row
>          """
>
> +    def __send_cond_update(self, table, cond):
> +        monitor_cond_update = {table.name: [{"where": cond}]}
> +        old_uuid = str(self.uuid)
> +        self.uuid = uuid.uuid1()
> +        params = [old_uuid, str(self.uuid), monitor_cond_update]
> +        msg = ovs.jsonrpc.Message.create_request("monitor_cond_update",
> params)
> +        self._session.send(msg)
> +
>      def __clear(self):
>          changed = False
>
> @@ -331,36 +376,47 @@ class Idl(object):
>
>      def __parse_lock_notify(self, params, new_has_lock):
>          if (self.lock_name is not None
> -            and type(params) in (list, tuple)
> -            and params
> -            and params[0] == self.lock_name):
> +                and type(params) in (list, tuple)
> +                and params
> +                and params[0] == self.lock_name):
>              self.__update_has_lock(new_has_lock)
>              if not new_has_lock:
>                  self.is_lock_contended = True
>
>      def __send_monitor_request(self):
> +        if self.state == self.IDL_S_INITIAL:
> +            self.state = self.IDL_S_MONITOR_COND_REQUESTED
> +            method = "monitor_cond"
> +        else:
> +            self.state = self.IDL_S_MONITOR_REQUESTED
> +            method = "monitor"
> +
>          monitor_requests = {}
>          for table in six.itervalues(self.tables):
>              columns = []
>              for column in six.iterkeys(table.columns):
>                  if ((table.name not in self.readonly) or
> -                    (table.name in self.readonly) and
> -                    (column not in self.readonly[table.name])):
> +                        (table.name in self.readonly) and
> +                        (column not in self.readonly[table.name])):
>                      columns.append(column)
>              monitor_requests[table.name] = {"columns": columns}
> +            if method == "monitor_cond" and table.condition:
> +                monitor_requests[table.name]["where"] = table.condition
> +                table.condition = None
> +
>          msg = ovs.jsonrpc.Message.create_request(
> -            "monitor", [self._db.name, None, monitor_requests])
> +            method, [self._db.name, str(self.uuid), monitor_requests])
>          self._monitor_request_id = msg.id
>          self._session.send(msg)
>
> -    def __parse_update(self, update):
> +    def __parse_update(self, update, version):
>          try:
> -            self.__do_parse_update(update)
> -        except error.Error as e:
> +            self.__do_parse_update(update, version)
> +        except error.Error, e:
>              vlog.err("%s: error parsing update: %s"
>                       % (self._session.get_name(), e))
>
> -    def __do_parse_update(self, table_updates):
> +    def __do_parse_update(self, table_updates, version):
>          if type(table_updates) != dict:
>              raise error.Error("<table-updates> is not an object",
>                                table_updates)
> @@ -389,6 +445,11 @@ class Idl(object):
>                                        'is not an object'
>                                        % (table_name, uuid_string))
>
> +                if version == OVSDB_UPDATE2:
> +                    if self.__process_update2(table, uuid, row_update):
> +                        self.change_seqno += 1
> +                    continue
> +
>                  parser = ovs.db.parser.Parser(row_update, "row-update")
>                  old = parser.get_optional("old", [dict])
>                  new = parser.get_optional("new", [dict])
> @@ -401,6 +462,46 @@ class Idl(object):
>                  if self.__process_update(table, uuid, old, new):
>                      self.change_seqno += 1
>
> +    def __process_update2(self, table, uuid, row_update):
> +        row = table.rows.get(uuid)
> +        changed = False
> +        if "delete" in row_update:
> +            if row:
> +                del table.rows[uuid]
> +                self.notify(ROW_DELETE, row)
> +                changed = True
> +            else:
> +                # XXX rate-limit
> +                vlog.warn("cannot delete missing row %s from table"
> +                          "%s" % (uuid, table.name))
> +        elif "insert" in row_update or "initial" in row_update:
> +            if row:
> +                vlog.warn("cannot add existing row %s from table"
> +                          " %s" % (uuid, table.name))
> +                del table.rows[uuid]
> +            row = self.__create_row(table, uuid)
> +            if "insert" in row_update:
> +                row_update = row_update['insert']
> +            else:
> +                row_update = row_update['initial']
> +            self.__add_default(table, row_update)
> +            if self.__row_update(table, row, row_update):
> +                changed = True
> +                self.notify(ROW_CREATE, row)
> +        elif "modify" in row_update:
> +            if not row:
> +                raise error.Error('Modify non-existing row')
> +
> +            self.__apply_diff(table, row, row_update['modify'])
> +            self.notify(ROW_UPDATE, row,
> +                        Row.from_json(self, table,
> +                                      uuid, row_update['modify']))
>


I think the the third parameter to the "notify" function (i.e updates)
should be the Row with the old values (i.e before the update diff
is applied).

When the "notify" is called,  since the 2nd param  "row" is row  object
with diff already applied and the 3rd param with modified values, the
notify handler function will not be able to figure out what columns changed
in the Row.

Have a look at here [1]. In the "monitor v1 update", it is passing the Row
with the old values.

As per the documentation of "notify" here  [2], what you are doing is
right, but the old implementation is different. I think its better to
update the documentation as well and be consistent with "v1".

Neutron ovn plugin implements the "notify" function and expects the the 3rd
param for "update" to be old row before the update is applied. [3]

[1] -
https://github.com/openvswitch/ovs/blob/master/python/ovs/db/idl.py#L441
[2] -
https://github.com/openvswitch/ovs/blob/master/python/ovs/db/idl.py#L281
[3] -
https://github.com/openstack/networking-ovn/blob/master/networking_ovn/ovsdb/ovsdb_monitor.py#L179

https://github.com/openstack/networking-ovn/blob/master/networking_ovn/ovsdb/row_event.py#L49




> +            changed = True
> +        else:
> +            raise error.Error('<row-update> unknown operation',
> +                              row_update)
> +        return changed
> +
>      def __process_update(self, table, uuid, old, new):
>          """Returns True if a column changed, False otherwise."""
>          row = table.rows.get(uuid)
> @@ -441,6 +542,42 @@ class Idl(object):
>                  self.notify(op, row, Row.from_json(self, table, uuid,
> old))
>          return changed
>
> +    def __column_name(self, column):
> +        if column.type.key.type == ovs.db.types.UuidType:
> +            return ovs.ovsuuid.to_json(column.type.key.type.default)
> +        else:
> +            return column.type.key.type.default
> +
> +    def __add_default(self, table, row_update):
> +        for column in table.columns.itervalues():
> +            if column.name not in row_update:
> +                if ((table.name not in self.readonly) or
> +                        (table.name in self.readonly) and
> +                        (column.name not in self.readonly[table.name])):
> +                    if column.type.n_min != 0 and not
> column.type.is_map():
> +                        row_update[column.name] =
> self.__column_name(column)
> +
> +    def __apply_diff(self, table, row, row_diff):
> +        for column_name, datum_json in row_diff.iteritems():
> +            column = table.columns.get(column_name)
> +            if not column:
> +                # XXX rate-limit
> +                vlog.warn("unknown column %s updating table %s"
> +                          % (column_name, table.name))
> +                continue
> +
> +            try:
> +                datum = ovs.db.data.Datum.from_json(column.type,
> datum_json)
> +            except error.Error, e:
> +                # XXX rate-limit
> +                vlog.warn("error parsing column %s in table %s: %s"
> +                          % (column_name, table.name, e))
> +                continue
> +
> +            datum = row._data[column_name].diff(datum)
> +            if datum != row._data[column_name]:
> +                row._data[column_name] = datum
> +
>      def __row_update(self, table, row, row_json):
>          changed = False
>          for column_name, datum_json in six.iteritems(row_json):
> @@ -593,7 +730,7 @@ class Row(object):
>          assert self._idl.txn
>
>          if ((self._table.name in self._idl.readonly) and
> -            (column_name in self._idl.readonly[self._table.name])):
> +                (column_name in self._idl.readonly[self._table.name])):
>              vlog.warn("attempting to write to readonly column %s"
>                        % column_name)
>              return
> @@ -829,8 +966,8 @@ class Transaction(object):
>      def _substitute_uuids(self, json):
>          if type(json) in (list, tuple):
>              if (len(json) == 2
> -                and json[0] == 'uuid'
> -                and ovs.ovsuuid.is_valid_string(json[1])):
> +                    and json[0] == 'uuid'
> +                    and ovs.ovsuuid.is_valid_string(json[1])):
>                  uuid = ovs.ovsuuid.from_string(json[1])
>                  row = self._txn_rows.get(uuid, None)
>                  if row and row._data is None:
> @@ -967,14 +1104,14 @@ class Transaction(object):
>                  for column_name, datum in six.iteritems(row._changes):
>                      if row._data is not None or not datum.is_default():
>                          row_json[column_name] = (
> -                                self._substitute_uuids(datum.to_json()))
> +                            self._substitute_uuids(datum.to_json()))
>
>                          # If anything really changed, consider it an
> update.
>                          # We can't suppress not-really-changed values
> earlier
>                          # or transactions would become nonatomic (see the
> big
>                          # comment inside Transaction._write()).
>                          if (not any_updates and row._data is not None and
> -                            row._data[column_name] != datum):
> +                                row._data[column_name] != datum):
>                              any_updates = True
>
>                  if row._data is None or row_json:
> diff --git a/tests/ovsdb-idl.at b/tests/ovsdb-idl.at
> index 4baac46..2238653 100644
> --- a/tests/ovsdb-idl.at
> +++ b/tests/ovsdb-idl.at
> @@ -646,6 +646,103 @@ OVSDB_CHECK_IDL_FETCH_COLUMNS([simple idl, initially
> populated],
>  003: done
>  ]])
>
> +m4_define([OVSDB_CHECK_IDL_WO_MONITOR_COND_PY],
> +  [AT_SETUP([$1 - Python])
> +   AT_SKIP_IF([test $HAVE_PYTHON = no])
> +   AT_KEYWORDS([ovsdb server idl Python monitor $4])
> +   AT_CHECK([ovsdb-tool create db $abs_srcdir/idltest.ovsschema],
> +                  [0], [stdout], [ignore])
> +   AT_CHECK([ovsdb-server '-vPATTERN:console:ovsdb-server|%c|%m' --detach
> --no-chdir --pidfile="`pwd`"/pid --remote=punix:socket
> --unixctl="`pwd`"/unixctl db], [0], [ignore], [ignore])
> +   AT_CHECK([ovs-appctl -t "`pwd`"/unixctl
> ovsdb-server/disable-monitor-cond])
> +   AT_CHECK([$PYTHON $srcdir/test-ovsdb.py  -t10 idl
> $srcdir/idltest.ovsschema unix:socket $2],
> +            [0], [stdout], [ignore], [kill `cat pid`])
> +   AT_CHECK([sort stdout | ${PERL} $srcdir/uuidfilt.pl]m4_if([$5],,, [[|
> $5]]),
> +            [0], [$3], [], [kill `cat pid`])
> +   OVSDB_SERVER_SHUTDOWN
> +   AT_CLEANUP])
> +
> +
> +m4_define([OVSDB_CHECK_IDL_WO_MONITOR_COND],
> +   [OVSDB_CHECK_IDL_WO_MONITOR_COND_PY($@)])
> +
> +
> +OVSDB_CHECK_IDL_WO_MONITOR_COND([simple idl disable monitor-cond],
> +  [['["idltest",
> +      {"op": "insert",
> +       "table": "simple",
> +       "row": {"i": 1,
> +               "r": 2.0,
> +               "b": true,
> +               "s": "mystring",
> +               "u": ["uuid", "84f5c8f5-ac76-4dbc-a24f-8860eb407fc1"],
> +               "ia": ["set", [1, 2, 3]],
> +               "ra": ["set", [-0.5]],
> +               "ba": ["set", [true]],
> +               "sa": ["set", ["abc", "def"]],
> +               "ua": ["set", [["uuid",
> "69443985-7806-45e2-b35f-574a04e720f9"],
> +                              ["uuid",
> "aad11ef0-816a-4b01-93e6-03b8b4256b98"]]]}},
> +      {"op": "insert",
> +       "table": "simple",
> +       "row": {}}]' \
> +    '["idltest",
> +      {"op": "update",
> +       "table": "simple",
> +       "where": [],
> +       "row": {"b": true}}]' \
> +    '["idltest",
> +      {"op": "update",
> +       "table": "simple",
> +       "where": [],
> +       "row": {"r": 123.5}}]' \
> +    '["idltest",
> +      {"op": "insert",
> +       "table": "simple",
> +       "row": {"i": -1,
> +               "r": 125,
> +               "b": false,
> +               "s": "",
> +               "ia": ["set", [1]],
> +               "ra": ["set", [1.5]],
> +               "ba": ["set", [false]],
> +               "sa": ["set", []],
> +               "ua": ["set", []]}}]' \
> +    '["idltest",
> +      {"op": "update",
> +       "table": "simple",
> +       "where": [["i", "<", 1]],
> +       "row": {"s": "newstring"}}]' \
> +    '["idltest",
> +      {"op": "delete",
> +       "table": "simple",
> +       "where": [["i", "==", 0]]}]' \
> +    'reconnect']],
> +  [[000: empty
> +001:
> {"error":null,"result":[{"uuid":["uuid","<0>"]},{"uuid":["uuid","<1>"]}]}
> +002: i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
> +002: i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true]
> sa=[abc def] ua=[<4> <5>] uuid=<0>
> +003: {"error":null,"result":[{"count":2}]}
> +004: i=0 r=0 b=true s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
> +004: i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true]
> sa=[abc def] ua=[<4> <5>] uuid=<0>
> +005: {"error":null,"result":[{"count":2}]}
> +006: i=0 r=123.5 b=true s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
> +006: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true]
> sa=[abc def] ua=[<4> <5>] uuid=<0>
> +007: {"error":null,"result":[{"uuid":["uuid","<6>"]}]}
> +008: i=-1 r=125 b=false s= u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[]
> uuid=<6>
> +008: i=0 r=123.5 b=true s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
> +008: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true]
> sa=[abc def] ua=[<4> <5>] uuid=<0>
> +009: {"error":null,"result":[{"count":2}]}
> +010: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false]
> sa=[] ua=[] uuid=<6>
> +010: i=0 r=123.5 b=true s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[]
> uuid=<1>
> +010: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true]
> sa=[abc def] ua=[<4> <5>] uuid=<0>
> +011: {"error":null,"result":[{"count":1}]}
> +012: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false]
> sa=[] ua=[] uuid=<6>
> +012: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true]
> sa=[abc def] ua=[<4> <5>] uuid=<0>
> +013: reconnect
> +014: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false]
> sa=[] ua=[] uuid=<6>
> +014: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true]
> sa=[abc def] ua=[<4> <5>] uuid=<0>
> +015: done
> +]])
> +
>  m4_define([OVSDB_CHECK_IDL_TRACK_C],
>    [AT_SETUP([$1 - C])
>     AT_KEYWORDS([ovsdb server idl tracking positive $5])
> --
> 2.1.4
>
>
> _______________________________________________
> dev mailing list
> dev@openvswitch.org
> http://openvswitch.org/mailman/listinfo/dev
>
_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to