Andy Zhou <az...@ovn.org> wrote on 05/02/2016 12:00:27 PM: > > On Wed, Feb 3, 2016 at 5:53 AM, Liran Schour <lir...@il.ibm.com> wrote: > ovsdb-server now accepts "monitor_cond_update" request. On conditions update > we insert all rows of table in a new changes list - OVSDB_MONITOR_ALL that are > being indexed by the transaction-id at the moment of insertion. > JSON cache is being used only for empty condition monitor sessions. > Sees ovsdb-server (1) man page for details of monitor_cond_update. > > Signed-off-by: Liran Schour <lir...@il.ibm.com> > > Ah. I see why you want to only compare monitored columns. It makes > handling condition updates > easier. :-) > > There is no test for this feature. It seems to be a too big of a > feature to be missing test coverage. >
Will add a test for sharing ovsdb_monitor between 2 sessions with different none-monitored columns. > --- > v2->v3: > * ovsdb_monitor_table_condition_update() accepts only single json condition > * Allow non-monitored columns in cond_update. > * Flush monitor session after monitor_cond_update to guarantee empty > changes list > * Bug fix: use json cache when all condition are empty > * Simplify inserting row change to changes lists > --- > ovsdb/jsonrpc-server.c | 154 +++++++++++++++++++++++++-- > ovsdb/monitor.c | 283 ++++++++++++++++++++++++++++++++++++++ > ++++------- > ovsdb/monitor.h | 26 +++-- > 3 files changed, 415 insertions(+), 48 deletions(-) > > diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c > index cd6a70a..15dc406 100644 > --- a/ovsdb/jsonrpc-server.c > +++ b/ovsdb/jsonrpc-server.c > @@ -87,6 +87,10 @@ static void ovsdb_jsonrpc_trigger_complete_done( > static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_create( > struct ovsdb_jsonrpc_session *, struct ovsdb *, struct json *params, > enum ovsdb_monitor_version, const struct json *request_id); > +static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cond_update( > + struct ovsdb_jsonrpc_session *s, > + struct json *params, > + const struct json *request_id); > static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel( > struct ovsdb_jsonrpc_session *, > struct json_array *params, > @@ -407,7 +411,8 @@ static void ovsdb_jsonrpc_session_wait(struct > ovsdb_jsonrpc_session *); > static void ovsdb_jsonrpc_session_get_memory_usage( > const struct ovsdb_jsonrpc_session *, struct simap *usage); > static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *, > - struct jsonrpc_msg *); > + struct jsonrpc_msg *, > + bool *); > static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *, > struct jsonrpc_msg *); > > @@ -463,13 +468,14 @@ ovsdb_jsonrpc_session_run(struct > ovsdb_jsonrpc_session *s) > > if (!jsonrpc_session_get_backlog(s->js)) { > struct jsonrpc_msg *msg; > + bool needs_flush = false; > > ovsdb_jsonrpc_monitor_flush_all(s); > > msg = jsonrpc_session_recv(s->js); > if (msg) { > if (msg->type == JSONRPC_REQUEST) { > - ovsdb_jsonrpc_session_got_request(s, msg); > + ovsdb_jsonrpc_session_got_request(s, msg, &needs_flush); > } else if (msg->type == JSONRPC_NOTIFY) { > ovsdb_jsonrpc_session_got_notify(s, msg); > } else { > @@ -480,6 +486,9 @@ ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s) > jsonrpc_msg_destroy(msg); > } > } > + if (needs_flush) { > + ovsdb_jsonrpc_monitor_flush_all(s); > + } > } > return jsonrpc_session_is_alive(s->js) ? 0 : ETIMEDOUT; > } > @@ -840,10 +849,12 @@ execute_transaction(struct > ovsdb_jsonrpc_session *s, struct ovsdb *db, > > static void > ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s, > - struct jsonrpc_msg *request) > + struct jsonrpc_msg *request, > + bool *needs_flush) > { > struct jsonrpc_msg *reply; > > + *needs_flush = false; > if (!strcmp(request->method, "transact")) { > struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply); > if (!reply) { > @@ -861,6 +872,10 @@ ovsdb_jsonrpc_session_got_request(struct > ovsdb_jsonrpc_session *s, > version, > request->id); > } > + } else if (!strcmp(request->method, "monitor_cond_update")) { > + reply = ovsdb_jsonrpc_monitor_cond_update(s, request->params, > + request->id); > + *needs_flush = true; > } else if (!strcmp(request->method, "monitor_cancel")) { > reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params), > request->id); > @@ -1050,6 +1065,8 @@ struct ovsdb_jsonrpc_monitor { > struct ovsdb_monitor *dbmon; > uint64_t unflushed; /* The first transaction that has not been > flushed to the jsonrpc > remote client. */ > + bool all_rows; /* Indicates if in the next flush > we request all > + rows (due to a condition > change) */ > enum ovsdb_monitor_version version; > struct ovsdb_monitor_session_condition *condition;/* Session's > condition */ > }; > @@ -1213,6 +1230,7 @@ ovsdb_jsonrpc_monitor_create(struct > ovsdb_jsonrpc_session *s, struct ovsdb *db, > m->condition = ovsdb_monitor_session_condition_create(); > } > m->unflushed = 0; > + m->all_rows = false; > m->version = version; > hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0)); > m->monitor_id = json_clone(monitor_id); > @@ -1294,6 +1312,124 @@ error: > return jsonrpc_create_error(json, request_id); > } > > +static struct ovsdb_error * > +ovsdb_jsonrpc_parse_monitor_cond_update_request( > + struct ovsdb_jsonrpc_monitor *m, > + const struct ovsdb_table *table, > + const struct json *cond_update_req) > +{ > + const struct ovsdb_table_schema *ts = table->schema; > + const struct json *condition, *columns; > + struct ovsdb_parser parser; > + struct ovsdb_error *error; > + > + ovsdb_parser_init(&parser, cond_update_req, "table %s", ts->name); > + columns = ovsdb_parser_member(&parser, "columns", OP_ARRAY | > OP_OPTIONAL); > + condition = ovsdb_parser_member(&parser, "where", OP_ARRAY | > OP_OPTIONAL); > + > + error = ovsdb_parser_finish(&parser); > + if (error) { > + return error; > + } > + > + if (columns) { > + error = ovsdb_syntax_error(cond_update_req, NULL, "changingcolumns " > + "is unsupported"); > + return error; > + } > + error = ovsdb_monitor_table_condition_update(m->dbmon, > m->condition, table, > + condition); > + > + return error; > +} > + > +static struct jsonrpc_msg * > +ovsdb_jsonrpc_monitor_cond_update(struct ovsdb_jsonrpc_session *s, > + struct json *params, > + const struct json *request_id) > +{ > + struct ovsdb_error *error; > + struct ovsdb_jsonrpc_monitor *m; > + struct json *monitor_cond_update_reqs; > + struct shash_node *node; > + struct json *json; > + > + if (json_array(params)->n != 3) { > + error = ovsdb_syntax_error(params, NULL, "invalid parameters"); > + goto error; > + } > + > + m = ovsdb_jsonrpc_monitor_find(s, params->u.array.elems[0]); > + if (!m) { > + error = ovsdb_syntax_error(request_id, NULL, > + "unknown monitor session"); > + goto error; > + } > + > + monitor_cond_update_reqs = params->u.array.elems[2]; > + if (monitor_cond_update_reqs->type != JSON_OBJECT) { > + error = > + ovsdb_syntax_error(NULL, NULL, > + "monitor-cond-change-requests must > be object"); > + goto error; > + } > + > + SHASH_FOR_EACH (node, json_object(monitor_cond_update_reqs)) { > + const struct ovsdb_table *table; > + const struct json *mr_value; > + size_t i; > + > + table = ovsdb_get_table(m->db, node->name); > + if (!table) { > + error = ovsdb_syntax_error(NULL, NULL, > + "no table named %s", node->name); > + goto error; > + } > + if (!ovsdb_monitor_table_exists(m->dbmon, table)) { > + error = ovsdb_syntax_error(NULL, NULL, > + "no table named %s in > monitor session", > + node->name); > + goto error; > + } > + > + mr_value = node->data; > + if (mr_value->type == JSON_ARRAY) { > + const struct json_array *array = &mr_value->u.array; > + > + for (i = 0; i < array->n; i++) { > + error = ovsdb_jsonrpc_parse_monitor_cond_update_request( > + m, table, array->elems[i]); > + if (error) { > + goto error; > + } > + } > + } else { > + error = ovsdb_syntax_error( > + NULL, NULL, > + "table %s no monitor-cond-change JSON array", > + node->name); > + goto error; > + } > + } > + > + ovsdb_monitor_get_all_rows(m->dbmon, m->unflushed); > + m->all_rows = true; > + > + /* Change monitor id */ > + hmap_remove(&s->monitors, &m->node); > + json_destroy(m->monitor_id); > + m->monitor_id = json_clone(params->u.array.elems[1]); > + hmap_insert(&s->monitors, &m->node, json_hash(m->monitor_id, 0)); > + > + return jsonrpc_create_reply(json_object_create(), request_id); > + > +error: > + > + json = ovsdb_error_to_json(error); > + ovsdb_error_destroy(error); > + return jsonrpc_create_error(json, request_id); > +} > + > static struct jsonrpc_msg * > ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s, > struct json_array *params, > @@ -1330,8 +1466,14 @@ static struct json * > ovsdb_jsonrpc_monitor_compose_update(struct ovsdb_jsonrpc_monitor *m, > bool initial) > { > - return ovsdb_monitor_get_update(m->dbmon, initial, &m->unflushed, > - m->condition, m->version); > + struct json * json = ovsdb_monitor_get_update(m->dbmon, initial, > + m->all_rows, > + &m->unflushed, > + m->condition, > + m->version); > + > + m->all_rows = false; > + return json; > } > > static bool > @@ -1340,7 +1482,7 @@ ovsdb_jsonrpc_monitor_needs_flush(struct > ovsdb_jsonrpc_session *s) > struct ovsdb_jsonrpc_monitor *m; > > HMAP_FOR_EACH (m, node, &s->monitors) { > - if (ovsdb_monitor_needs_flush(m->dbmon, m->unflushed)) { > + if (m->all_rows || ovsdb_monitor_needs_flush(m->dbmon, > m->unflushed)) { > return true; > } > } > diff --git a/ovsdb/monitor.c b/ovsdb/monitor.c > index 1614d67..d087a5a 100644 > --- a/ovsdb/monitor.c > +++ b/ovsdb/monitor.c > @@ -120,6 +120,11 @@ struct ovsdb_monitor_changes { > hmap. */ > }; > > +enum ovsdb_monitor_changes_type { > + OVSDB_MONITOR_CHANGES, > + OVSDB_MONITOR_ALL > +}; > + > /* A particular table being monitored. */ > struct ovsdb_monitor_table { > const struct ovsdb_table *table; > @@ -142,6 +147,9 @@ struct ovsdb_monitor_table { > > /* Contains 'ovsdb_monitor_changes' indexed by 'transaction'. */ > struct hmap changes; > + /* Contains 'ovsdb_monitor_changes' of all rows in table at transaction > + point in time. indexed by 'transaction'. */ > + struct hmap all; > }; > > typedef struct json * > @@ -153,12 +161,15 @@ typedef struct json * > > static void ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon); > static struct ovsdb_monitor_changes * ovsdb_monitor_table_add_changes( > - struct ovsdb_monitor_table *mt, uint64_t next_txn); > + struct ovsdb_monitor_table *mt, enum ovsdb_monitor_changes_type type, > + uint64_t next_txn); > static struct ovsdb_monitor_changes *ovsdb_monitor_table_find_changes( > - struct ovsdb_monitor_table *mt, uint64_t unflushed); > + struct ovsdb_monitor_table *mt, enum ovsdb_monitor_changes_type type, > + uint64_t unflushed); > static void ovsdb_monitor_changes_destroy( > struct ovsdb_monitor_changes *changes); > static void ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt, > + enum ovsdb_monitor_changes_type type, > uint64_t unflushed); > > static uint32_t > @@ -262,8 +273,8 @@ ovsdb_monitor_changes_row_find(const struct > ovsdb_monitor_changes *changes, > * > * If 'row' is NULL, returns NULL. */ > static struct ovsdb_datum * > -clone_monitor_row_data(const struct ovsdb_monitor_table *mt, > - const struct ovsdb_row *row) > +clone_monitor_ovsdb_row_data(const struct ovsdb_monitor_table *mt, > + const struct ovsdb_row *row) > { > struct ovsdb_datum *data; > size_t i; > @@ -284,6 +295,44 @@ clone_monitor_row_data(const struct > ovsdb_monitor_table *mt, > return data; > } > > +/* Allocates an array of 'mt->n_columns' ovsdb_datums and initializes them as > + * copies of the data in 'fields' drawn from the columns represented by > + * mt->columns[]. Returns the array. > + * > + * If 'row' is NULL, returns NULL. */ > +static struct ovsdb_datum * > +clone_monitor_row_data(const struct ovsdb_monitor_table *mt, > + const struct ovsdb_datum *fields) > +{ > + struct ovsdb_datum *data; > + size_t i; > + > + if (!fields) { > + return NULL; > + } > + > + data = xmalloc(mt->n_columns * sizeof *data); > + for (i = 0; i < mt->n_columns; i++) { > + const struct ovsdb_column *c = mt->columns[i].column; > + const struct ovsdb_datum *src = &fields[c->index]; > + struct ovsdb_datum *dst = &data[i]; > + const struct ovsdb_type *type = &c->type; > + > + ovsdb_datum_clone(dst, src, type); > + } > + return data; > +} > + > +static void > +clone_monitor_row(const struct ovsdb_monitor_table *mt, > + struct ovsdb_monitor_row *to, > + const struct ovsdb_monitor_row *from) > +{ > + to->uuid = from->uuid; > + to->old = clone_monitor_row_data(mt, from->old); > + to->new = clone_monitor_row_data(mt,from->new); > +} > + > /* Replaces the mt->n_columns ovsdb_datums in row[] by copies of > the data from > * in 'row' drawn from the columns represented by mt->columns[]. */ > static void > @@ -393,6 +442,7 @@ ovsdb_monitor_add_table(struct ovsdb_monitor *m, > mt->dbmon = m; > shash_add(&m->tables, table->schema->name, mt); > hmap_init(&mt->changes); > + hmap_init(&mt->all); > mt->columns_index_map = > xmalloc(sizeof(unsigned int) * shash_count(&table->schema->columns)); > for (i = 0; i < shash_count(&table->schema->columns); i++) { > @@ -472,6 +522,13 @@ ovsdb_monitor_add_all_condition_columns( > } > } > > +bool > +ovsdb_monitor_table_exists(struct ovsdb_monitor *m, > + const struct ovsdb_table *table) > +{ > + return shash_find_data(&m->tables, table->schema->name); > +} > + > /* Check for duplicated column names. Return the first > * duplicated column's name if found. Otherwise return > * NULL. */ > @@ -499,9 +556,12 @@ ovsdb_monitor_table_check_duplicates(struct > ovsdb_monitor *m, > > static struct ovsdb_monitor_changes * > ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt, > + enum ovsdb_monitor_changes_type type, > uint64_t next_txn) > { > struct ovsdb_monitor_changes *changes; > + struct hmap *changes_hmap = > + type == OVSDB_MONITOR_CHANGES ? &mt->changes : &mt->all; > > changes = xzalloc(sizeof *changes); > > @@ -509,19 +569,22 @@ ovsdb_monitor_table_add_changes(struct > ovsdb_monitor_table *mt, > changes->mt = mt; > changes->n_refs = 1; > hmap_init(&changes->rows); > - hmap_insert(&mt->changes, &changes->hmap_node, hash_uint64(next_txn)); > + hmap_insert(changes_hmap, &changes->hmap_node, hash_uint64(next_txn)); > > return changes; > }; > > static struct ovsdb_monitor_changes * > ovsdb_monitor_table_find_changes(struct ovsdb_monitor_table *mt, > + enum ovsdb_monitor_changes_type type, > uint64_t transaction) > { > struct ovsdb_monitor_changes *changes; > + struct hmap *changes_hmap = > + type == OVSDB_MONITOR_CHANGES ? &mt->changes : &mt->all; > size_t hash = hash_uint64(transaction); > > - HMAP_FOR_EACH_WITH_HASH(changes, hmap_node, hash, &mt->changes) { > + HMAP_FOR_EACH_WITH_HASH(changes, hmap_node, hash, changes_hmap) { > if (changes->transaction == transaction) { > return changes; > } > @@ -533,13 +596,16 @@ ovsdb_monitor_table_find_changes(struct > ovsdb_monitor_table *mt, > /* Stop currently tracking changes to table 'mt' since 'transaction'. */ > static void > ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table *mt, > + enum ovsdb_monitor_changes_type type, > uint64_t transaction) > { > + struct hmap *changes_hmap = > + type == OVSDB_MONITOR_CHANGES ? &mt->changes : &mt->all; > struct ovsdb_monitor_changes *changes = > - ovsdb_monitor_table_find_changes(mt, transaction); > + ovsdb_monitor_table_find_changes(mt, type, transaction); > if (changes) { > if (--changes->n_refs == 0) { > - hmap_remove(&mt->changes, &changes->hmap_node); > + hmap_remove(changes_hmap, &changes->hmap_node); > ovsdb_monitor_changes_destroy(changes); > } > } > @@ -549,15 +615,16 @@ ovsdb_monitor_table_untrack_changes(struct > ovsdb_monitor_table *mt, > */ > static void > ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt, > + enum ovsdb_monitor_changes_type type, > uint64_t transaction) > { > struct ovsdb_monitor_changes *changes; > > - changes = ovsdb_monitor_table_find_changes(mt, transaction); > + changes = ovsdb_monitor_table_find_changes(mt, type, transaction); > if (changes) { > changes->n_refs++; > } else { > - ovsdb_monitor_table_add_changes(mt, transaction); > + ovsdb_monitor_table_add_changes(mt, type, transaction); > } > } > > @@ -674,6 +741,46 @@ ovsdb_monitor_get_table_conditions( > return true; > } > > +struct ovsdb_error * > +ovsdb_monitor_table_condition_update( > + struct ovsdb_monitor *dbmon, > + struct ovsdb_monitor_session_condition > *condition, > + const struct ovsdb_table *table, > + const struct json *cond_json) > +{ > + struct ovsdb_monitor_table_condition *mtc = > + shash_find_data(&condition->tables, table->schema->name); > + struct ovsdb_error *error; > + struct ovsdb_condition cond = OVSDB_CONDITION_INITIALIZER; > + bool empty = ovsdb_condition_empty(&mtc->new_condition); > + > + if (!condition) { > + return NULL; > + } > + > + error = ovsdb_condition_from_json(table->schema, cond_json, > + NULL, &cond); > + if (error) { > + return error; > + } > + > + ovsdb_condition_destroy(&mtc->new_condition); > + ovsdb_condition_clone(&mtc->new_condition, &cond); > I feel like I am missing something here: Should we copy copy the > "old" new_condition > to "old_condition" here? Old condition indicates the condition we had when the last update notification was sent. We need to keep it as is and only update it with the new condition after the a new update notification is sent. > + > + if (empty && !ovsdb_condition_empty(&mtc->new_condition)) { > + condition->n_empty_cnd--; > + } > + if (!empty && ovsdb_condition_empty(&mtc->new_condition)) { > + condition->n_empty_cnd++; > + } > + > + ovsdb_monitor_condition_add_columns(dbmon, > + table, > + &mtc->new_condition); > + > + return NULL; > +} > + > static enum ovsdb_monitor_selection > ovsdb_monitor_row_update_type_condition( > const struct ovsdb_monitor_table *mt, > @@ -827,7 +934,7 @@ ovsdb_monitor_compose_row_update( > * for 'row' within * 'mt', or NULL if no row update should be sent. > * > * The caller should specify 'initial' as true if the returned JSON is > - * going to be used as part of the initial reply to a "monitor2" request, > + * going to be used as part of the initial reply to a "monitor_cond" request, > * false if it is going to be used as part of an "update2" notification. > * > * 'changed' must be a scratch buffer for internal use that is at least > @@ -916,7 +1023,7 @@ ovsdb_monitor_max_columns(struct ovsdb_monitor *dbmon) > static struct json* > ovsdb_monitor_compose_update( > struct ovsdb_monitor *dbmon, > - bool initial, uint64_t transaction, > + bool initial, bool all_rows, uint64_t transaction, > const struct ovsdb_monitor_session_condition > *condition, > compose_row_update_cb_func row_update) > { > @@ -932,7 +1039,16 @@ ovsdb_monitor_compose_update( > struct ovsdb_monitor_changes *changes; > struct json *table_json = NULL; > > - changes = ovsdb_monitor_table_find_changes(mt, transaction); > + if (!all_rows) { > + changes = ovsdb_monitor_table_find_changes(mt, > + OVSDB_MONITOR_CHANGES, > + transaction); > + } else { > + /* Get changes that includes all rows from all_txn > point in time */ > + changes = ovsdb_monitor_table_find_changes(mt, > + OVSDB_MONITOR_ALL, > + transaction); > + } > if (!changes) { > continue; > } > @@ -968,7 +1084,8 @@ ovsdb_monitor_compose_update( > > /* Returns JSON for a <table-updates> object (as described in RFC 7047) > * for all the outstanding changes within 'monitor' that starts from > - * '*unflushed' transaction id. > + * '*unflushed'. > + * If all_rows is true all_rows in the db that match conditions will be sent. > * > * The caller should specify 'initial' as true if the returned JSON > is going to > * be used as part of the initial reply to a "monitor" request, > false if it is > @@ -976,7 +1093,8 @@ ovsdb_monitor_compose_update( > struct json * > ovsdb_monitor_get_update( > struct ovsdb_monitor *dbmon, > - bool initial, uint64_t *unflushed, > + bool initial, bool all_rows, > + uint64_t *unflushed, > const struct ovsdb_monitor_session_condition *condition, > enum ovsdb_monitor_version version) > { > @@ -988,7 +1106,7 @@ ovsdb_monitor_get_update( > > /* Return a clone of cached json if one exists. Otherwise, > * generate a new one and add it to the cache. */ > - if (!condition || (condition && ovsdb_can_cache(condition))) { > + if (!condition || (!all_rows && condition && ovsdb_can_cache > (condition))) { > cache_node = ovsdb_monitor_json_cache_search(dbmon, > version, prev_txn); > } > if (cache_node) { > @@ -996,17 +1114,19 @@ ovsdb_monitor_get_update( > } else { > if (version == OVSDB_MONITOR_V1) { > json = > - ovsdb_monitor_compose_update(dbmon, initial, prev_txn, > - condition, > + ovsdb_monitor_compose_update(dbmon, initial, all_rows, > + prev_txn, condition, > > ovsdb_monitor_compose_row_update); > } else { > ovs_assert(version == OVSDB_MONITOR_V2); > json = > - ovsdb_monitor_compose_update(dbmon, initial, prev_txn, > - condition, > + ovsdb_monitor_compose_update(dbmon, initial, all_rows, > + prev_txn, condition, > > ovsdb_monitor_compose_row_update2); > } > - if (!condition || (condition && ovsdb_can_cache(condition))) { > + > + if (!condition || > + (!all_rows && condition && ovsdb_can_cache(condition))) { > ovsdb_monitor_json_cache_insert(dbmon, version, prev_txn, json); > } > } > @@ -1014,9 +1134,28 @@ ovsdb_monitor_get_update( > /* Maintain transaction id of 'changes'. */ > SHASH_FOR_EACH (node, &dbmon->tables) { > struct ovsdb_monitor_table *mt = node->data; > + struct ovsdb_condition *old_condition, *new_condition; > > - ovsdb_monitor_table_untrack_changes(mt, prev_txn); > - ovsdb_monitor_table_track_changes(mt, next_txn); > + if (!all_rows) { > + ovsdb_monitor_table_untrack_changes(mt, > + OVSDB_MONITOR_CHANGES, > + prev_txn); > + } else { > + ovsdb_monitor_table_untrack_changes(mt, > + OVSDB_MONITOR_ALL, > + prev_txn); > + } > + ovsdb_monitor_table_track_changes(mt, > OVSDB_MONITOR_CHANGES, next_txn); > + > + if (ovsdb_monitor_get_table_conditions(mt, > + condition, > + &old_condition, > + &new_condition)) { > + if (ovsdb_condition_cmp(old_condition, new_condition)) { > + ovsdb_condition_destroy(old_condition); > + ovsdb_condition_clone(old_condition, new_condition); > + } > + } > } > *unflushed = next_txn; > > @@ -1097,8 +1236,8 @@ ovsdb_monitor_changes_update(const struct > ovsdb_row *old, > change = xzalloc(sizeof *change); > hmap_insert(&changes->rows, &change->hmap_node, uuid_hash(uuid)); > change->uuid = *uuid; > - change->old = clone_monitor_row_data(mt, old); > - change->new = clone_monitor_row_data(mt, new); > + change->old = clone_monitor_ovsdb_row_data(mt, old); > + change->new = clone_monitor_ovsdb_row_data(mt, new); > } else { > if (new) { > update_monitor_row_data(mt, new, change->new); > @@ -1115,6 +1254,20 @@ ovsdb_monitor_changes_update(const struct > ovsdb_row *old, > } > } > > +static void > +ovsdb_monitor_changes_clone_insert_row(const struct ovsdb_monitor_row *row, > + const struct ovsdb_monitor_table *mt, > + struct ovsdb_monitor_changes*changes) > +{ > + struct ovsdb_monitor_row *change; > + > + ovs_assert(ovsdb_monitor_changes_row_find(changes, &row->uuid) == NULL); > + > + change = xzalloc(sizeof *change); > + hmap_insert(&changes->rows, &change->hmap_node, uuid_hash(&row->uuid)); > + clone_monitor_row(mt, change, row); > +} > + > static bool > ovsdb_monitor_columns_changed(const struct ovsdb_monitor_table *mt, > const unsigned long int *changed) > @@ -1162,6 +1315,8 @@ ovsdb_monitor_change_cb(const struct ovsdb_row *old, > struct ovsdb_table *table = new ? new->table : old->table; > struct ovsdb_monitor_table *mt; > struct ovsdb_monitor_changes *changes; > + enum ovsdb_monitor_changes_efficacy efficacy; > + enum ovsdb_monitor_selection type; > > if (!aux->mt || table != aux->mt->table) { > aux->mt = shash_find_data(&m->tables, table->schema->name); > @@ -1173,16 +1328,17 @@ ovsdb_monitor_change_cb(const struct ovsdb_row *old, > } > mt = aux->mt; > > - HMAP_FOR_EACH(changes, hmap_node, &mt->changes) { > - enum ovsdb_monitor_changes_efficacy efficacy; > - enum ovsdb_monitor_selection type; > + type = ovsdb_monitor_row_update_type(false, old, new); > + efficacy = ovsdb_monitor_changes_classify(type, mt, changed); > > - type = ovsdb_monitor_row_update_type(false, old, new); > - efficacy = ovsdb_monitor_changes_classify(type, mt, changed); > - if (efficacy > OVSDB_CHANGES_NO_EFFECT) { > + if (efficacy > OVSDB_CHANGES_NO_EFFECT) { > + /* insert row change to changes lists */ > + HMAP_FOR_EACH(changes, hmap_node, &mt->changes) { > + ovsdb_monitor_changes_update(old, new, mt, changes); > + } > + HMAP_FOR_EACH(changes, hmap_node, &mt->all) { > ovsdb_monitor_changes_update(old, new, mt, changes); > } > - > if (aux->efficacy < efficacy) { > aux->efficacy = efficacy; > } > @@ -1194,10 +1350,8 @@ ovsdb_monitor_change_cb(const struct ovsdb_row *old, > void > ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon) > { > - struct ovsdb_monitor_aux aux; > struct shash_node *node; > > - ovsdb_monitor_init_aux(&aux, dbmon); > SHASH_FOR_EACH (node, &dbmon->tables) { > struct ovsdb_monitor_table *mt = node->data; > > @@ -1205,9 +1359,14 @@ ovsdb_monitor_get_initial(const struct > ovsdb_monitor *dbmon) > struct ovsdb_row *row; > struct ovsdb_monitor_changes *changes; > > - changes = ovsdb_monitor_table_find_changes(mt, 0); > + changes = ovsdb_monitor_table_find_changes(mt, > + OVSDB_MONITOR_CHANGES, > + 0); > if (!changes) { > - changes = ovsdb_monitor_table_add_changes(mt, 0); > + changes = > + ovsdb_monitor_table_add_changes(mt, > + OVSDB_MONITOR_CHANGES, > + 0); > HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) { > ovsdb_monitor_changes_update(NULL, row, mt, changes); > } > @@ -1218,6 +1377,53 @@ ovsdb_monitor_get_initial(const struct > ovsdb_monitor *dbmon) > } > } > > +/* Record all rows in DB and mark this changes at unflushed tranaction id */ > +void > +ovsdb_monitor_get_all_rows(const struct ovsdb_monitor *dbmon, > + uint64_t unflushed) > +{ > + struct shash_node *node; > + > + SHASH_FOR_EACH (node, &dbmon->tables) { > + struct ovsdb_monitor_table *mt = node->data; > + struct ovsdb_row *new; > + struct ovsdb_monitor_changes *changes, *all_changes; > + > + all_changes = ovsdb_monitor_table_find_changes(mt, > + OVSDB_MONITOR_ALL, > + unflushed); > + if (!all_changes) { > + all_changes = ovsdb_monitor_table_add_changes(mt, > + OVSDB_MONITOR_ALL, > + unflushed); > + changes = ovsdb_monitor_table_find_changes(mt, > + OVSDB_MONITOR_CHANGES, > + unflushed); > + HMAP_FOR_EACH (new, hmap_node, &mt->table->rows) { > + struct ovsdb_monitor_row *row = NULL; > + if (changes) { > + /* Check if we have a change record for this row */ > + row = ovsdb_monitor_changes_row_find( > + changes, > + ovsdb_row_get_uuid(new)); > + } > + if (row) { > + ovsdb_monitor_changes_clone_insert_row(row, mt, > + all_changes); > + } else { > + ovsdb_monitor_changes_update(new, new, mt, all_changes); > + } > + } > + } else { > + all_changes->n_refs++; > + } > + > + ovsdb_monitor_table_untrack_changes(mt, > + OVSDB_MONITOR_CHANGES, > + unflushed); > + } > +} > + > I don't feel I fully understand the design here. It seems > all_changes are only generate in the special case of cond update. > Then I am not sure why it should be > part of ovsdb_monitor_table . At any rate. this function can use > some more comments. > It can be simplified. After cond_update we flush the monitor sessions so we can be sure that no new insertions were made to all_changes list. Therefore it is not necessary at all and we can get all rows only at calculating update2 message on ovsdb_monitor_get_update(). Will make this modification and add comments that clarify the design and assumptions. > ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon, > struct ovsdb_jsonrpc_monitor *jsonrpc_monitor) > @@ -1367,7 +1573,12 @@ ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon) > hmap_remove(&mt->changes, &changes->hmap_node); > ovsdb_monitor_changes_destroy(changes); > } > + HMAP_FOR_EACH_SAFE (changes, next, hmap_node, &mt->all) { > + hmap_remove(&mt->changes, &changes->hmap_node); > + ovsdb_monitor_changes_destroy(changes); > + } > hmap_destroy(&mt->changes); > + hmap_destroy(&mt->all); > free(mt->columns); > free(mt->columns_index_map); > free(mt); > diff --git a/ovsdb/monitor.h b/ovsdb/monitor.h > index 0529e5a..935c65f 100644 > --- a/ovsdb/monitor.h > +++ b/ovsdb/monitor.h > @@ -57,6 +57,10 @@ void ovsdb_monitor_remove_jsonrpc_monitor(struct > ovsdb_monitor *dbmon, > void ovsdb_monitor_add_table(struct ovsdb_monitor *m, > const struct ovsdb_table *table); > > +bool > +ovsdb_monitor_table_exists(struct ovsdb_monitor *m, > + const struct ovsdb_table *table); > + > void ovsdb_monitor_add_column(struct ovsdb_monitor *dbmon, > const struct ovsdb_table *table, > const struct ovsdb_column *column, > @@ -70,12 +74,12 @@ const char * OVS_WARN_UNUSED_RESULT > ovsdb_monitor_table_check_duplicates(struct ovsdb_monitor *, > const struct ovsdb_table *); > > -struct json *ovsdb_monitor_get_update( > - struct ovsdb_monitor *dbmon, > - bool initial, > - uint64_t *unflushed_transaction, > - const struct ovsdb_monitor_session_condition *condition, > - enum ovsdb_monitor_version version); > +struct json *ovsdb_monitor_get_update(struct ovsdb_monitor *dbmon, > + bool initial, > + bool all_rows, > + uint64_t *unflushed_transaction, > + const struct > ovsdb_monitor_session_condition *condition, > + enum ovsdb_monitor_version version); > > void ovsdb_monitor_table_add_select(struct ovsdb_monitor *dbmon, > const struct ovsdb_table *table, > @@ -105,8 +109,18 @@ ovsdb_monitor_table_condition_add( > const struct ovsdb_table *table, > const struct json *json_cnd); > > +void ovsdb_monitor_get_all_rows(const struct ovsdb_monitor *dbmon, > + uint64_t unflushed); > + > void ovsdb_monitor_session_condition_bind( > const struct ovsdb_monitor_session_condition *, > const struct ovsdb_monitor *); > > +struct ovsdb_error * > +ovsdb_monitor_table_condition_update( > + struct ovsdb_monitor *dbmon, > + struct ovsdb_monitor_session_condition *condition, > + const struct ovsdb_table *table, > + const struct json *cond_json); > + > #endif > -- > 2.1.4 > > > _______________________________________________ > dev mailing list > dev@openvswitch.org > http://openvswitch.org/mailman/listinfo/dev _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev