Maintain the minimal clauses tracked transaction ID. Run lazy cleanup - while going over clauses tracked rows, remove all rows that have a lower transaction than the minimal table transaction ID.
Signed-off-by: Liran Schour <lir...@il.ibm.com> --- ovsdb/monitor.c | 99 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 99 insertions(+) diff --git a/ovsdb/monitor.c b/ovsdb/monitor.c index b545cef..aaae483 100644 --- a/ovsdb/monitor.c +++ b/ovsdb/monitor.c @@ -167,6 +167,12 @@ struct ovsdb_monitor_changes { * hmap. */ }; +/* A transaction ID that is tracked by clauses */ +struct ovsdb_monitor_txn_id { + int n_refs; + uint64_t transaction; +}; + /* A particular table being monitored. */ struct ovsdb_monitor_table { const struct ovsdb_table *table; @@ -189,9 +195,16 @@ struct ovsdb_monitor_table { /* Contains "ovsdb_monitor_json_cache_node"s.*/ struct hmap json_cache; + /* Contains 'ovsdb_monitor_changes' indexed by 'transaction'. */ struct hmap changes; bool clauses_tracking; + + /* Maintain minimal transaction ID being tracked by clauses */ + struct ovsdb_monitor_txn_id *txn_ids; + size_t allocated_txn_ids; + size_t n_txn_ids; + /* Contains ovsdb_tracked_clauses * */ struct shash trk_clauses; }; @@ -226,6 +239,8 @@ ovsdb_monitor_changes_update(const struct ovsdb_row *old, uint64_t transaction); static void ovsdb_monitor_row_set_old(struct ovsdb_monitor_row *row, uint64_t transaction); +static void +ovsdb_monitor_table_track(struct ovsdb_monitor_table *mt, uint64_t transaction); static uint32_t json_cache_hash(enum ovsdb_monitor_version version, uint64_t from_txn) @@ -661,6 +676,8 @@ ovsdb_monitor_condition_bind(struct ovsdb_monitor *dbmon, ovsdb_monitor_condition_update_tracking(mtc->mt, &mtc->old_condition, NULL); + /* Track the next unflushed txn */ + ovsdb_monitor_table_track(mt, mt->dbmon->n_transactions + 1); } } } @@ -792,6 +809,67 @@ ovsdb_monitor_table_find_clause_changes( return changes; } +static int +compare_txn_ids(const void *a_, const void *b_) +{ + const struct ovsdb_monitor_txn_id *a = a_; + const struct ovsdb_monitor_txn_id *b = b_; + + return a->transaction == b->transaction ? 0 : + a->transaction < b->transaction ? -1 : 1; +} + +/* Maintain minimal unflushed transaction by a sorted list. First element in + * the list is the lowest unflushed transaction id. */ +static void +ovsdb_monitor_table_track(struct ovsdb_monitor_table *mt, uint64_t transaction) +{ + struct ovsdb_monitor_txn_id *txn_id; + int i; + + for (i = mt->n_txn_ids - 1; i >= 0; i--) { + txn_id = &mt->txn_ids[i]; + if (txn_id->transaction == transaction) { + txn_id->n_refs++; + return; + } + } + + if (mt->n_txn_ids >= mt->allocated_txn_ids) { + mt->txn_ids = x2nrealloc(mt->txn_ids, &mt->allocated_txn_ids, + sizeof *mt->txn_ids); + } + + txn_id = &mt->txn_ids[mt->n_txn_ids++]; + txn_id->n_refs = 1; + txn_id->transaction = transaction; + + qsort(mt->txn_ids, mt->n_txn_ids, sizeof *mt->txn_ids, + compare_txn_ids); +} + +/* Maintain sorted list of tracked transactions. First element in the list + * is the lowest unflushed transaction id. */ +static void +ovsdb_monitor_table_untrack(struct ovsdb_monitor_table *mt, uint64_t transaction) +{ + int i; + + for(i = 0; i < mt->n_txn_ids; i++) { + struct ovsdb_monitor_txn_id *txn_id = &mt->txn_ids[i]; + if (txn_id->transaction == transaction) { + if (--txn_id->n_refs == 0) { + txn_id->transaction = ULONG_LONG_MAX; + qsort(mt->txn_ids, mt->n_txn_ids, sizeof *mt->txn_ids, + compare_txn_ids); + mt->n_txn_ids--; + } + return; + } + } + OVS_NOT_REACHED(); +} + static void ovsdb_monitor_clauses_changes_update(const struct ovsdb_row *old, const struct ovsdb_row *new, @@ -1035,6 +1113,9 @@ ovsdb_monitor_table_condition_updated(struct ovsdb_monitor_table *mt, ovsdb_monitor_condition_update_tracking(mtc->mt, &mtc->old_condition, &mtc->new_condition); + /* Track the next unflushed txn */ + ovsdb_monitor_table_track(mt, mt->dbmon->n_transactions + 1); + if (old_true) { condition->n_true_cnd--; } @@ -1433,6 +1514,14 @@ ovsdb_monitor_compose_update( ovsdb_monitor_row_set_old(row, transaction); if (row->transaction < transaction) { + uint64_t lowest_txn_id = + mt->txn_ids[0].transaction; + + /* Check for lazy cleanup */ + if (row->transaction < lowest_txn_id) { + hmap_remove(&changes->rows, &row->hmap_node); + ovsdb_monitor_row_destroy(mt, row); + } continue; } if (uuid_exists(&row_uuids, &row->uuid)) { @@ -1451,6 +1540,11 @@ ovsdb_monitor_compose_update( } } } + + /* Update tracking of transaction ID */ + ovsdb_monitor_table_untrack(mt, transaction); + ovsdb_monitor_table_track(mt, mt->dbmon->n_transactions + 1); + row_uuids_destroy(&row_uuids); } else { changes = ovsdb_monitor_table_find_changes(mt, transaction); @@ -1820,6 +1914,9 @@ ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon) changes->n_refs++; } } + if (mt->clauses_tracking) { + ovsdb_monitor_table_track(mt, 0); + } } } @@ -1973,6 +2070,8 @@ ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon) ovsdb_monitor_changes_destroy(changes); } hmap_destroy(&mt->changes); + + free(mt->txn_ids); free(mt->columns); free(mt->columns_index_map); free(mt); -- 2.1.4 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev