Changeset: 7095e1950097 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/7095e1950097
Modified Files:
        gdk/gdk_bbp.c
        sql/storage/bat/bat_storage.c
        sql/storage/objectset.c
        sql/storage/store.c
Branch: Jul2021
Log Message:

add some more locks
fixes for savepoints


diffs (146 lines):

diff --git a/gdk/gdk_bbp.c b/gdk/gdk_bbp.c
--- a/gdk/gdk_bbp.c
+++ b/gdk/gdk_bbp.c
@@ -1928,6 +1928,8 @@ BBPuncacheit(bat i, bool unloaddesc)
        if (BBPcheck(i, "BBPuncacheit")) {
                BAT *b = BBP_desc(i);
 
+               assert(unloaddesc || BBP_refs(i) == 0);
+
                if (b) {
                        if (BBP_cache(i)) {
                                TRC_DEBUG(BAT_, "uncache %d (%s)\n", (int) i, 
BBPname(i));
diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c
--- a/sql/storage/bat/bat_storage.c
+++ b/sql/storage/bat/bat_storage.c
@@ -3052,6 +3052,8 @@ commit_update_col( sql_trans *tr, sql_ch
        if (!commit_ts) { /* rollback */
                sql_delta *d = change->data, *o = ATOMIC_PTR_GET(&c->data);
 
+               if (change->ts && c->t->base.new) /* handled by create col */
+                       return ok;
                if (o != d) {
                        while(o && o->next != d)
                                o = o->next;
@@ -3073,8 +3075,11 @@ commit_update_col( sql_trans *tr, sql_ch
                                delta->next = NULL;
                        }
                }
-               if (ok == LOG_OK && delta == d && oldest == commit_ts)
+               if (ok == LOG_OK && delta == d && oldest == commit_ts) {
+                       lock_column(tr->store, c->base.id);
                        ok = merge_delta(delta);
+                       unlock_column(tr->store, c->base.id);
+               }
        } else if (ok == LOG_OK && tr->parent) /* move delta into older and 
cleanup current save points */
                ATOMIC_PTR_SET(&c->data, savepoint_commit_delta(delta, 
commit_ts));
        return ok;
@@ -3131,6 +3136,8 @@ commit_update_idx( sql_trans *tr, sql_ch
        if (!commit_ts) { /* rollback */
                sql_delta *d = change->data, *o = ATOMIC_PTR_GET(&i->data);
 
+               if (change->ts && i->t->base.new) /* handled by create col */
+                       return ok;
                if (o != d) {
                        while(o && o->next != d)
                                o = o->next;
@@ -3152,8 +3159,11 @@ commit_update_idx( sql_trans *tr, sql_ch
                                delta->next = NULL;
                        }
                }
-               if (ok == LOG_OK && delta == d && oldest == commit_ts)
+               if (ok == LOG_OK && delta == d && oldest == commit_ts) {
+                       lock_column(tr->store, i->base.id);
                        ok = merge_delta(delta);
+                       unlock_column(tr->store, i->base.id);
+               }
        } else if (ok == LOG_OK && tr->parent) /* cleanup older save points */
                ATOMIC_PTR_SET(&i->data, savepoint_commit_delta(delta, 
commit_ts));
        return ok;
@@ -3229,6 +3239,10 @@ commit_update_del( sql_trans *tr, sql_ch
        lock_table(tr->store, t->base.id);
        if (!commit_ts) { /* rollback */
                if (dbat->cs.ts == tr->tid) {
+                       if (change->ts && t->base.new) { /* handled by the 
create table */
+                               unlock_table(tr->store, t->base.id);
+                               return ok;
+                       }
                        storage *d = change->data, *o = 
ATOMIC_PTR_GET(&t->data);
 
                        if (o != d) {
@@ -3272,6 +3286,8 @@ tc_gc_col( sql_store Store, sql_change *
        /* savepoint commit (did it merge ?) */
        if (ATOMIC_PTR_GET(&c->data) != change->data || isTempTable(c->t)) /* 
data is freed by commit */
                return 1;
+       if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older 
stuff on savepoint commits */
+               return 0;
        sql_delta *d = (sql_delta*)change->data;
        if (d->next) {
                if (d->cs.ts > oldest)
@@ -3295,6 +3311,8 @@ tc_gc_idx( sql_store Store, sql_change *
        /* savepoint commit (did it merge ?) */
        if (ATOMIC_PTR_GET(&i->data) != change->data || isTempTable(i->t)) /* 
data is freed by commit */
                return 1;
+       if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older 
stuff on savepoint commits */
+               return 0;
        sql_delta *d = (sql_delta*)change->data;
        if (d->next) {
                if (d->cs.ts > oldest)
@@ -3316,6 +3334,8 @@ tc_gc_del( sql_store Store, sql_change *
        /* savepoint commit (did it merge ?) */
        if (ATOMIC_PTR_GET(&t->data) != change->data || isTempTable(t)) /* data 
is freed by commit */
                return 1;
+       if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older 
stuff on savepoint commits */
+               return 0;
        storage *d = (storage*)change->data;
        if (d->next) {
                if (d->cs.ts > oldest)
diff --git a/sql/storage/objectset.c b/sql/storage/objectset.c
--- a/sql/storage/objectset.c
+++ b/sql/storage/objectset.c
@@ -590,6 +590,8 @@ tc_gc_objectversion(sql_store store, sql
        assert(!change->handled);
        objectversion *ov = (objectversion*)change->data;
 
+       if (oldest && oldest >= TRANSACTION_ID_BASE)
+               return 0;
        int res = os_cleanup( (sqlstore*) store, ov, oldest);
        change->handled = (res)?true:false;
        return res;
@@ -602,7 +604,7 @@ tc_commit_objectversion(sql_trans *tr, s
        if (commit_ts) {
                assert(ov->ts == tr->tid);
                ov->ts = commit_ts;
-               change->committed = true;
+               change->committed = commit_ts < TRANSACTION_ID_BASE ? true: 
false;
                (void)oldest;
        }
        else {
diff --git a/sql/storage/store.c b/sql/storage/store.c
--- a/sql/storage/store.c
+++ b/sql/storage/store.c
@@ -3436,7 +3436,7 @@ sql_trans_commit(sql_trans *tr)
                int min_changes = GDKdebug & FORCEMITOMASK ? 5 : 100000;
                int flush = (tr->logchanges > min_changes && !store->changes);
                /* log changes should only be done if there is something to log 
*/
-               if (tr->logchanges > 0) {
+               if (!tr->parent && tr->logchanges > 0) {
                        ok = store->logger_api.log_tstart(store, flush);
                        /* log */
                        for(node *n=tr->changes->h; n && ok == LOG_OK; n = 
n->next) {
@@ -3455,11 +3455,14 @@ sql_trans_commit(sql_trans *tr)
                } else {
                        store_lock(store);
                        commit_ts = tr->parent ? tr->parent->tid : 
store_timestamp(store);
+                       oldest = tr->parent ? commit_ts : oldest;
+                       if (tr->parent)
+                               tr->parent->logchanges += tr->logchanges;
                }
                tr->logchanges = 0;
                TRC_DEBUG(SQL_STORE, "Forwarding changes (" ULLFMT ", " ULLFMT 
") -> " ULLFMT "\n", tr->tid, tr->ts, commit_ts);
                /* apply committed changes */
-               if (ATOMIC_GET(&store->nr_active) == 1) {
+               if (ATOMIC_GET(&store->nr_active) == 1 && !tr->parent) {
                        oldest = commit_ts;
                        store_pending_changes(store, oldest);
                }
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to