Changeset: 39dfededbb3d for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/39dfededbb3d
Modified Files:
        gdk/gdk_bbp.c
        gdk/gdk_tm.c
        sql/include/sql_catalog.h
        sql/storage/bat/bat_storage.c
        sql/storage/objectset.c
        sql/storage/sql_storage.h
        sql/storage/store.c
        tools/merovingian/client/monetdb.c
Branch: default
Log Message:

Merge with Jan2022.


diffs (truncated from 653 to 300 lines):

diff --git a/gdk/gdk_bbp.c b/gdk/gdk_bbp.c
--- a/gdk/gdk_bbp.c
+++ b/gdk/gdk_bbp.c
@@ -3337,7 +3337,7 @@ dirty_bat(bat *i, bool subcommit)
 static gdk_return
 file_move(int farmid, const char *srcdir, const char *dstdir, const char 
*name, const char *ext)
 {
-       if (GDKmove(farmid, srcdir, name, ext, dstdir, name, ext, true) == 
GDK_SUCCEED) {
+       if (GDKmove(farmid, srcdir, name, ext, dstdir, name, ext, false) == 
GDK_SUCCEED) {
                return GDK_SUCCEED;
        } else {
                char *path;
@@ -3792,7 +3792,7 @@ BBPsync(int cnt, bat *restrict subcommit
                                break;
                        }
                        if (BBP_status(i) & BBPEXISTING) {
-                               if (b != NULL) {
+                               if (b != NULL && b->batInserted > 0) {
                                        if (BBPbackup(b, subcommit != NULL) != 
GDK_SUCCEED) {
                                                if (lock)
                                                        
MT_lock_unset(&GDKswapLock(i));
@@ -3834,6 +3834,7 @@ BBPsync(int cnt, bat *restrict subcommit
                                assert(sizes == NULL || bi.width == 0 || 
(bi.type == TYPE_msk ? ((size + 31) / 32) * 4 : size << bi.shift) <= bi.hfree);
                                if (size > bi.count) /* includes sizes==NULL */
                                        size = bi.count;
+                               bi.b->batInserted = size;
                                if (b && size != 0) {
                                        /* wait for BBPSAVING so that we
                                         * can set it, wait for
diff --git a/gdk/gdk_tm.c b/gdk/gdk_tm.c
--- a/gdk/gdk_tm.c
+++ b/gdk/gdk_tm.c
@@ -43,31 +43,6 @@
  * situation is COLD-abort: quit the server and restart, so you get
  * the recovered disk images.
  */
-/* in the commit prelude, the delta status in the memory image of all
- * bats is commited */
-static gdk_return
-prelude(int cnt, bat *restrict subcommit, BUN *restrict sizes)
-{
-       int i = 0;
-
-       while (++i < cnt) {
-               bat bid = subcommit ? subcommit[i] : i;
-
-               if (BBP_status(bid) & BBPPERSISTENT) {
-                       BAT *b = BBPquickdesc(bid);
-
-                       if (b) {
-                               MT_lock_set(&b->theaplock);
-                               assert(!isVIEW(b));
-                               assert(b->batRole == PERSISTENT);
-                               assert(sizes == NULL || sizes[i] <= 
BATcount(b));
-                               BATcommit(b, sizes ? sizes[i] : BUN_NONE);
-                               MT_lock_unset(&b->theaplock);
-                       }
-               }
-       }
-       return GDK_SUCCEED;
-}
 
 /* in the commit epilogue, the BBP-status of the bats is changed to
  * reflect their presence in the succeeded checkpoint.  Also bats from
@@ -146,8 +121,7 @@ TMcommit(void)
 
        /* commit with the BBP globally locked */
        BBPlock();
-       if (prelude(getBBPsize(), NULL, NULL) == GDK_SUCCEED &&
-           BBPsync(getBBPsize(), NULL, NULL, getBBPlogno(), getBBPtransid()) 
== GDK_SUCCEED) {
+       if (BBPsync(getBBPsize(), NULL, NULL, getBBPlogno(), getBBPtransid()) 
== GDK_SUCCEED) {
                epilogue(getBBPsize(), NULL, true);
                ret = GDK_SUCCEED;
        }
@@ -212,15 +186,13 @@ TMsubcommit_list(bat *restrict subcommit
                        }
                }
        }
-       if (prelude(cnt, subcommit, sizes) == GDK_SUCCEED) {    /* save the new 
bats outside the lock */
-               /* lock just prevents other global (sub-)commits */
-               MT_lock_set(&GDKtmLock);
-               if (BBPsync(cnt, subcommit, sizes, logno, transid) == 
GDK_SUCCEED) { /* write BBP.dir (++) */
-                       epilogue(cnt, subcommit, false);
-                       ret = GDK_SUCCEED;
-               }
-               MT_lock_unset(&GDKtmLock);
+       /* lock just prevents other global (sub-)commits */
+       MT_lock_set(&GDKtmLock);
+       if (BBPsync(cnt, subcommit, sizes, logno, transid) == GDK_SUCCEED) { /* 
write BBP.dir (++) */
+               epilogue(cnt, subcommit, false);
+               ret = GDK_SUCCEED;
        }
+       MT_lock_unset(&GDKtmLock);
        return ret;
 }
 
diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c
--- a/sql/storage/bat/bat_storage.c
+++ b/sql/storage/bat/bat_storage.c
@@ -365,6 +365,7 @@ segments2cs(sql_trans *tr, segments *seg
 static void
 merge_segments(storage *s, sql_trans *tr, sql_change *change, ulng commit_ts, 
ulng oldest)
 {
+       sqlstore* store = tr->store;
        segment *cur = s->segs->h, *seg = NULL;
        for (; cur; cur = cur->next) {
                if (cur->ts == tr->tid) {
@@ -372,24 +373,47 @@ merge_segments(storage *s, sql_trans *tr
                                cur->oldts = 0;
                        cur->ts = commit_ts;
                }
-               if (cur->ts <= oldest && cur->ts < TRANSACTION_ID_BASE) { /* 
possibly merge range */
-                       if (!seg) { /* skip first */
-                               seg = cur;
-                       } else if (seg->end == cur->start && seg->deleted == 
cur->deleted) {
-                               /* merge with previous */
-                               seg->end = cur->end;
-                               seg->next = cur->next;
-                               if (cur == s->segs->t)
-                                       s->segs->t = seg;
-                               if (commit_ts == oldest)
-                                       _DELETE(cur);
-                               else
-                                       mark4destroy(cur, change, commit_ts);
-                               cur = seg;
-                       } else {
-                               seg = cur; /* begin of new merge */
+               if (!seg) {
+                       /* first segment */
+                       seg = cur;
+               }
+               else if (seg->ts < TRANSACTION_ID_BASE) {
+                       /* possible merge since both deleted flags are equal */
+                       if (seg->deleted == cur->deleted && cur->ts < 
TRANSACTION_ID_BASE) {
+                               int merge = 1;
+                               node *n = store->active->h;
+                               for (int i = 0; i < store->active->cnt; i++, n 
= n->next) {
+                                       ulng active = ((sql_trans*)n->data)->ts;
+                                       if(active == seg->ts || active == 
cur->ts)
+                                               continue; /* pretent that a 
recently committed transaction has already committed and is no longer active */
+                                       if (active == tr->ts)
+                                               continue; /* pretent that 
committing transaction has already committed and is no longer active */
+                                       if (seg->ts < active && cur->ts < 
active)
+                                               break;
+                                       if (seg->ts > active && cur->ts > 
active)
+                                               continue;
+
+                                       assert((active > seg->ts && active < 
cur->ts) || (active < seg->ts && active > cur->ts));
+                                       /* cannot safely merge since there is 
an active transaction between the segments */
+                                       merge = false;
+                                       break;
+                               }
+                               /* merge segments */
+                               if (merge) {
+                                       seg->end = cur->end;
+                                       seg->next = cur->next;
+                                       if (cur == s->segs->t)
+                                               s->segs->t = seg;
+                                       if (commit_ts == oldest)
+                                               _DELETE(cur);
+                                       else
+                                               mark4destroy(cur, change, 
commit_ts);
+                                       cur = seg;
+                                       continue;
+                               }
                        }
                }
+               seg = cur;
        }
 }
 
@@ -3414,12 +3438,18 @@ static int
 log_segments(sql_trans *tr, segments *segs, sqlid id)
 {
        /* log segments */
+       lock_table(tr->store, id);
        for (segment *seg = segs->h; seg; seg=seg->next) {
+               unlock_table(tr->store, id);
                if (seg->ts == tr->tid && seg->end-seg->start) {
-                       if (log_segment(tr, seg, id) != LOG_OK)
+                       if (log_segment(tr, seg, id) != LOG_OK) {
+                               unlock_table(tr->store, id);
                                return LOG_ERR;
+                       }
                }
+               lock_table(tr->store, id);
        }
+       unlock_table(tr->store, id);
        return LOG_OK;
 }
 
@@ -3924,7 +3954,11 @@ log_table_append(sql_trans *tr, sql_tabl
                return LOG_ERR;
 
        size_t nr_appends = 0;
+
+       lock_table(tr->store, t->base.id);
        for (segment *seg = segs->h; seg; seg=seg->next) {
+               unlock_table(tr->store, t->base.id);
+
                if (seg->ts == tr->tid && seg->end-seg->start) {
                        if (!seg->deleted) {
                                if (log_segment(tr, seg, t->base.id) != LOG_OK)
@@ -3933,7 +3967,9 @@ log_table_append(sql_trans *tr, sql_tabl
                                nr_appends += (seg->end - seg->start);
                        }
                }
+               lock_table(tr->store, t->base.id);
        }
+       unlock_table(tr->store, t->base.id);
 
        for (node *n = ol_first_node(t->columns); n && ok; n = n->next) {
                sql_column *c = n->data;
@@ -3944,7 +3980,9 @@ log_table_append(sql_trans *tr, sql_tabl
                        continue;
                }
 
+               lock_table(tr->store, t->base.id);
                if (!cs->cleared) for (segment *cur = segs->h; cur && ok; cur = 
cur->next) {
+                       unlock_table(tr->store, t->base.id);
                        if (cur->ts == tr->tid && !cur->deleted && cur->start < 
end) {
                                /* append col*/
                                BAT *ins = temp_descriptor(cs->bid);
@@ -3953,7 +3991,9 @@ log_table_append(sql_trans *tr, sql_tabl
                                ok = log_bat(store->logger, ins, c->base.id, 
cur->start, cur->end-cur->start, nr_appends);
                                bat_destroy(ins);
                        }
+                       lock_table(tr->store, t->base.id);
                }
+               unlock_table(tr->store, t->base.id);
 
                if (cs->ebid) {
                        BAT *ins = temp_descriptor(cs->ebid);
@@ -3974,7 +4014,9 @@ log_table_append(sql_trans *tr, sql_tabl
                        column_storage *cs = ATOMIC_PTR_GET(&i->data);
 
                        if (cs) {
+                               lock_table(tr->store, t->base.id);
                                for (segment *cur = segs->h; cur && ok; cur = 
cur->next) {
+                                       unlock_table(tr->store, t->base.id);
                                        if (cur->ts == tr->tid && !cur->deleted 
&& cur->start < end) {
                                                /* append idx */
                                                BAT *ins = 
temp_descriptor(cs->bid);
@@ -3983,7 +4025,9 @@ log_table_append(sql_trans *tr, sql_tabl
                                                ok = log_bat(store->logger, 
ins, i->base.id, cur->start, cur->end-cur->start, nr_appends);
                                                bat_destroy(ins);
                                        }
+                                       lock_table(tr->store, t->base.id);
                                }
+                               unlock_table(tr->store, t->base.id);
                        }
                }
        }
@@ -4001,8 +4045,6 @@ log_storage(sql_trans *tr, sql_table *t,
        if (ok == LOG_OK && cleared)
                ok =  tr_log_cs(tr, t, &s->cs, s->segs->h, t->base.id);
        if (ok == LOG_OK)
-               ok = segments2cs(tr, s->segs, &s->cs);
-       if (ok == LOG_OK)
                ok = log_segments(tr, s->segs, t->base.id);
        if (ok == LOG_OK && !cleared)
                ok = log_table_append(tr, t, s->segs);
diff --git a/sql/storage/store.c b/sql/storage/store.c
--- a/sql/storage/store.c
+++ b/sql/storage/store.c
@@ -7032,7 +7032,7 @@ sql_trans_begin(sql_session *s)
        tr->active = 1;
 
        (void) ATOMIC_INC(&store->nr_active);
-       list_append(store->active, s);
+       list_append(store->active, tr);
 
        TRC_DEBUG(SQL_STORE, "Exit sql_trans_begin for transaction: " ULLFMT 
"\n", tr->tid);
        store_unlock(store);
@@ -7050,20 +7050,20 @@ sql_trans_end(sql_session *s, int ok)
                sql_trans_rollback(s->tr, false);
        }
        assert(s->tr->active);
+       sqlstore *store = s->tr->store;
+       store_lock(store);
        s->tr->active = 0;
        s->tr->status = 0;
        s->auto_commit = s->ac_on_commit;
-       sqlstore *store = s->tr->store;
-       store_lock(store);
-       list_remove_data(store->active, NULL, s);
+       list_remove_data(store->active, NULL, s->tr);
        ATOMIC_SET(&store->lastactive, GDKusec());
        (void) ATOMIC_DEC(&store->nr_active);
        ulng oldest = store_get_timestamp(store);
        if (store->active && store->active->h) {
                for(node *n = store->active->h; n; n = n->next) {
-                       sql_session *s = n->data;
-                       if (s->tr->ts < oldest)
-                               oldest = s->tr->ts;
+                       sql_trans *tr = n->data;
+                       if (tr->ts < oldest)
+                               oldest = tr->ts;
                }
        }
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to