Changeset: 09b9a4218fa8 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/09b9a4218fa8
Modified Files:
        gdk/gdk_align.c
        gdk/gdk_bat.c
        gdk/gdk_batop.c
        gdk/gdk_bbp.c
        gdk/gdk_heap.c
        gdk/gdk_imprints.c
        gdk/gdk_private.h
        gdk/gdk_tm.c
Branch: Jan2022
Log Message:

Merge with Jul2021 branch.


diffs (truncated from 593 to 300 lines):

diff --git a/gdk/gdk_align.c b/gdk/gdk_align.c
--- a/gdk/gdk_align.c
+++ b/gdk/gdk_align.c
@@ -166,6 +166,7 @@ BATmaterialize(BAT *b)
 {
        BUN cnt;
        Heap *tail;
+       Heap *h, *vh = NULL;
        BUN p, q;
        oid t, *x;
 
@@ -211,7 +212,7 @@ BATmaterialize(BAT *b)
        ATOMIC_INIT(&tail->refs, 1);
        /* point of no return */
        MT_lock_set(&b->theaplock);
-       assert(ATOMIC_GET(&b->theap->refs) > 0);
+       assert((ATOMIC_GET(&b->theap->refs) & HEAPREFS) > 0);
        /* can only look at tvheap when lock is held */
        if (complex_cand(b)) {
                assert(b->batRole == TRANSIENT);
@@ -246,18 +247,21 @@ BATmaterialize(BAT *b)
                        }
                        assert(n == q);
                }
-               HEAPdecref(b->tvheap, true);
+               vh = b->tvheap;
                b->tvheap = NULL;
        }
-       HEAPdecref(b->theap, false);
+       h = b->theap;
        b->theap = tail;
        b->tbaseoff = 0;
        b->theap->dirty = true;
        b->tunique_est = is_oid_nil(t) ? 1.0 : (double) b->batCount;
+       MT_lock_unset(&b->theaplock);
        b->ttype = TYPE_oid;
        BATsetdims(b, 0);
        BATsetcount(b, b->batCount);
-       MT_lock_unset(&b->theaplock);
+       HEAPdecref(h, false);
+       if (vh)
+               HEAPdecref(vh, true);
 
        return GDK_SUCCEED;
 }
diff --git a/gdk/gdk_bat.c b/gdk/gdk_bat.c
--- a/gdk/gdk_bat.c
+++ b/gdk/gdk_bat.c
@@ -675,12 +675,12 @@ BATfree(BAT *b)
                b->tunique_est = (double) nunique;
        }
        if (b->theap) {
-               assert(ATOMIC_GET(&b->theap->refs) == 1);
+               assert((ATOMIC_GET(&b->theap->refs) & HEAPREFS) == 1);
                assert(b->theap->parentid == b->batCacheid);
                HEAPfree(b->theap, false);
        }
        if (b->tvheap) {
-               assert(ATOMIC_GET(&b->tvheap->refs) == 1);
+               assert((ATOMIC_GET(&b->tvheap->refs) & HEAPREFS) == 1);
                assert(b->tvheap->parentid == b->batCacheid);
                HEAPfree(b->tvheap, false);
        }
@@ -698,6 +698,14 @@ BATdestroy(BAT *b)
                ATOMIC_DESTROY(&b->tvheap->refs);
                GDKfree(b->tvheap);
        }
+       ValPtr p = BATgetprop_nolock(b, (enum prop_t) 21);
+       if (p != NULL) {
+               Heap *h = p->val.pval;
+               ATOMIC_AND(&h->refs, ~DELAYEDREMOVE);
+               /* the bat has not been committed, so we cannot remove
+                * the old tail file */
+               HEAPdecref(h, false);
+       }
        PROPdestroy_nolock(b);
        MT_lock_destroy(&b->theaplock);
        MT_lock_destroy(&b->batIdxLock);
diff --git a/gdk/gdk_batop.c b/gdk/gdk_batop.c
--- a/gdk/gdk_batop.c
+++ b/gdk/gdk_batop.c
@@ -41,9 +41,10 @@ unshare_varsized_heap(BAT *b)
                ATOMIC_INIT(&h->refs, 1);
                MT_lock_set(&b->theaplock);
                int parent = b->tvheap->parentid;
-               HEAPdecref(b->tvheap, false);
+               Heap *oh = b->tvheap;
                b->tvheap = h;
                MT_lock_unset(&b->theaplock);
+               HEAPdecref(oh, false);
                BBPunshare(parent);
                BBPunfix(parent);
        }
@@ -439,11 +440,12 @@ append_varsized_bat(BAT *b, BAT *n, stru
                }
                bat parid = b->tvheap->parentid;
                BBPunshare(parid);
+               ATOMIC_INIT(&h->refs, 1);
                MT_lock_set(&b->theaplock);
-               HEAPdecref(b->tvheap, false);
-               ATOMIC_INIT(&h->refs, 1);
+               Heap *oh = b->tvheap;
                b->tvheap = h;
                MT_lock_unset(&b->theaplock);
+               HEAPdecref(oh, false);
                BBPunfix(parid);
        }
        if (BATcount(b) == 0 && BATatoms[b->ttype].atomFix == NULL &&
diff --git a/gdk/gdk_bbp.c b/gdk/gdk_bbp.c
--- a/gdk/gdk_bbp.c
+++ b/gdk/gdk_bbp.c
@@ -3294,6 +3294,13 @@ BBPdestroy(BAT *b)
        }
        if (tp || vtp)
                VIEWunlink(b);
+       ValPtr p = BATgetprop(b, (enum prop_t) 21);
+       if (p != NULL) {
+               Heap *h = p->val.pval;
+               BATrmprop(b, (enum prop_t) 21);
+               ATOMIC_AND(&h->refs, ~DELAYEDREMOVE);
+               HEAPdecref(h, true);
+       }
        BATdelete(b);
 
        BBPclear(b->batCacheid, true);  /* if destroyed; de-register from BBP */
@@ -3778,34 +3785,21 @@ BBPbackup(BAT *b, bool subcommit)
 }
 
 static inline void
-BBPcheckHeap(bool subcommit, Heap *h)
+BBPcheckHeap(Heap *h)
 {
        struct stat statb;
        char *path;
 
-       if (subcommit) {
-               char *s = strrchr(h->filename, DIR_SEP);
-               if (s)
-                       s++;
-               else
-                       s = h->filename;
-               path = GDKfilepath(0, BAKDIR, s, NULL);
-               if (path == NULL)
-                       return;
-               if (MT_stat(path, &statb) < 0) {
-                       GDKfree(path);
-                       path = GDKfilepath(0, BATDIR, h->filename, NULL);
-                       if (path == NULL)
-                               return;
-                       if (MT_stat(path, &statb) < 0) {
-                               assert(0);
-                               GDKsyserror("cannot stat file %s (expected size 
%zu)\n",
-                                           path, h->free);
-                               GDKfree(path);
-                               return;
-                       }
-               }
-       } else {
+       char *s = strrchr(h->filename, DIR_SEP);
+       if (s)
+               s++;
+       else
+               s = h->filename;
+       path = GDKfilepath(0, BAKDIR, s, NULL);
+       if (path == NULL)
+               return;
+       if (MT_stat(path, &statb) < 0) {
+               GDKfree(path);
                path = GDKfilepath(0, BATDIR, h->filename, NULL);
                if (path == NULL)
                        return;
@@ -3828,7 +3822,7 @@ BBPcheckHeap(bool subcommit, Heap *h)
 }
 
 static void
-BBPcheckBBPdir(bool subcommit)
+BBPcheckBBPdir(void)
 {
        FILE *fp;
        int lineno = 0;
@@ -3836,10 +3830,14 @@ BBPcheckBBPdir(bool subcommit)
        unsigned bbpversion;
        lng logno, transid;
 
-       fp = GDKfileopen(0, BATDIR, "BBP", "dir", "r");
+       fp = GDKfileopen(0, BAKDIR, "BBP", "dir", "r");
        assert(fp != NULL);
-       if (fp == NULL)
-               return;
+       if (fp == NULL) {
+               fp = GDKfileopen(0, BATDIR, "BBP", "dir", "r");
+               assert(fp != NULL);
+               if (fp == NULL)
+                       return;
+       }
        bbpversion = BBPheader(fp, &lineno, &bbpsize, &logno, &transid);
        if (bbpversion == 0) {
                fclose(fp);
@@ -3895,9 +3893,9 @@ BBPcheckBBPdir(bool subcommit)
                        continue;
                }
                if (b.theap->free > 0)
-                       BBPcheckHeap(subcommit, b.theap);
+                       BBPcheckHeap(b.theap);
                if (b.tvheap != NULL && b.tvheap->free > 0)
-                       BBPcheckHeap(subcommit, b.tvheap);
+                       BBPcheckHeap(b.tvheap);
        }
 }
 
@@ -3932,6 +3930,9 @@ BBPsync(int cnt, bat *restrict subcommit
 
        TRC_DEBUG_IF(PERF) t0 = t1 = GDKms();
 
+       if ((GDKdebug & TAILCHKMASK) && !GDKinmemory(0))
+               BBPcheckBBPdir();
+
        ret = BBPprepare(subcommit != NULL);
 
        /* PHASE 1: safeguard everything in a backup-dir */
@@ -4029,7 +4030,21 @@ BBPsync(int cnt, bat *restrict subcommit
                        if (size > bi.count) /* includes sizes==NULL */
                                size = bi.count;
                        bi.b->batInserted = size;
-                       if (b && size != 0) {
+                       if (bi.b->ttype >= 0 && ATOMvarsized(bi.b->ttype)) {
+                               /* see epilogue() for other part of this */
+                               MT_lock_set(&bi.b->theaplock);
+                               /* remember the tail we're saving */
+                               if (BATsetprop_nolock(bi.b, (enum prop_t) 20, 
TYPE_ptr, &bi.h) == NULL) {
+                                       GDKerror("setprop failed\n");
+                                       ret = GDK_FAIL;
+                               } else {
+                                       if (BATgetprop_nolock(bi.b, (enum 
prop_t) 21) == NULL)
+                                               BATsetprop_nolock(bi.b, (enum 
prop_t) 21, TYPE_ptr, &(void *){(Heap *) 1});
+                                       HEAPincref(bi.h);
+                               }
+                               MT_lock_unset(&bi.b->theaplock);
+                       }
+                       if (ret == GDK_SUCCEED && b && size != 0) {
                                /* wait for BBPSAVING so that we
                                 * can set it, wait for
                                 * BBPUNLOADING before
@@ -4076,9 +4091,6 @@ BBPsync(int cnt, bat *restrict subcommit
                 * succeeded, so no changing of ret after this
                 * call anymore */
 
-               if ((GDKdebug & TAILCHKMASK) && !GDKinmemory(0))
-                       BBPcheckBBPdir(subcommit != NULL);
-
                if (MT_rename(bakdir, deldir) < 0 &&
                    /* maybe there was an old deldir, so remove and try again */
                    (GDKremovedir(0, DELDIR) != GDK_SUCCEED ||
@@ -4106,7 +4118,25 @@ BBPsync(int cnt, bat *restrict subcommit
        TRC_DEBUG(PERF, "%s (ready time %d)\n",
                  ret == GDK_SUCCEED ? "" : " failed",
                  (t0 = GDKms()) - t1);
+
   bailout:
+       if (ret != GDK_SUCCEED) {
+               /* clean up extra refs we created */
+               for (int idx = 1; idx < cnt; idx++) {
+                       bat i = subcommit ? subcommit[idx] : idx;
+                       BAT *b = BBP_desc(i);
+                       if (b && ATOMvarsized(b->ttype)) {
+                               MT_lock_set(&b->theaplock);
+                               ValPtr p = BATgetprop_nolock(b, (enum prop_t) 
20);
+                               if (p != NULL) {
+                                       HEAPdecref(p->val.pval, false);
+                                       BATrmprop_nolock(b, (enum prop_t) 20);
+                               }
+                               MT_lock_unset(&b->theaplock);
+                       }
+               }
+       }
+
        /* turn off the BBPSYNCING bits for all bats, even when things
         * didn't go according to plan (i.e., don't check for ret ==
         * GDK_SUCCEED) */
diff --git a/gdk/gdk_heap.c b/gdk/gdk_heap.c
--- a/gdk/gdk_heap.c
+++ b/gdk/gdk_heap.c
@@ -92,7 +92,8 @@ HEAPgrow(MT_Lock *lock, Heap **hp, size_
        Heap *new;
 
        MT_lock_set(lock);
-       if (ATOMIC_GET(&(*hp)->refs) == 1) {
+       ATOMIC_BASE_TYPE refs = ATOMIC_GET(&(*hp)->refs);
+       if ((refs & HEAPREFS) == 1) {
                gdk_return rc = HEAPextend((*hp), size, mayshare);
                MT_lock_unset(lock);
                return rc;
@@ -103,13 +104,12 @@ HEAPgrow(MT_Lock *lock, Heap **hp, size_
                *new = (Heap) {
                        .farmid = old->farmid,
                        .dirty = true,
-                       .remove = old->remove,
                        .parentid = old->parentid,
                        .wasempty = old->wasempty,
                };
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to