ovsdb-server now accepts "monitor_cond_update" request. On conditions update we insert all rows of table in a new changes list - OVSDB_MONITOR_ALL that are being indexed by the transaction-id at the moment of insertion. JSON cache is being used only for empty condition monitor sessions. Sees ovsdb-server (1) man page for details of monitor_cond_update.
Signed-off-by: Liran Schour <lir...@il.ibm.com> --- v2->v3: * ovsdb_monitor_table_condition_update() accepts only single json condition * Allow non-monitored columns in cond_update. * Flush monitor session after monitor_cond_update to guarantee empty changes list * Bug fix: use json cache when all condition are empty * Simplify inserting row change to changes lists --- ovsdb/jsonrpc-server.c | 154 +++++++++++++++++++++++++-- ovsdb/monitor.c | 283 ++++++++++++++++++++++++++++++++++++++++++------- ovsdb/monitor.h | 26 +++-- 3 files changed, 415 insertions(+), 48 deletions(-) diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c index cd6a70a..15dc406 100644 --- a/ovsdb/jsonrpc-server.c +++ b/ovsdb/jsonrpc-server.c @@ -87,6 +87,10 @@ static void ovsdb_jsonrpc_trigger_complete_done( static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_create( struct ovsdb_jsonrpc_session *, struct ovsdb *, struct json *params, enum ovsdb_monitor_version, const struct json *request_id); +static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cond_update( + struct ovsdb_jsonrpc_session *s, + struct json *params, + const struct json *request_id); static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel( struct ovsdb_jsonrpc_session *, struct json_array *params, @@ -407,7 +411,8 @@ static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *); static void ovsdb_jsonrpc_session_get_memory_usage( const struct ovsdb_jsonrpc_session *, struct simap *usage); static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *, - struct jsonrpc_msg *); + struct jsonrpc_msg *, + bool *); static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *, struct jsonrpc_msg *); @@ -463,13 +468,14 @@ ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s) if (!jsonrpc_session_get_backlog(s->js)) { struct jsonrpc_msg *msg; + bool needs_flush = false; ovsdb_jsonrpc_monitor_flush_all(s); msg = jsonrpc_session_recv(s->js); if (msg) { if (msg->type == JSONRPC_REQUEST) { - ovsdb_jsonrpc_session_got_request(s, msg); + ovsdb_jsonrpc_session_got_request(s, msg, &needs_flush); } else if (msg->type == JSONRPC_NOTIFY) { ovsdb_jsonrpc_session_got_notify(s, msg); } else { @@ -480,6 +486,9 @@ ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s) jsonrpc_msg_destroy(msg); } } + if (needs_flush) { + ovsdb_jsonrpc_monitor_flush_all(s); + } } return jsonrpc_session_is_alive(s->js) ? 0 : ETIMEDOUT; } @@ -840,10 +849,12 @@ execute_transaction(struct ovsdb_jsonrpc_session *s, struct ovsdb *db, static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s, - struct jsonrpc_msg *request) + struct jsonrpc_msg *request, + bool *needs_flush) { struct jsonrpc_msg *reply; + *needs_flush = false; if (!strcmp(request->method, "transact")) { struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply); if (!reply) { @@ -861,6 +872,10 @@ ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s, version, request->id); } + } else if (!strcmp(request->method, "monitor_cond_update")) { + reply = ovsdb_jsonrpc_monitor_cond_update(s, request->params, + request->id); + *needs_flush = true; } else if (!strcmp(request->method, "monitor_cancel")) { reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params), request->id); @@ -1050,6 +1065,8 @@ struct ovsdb_jsonrpc_monitor { struct ovsdb_monitor *dbmon; uint64_t unflushed; /* The first transaction that has not been flushed to the jsonrpc remote client. */ + bool all_rows; /* Indicates if in the next flush we request all + rows (due to a condition change) */ enum ovsdb_monitor_version version; struct ovsdb_monitor_session_condition *condition;/* Session's condition */ }; @@ -1213,6 +1230,7 @@ ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db, m->condition = ovsdb_monitor_session_condition_create(); } m->unflushed = 0; + m->all_rows = false; m->version = version; hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0)); m->monitor_id = json_clone(monitor_id); @@ -1294,6 +1312,124 @@ error: return jsonrpc_create_error(json, request_id); } +static struct ovsdb_error * +ovsdb_jsonrpc_parse_monitor_cond_update_request( + struct ovsdb_jsonrpc_monitor *m, + const struct ovsdb_table *table, + const struct json *cond_update_req) +{ + const struct ovsdb_table_schema *ts = table->schema; + const struct json *condition, *columns; + struct ovsdb_parser parser; + struct ovsdb_error *error; + + ovsdb_parser_init(&parser, cond_update_req, "table %s", ts->name); + columns = ovsdb_parser_member(&parser, "columns", OP_ARRAY | OP_OPTIONAL); + condition = ovsdb_parser_member(&parser, "where", OP_ARRAY | OP_OPTIONAL); + + error = ovsdb_parser_finish(&parser); + if (error) { + return error; + } + + if (columns) { + error = ovsdb_syntax_error(cond_update_req, NULL, "changing columns " + "is unsupported"); + return error; + } + error = ovsdb_monitor_table_condition_update(m->dbmon, m->condition, table, + condition); + + return error; +} + +static struct jsonrpc_msg * +ovsdb_jsonrpc_monitor_cond_update(struct ovsdb_jsonrpc_session *s, + struct json *params, + const struct json *request_id) +{ + struct ovsdb_error *error; + struct ovsdb_jsonrpc_monitor *m; + struct json *monitor_cond_update_reqs; + struct shash_node *node; + struct json *json; + + if (json_array(params)->n != 3) { + error = ovsdb_syntax_error(params, NULL, "invalid parameters"); + goto error; + } + + m = ovsdb_jsonrpc_monitor_find(s, params->u.array.elems[0]); + if (!m) { + error = ovsdb_syntax_error(request_id, NULL, + "unknown monitor session"); + goto error; + } + + monitor_cond_update_reqs = params->u.array.elems[2]; + if (monitor_cond_update_reqs->type != JSON_OBJECT) { + error = + ovsdb_syntax_error(NULL, NULL, + "monitor-cond-change-requests must be object"); + goto error; + } + + SHASH_FOR_EACH (node, json_object(monitor_cond_update_reqs)) { + const struct ovsdb_table *table; + const struct json *mr_value; + size_t i; + + table = ovsdb_get_table(m->db, node->name); + if (!table) { + error = ovsdb_syntax_error(NULL, NULL, + "no table named %s", node->name); + goto error; + } + if (!ovsdb_monitor_table_exists(m->dbmon, table)) { + error = ovsdb_syntax_error(NULL, NULL, + "no table named %s in monitor session", + node->name); + goto error; + } + + mr_value = node->data; + if (mr_value->type == JSON_ARRAY) { + const struct json_array *array = &mr_value->u.array; + + for (i = 0; i < array->n; i++) { + error = ovsdb_jsonrpc_parse_monitor_cond_update_request( + m, table, array->elems[i]); + if (error) { + goto error; + } + } + } else { + error = ovsdb_syntax_error( + NULL, NULL, + "table %s no monitor-cond-change JSON array", + node->name); + goto error; + } + } + + ovsdb_monitor_get_all_rows(m->dbmon, m->unflushed); + m->all_rows = true; + + /* Change monitor id */ + hmap_remove(&s->monitors, &m->node); + json_destroy(m->monitor_id); + m->monitor_id = json_clone(params->u.array.elems[1]); + hmap_insert(&s->monitors, &m->node, json_hash(m->monitor_id, 0)); + + return jsonrpc_create_reply(json_object_create(), request_id); + +error: + + json = ovsdb_error_to_json(error); + ovsdb_error_destroy(error); + return jsonrpc_create_error(json, request_id); +} + static struct jsonrpc_msg * ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s, struct json_array *params, @@ -1330,8 +1466,14 @@ static struct json * ovsdb_jsonrpc_monitor_compose_update(struct ovsdb_jsonrpc_monitor *m, bool initial) { - return ovsdb_monitor_get_update(m->dbmon, initial, &m->unflushed, - m->condition, m->version); + struct json * json = ovsdb_monitor_get_update(m->dbmon, initial, + m->all_rows, + &m->unflushed, + m->condition, + m->version); + + m->all_rows = false; + return json; } static bool @@ -1340,7 +1482,7 @@ ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *s) struct ovsdb_jsonrpc_monitor *m; HMAP_FOR_EACH (m, node, &s->monitors) { - if (ovsdb_monitor_needs_flush(m->dbmon, m->unflushed)) { + if (m->all_rows || ovsdb_monitor_needs_flush(m->dbmon, m->unflushed)) { return true; } } diff --git a/ovsdb/monitor.c b/ovsdb/monitor.c index 1614d67..d087a5a 100644 --- a/ovsdb/monitor.c +++ b/ovsdb/monitor.c @@ -120,6 +120,11 @@ struct ovsdb_monitor_changes { hmap. */ }; +enum ovsdb_monitor_changes_type { + OVSDB_MONITOR_CHANGES, + OVSDB_MONITOR_ALL +}; + /* A particular table being monitored. */ struct ovsdb_monitor_table { const struct ovsdb_table *table; @@ -142,6 +147,9 @@ struct ovsdb_monitor_table { /* Contains 'ovsdb_monitor_changes' indexed by 'transaction'. */ struct hmap changes; + /* Contains 'ovsdb_monitor_changes' of all rows in table at transaction + point in time. indexed by 'transaction'. */ + struct hmap all; }; typedef struct json * @@ -153,12 +161,15 @@ typedef struct json * static void ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon); static struct ovsdb_monitor_changes * ovsdb_monitor_table_add_changes( - struct ovsdb_monitor_table *mt, uint64_t next_txn); + struct ovsdb_monitor_table *mt, enum ovsdb_monitor_changes_type type, + uint64_t next_txn); static struct ovsdb_monitor_changes *ovsdb_monitor_table_find_changes( - struct ovsdb_monitor_table *mt, uint64_t unflushed); + struct ovsdb_monitor_table *mt, enum ovsdb_monitor_changes_type type, + uint64_t unflushed); static void ovsdb_monitor_changes_destroy( struct ovsdb_monitor_changes *changes); static void ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt, + enum ovsdb_monitor_changes_type type, uint64_t unflushed); static uint32_t @@ -262,8 +273,8 @@ ovsdb_monitor_changes_row_find(const struct ovsdb_monitor_changes *changes, * * If 'row' is NULL, returns NULL. */ static struct ovsdb_datum * -clone_monitor_row_data(const struct ovsdb_monitor_table *mt, - const struct ovsdb_row *row) +clone_monitor_ovsdb_row_data(const struct ovsdb_monitor_table *mt, + const struct ovsdb_row *row) { struct ovsdb_datum *data; size_t i; @@ -284,6 +295,44 @@ clone_monitor_row_data(const struct ovsdb_monitor_table *mt, return data; } +/* Allocates an array of 'mt->n_columns' ovsdb_datums and initializes them as + * copies of the data in 'fields' drawn from the columns represented by + * mt->columns[]. Returns the array. + * + * If 'row' is NULL, returns NULL. */ +static struct ovsdb_datum * +clone_monitor_row_data(const struct ovsdb_monitor_table *mt, + const struct ovsdb_datum *fields) +{ + struct ovsdb_datum *data; + size_t i; + + if (!fields) { + return NULL; + } + + data = xmalloc(mt->n_columns * sizeof *data); + for (i = 0; i < mt->n_columns; i++) { + const struct ovsdb_column *c = mt->columns[i].column; + const struct ovsdb_datum *src = &fields[c->index]; + struct ovsdb_datum *dst = &data[i]; + const struct ovsdb_type *type = &c->type; + + ovsdb_datum_clone(dst, src, type); + } + return data; +} + +static void +clone_monitor_row(const struct ovsdb_monitor_table *mt, + struct ovsdb_monitor_row *to, + const struct ovsdb_monitor_row *from) +{ + to->uuid = from->uuid; + to->old = clone_monitor_row_data(mt, from->old); + to->new = clone_monitor_row_data(mt,from->new); +} + /* Replaces the mt->n_columns ovsdb_datums in row[] by copies of the data from * in 'row' drawn from the columns represented by mt->columns[]. */ static void @@ -393,6 +442,7 @@ ovsdb_monitor_add_table(struct ovsdb_monitor *m, mt->dbmon = m; shash_add(&m->tables, table->schema->name, mt); hmap_init(&mt->changes); + hmap_init(&mt->all); mt->columns_index_map = xmalloc(sizeof(unsigned int) * shash_count(&table->schema->columns)); for (i = 0; i < shash_count(&table->schema->columns); i++) { @@ -472,6 +522,13 @@ ovsdb_monitor_add_all_condition_columns( } } +bool +ovsdb_monitor_table_exists(struct ovsdb_monitor *m, + const struct ovsdb_table *table) +{ + return shash_find_data(&m->tables, table->schema->name); +} + /* Check for duplicated column names. Return the first * duplicated column's name if found. Otherwise return * NULL. */ @@ -499,9 +556,12 @@ ovsdb_monitor_table_check_duplicates(struct ovsdb_monitor *m, static struct ovsdb_monitor_changes * ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt, + enum ovsdb_monitor_changes_type type, uint64_t next_txn) { struct ovsdb_monitor_changes *changes; + struct hmap *changes_hmap = + type == OVSDB_MONITOR_CHANGES ? &mt->changes : &mt->all; changes = xzalloc(sizeof *changes); @@ -509,19 +569,22 @@ ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt, changes->mt = mt; changes->n_refs = 1; hmap_init(&changes->rows); - hmap_insert(&mt->changes, &changes->hmap_node, hash_uint64(next_txn)); + hmap_insert(changes_hmap, &changes->hmap_node, hash_uint64(next_txn)); return changes; }; static struct ovsdb_monitor_changes * ovsdb_monitor_table_find_changes(struct ovsdb_monitor_table *mt, + enum ovsdb_monitor_changes_type type, uint64_t transaction) { struct ovsdb_monitor_changes *changes; + struct hmap *changes_hmap = + type == OVSDB_MONITOR_CHANGES ? &mt->changes : &mt->all; size_t hash = hash_uint64(transaction); - HMAP_FOR_EACH_WITH_HASH(changes, hmap_node, hash, &mt->changes) { + HMAP_FOR_EACH_WITH_HASH(changes, hmap_node, hash, changes_hmap) { if (changes->transaction == transaction) { return changes; } @@ -533,13 +596,16 @@ ovsdb_monitor_table_find_changes(struct ovsdb_monitor_table *mt, /* Stop currently tracking changes to table 'mt' since 'transaction'. */ static void ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table *mt, + enum ovsdb_monitor_changes_type type, uint64_t transaction) { + struct hmap *changes_hmap = + type == OVSDB_MONITOR_CHANGES ? &mt->changes : &mt->all; struct ovsdb_monitor_changes *changes = - ovsdb_monitor_table_find_changes(mt, transaction); + ovsdb_monitor_table_find_changes(mt, type, transaction); if (changes) { if (--changes->n_refs == 0) { - hmap_remove(&mt->changes, &changes->hmap_node); + hmap_remove(changes_hmap, &changes->hmap_node); ovsdb_monitor_changes_destroy(changes); } } @@ -549,15 +615,16 @@ ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table *mt, */ static void ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt, + enum ovsdb_monitor_changes_type type, uint64_t transaction) { struct ovsdb_monitor_changes *changes; - changes = ovsdb_monitor_table_find_changes(mt, transaction); + changes = ovsdb_monitor_table_find_changes(mt, type, transaction); if (changes) { changes->n_refs++; } else { - ovsdb_monitor_table_add_changes(mt, transaction); + ovsdb_monitor_table_add_changes(mt, type, transaction); } } @@ -674,6 +741,46 @@ ovsdb_monitor_get_table_conditions( return true; } +struct ovsdb_error * +ovsdb_monitor_table_condition_update( + struct ovsdb_monitor *dbmon, + struct ovsdb_monitor_session_condition *condition, + const struct ovsdb_table *table, + const struct json *cond_json) +{ + struct ovsdb_monitor_table_condition *mtc = + shash_find_data(&condition->tables, table->schema->name); + struct ovsdb_error *error; + struct ovsdb_condition cond = OVSDB_CONDITION_INITIALIZER; + bool empty = ovsdb_condition_empty(&mtc->new_condition); + + if (!condition) { + return NULL; + } + + error = ovsdb_condition_from_json(table->schema, cond_json, + NULL, &cond); + if (error) { + return error; + } + + ovsdb_condition_destroy(&mtc->new_condition); + ovsdb_condition_clone(&mtc->new_condition, &cond); + + if (empty && !ovsdb_condition_empty(&mtc->new_condition)) { + condition->n_empty_cnd--; + } + if (!empty && ovsdb_condition_empty(&mtc->new_condition)) { + condition->n_empty_cnd++; + } + + ovsdb_monitor_condition_add_columns(dbmon, + table, + &mtc->new_condition); + + return NULL; +} + static enum ovsdb_monitor_selection ovsdb_monitor_row_update_type_condition( const struct ovsdb_monitor_table *mt, @@ -827,7 +934,7 @@ ovsdb_monitor_compose_row_update( * for 'row' within * 'mt', or NULL if no row update should be sent. * * The caller should specify 'initial' as true if the returned JSON is - * going to be used as part of the initial reply to a "monitor2" request, + * going to be used as part of the initial reply to a "monitor_cond" request, * false if it is going to be used as part of an "update2" notification. * * 'changed' must be a scratch buffer for internal use that is at least @@ -916,7 +1023,7 @@ ovsdb_monitor_max_columns(struct ovsdb_monitor *dbmon) static struct json* ovsdb_monitor_compose_update( struct ovsdb_monitor *dbmon, - bool initial, uint64_t transaction, + bool initial, bool all_rows, uint64_t transaction, const struct ovsdb_monitor_session_condition *condition, compose_row_update_cb_func row_update) { @@ -932,7 +1039,16 @@ ovsdb_monitor_compose_update( struct ovsdb_monitor_changes *changes; struct json *table_json = NULL; - changes = ovsdb_monitor_table_find_changes(mt, transaction); + if (!all_rows) { + changes = ovsdb_monitor_table_find_changes(mt, + OVSDB_MONITOR_CHANGES, + transaction); + } else { + /* Get changes that includes all rows from all_txn point in time */ + changes = ovsdb_monitor_table_find_changes(mt, + OVSDB_MONITOR_ALL, + transaction); + } if (!changes) { continue; } @@ -968,7 +1084,8 @@ ovsdb_monitor_compose_update( /* Returns JSON for a <table-updates> object (as described in RFC 7047) * for all the outstanding changes within 'monitor' that starts from - * '*unflushed' transaction id. + * '*unflushed'. + * If all_rows is true all_rows in the db that match conditions will be sent. * * The caller should specify 'initial' as true if the returned JSON is going to * be used as part of the initial reply to a "monitor" request, false if it is @@ -976,7 +1093,8 @@ ovsdb_monitor_compose_update( struct json * ovsdb_monitor_get_update( struct ovsdb_monitor *dbmon, - bool initial, uint64_t *unflushed, + bool initial, bool all_rows, + uint64_t *unflushed, const struct ovsdb_monitor_session_condition *condition, enum ovsdb_monitor_version version) { @@ -988,7 +1106,7 @@ ovsdb_monitor_get_update( /* Return a clone of cached json if one exists. Otherwise, * generate a new one and add it to the cache. */ - if (!condition || (condition && ovsdb_can_cache(condition))) { + if (!condition || (!all_rows && condition && ovsdb_can_cache(condition))) { cache_node = ovsdb_monitor_json_cache_search(dbmon, version, prev_txn); } if (cache_node) { @@ -996,17 +1114,19 @@ ovsdb_monitor_get_update( } else { if (version == OVSDB_MONITOR_V1) { json = - ovsdb_monitor_compose_update(dbmon, initial, prev_txn, - condition, + ovsdb_monitor_compose_update(dbmon, initial, all_rows, + prev_txn, condition, ovsdb_monitor_compose_row_update); } else { ovs_assert(version == OVSDB_MONITOR_V2); json = - ovsdb_monitor_compose_update(dbmon, initial, prev_txn, - condition, + ovsdb_monitor_compose_update(dbmon, initial, all_rows, + prev_txn, condition, ovsdb_monitor_compose_row_update2); } - if (!condition || (condition && ovsdb_can_cache(condition))) { + + if (!condition || + (!all_rows && condition && ovsdb_can_cache(condition))) { ovsdb_monitor_json_cache_insert(dbmon, version, prev_txn, json); } } @@ -1014,9 +1134,28 @@ ovsdb_monitor_get_update( /* Maintain transaction id of 'changes'. */ SHASH_FOR_EACH (node, &dbmon->tables) { struct ovsdb_monitor_table *mt = node->data; + struct ovsdb_condition *old_condition, *new_condition; - ovsdb_monitor_table_untrack_changes(mt, prev_txn); - ovsdb_monitor_table_track_changes(mt, next_txn); + if (!all_rows) { + ovsdb_monitor_table_untrack_changes(mt, + OVSDB_MONITOR_CHANGES, + prev_txn); + } else { + ovsdb_monitor_table_untrack_changes(mt, + OVSDB_MONITOR_ALL, + prev_txn); + } + ovsdb_monitor_table_track_changes(mt, OVSDB_MONITOR_CHANGES, next_txn); + + if (ovsdb_monitor_get_table_conditions(mt, + condition, + &old_condition, + &new_condition)) { + if (ovsdb_condition_cmp(old_condition, new_condition)) { + ovsdb_condition_destroy(old_condition); + ovsdb_condition_clone(old_condition, new_condition); + } + } } *unflushed = next_txn; @@ -1097,8 +1236,8 @@ ovsdb_monitor_changes_update(const struct ovsdb_row *old, change = xzalloc(sizeof *change); hmap_insert(&changes->rows, &change->hmap_node, uuid_hash(uuid)); change->uuid = *uuid; - change->old = clone_monitor_row_data(mt, old); - change->new = clone_monitor_row_data(mt, new); + change->old = clone_monitor_ovsdb_row_data(mt, old); + change->new = clone_monitor_ovsdb_row_data(mt, new); } else { if (new) { update_monitor_row_data(mt, new, change->new); @@ -1115,6 +1254,20 @@ ovsdb_monitor_changes_update(const struct ovsdb_row *old, } } +static void +ovsdb_monitor_changes_clone_insert_row(const struct ovsdb_monitor_row *row, + const struct ovsdb_monitor_table *mt, + struct ovsdb_monitor_changes *changes) +{ + struct ovsdb_monitor_row *change; + + ovs_assert(ovsdb_monitor_changes_row_find(changes, &row->uuid) == NULL); + + change = xzalloc(sizeof *change); + hmap_insert(&changes->rows, &change->hmap_node, uuid_hash(&row->uuid)); + clone_monitor_row(mt, change, row); +} + static bool ovsdb_monitor_columns_changed(const struct ovsdb_monitor_table *mt, const unsigned long int *changed) @@ -1162,6 +1315,8 @@ ovsdb_monitor_change_cb(const struct ovsdb_row *old, struct ovsdb_table *table = new ? new->table : old->table; struct ovsdb_monitor_table *mt; struct ovsdb_monitor_changes *changes; + enum ovsdb_monitor_changes_efficacy efficacy; + enum ovsdb_monitor_selection type; if (!aux->mt || table != aux->mt->table) { aux->mt = shash_find_data(&m->tables, table->schema->name); @@ -1173,16 +1328,17 @@ ovsdb_monitor_change_cb(const struct ovsdb_row *old, } mt = aux->mt; - HMAP_FOR_EACH(changes, hmap_node, &mt->changes) { - enum ovsdb_monitor_changes_efficacy efficacy; - enum ovsdb_monitor_selection type; + type = ovsdb_monitor_row_update_type(false, old, new); + efficacy = ovsdb_monitor_changes_classify(type, mt, changed); - type = ovsdb_monitor_row_update_type(false, old, new); - efficacy = ovsdb_monitor_changes_classify(type, mt, changed); - if (efficacy > OVSDB_CHANGES_NO_EFFECT) { + if (efficacy > OVSDB_CHANGES_NO_EFFECT) { + /* insert row change to changes lists */ + HMAP_FOR_EACH(changes, hmap_node, &mt->changes) { + ovsdb_monitor_changes_update(old, new, mt, changes); + } + HMAP_FOR_EACH(changes, hmap_node, &mt->all) { ovsdb_monitor_changes_update(old, new, mt, changes); } - if (aux->efficacy < efficacy) { aux->efficacy = efficacy; } @@ -1194,10 +1350,8 @@ ovsdb_monitor_change_cb(const struct ovsdb_row *old, void ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon) { - struct ovsdb_monitor_aux aux; struct shash_node *node; - ovsdb_monitor_init_aux(&aux, dbmon); SHASH_FOR_EACH (node, &dbmon->tables) { struct ovsdb_monitor_table *mt = node->data; @@ -1205,9 +1359,14 @@ ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon) struct ovsdb_row *row; struct ovsdb_monitor_changes *changes; - changes = ovsdb_monitor_table_find_changes(mt, 0); + changes = ovsdb_monitor_table_find_changes(mt, + OVSDB_MONITOR_CHANGES, + 0); if (!changes) { - changes = ovsdb_monitor_table_add_changes(mt, 0); + changes = + ovsdb_monitor_table_add_changes(mt, + OVSDB_MONITOR_CHANGES, + 0); HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) { ovsdb_monitor_changes_update(NULL, row, mt, changes); } @@ -1218,6 +1377,53 @@ ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon) } } +/* Record all rows in DB and mark this changes at unflushed tranaction id */ +void +ovsdb_monitor_get_all_rows(const struct ovsdb_monitor *dbmon, + uint64_t unflushed) +{ + struct shash_node *node; + + SHASH_FOR_EACH (node, &dbmon->tables) { + struct ovsdb_monitor_table *mt = node->data; + struct ovsdb_row *new; + struct ovsdb_monitor_changes *changes, *all_changes; + + all_changes = ovsdb_monitor_table_find_changes(mt, + OVSDB_MONITOR_ALL, + unflushed); + if (!all_changes) { + all_changes = ovsdb_monitor_table_add_changes(mt, + OVSDB_MONITOR_ALL, + unflushed); + changes = ovsdb_monitor_table_find_changes(mt, + OVSDB_MONITOR_CHANGES, + unflushed); + HMAP_FOR_EACH (new, hmap_node, &mt->table->rows) { + struct ovsdb_monitor_row *row = NULL; + if (changes) { + /* Check if we have a change record for this row */ + row = ovsdb_monitor_changes_row_find( + changes, + ovsdb_row_get_uuid(new)); + } + if (row) { + ovsdb_monitor_changes_clone_insert_row(row, mt, + all_changes); + } else { + ovsdb_monitor_changes_update(new, new, mt, all_changes); + } + } + } else { + all_changes->n_refs++; + } + + ovsdb_monitor_table_untrack_changes(mt, + OVSDB_MONITOR_CHANGES, + unflushed); + } +} + void ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon, struct ovsdb_jsonrpc_monitor *jsonrpc_monitor) @@ -1367,7 +1573,12 @@ ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon) hmap_remove(&mt->changes, &changes->hmap_node); ovsdb_monitor_changes_destroy(changes); } + HMAP_FOR_EACH_SAFE (changes, next, hmap_node, &mt->all) { + hmap_remove(&mt->changes, &changes->hmap_node); + ovsdb_monitor_changes_destroy(changes); + } hmap_destroy(&mt->changes); + hmap_destroy(&mt->all); free(mt->columns); free(mt->columns_index_map); free(mt); diff --git a/ovsdb/monitor.h b/ovsdb/monitor.h index 0529e5a..935c65f 100644 --- a/ovsdb/monitor.h +++ b/ovsdb/monitor.h @@ -57,6 +57,10 @@ void ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon, void ovsdb_monitor_add_table(struct ovsdb_monitor *m, const struct ovsdb_table *table); +bool +ovsdb_monitor_table_exists(struct ovsdb_monitor *m, + const struct ovsdb_table *table); + void ovsdb_monitor_add_column(struct ovsdb_monitor *dbmon, const struct ovsdb_table *table, const struct ovsdb_column *column, @@ -70,12 +74,12 @@ const char * OVS_WARN_UNUSED_RESULT ovsdb_monitor_table_check_duplicates(struct ovsdb_monitor *, const struct ovsdb_table *); -struct json *ovsdb_monitor_get_update( - struct ovsdb_monitor *dbmon, - bool initial, - uint64_t *unflushed_transaction, - const struct ovsdb_monitor_session_condition *condition, - enum ovsdb_monitor_version version); +struct json *ovsdb_monitor_get_update(struct ovsdb_monitor *dbmon, + bool initial, + bool all_rows, + uint64_t *unflushed_transaction, + const struct ovsdb_monitor_session_condition *condition, + enum ovsdb_monitor_version version); void ovsdb_monitor_table_add_select(struct ovsdb_monitor *dbmon, const struct ovsdb_table *table, @@ -105,8 +109,18 @@ ovsdb_monitor_table_condition_add( const struct ovsdb_table *table, const struct json *json_cnd); +void ovsdb_monitor_get_all_rows(const struct ovsdb_monitor *dbmon, + uint64_t unflushed); + void ovsdb_monitor_session_condition_bind( const struct ovsdb_monitor_session_condition *, const struct ovsdb_monitor *); +struct ovsdb_error * +ovsdb_monitor_table_condition_update( + struct ovsdb_monitor *dbmon, + struct ovsdb_monitor_session_condition *condition, + const struct ovsdb_table *table, + const struct json *cond_json); + #endif -- 2.1.4 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev