Maintain the minimal transaction ID per "==" table. Run lazy cleanup while going over the monitor rows tracked by clauses remove all rows that have a lower transaction than the minimal table transaction ID.
Signed-off-by: Liran Schour <lir...@il.ibm.com> --- ovsdb/monitor.c | 102 ++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 96 insertions(+), 6 deletions(-) diff --git a/ovsdb/monitor.c b/ovsdb/monitor.c index 46d2d13..929b29e 100644 --- a/ovsdb/monitor.c +++ b/ovsdb/monitor.c @@ -148,6 +148,12 @@ struct ovsdb_monitor_changes { hmap. */ }; +/* Track of tracked txn_ids */ +struct ovsdb_monitor_txn_id { + int n_refs; + uint64_t transaction; +}; + /* A particular table being monitored. */ struct ovsdb_monitor_table { const struct ovsdb_table *table; @@ -173,6 +179,11 @@ struct ovsdb_monitor_table { /* Contains 'ovsdb_monitor_changes' indexed by 'transaction'. */ struct hmap changes; bool track_clauses; + /* Holds tracked txn_ids, first element is the minimal txn_id + * that is being tracked */ + struct ovsdb_monitor_txn_id *txn_ids; + size_t allocated_txn_ids; + size_t n_txn_ids; /* Contains ovsdb_tracked_clauses * */ struct shash trk_clauses; }; @@ -769,17 +780,74 @@ ovsdb_monitor_table_find_clause_changes( return changes; } +static int +compare_txn_ids(const void *a_, const void *b_) +{ + const struct ovsdb_monitor_txn_id *a = a_; + const struct ovsdb_monitor_txn_id *b = b_; + + return a->transaction == b->transaction ? 0 : + a->transaction < b->transaction ? -1 : 1; +} + +/* Maintain minimal unflushed transaction by a sorted list. First element in + * the list is the lowest unflushed transaction id. */ +static void +ovsdb_monitor_table_track(struct ovsdb_monitor_table *mt, uint64_t transaction) +{ + struct ovsdb_monitor_txn_id *txn_id; + int i; + + for (i = mt->n_txn_ids - 1; i >= 0; i--) { + txn_id = &mt->txn_ids[i]; + if (txn_id->transaction == transaction) { + txn_id->n_refs++; + return; + } + } + + if (mt->n_txn_ids >= mt->allocated_txn_ids) { + mt->txn_ids = x2nrealloc(mt->txn_ids, &mt->allocated_txn_ids, + sizeof *mt->txn_ids); + } + + txn_id = &mt->txn_ids[mt->n_txn_ids++]; + txn_id->n_refs = 1; + txn_id->transaction = transaction; + + qsort(mt->txn_ids, mt->n_txn_ids, sizeof *mt->txn_ids, + compare_txn_ids); +} + +/* Maintain sorted list of tracked transactions. First element in the list + * is the lowest unflushed transaction id. */ +static void +ovsdb_monitor_table_untrack(struct ovsdb_monitor_table *mt, uint64_t transaction) +{ + int i; + + for(i = 0; i < mt->n_txn_ids; i++) { + struct ovsdb_monitor_txn_id *txn_id = &mt->txn_ids[i]; + if (txn_id->transaction == transaction) { + if (--txn_id->n_refs == 0) { + txn_id->transaction = ULONG_LONG_MAX; + qsort(mt->txn_ids, mt->n_txn_ids, sizeof *mt->txn_ids, + compare_txn_ids); + mt->n_txn_ids--; + } + return; + } + } + OVS_NOT_REACHED(); +} + static void ovsdb_monitor_clauses_track_update(const struct ovsdb_row *old, const struct ovsdb_row *new, struct ovsdb_monitor_table *mt) { - //struct ovsdb_monitor_txn_id *trk; - /* First element in list is minimal tracked transaction */ - //INIT_CONTAINER(trk, &mt->tracked_txns, node); struct shash_node *node; struct ovsdb_monitor_changes *changes; - //uint64_t unflushed = trk->txn_id; /* Insert row to tracked columns */ SHASH_FOR_EACH(node, &mt->trk_clauses) { @@ -1406,6 +1474,13 @@ ovsdb_monitor_compose_update( ovsdb_monitor_row_set_old(row, transaction); if (row->transaction < transaction) { + uint64_t lowest_txn_id = mt->txn_ids[0].transaction; + + /* Check for lazy cleanup */ + if (row->transaction < lowest_txn_id) { + hmap_remove(&changes->rows, &row->hmap_node); + ovsdb_monitor_row_destroy(mt, row); + } continue; } if (uuid_exists(&row_uuids, &row->uuid)) { @@ -1452,7 +1527,8 @@ ovsdb_monitor_compose_update( static struct json* ovsdb_monitor_compose_all_rows_update( struct ovsdb_monitor *dbmon, - struct ovsdb_monitor_session_condition *condition) + struct ovsdb_monitor_session_condition *condition, + uint64_t next_txn) { struct shash_node *node; struct json *json = NULL; @@ -1477,6 +1553,9 @@ ovsdb_monitor_compose_all_rows_update( } } ovsdb_monitor_table_condition_updated(mt, condition); + if (mt->track_clauses) { + ovsdb_monitor_table_track(mt, next_txn); + } } free(changed); @@ -1526,7 +1605,7 @@ ovsdb_monitor_get_update( } } else { json = - ovsdb_monitor_compose_all_rows_update(dbmon, condition); + ovsdb_monitor_compose_all_rows_update(dbmon, condition, next_txn); } } @@ -1536,7 +1615,13 @@ ovsdb_monitor_get_update( ovsdb_monitor_table_untrack_changes(mt, prev_txn); ovsdb_monitor_table_track_changes(mt, next_txn); + if (mt->track_clauses) { + /* We are tracking columns on this table */ + ovsdb_monitor_table_untrack(mt, prev_txn); + ovsdb_monitor_table_track(mt, next_txn); + } } + *unflushed = next_txn; return json; @@ -1780,6 +1865,9 @@ ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon) changes->n_refs++; } } + if (mt->track_clauses) { + ovsdb_monitor_table_track(mt, 0); + } } } @@ -1933,6 +2021,8 @@ ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon) ovsdb_monitor_changes_destroy(changes); } hmap_destroy(&mt->changes); + + free(mt->txn_ids); free(mt->columns); free(mt->columns_index_map); free(mt); -- 2.1.4 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev