Changeset: dc30e90dbe03 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/dc30e90dbe03
Modified Files:
        gdk/gdk_batop.c
        gdk/gdk_bbp.c
Branch: default
Log Message:

Merge with Jul2021 branch.


diffs (277 lines):

diff --git a/gdk/gdk_batop.c b/gdk/gdk_batop.c
--- a/gdk/gdk_batop.c
+++ b/gdk/gdk_batop.c
@@ -1171,6 +1171,13 @@ BATappend_or_update(BAT *b, BAT *p, cons
                        }
 
                        const void *old = BUNtvar(bi, updid);
+
+                       if (atomcmp(old, new) == 0) {
+                               /* replacing with the same value:
+                                * nothing to do */
+                               continue;
+                       }
+
                        bool isnil = atomcmp(new, nil) == 0;
                        anynil |= isnil;
                        if (b->tnil &&
diff --git a/gdk/gdk_bbp.c b/gdk/gdk_bbp.c
--- a/gdk/gdk_bbp.c
+++ b/gdk/gdk_bbp.c
@@ -292,10 +292,8 @@ BBPunlock(void)
 
 
 static gdk_return
-BBPinithash(int j)
+BBPinithash(int j, bat size)
 {
-       bat i = (bat) ATOMIC_GET(&BBPsize);
-
        assert(j >= 0 && j <= BBP_THREADMASK);
        for (BBP_mask = 1; (BBP_mask << 1) <= BBPlimit; BBP_mask <<= 1)
                ;
@@ -305,16 +303,16 @@ BBPinithash(int j)
        }
        BBP_mask--;
 
-       while (--i > 0) {
-               const char *s = BBP_logical(i);
+       while (--size > 0) {
+               const char *s = BBP_logical(size);
 
                if (s) {
                        if (*s != '.' && !BBPtmpcheck(s)) {
-                               BBP_insert(i);
+                               BBP_insert(size);
                        }
                } else {
-                       BBP_next(i) = BBP_free(j);
-                       BBP_free(j) = i;
+                       BBP_next(size) = BBP_free(j);
+                       BBP_free(j) = size;
                        if (++j > BBP_THREADMASK)
                                j = 0;
                }
@@ -350,16 +348,16 @@ BBPselectfarm(role_t role, int type, enu
 }
 
 static gdk_return
-BBPextend(int idx, bool buildhash)
+BBPextend(int idx, bool buildhash, bat newsize)
 {
-       if ((bat) ATOMIC_GET(&BBPsize) >= N_BBPINIT * BBPINIT) {
+       if (newsize >= N_BBPINIT * BBPINIT) {
                GDKerror("trying to extend BAT pool beyond the "
                         "limit (%d)\n", N_BBPINIT * BBPINIT);
                return GDK_FAIL;
        }
 
        /* make sure the new size is at least BBPsize large */
-       while (BBPlimit < (bat) ATOMIC_GET(&BBPsize)) {
+       while (BBPlimit < newsize) {
                BUN limit = BBPlimit >> BBPINITLOG;
                assert(BBP[limit] == NULL);
                BBP[limit] = GDKzalloc(BBPINIT * sizeof(BBPrec));
@@ -381,7 +379,7 @@ BBPextend(int idx, bool buildhash)
                BBP_hash = NULL;
                for (i = 0; i <= BBP_THREADMASK; i++)
                        BBP_free(i) = 0;
-               if (BBPinithash(idx) != GDK_SUCCEED)
+               if (BBPinithash(idx, newsize) != GDK_SUCCEED)
                        return GDK_FAIL;
        }
        return GDK_SUCCEED;
@@ -639,9 +637,10 @@ BBPreadEntries(FILE *fp, unsigned bbpver
 
                bid = (bat) batid;
                if (batid >= (uint64_t) ATOMIC_GET(&BBPsize)) {
-                       ATOMIC_SET(&BBPsize, batid + 1);
-                       if ((bat) ATOMIC_GET(&BBPsize) >= BBPlimit)
-                               BBPextend(0, false);
+                       if ((bat) ATOMIC_GET(&BBPsize) + 1 >= BBPlimit &&
+                           BBPextend(0, false, bid + 1) != GDK_SUCCEED)
+                               return GDK_FAIL;
+                       ATOMIC_SET(&BBPsize, bid + 1);
                }
                if (BBP_desc(bid) != NULL) {
                        TRC_CRITICAL(GDK, "duplicate entry in BBP.dir (ID = "
@@ -778,7 +777,7 @@ static gdk_return
 BBPcheckbats(unsigned bbpversion)
 {
        (void) bbpversion;
-       for (bat bid = 1; bid < (bat) ATOMIC_GET(&BBPsize); bid++) {
+       for (bat bid = 1, size = (bat) ATOMIC_GET(&BBPsize); bid < size; bid++) 
{
                struct stat statb;
                BAT *b;
                char *path;
@@ -888,7 +887,7 @@ BBPcheckbats(unsigned bbpversion)
 #endif
 
 static unsigned
-BBPheader(FILE *fp, int *lineno)
+BBPheader(FILE *fp, int *lineno, bat *bbpsize)
 {
        char buf[BUFSIZ];
        int sz, ptrsize, oidsize, intsize;
@@ -946,8 +945,8 @@ BBPheader(FILE *fp, int *lineno)
                return 0;
        }
        sz = (int) (sz * BATMARGIN);
-       if (sz > (bat) ATOMIC_GET(&BBPsize))
-               ATOMIC_SET(&BBPsize, sz);
+       if (sz > *bbpsize)
+               *bbpsize = sz;
        if (bbpversion > GDKLIBRARY_MINMAX_POS) {
                if (fgets(buf, sizeof(buf), fp) == NULL) {
                        TRC_CRITICAL(GDK, "short BBP");
@@ -1343,7 +1342,7 @@ BBPtrim(bool aggressive)
        unsigned flag = BBPUNLOADING | BBPSYNCING | BBPSAVING;
        if (!aggressive)
                flag |= BBPHOT;
-       for (bat bid = 1; bid < (bat) ATOMIC_GET(&BBPsize); bid++) {
+       for (bat bid = 1, nbat = (bat) ATOMIC_GET(&BBPsize); bid < nbat; bid++) 
{
                MT_lock_set(&GDKswapLock(bid));
                BAT *b = NULL;
                bool swap = false;
@@ -1376,7 +1375,7 @@ BBPmanager(void *dummy)
 
        for (;;) {
                int n = 0;
-               for (bat bid = 1; bid < (bat) ATOMIC_GET(&BBPsize); bid++) {
+               for (bat bid = 1, nbat = (bat) ATOMIC_GET(&BBPsize); bid < 
nbat; bid++) {
                        MT_lock_set(&GDKswapLock(bid));
                        if (BBP_refs(bid) == 0 && BBP_lrefs(bid) != 0) {
                                n += (BBP_status(bid) & BBPHOT) != 0;
@@ -1513,20 +1512,23 @@ BBPinit(void)
        /* scan the BBP.dir to obtain current size */
        BBPlimit = 0;
        memset(BBP, 0, sizeof(BBP));
-       ATOMIC_SET(&BBPsize, 1);
-
+
+       bat bbpsize;
+       bbpsize = 1;
        if (GDKinmemory(0)) {
                bbpversion = GDKLIBRARY;
        } else {
-               bbpversion = BBPheader(fp, &lineno);
+               bbpversion = BBPheader(fp, &lineno, &bbpsize);
                if (bbpversion == 0)
                        return GDK_FAIL;
        }
 
-       BBPextend(0, false);            /* allocate BBP records */
+       /* allocate BBP records */
+       if (BBPextend(0, false, bbpsize) != GDK_SUCCEED)
+               return GDK_FAIL;
+       ATOMIC_SET(&BBPsize, bbpsize);
 
        if (!GDKinmemory(0)) {
-               ATOMIC_SET(&BBPsize, 1);
                if (BBPreadEntries(fp, bbpversion, lineno
 #ifdef GDKLIBRARY_HASHASH
                                   , &hashbats, &nhashbats
@@ -1537,7 +1539,7 @@ BBPinit(void)
        }
 
        MT_lock_set(&BBPnameLock);
-       if (BBPinithash(0) != GDK_SUCCEED) {
+       if (BBPinithash(0, (bat) ATOMIC_GET(&BBPsize)) != GDK_SUCCEED) {
                TRC_CRITICAL(GDK, "BBPinithash failed");
                MT_lock_unset(&BBPnameLock);
                return GDK_FAIL;
@@ -2186,7 +2188,12 @@ BBPgetsubdir(str s, bat i)
  * enough list can be found, we create a new entry by either just
  * increasing BBPsize (up to BBPlimit) or extending the BBP (which
  * increases BBPlimit).  Every time this function is called we start
- * searching in a following free list (variable "last"). */
+ * searching in a following free list (variable "last").
+ *
+ * Note that this is the only place in normal, multi-threaded operation
+ * where BBPsize is assigned a value (never decreasing), that the
+ * assignment happens after any necessary memory was allocated and
+ * initialized, and that this happens when the BBPnameLock is held. */
 static gdk_return
 maybeextend(int idx)
 {
@@ -2217,26 +2224,25 @@ maybeextend(int idx)
                BBP_free(idx) = i;
        } else {
                /* let the longest list alone, get a fresh entry */
-               if ((bat) ATOMIC_ADD(&BBPsize, 1) >= BBPlimit) {
-                       if (BBPextend(idx, true) != GDK_SUCCEED) {
-                               /* undo add */
-                               ATOMIC_SUB(&BBPsize, 1);
-                               /* couldn't extend; if there is any
-                                * free entry, take it from the
-                                * longest list after all */
-                               if (l > 0) {
-                                       i = BBP_free(m);
-                                       BBP_free(m) = BBP_next(i);
-                                       BBP_next(i) = 0;
-                                       BBP_free(idx) = i;
-                                       GDKclrerr();
-                               } else {
-                                       /* nothing available */
-                                       return GDK_FAIL;
-                               }
+               bat size = (bat) ATOMIC_GET(&BBPsize);
+               if (size >= BBPlimit &&
+                   BBPextend(idx, true, size + 1) != GDK_SUCCEED) {
+                       /* couldn't extend; if there is any
+                        * free entry, take it from the
+                        * longest list after all */
+                       if (l > 0) {
+                               i = BBP_free(m);
+                               BBP_free(m) = BBP_next(i);
+                               BBP_next(i) = 0;
+                               BBP_free(idx) = i;
+                               GDKclrerr();
+                       } else {
+                               /* nothing available */
+                               return GDK_FAIL;
                        }
                } else {
-                       BBP_free(idx) = (bat) ATOMIC_GET(&BBPsize) - 1;
+                       ATOMIC_SET(&BBPsize, size + 1);
+                       BBP_free(idx) = size;
                }
        }
        last = (last + 1) & BBP_THREADMASK;
@@ -2829,15 +2835,18 @@ decref(bat i, bool logical, bool release
        if (GDKvm_cursize() < GDK_vm_maxsize &&
             ((b && b->theap ? b->theap->size : 0) + (b && b->tvheap ? 
b->tvheap->size : 0)) < (GDK_vm_maxsize - GDKvm_cursize()) / 32)
                chkflag |= BBPHOT;
-       if (BBP_refs(i) > 0 ||
-           (BBP_lrefs(i) > 0 &&
-            (b == NULL ||
-             BATdirty(b) ||
-             (BBP_status(i) & chkflag) ||
-             !(BBP_status(i) & BBPPERSISTENT) ||
-             GDKinmemory(farmid)))) {
-               /* bat cannot be swapped out */
-       } else if (b ? b->batSharecnt == 0 : (BBP_status(i) & BBPTMP)) {
+       /* only consider unloading if refs is 0; if, in addition, lrefs
+        * is 0, we can definitely unload, else only if some more
+        * conditions are met */
+       if (BBP_refs(i) == 0 &&
+           (BBP_lrefs(i) == 0 ||
+            (b != NULL
+             ? (!BATdirty(b) &&
+                !(BBP_status(i) & chkflag) &&
+                (BBP_status(i) & BBPPERSISTENT) &&
+                !GDKinmemory(farmid) &&
+                b->batSharecnt == 0)
+             : (BBP_status(i) & BBPTMP)))) {
                /* bat will be unloaded now. set the UNLOADING bit
                 * while locked so no other thread thinks it's
                 * available anymore */
@@ -2845,7 +2854,7 @@ decref(bat i, bool logical, bool release
                TRC_DEBUG(BAT_, "%s set to unloading BAT %d (status %u, lrefs 
%d)\n", func, i, BBP_status(i), BBP_lrefs(i));
                BBP_status_on(i, BBPUNLOADING);
                swap = true;
-       }
+       } /* else: bat cannot be swapped out */
        lrefs = BBP_lrefs(i);
 
        /* unlock before re-locking in unload; as saving a dirty
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to