Changeset: 85c1d1a898f1 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/85c1d1a898f1
Modified Files:
        sql/storage/bat/bat_storage.c
        sql/storage/store.c
Branch: Jul2021
Log Message:

use column id based locks for appends
only start final cleanup after all transactions finished
small segment cleanup inprovements


diffs (119 lines):

diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c
--- a/sql/storage/bat/bat_storage.c
+++ b/sql/storage/bat/bat_storage.c
@@ -71,6 +71,9 @@ unlock_table(sqlstore *store, sqlid id)
        MT_lock_unset(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
 }
 
+#define lock_column(store, id) lock_table(store, id)
+#define unlock_column(store, id) unlock_table(store, id)
+
 static int
 tc_gc_seg( sql_store Store, sql_change *change, ulng commit_ts, ulng oldest)
 {
@@ -243,7 +246,10 @@ merge_segments(segments *segs, sql_trans
                                seg->next = cur->next;
                                if (cur == segs->t)
                                        segs->t = seg;
-                               mark4destroy(cur, change, commit_ts);
+                               if (commit_ts == oldest)
+                                       _DELETE(cur);
+                               else
+                                       mark4destroy(cur, change, commit_ts);
                                cur = seg;
                        } else {
                                seg = cur; /* begin of new merge */
@@ -1219,11 +1225,11 @@ dup_storage( sql_trans *tr, storage *oba
 }
 
 static int
-append_col_execute(sql_trans *tr, sql_delta *delta, sql_table *table, size_t 
offset, void *incoming_data, bool is_bat, size_t cnt)
+append_col_execute(sql_trans *tr, sql_delta *delta, sqlid id, size_t offset, 
void *incoming_data, bool is_bat, size_t cnt)
 {
        int ok = LOG_OK;
 
-       lock_table(tr->store, table->base.id);
+       lock_column(tr->store, id);
        if (is_bat) {
                BAT *bat = incoming_data;
 
@@ -1232,7 +1238,7 @@ append_col_execute(sql_trans *tr, sql_de
        } else {
                ok = delta_append_val(delta, offset, incoming_data, cnt);
        }
-       unlock_table(tr->store, table->base.id);
+       unlock_column(tr->store, id);
        return ok;
 }
 
@@ -1250,7 +1256,7 @@ append_col(sql_trans *tr, sql_column *c,
        if ((!inTransaction(tr, c->t) && (odelta != delta || !in_transaction || 
isTempTable(c->t)) && isGlobal(c->t)) || (!isNew(c->t) && isLocalTemp(c->t)))
                trans_add(tr, &c->base, delta, &tc_gc_col, &commit_update_col, 
isLocalTemp(c->t)?NULL:&log_update_col);
 
-       return append_col_execute(tr, delta, c->t, offset, i, tpe == TYPE_bat, 
cnt);
+       return append_col_execute(tr, delta, c->base.id, offset, i, tpe == 
TYPE_bat, cnt);
 }
 
 static int
@@ -1267,7 +1273,7 @@ append_idx(sql_trans *tr, sql_idx * i, s
        if ((!inTransaction(tr, i->t) && (odelta != delta || !in_transaction || 
isTempTable(i->t)) && isGlobal(i->t)) || (!isNew(i->t) && isLocalTemp(i->t)))
                trans_add(tr, &i->base, delta, &tc_gc_idx, &commit_update_idx, 
isLocalTemp(i->t)?NULL:&log_update_idx);
 
-       return append_col_execute(tr, delta, i->t, offset, data, tpe == 
TYPE_bat, cnt);
+       return append_col_execute(tr, delta, i->base.id, offset, data, tpe == 
TYPE_bat, cnt);
 }
 
 static int
@@ -2084,7 +2090,7 @@ commit_create_del( sql_trans *tr, sql_ch
                return ok;
        if(!isTempTable(t)) {
                storage *dbat = ATOMIC_PTR_GET(&t->data);
-               merge_segments(dbat->segs, tr, change, commit_ts, oldest);
+               merge_segments(dbat->segs, tr, change, commit_ts, commit_ts/* 
create is we are alone */ /*oldest*/);
                assert(dbat->cs.ts == tr->tid);
                dbat->cs.ts = commit_ts;
                if (ok == LOG_OK) {
diff --git a/sql/storage/store.c b/sql/storage/store.c
--- a/sql/storage/store.c
+++ b/sql/storage/store.c
@@ -1930,16 +1930,26 @@ store_exit(sqlstore *store)
        TRC_DEBUG(SQL_STORE, "Store locked\n");
 
        if (store->cat) {
-               MT_lock_unset(&store->flush);
-               MT_lock_unset(&store->lock);
+               while (ATOMIC_GET(&store->nr_active) > 0) {
+                       const int sleeptime = 100;
+                       MT_lock_unset(&store->flush);
+                       MT_lock_unset(&store->lock);
+                       MT_sleep_ms(sleeptime);
+                       MT_lock_set(&store->lock);
+                       MT_lock_set(&store->flush);
+               }
                if (store->changes) {
                        ulng oldest = store_timestamp(store)+1;
                        for(node *n=store->changes->h; n; n = n->next) {
                                sql_change *c = n->data;
 
-                               if (c->cleanup && !c->cleanup(store, c, oldest, 
oldest))
-                                       assert(0);
-                               else
+                               if (c->cleanup && !c->cleanup(store, c, oldest, 
oldest)) {
+                                       /* try again with newer oldest, should 
cleanup any pending issues */
+                                       if (!c->cleanup(store, c, oldest+1, 
oldest+1))
+                                               printf("not deleted\n");
+                                       else
+                                               _DELETE(c);
+                               } else
                                        _DELETE(c);
                        }
                        list_destroy(store->changes);
@@ -1948,8 +1958,6 @@ store_exit(sqlstore *store)
                os_destroy(store->cat->schemas, store);
                _DELETE(store->cat);
                sequences_exit();
-               MT_lock_set(&store->lock);
-               MT_lock_set(&store->flush);
        }
        store->logger_api.destroy(store);
 
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to