Currently, each monitor table contains a single hmap 'changes' to track updates. This patch introduces a new data structure 'ovsdb_monitor_changes' that stores the updates 'rows' tagged by its first commit transaction id. Each 'ovsdb_monitor_changes' is refenece counted allowing multiple jsonrpc_monitors to share them.
The next patch will allow each ovsdb monitor table to store a list of 'ovsdb_monitor_changes'. This patch stores only one, same as before. Signed-off-by: Andy Zhou <az...@nicira.com> --- ovsdb/jsonrpc-server.c | 16 ++++-- ovsdb/ovsdb-monitor.c | 133 +++++++++++++++++++++++++++++++++++++++++++------ ovsdb/ovsdb-monitor.h | 3 ++ 3 files changed, 133 insertions(+), 19 deletions(-) diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c index ddcacbb..c9d84a5 100644 --- a/ovsdb/jsonrpc-server.c +++ b/ovsdb/jsonrpc-server.c @@ -1280,11 +1280,19 @@ ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s) } static struct json * -ovsdb_jsonrpc_monitor_compose_update( - struct ovsdb_jsonrpc_monitor *monitor, bool initial) +ovsdb_jsonrpc_monitor_compose_update(struct ovsdb_jsonrpc_monitor *m, + bool initial) { - return ovsdb_monitor_compose_update(monitor->dbmon, initial, - &monitor->unflushed); + uint64_t unflushed; + struct json *json; + + unflushed = initial ? 0 : m->unflushed; + + json = ovsdb_monitor_compose_update(m->dbmon, initial, + &m->unflushed); + ovsdb_monitor_renew_tracking_changes(m->dbmon, unflushed, m->unflushed); + + return json; } static bool diff --git a/ovsdb/ovsdb-monitor.c b/ovsdb/ovsdb-monitor.c index 397aa2a..a1efa66 100644 --- a/ovsdb/ovsdb-monitor.c +++ b/ovsdb/ovsdb-monitor.c @@ -73,6 +73,22 @@ struct ovsdb_monitor_row { struct ovsdb_datum *new; /* New data, NULL for a deleted row. */ }; +/* Contains 'struct ovsdb_monitor_row's for rows that have been + * updated but not yet flushed to all the jsonrpc connection. + * + * 'n_refs' represent the number of jsonrpc connections that have + * not received updates. Generate the update for the last jsonprc + * connection will also remove rows contained in 'changes'. + * + * 'transaction' stores the first update's transaction id. + * */ +struct ovsdb_monitor_changes { + struct ovsdb_monitor_table *mt; + struct hmap rows; + int n_refs; + uint64_t transaction; +}; + /* A particular table being monitored. */ struct ovsdb_monitor_table { const struct ovsdb_table *table; @@ -87,10 +103,16 @@ struct ovsdb_monitor_table { /* Contains 'struct ovsdb_monitor_row's for rows that have been * updated but not yet flushed to the jsonrpc connection. */ - struct hmap changes; + struct ovsdb_monitor_changes *changes; }; static void ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon); +static void ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt, + uint64_t next_txn); +static void ovsdb_monitor_changes_destroy_rows( + struct ovsdb_monitor_changes *changes); +static void ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt, + uint64_t transaction); static int compare_ovsdb_monitor_column(const void *a_, const void *b_) @@ -108,7 +130,7 @@ ovsdb_monitor_cast(struct ovsdb_replica *replica) return CONTAINER_OF(replica, struct ovsdb_monitor, replica); } -/* Finds and returns the ovsdb_monitor_row in 'mt->changes' for the +/* Finds and returns the ovsdb_monitor_row in 'mt->changes->rows' for the * given 'uuid', or NULL if there is no such row. */ static struct ovsdb_monitor_row * ovsdb_monitor_row_find(const struct ovsdb_monitor_table *mt, @@ -116,7 +138,8 @@ ovsdb_monitor_row_find(const struct ovsdb_monitor_table *mt, { struct ovsdb_monitor_row *row; - HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid), &mt->changes) { + HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid), + &mt->changes->rows) { if (uuid_equals(uuid, &row->uuid)) { return row; } @@ -235,7 +258,7 @@ ovsdb_monitor_add_table(struct ovsdb_monitor *m, mt = xzalloc(sizeof *mt); mt->table = table; - hmap_init(&mt->changes); + mt->changes = NULL; shash_add(&m->tables, table->schema->name, mt); } @@ -288,6 +311,70 @@ ovsdb_monitor_table_check_duplicates(struct ovsdb_monitor *m, return NULL; } +static void +ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt, + uint64_t next_txn) +{ + struct ovsdb_monitor_changes *changes; + + changes = xzalloc(sizeof *changes); + + changes->transaction = next_txn; + changes->mt = mt; + changes->n_refs = 1; + hmap_init(&changes->rows); + mt->changes = changes; +}; + +/* Stop currently tracking changes to table 'mt' since 'transaction'. + * + * Return 'true' if the 'transaction' is being tracked. 'false' otherwise. */ +static void +ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table *mt, + uint64_t transaction) +{ + struct ovsdb_monitor_changes *changes; + + changes = mt->changes; + + if (changes) { + ovs_assert(changes->transaction == transaction); + if (--changes->n_refs == 0) { + ovsdb_monitor_changes_destroy_rows(changes); + free(changes); + mt->changes = NULL; + } + } +} + +/* Start tracking changes to table 'mt' begins from 'transaction' inclusive. + */ +static void +ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt, + uint64_t transaction) +{ + struct ovsdb_monitor_changes *changes; + + changes = mt->changes; + if (changes) { + ovs_assert(false); + } else { + ovsdb_monitor_table_add_changes(mt, transaction); + } +} + +static void +ovsdb_monitor_changes_destroy_rows(struct ovsdb_monitor_changes *changes) +{ + struct ovsdb_monitor_row *row, *next; + + HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) { + hmap_remove(&changes->rows, &row->hmap_node); + ovsdb_monitor_row_destroy(changes->mt, row); + } + hmap_destroy(&changes->rows); +} + /* Returns JSON for a <row-update> (as described in RFC 7047) for 'row' within * 'mt', or NULL if no row update should be sent. * @@ -404,7 +491,11 @@ ovsdb_monitor_compose_update(const struct ovsdb_monitor *dbmon, struct ovsdb_monitor_row *row, *next; struct json *table_json = NULL; - HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mt->changes) { + if (!mt->changes) { + continue; + } + + HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mt->changes->rows) { struct json *row_json; row_json = ovsdb_monitor_compose_row_update( @@ -428,7 +519,7 @@ ovsdb_monitor_compose_update(const struct ovsdb_monitor *dbmon, json_object_put(table_json, uuid, row_json); } - hmap_remove(&mt->changes, &row->hmap_node); + hmap_remove(&mt->changes->rows, &row->hmap_node); ovsdb_monitor_row_destroy(mt, row); } } @@ -495,7 +586,7 @@ ovsdb_monitor_change_cb(const struct ovsdb_row *old, change = ovsdb_monitor_row_find(mt, uuid); if (!change) { change = xmalloc(sizeof *change); - hmap_insert(&mt->changes, &change->hmap_node, uuid_hash(uuid)); + hmap_insert(&mt->changes->rows, &change->hmap_node, uuid_hash(uuid)); change->uuid = *uuid; change->old = clone_monitor_row_data(mt, old); change->new = clone_monitor_row_data(mt, new); @@ -508,7 +599,7 @@ ovsdb_monitor_change_cb(const struct ovsdb_row *old, if (!change->old) { /* This row was added then deleted. Forget about it. */ - hmap_remove(&mt->changes, &change->hmap_node); + hmap_remove(&mt->changes->rows, &change->hmap_node); free(change); } } @@ -529,6 +620,10 @@ ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon) if (mt->select & OJMS_INITIAL) { struct ovsdb_row *row; + if (!mt->changes) { + ovsdb_monitor_table_add_changes(mt, 0); + } + HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) { ovsdb_monitor_change_cb(NULL, row, NULL, &aux); } @@ -563,6 +658,20 @@ ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon, ovs_assert(true); } +void +ovsdb_monitor_renew_tracking_changes(struct ovsdb_monitor *dbmon, + uint64_t prev_txn, uint64_t next_txn) +{ + struct shash_node *node; + + SHASH_FOR_EACH (node, &dbmon->tables) { + struct ovsdb_monitor_table *mt = node->data; + + ovsdb_monitor_table_untrack_changes(mt, prev_txn); + ovsdb_monitor_table_track_changes(mt, next_txn); + } +} + static void ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon) { @@ -572,14 +681,8 @@ ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon) SHASH_FOR_EACH (node, &dbmon->tables) { struct ovsdb_monitor_table *mt = node->data; - struct ovsdb_monitor_row *row, *next; - - HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mt->changes) { - hmap_remove(&mt->changes, &row->hmap_node); - ovsdb_monitor_row_destroy(mt, row); - } - hmap_destroy(&mt->changes); + ovsdb_monitor_changes_destroy_rows(mt->changes); free(mt->columns); free(mt); } diff --git a/ovsdb/ovsdb-monitor.h b/ovsdb/ovsdb-monitor.h index ea2a7aa..c003184 100644 --- a/ovsdb/ovsdb-monitor.h +++ b/ovsdb/ovsdb-monitor.h @@ -57,4 +57,7 @@ bool ovsdb_monitor_needs_flush(struct ovsdb_monitor *dbmon, uint64_t next_transaction); void ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon); + +void ovsdb_monitor_renew_tracking_changes(struct ovsdb_monitor *dbmon, + uint64_t prev_txn, uint64_t next_txn); #endif -- 1.9.1 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev