I applied this to master. Thank you!
On Tue, Nov 10, 2015 at 05:21:04PM +0000, Ansari, Shad wrote: > (resubmitting this patch for comments/review) > > There is currently no mechanism in IDL to fetch specific column values > on-demand without having to register them for monitoring. In the case > where the column represent a frequently changing entity (e.g. counter), > and the reads are relatively infrequent (e.g. CLI client), there is a > significant overhead in replication. > > This patch adds support in the Python IDL to register a subset of the > columns of a table as "readonly". Readonly columns are not replicated. > Users may "fetch" the readonly columns of a row on-demand. Once fetched, > the columns are not updated until the next fetch by the user. Writes by > the user to readonly columns does not change the value (both locally or > on the server). > > The two main user visible changes in this patch are: > - The SchemaHelper.register_columns() method now takes an optionaly > argument to specify the subset of readonly column(s) > - A new Row.fetch(columns) method to fetch values of readonly columns(s) > > Usage: > ------ > > # Schema file includes all columns, including readonly > schema_helper = ovs.db.idl.SchemaHelper(schema_file) > > # Register interest in columns with 'r' and 's' as readonly > schema_helper.register_columns("simple", [i, r, s], [r, s]) > > # Create Idl and jsonrpc, and wait for update, as usual > ... > > # Fetch value of column 'r' for a specific row > row.fetch('r') > txn.commit_block() > > print row.r > print getattr(row, 'r') > > Signed-off-by: Shad Ansari <shad.ans...@hp.com<mailto:shad.ans...@hp.com>> > --- > python/ovs/db/idl.py | 86 ++++++++++++++++++++++++++++++++++++-- > tests/ovsdb-idl.at | 47 +++++++++++++++++++++ > tests/test-ovsdb.py | 114 +++++++++++++++++++++++++++++++++------------------ > 3 files changed, 204 insertions(+), 43 deletions(-) > > diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py > index f074dbf..c8990c7 100644 > --- a/python/ovs/db/idl.py > +++ b/python/ovs/db/idl.py > @@ -107,6 +107,7 @@ class Idl(object): > schema = schema.get_idl_schema() > > self.tables = schema.tables > + self.readonly = schema.readonly > self._db = schema > self._session = ovs.jsonrpc.Session.open(remote) > self._monitor_request_id = None > @@ -338,7 +339,13 @@ class Idl(object): > def __send_monitor_request(self): > monitor_requests = {} > for table in self.tables.itervalues(): > - monitor_requests[table.name] = {"columns": table.columns.keys()} > + columns = [] > + for column in table.columns.keys(): > + if ((table.name not in self.readonly) or > + (table.name in self.readonly) and > + (column not in self.readonly[table.name])): > + columns.append(column) > + monitor_requests[table.name] = {"columns": columns} > msg = ovs.jsonrpc.Message.create_request( > "monitor", [self._db.name, None, monitor_requests]) > self._monitor_request_id = msg.id > @@ -571,7 +578,11 @@ class Row(object): > if self._data is None: > raise AttributeError("%s instance has no attribute '%s'" % > (self.__class__.__name__, column_name)) > - datum = self._data[column_name] > + if column_name in self._data: > + datum = self._data[column_name] > + else: > + raise AttributeError("%s instance has no attribute '%s'" % > + (self.__class__.__name__, column_name)) > > return datum.to_python(_uuid_to_row) > > @@ -579,6 +590,11 @@ class Row(object): > assert self._changes is not None > assert self._idl.txn > > + if ((self._table.name in self._idl.readonly) and > + (column_name in self._idl.readonly[self._table.name])): > + vlog.warn("attempting to write to readonly column %s" % > column_name) > + return > + > column = self._table.columns[column_name] > try: > datum = ovs.db.data.Datum.from_python(column.type, value, > @@ -655,6 +671,9 @@ class Row(object): > self.__dict__["_changes"] = None > del self._table.rows[self.uuid] > > + def fetch(self, column_name): > + self._idl.txn._fetch(self, column_name) > + > def increment(self, column_name): > """Causes the transaction, when committed, to increment the value of > 'column_name' within this row by 1. 'column_name' must have an > integer > @@ -777,10 +796,12 @@ class Transaction(object): > self._inc_row = None > self._inc_column = None > > + self._fetch_requests = [] > + > self._inserted_rows = {} # Map from UUID to _InsertedRow > > def add_comment(self, comment): > - """Appens 'comment' to the comments that will be passed to the OVSDB > + """Appends 'comment' to the comments that will be passed to the OVSDB > server when this transaction is committed. (The comment will be > committed to the OVSDB log, which "ovsdb-tool show-log" can print in > a > relatively human-readable form.)""" > @@ -947,6 +968,16 @@ class Transaction(object): > if row._data is None or row_json: > operations.append(op) > > + if self._fetch_requests: > + for fetch in self._fetch_requests: > + fetch["index"] = len(operations) - 1 > + operations.append({"op": "select", > + "table": fetch["row"]._table.name, > + "where": self._substitute_uuids( > + > _where_uuid_equals(fetch["row"].uuid)), > + "columns": [fetch["column_name"]]}) > + any_updates = True > + > # Add increment. > if self._inc_row and any_updates: > self._inc_index = len(operations) - 1 > @@ -1057,6 +1088,9 @@ class Transaction(object): > self._inc_row = row > self._inc_column = column > > + def _fetch(self, row, column_name): > + self._fetch_requests.append({"row":row, "column_name":column_name}) > + > def _write(self, row, column, datum): > assert row._changes is not None > > @@ -1139,6 +1173,11 @@ class Transaction(object): > if not soft_errors and not hard_errors and not lock_errors: > if self._inc_row and not self.__process_inc_reply(ops): > hard_errors = True > + if self._fetch_requests: > + if self.__process_fetch_reply(ops): > + self.idl.change_seqno += 1 > + else: > + hard_errors = True > > for insert in self._inserted_rows.itervalues(): > if not self.__process_insert_reply(insert, ops): > @@ -1166,6 +1205,38 @@ class Transaction(object): > else: > return True > > + def __process_fetch_reply(self, ops): > + update = False > + for fetch_request in self._fetch_requests: > + row = fetch_request["row"] > + column_name = fetch_request["column_name"] > + index = fetch_request["index"] > + table = row._table > + > + select = ops[index] > + fetched_rows = select.get("rows") > + if not Transaction.__check_json_type(fetched_rows, (list, tuple), > + '"select" reply "rows"'): > + return False > + if len(fetched_rows) != 1: > + # XXX rate-limit > + vlog.warn('"select" reply "rows" has %d elements ' > + 'instead of 1' % len(rows)) > + continue > + fetched_row = fetched_rows[0] > + if not Transaction.__check_json_type(fetched_row, (dict,), > + '"select" reply row'): > + continue > + > + column = table.columns.get(column_name) > + datum_json = fetched_row.get(column_name) > + datum = ovs.db.data.Datum.from_json(column.type, datum_json) > + > + row._data[column_name] = datum > + update = True > + > + return update > + > def __process_inc_reply(self, ops): > if self._inc_index + 2 > len(ops): > # XXX rate-limit > @@ -1261,16 +1332,21 @@ class SchemaHelper(object): > > self.schema_json = schema_json > self._tables = {} > + self._readonly = {} > self._all = False > > - def register_columns(self, table, columns): > + def register_columns(self, table, columns, readonly=[]): > """Registers interest in the given 'columns' of 'table'. Future > calls > to get_idl_schema() will include 'table':column for each column in > 'columns'. This function automatically avoids adding duplicate > entries > to the schema. > + A subset of 'columns' can be specified as 'readonly'. The readonly > + columns are not replicated but can be fetched on-demand by the user > + with Row.fetch(). > > 'table' must be a string. > 'columns' must be a list of strings. > + 'readonly' must be a list of strings. > """ > > assert type(table) is str > @@ -1278,6 +1354,7 @@ class SchemaHelper(object): > > columns = set(columns) | self._tables.get(table, set()) > self._tables[table] = columns > + self._readonly[table] = readonly > > def register_table(self, table): > """Registers interest in the given all columns of 'table'. Future > calls > @@ -1307,6 +1384,7 @@ class SchemaHelper(object): > self._keep_table_columns(schema, table, columns)) > > schema.tables = schema_tables > + schema.readonly = self._readonly > return schema > > def _keep_table_columns(self, schema, table_name, columns): > diff --git a/tests/ovsdb-idl.at b/tests/ovsdb-idl.at > index d3d2aeb..c7b2582 100644 > --- a/tests/ovsdb-idl.at > +++ b/tests/ovsdb-idl.at > @@ -598,3 +598,50 @@ AT_CHECK([grep '"monitor"' stderr | grep -c '"ua"'], > [0], [1 > ]) > OVSDB_SERVER_SHUTDOWN > AT_CLEANUP > + > +m4_define([OVSDB_CHECK_IDL_FETCH_COLUMNS_PY], > + [AT_SETUP([$1 - Python fetch]) > + AT_SKIP_IF([test $HAVE_PYTHON = no]) > + AT_KEYWORDS([ovsdb server idl positive Python increment fetch $6]) > + AT_CHECK([ovsdb-tool create db $abs_srcdir/idltest.ovsschema], > + [0], [stdout], [ignore]) > + AT_CHECK([ovsdb-server '-vPATTERN:console:ovsdb-server|%c|%m' --detach > --no-chdir --pidfile="`pwd`"/pid --remote=punix:socket > --unixctl="`pwd`"/unixctl db], [0], [ignore], [ignore]) > + m4_if([$2], [], [], > + [AT_CHECK([ovsdb-client transact unix:socket $2], [0], [ignore], > [ignore], [kill `cat pid`])]) > + AT_CHECK([$PYTHON $srcdir/test-ovsdb.py -t10 idl > $srcdir/idltest.ovsschema unix:socket [$3] $4], > + [0], [stdout], [ignore], [kill `cat pid`]) > + AT_CHECK([sort stdout | ${PERL} $srcdir/uuidfilt.pl]m4_if([$7],,, [[| > $7]]), > + [0], [$5], [], [kill `cat pid`]) > + OVSDB_SERVER_SHUTDOWN > + AT_CLEANUP]) > + > +m4_define([OVSDB_CHECK_IDL_FETCH_COLUMNS], > + [OVSDB_CHECK_IDL_FETCH_COLUMNS_PY($@)]) > + > +OVSDB_CHECK_IDL_FETCH_COLUMNS([simple idl, initially populated], > + [['["idltest", > + {"op": "insert", > + "table": "simple", > + "row": {"i": 1, > + "r": 2.0, > + "b": true, > + "s": "mystring", > + "u": ["uuid", "84f5c8f5-ac76-4dbc-a24f-8860eb407fc1"], > + "ia": ["set", [1, 2, 3]], > + "ra": ["set", [-0.5]], > + "ba": ["set", [true]], > + "sa": ["set", ["abc", "def"]], > + "ua": ["set", [["uuid", > "69443985-7806-45e2-b35f-574a04e720f9"], > + ["uuid", > "aad11ef0-816a-4b01-93e6-03b8b4256b98"]]]}}, > + {"op": "insert", > + "table": "simple", > + "row": {}}]']], > + [?simple:i,r!], > + ['fetch 0 r'], > + [[000: i=0 uuid=<0> > +000: i=1 uuid=<1> > +001: commit, status=success > +002: i=0 r=0 uuid=<0> > +002: i=1 uuid=<1> > +003: done > +]]) > diff --git a/tests/test-ovsdb.py b/tests/test-ovsdb.py > index ab951f9..a6897f3 100644 > --- a/tests/test-ovsdb.py > +++ b/tests/test-ovsdb.py > @@ -146,44 +146,53 @@ def do_parse_schema(schema_string): > > def print_idl(idl, step): > - simple = idl.tables["simple"].rows > - l1 = idl.tables["link1"].rows > - l2 = idl.tables["link2"].rows > - > n = 0 > - for row in simple.itervalues(): > - s = ("%03d: i=%s r=%s b=%s s=%s u=%s " > - "ia=%s ra=%s ba=%s sa=%s ua=%s uuid=%s" > - % (step, row.i, row.r, row.b, row.s, row.u, > - row.ia, row.ra, row.ba, row.sa, row.ua, row.uuid)) > - s = re.sub('""|,|u?\'', "", s) > - s = re.sub('UUID\(([^)]+)\)', r'\1', s) > - s = re.sub('False', 'false', s) > - s = re.sub('True', 'true', s) > - s = re.sub(r'(ba)=([^[][^ ]*) ', r'\1=[\2] ', s) > - print(s) > - n += 1 > - > - for row in l1.itervalues(): > - s = ["%03d: i=%s k=" % (step, row.i)] > - if row.k: > - s.append(str(row.k.i)) > - s.append(" ka=[") > - s.append(' '.join(sorted(str(ka.i) for ka in row.ka))) > - s.append("] l2=") > - if row.l2: > - s.append(str(row.l2[0].i)) > - s.append(" uuid=%s" % row.uuid) > - print(''.join(s)) > - n += 1 > - > - for row in l2.itervalues(): > - s = ["%03d: i=%s l1=" % (step, row.i)] > - if row.l1: > - s.append(str(row.l1[0].i)) > - s.append(" uuid=%s" % row.uuid) > - print(''.join(s)) > - n += 1 > + if "simple" in idl.tables: > + simple_columns = ["i", "r", "b", "s", "u", "ia", > + "ra", "ba", "sa", "ua", "uuid"] > + simple = idl.tables["simple"].rows > + for row in simple.itervalues(): > + s = "%03d:" % step > + for column in simple_columns: > + if hasattr(row, column) and not (type(getattr(row, column)) > + is ovs.db.data.Atom): > + s += " %s=%s" % (column, getattr(row, column)) > + s = re.sub('""|,|u?\'', "", s) > + s = re.sub('UUID\(([^)]+)\)', r'\1', s) > + s = re.sub('False', 'false', s) > + s = re.sub('True', 'true', s) > + s = re.sub(r'(ba)=([^[][^ ]*) ', r'\1=[\2] ', s) > + print(s) > + n += 1 > + > + if "link1" in idl.tables: > + l1 = idl.tables["link1"].rows > + for row in l1.itervalues(): > + s = ["%03d: i=%s k=" % (step, row.i)] > + if hasattr(row, "k") and row.k: > + s.append(str(row.k.i)) > + if hasattr(row, "ka"): > + s.append(" ka=[") > + s.append(' '.join(sorted(str(ka.i) for ka in row.ka))) > + s.append("] l2=") > + if hasattr(row, "l2") and row.l2: > + s.append(str(row.l2[0].i)) > + if hasattr(row, "uuid"): > + s.append(" uuid=%s" % row.uuid) > + print(''.join(s)) > + n += 1 > + > + if "link2" in idl.tables: > + l2 = idl.tables["link2"].rows > + for row in l2.itervalues(): > + s = ["%03d:" % step] > + s.append(" i=%s l1=" % row.i) > + if hasattr(row, "l1") and row.l1: > + s.append(str(row.l1[0].i)) > + if hasattr(row, "uuid"): > + s.append(" uuid=%s" % row.uuid) > + print(''.join(s)) > + n += 1 > > if not n: > print("%03d: empty" % step) > @@ -228,6 +237,7 @@ def idltest_find_simple(idl, i): > def idl_set(idl, commands, step): > txn = ovs.db.idl.Transaction(idl) > increment = False > + fetch_cmds = [] > events = [] > for command in commands.split(','): > words = command.split() > @@ -307,6 +317,20 @@ def idl_set(idl, commands, step): > sys.stderr.write('"verify" command asks for unknown column ' > '"%s"\n' % args[1]) > sys.exit(1) > + elif name == "fetch": > + if len(args) != 2: > + sys.stderr.write('"fetch" command requires 2 argument\n') > + sys.exit(1) > + > + row = idltest_find_simple(idl, int(args[0])) > + if not row: > + sys.stderr.write('"fetch" command asks for nonexistent > i=%d\n' > + % int(args[0])) > + sys.exit(1) > + > + column = args[1] > + row.fetch(column) > + fetch_cmds.append([row, column]) > elif name == "increment": > if len(args) != 1: > sys.stderr.write('"increment" command requires 1 argument\n') > @@ -366,10 +390,16 @@ def do_idl(schema_file, remote, *commands): > schema_helper = ovs.db.idl.SchemaHelper(schema_file) > if commands and commands[0].startswith("?"): > monitor = {} > + readonly = {} > for x in commands[0][1:].split("?"): > + readonly = [] > table, columns = x.split(":") > - monitor[table] = columns.split(",") > - schema_helper.register_columns(table, monitor[table]) > + columns = columns.split(",") > + for index, column in enumerate(columns): > + if column[-1] == '!': > + columns[index] = columns[index][:-1] > + readonly.append(columns[index]) > + schema_helper.register_columns(table, columns, readonly) > commands = commands[1:] > else: > schema_helper.register_all() > @@ -499,6 +529,12 @@ idl SCHEMA SERVER [?T1:C1,C2...[?T2:C1,C2,...]...] > [TRANSACTION...] > e.g.: > ?simple:b?link1:i,k - Monitor column "b" in table "simple", > and column "i", "k" in table "link1" > + Readonly columns: Suffixing a "!" after a column indicates that the > + column is to be registered "readonly". > + e.g.: > + ?simple:i,b! - Register interest in column "i" (monitoring) and > + column "b" (readonly). > + > > The following options are also available: > -t, --timeout=SECS give up after SECS seconds > _______________________________________________ > dev mailing list > dev@openvswitch.org > http://openvswitch.org/mailman/listinfo/dev _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev