Changeset: 58f0bc5ca6e3 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/58f0bc5ca6e3
Modified Files:
        sql/storage/bat/bat_storage.c
        sql/storage/store.c
Branch: iso
Log Message:

Merged with Jul2021


diffs (truncated from 2202 to 300 lines):

diff --git a/gdk/gdk_bat.c b/gdk/gdk_bat.c
--- a/gdk/gdk_bat.c
+++ b/gdk/gdk_bat.c
@@ -1253,18 +1253,22 @@ BUNappendmulti(BAT *b, const void *value
 
        BATrmprop(b, GDK_UNIQUE_ESTIMATE);
        b->theap->dirty |= count > 0;
+       MT_rwlock_wrlock(&b->thashlock);
        for (BUN i = 0; i < count; i++) {
                void *t = b->ttype && b->tvarsized ? ((void **) values)[i] :
                        (void *) ((char *) values + i * Tsize(b));
                setcolprops(b, t);
                gdk_return rc = bunfastapp_nocheck(b, p, t, Tsize(b));
-               if (rc != GDK_SUCCEED)
+               if (rc != GDK_SUCCEED) {
+                       MT_rwlock_wrunlock(&b->thashlock);
                        return rc;
+               }
                if (b->thash) {
-                       HASHappend(b, p, t);
+                       HASHappend_locked(b, p, t);
                }
                p++;
        }
+       MT_rwlock_wrunlock(&b->thashlock);
 
        IMPSdestroy(b); /* no support for inserts in imprints yet */
        OIDXdestroy(b);
@@ -1405,7 +1409,6 @@ BUNinplacemulti(BAT *b, const oid *posit
                         * clear it */
                        b->tnil = false;
                }
-               HASHdelete(b, p, val);  /* first delete old value from hash */
                if (b->ttype != TYPE_void && ATOMlinear(b->ttype)) {
                        const ValRecord *prop;
 
@@ -1449,6 +1452,9 @@ BUNinplacemulti(BAT *b, const oid *posit
                }
                OIDXdestroy(b);
                IMPSdestroy(b);
+
+               MT_rwlock_wrlock(&b->thashlock);
+               HASHdelete_locked(b, p, val);   /* first delete old value from 
hash */
                if (b->tvarsized && b->ttype) {
                        var_t _d;
                        ptr _ptr;
@@ -1469,13 +1475,17 @@ BUNinplacemulti(BAT *b, const oid *posit
                                break;
 #endif
                        }
-                       if (ATOMreplaceVAR(b, &_d, t) != GDK_SUCCEED)
+                       if (ATOMreplaceVAR(b, &_d, t) != GDK_SUCCEED) {
+                               MT_rwlock_wrunlock(&b->thashlock);
                                return GDK_FAIL;
+                       }
                        if (b->twidth < SIZEOF_VAR_T &&
                            (b->twidth <= 2 ? _d - GDK_VAROFFSET : _d) >= 
((size_t) 1 << (8 * b->twidth))) {
                                /* doesn't fit in current heap, upgrade it */
-                               if (GDKupgradevarheap(b, _d, 0, false) != 
GDK_SUCCEED)
+                               if (GDKupgradevarheap(b, _d, 0, false) != 
GDK_SUCCEED) {
+                                       MT_rwlock_wrunlock(&b->thashlock);
                                        return GDK_FAIL;
+                               }
                        }
                        _ptr = BUNtloc(bi, p);
                        switch (b->twidth) {
@@ -1498,10 +1508,11 @@ BUNinplacemulti(BAT *b, const oid *posit
                        mskSetVal(b, p, * (msk *) t);
                } else {
                        assert(BATatoms[b->ttype].atomPut == NULL);
-                       if (ATOMfix(b->ttype, t) != GDK_SUCCEED)
+                       if (ATOMfix(b->ttype, t) != GDK_SUCCEED ||
+                           ATOMunfix(b->ttype, BUNtloc(bi, p)) != GDK_SUCCEED) 
{
+                               MT_rwlock_wrunlock(&b->thashlock);
                                return GDK_FAIL;
-                       if (ATOMunfix(b->ttype, BUNtloc(bi, p)) != GDK_SUCCEED)
-                               return GDK_FAIL;
+                       }
                        switch (ATOMsize(b->ttype)) {
                        case 0:      /* void */
                                break;
@@ -1530,7 +1541,8 @@ BUNinplacemulti(BAT *b, const oid *posit
                        }
                }
 
-               HASHinsert(b, p, t);    /* insert new value into hash */
+               HASHinsert_locked(b, p, t);     /* insert new value into hash */
+               MT_rwlock_wrunlock(&b->thashlock);
 
                tt = b->ttype;
                prv = p > 0 ? p - 1 : BUN_NONE;
@@ -1704,40 +1716,63 @@ BUNfnd(BAT *b, const void *v)
                if (BATordered(b) || BATordered_rev(b))
                        return SORTfnd(b, v);
        }
-       bi = bat_iterator(b);
-       switch (ATOMbasetype(b->ttype)) {
-       case TYPE_bte:
-               HASHfnd_bte(r, bi, v);
-               break;
-       case TYPE_sht:
-               HASHfnd_sht(r, bi, v);
-               break;
-       case TYPE_int:
-               HASHfnd_int(r, bi, v);
-               break;
-       case TYPE_flt:
-               HASHfnd_flt(r, bi, v);
-               break;
-       case TYPE_dbl:
-               HASHfnd_dbl(r, bi, v);
-               break;
-       case TYPE_lng:
-               HASHfnd_lng(r, bi, v);
-               break;
+       if (BAThash(b) == GDK_SUCCEED) {
+               MT_rwlock_rdlock(&b->thashlock);
+               if (b->thash == NULL) {
+                       MT_rwlock_rdunlock(&b->thashlock);
+                       goto hashfnd_failed;
+               }
+               bi = bat_iterator(b);
+               switch (ATOMbasetype(b->ttype)) {
+               case TYPE_bte:
+                       HASHloop_bte(bi, b->thash, r, v)
+                               break;
+                       break;
+               case TYPE_sht:
+                       HASHloop_sht(bi, b->thash, r, v)
+                               break;
+                       break;
+               case TYPE_int:
+                       HASHloop_int(bi, b->thash, r, v)
+                               break;
+                       break;
+               case TYPE_flt:
+                       HASHloop_flt(bi, b->thash, r, v)
+                               break;
+                       break;
+               case TYPE_dbl:
+                       HASHloop_dbl(bi, b->thash, r, v)
+                               break;
+                       break;
+               case TYPE_lng:
+                       HASHloop_lng(bi, b->thash, r, v)
+                               break;
+                       break;
 #ifdef HAVE_HGE
-       case TYPE_hge:
-               HASHfnd_hge(r, bi, v);
-               break;
+               case TYPE_hge:
+                       HASHloop_hge(bi, b->thash, r, v)
+                               break;
+                       break;
 #endif
-       case TYPE_str:
-               HASHfnd_str(r, bi, v);
-               break;
-       default:
-               HASHfnd(r, bi, v);
+               case TYPE_uuid:
+                       HASHloop_uuid(bi, b->thash, r, v)
+                               break;
+                       break;
+               case TYPE_str:
+                       HASHloop_str(bi, b->thash, r, v)
+                               break;
+                       break;
+               default:
+                       HASHloop(bi, b->thash, r, v)
+                               break;
+                       break;
+               }
+               MT_rwlock_rdunlock(&b->thashlock);
+               return r;
        }
-       return r;
   hashfnd_failed:
        /* can't build hash table, search the slow way */
+       GDKclrerr();
        return slowfnd(b, v);
 }
 
@@ -2036,44 +2071,41 @@ BATroles(BAT *b, const char *tnme)
 /* rather than deleting X.new, we comply with the commit protocol and
  * move it to backup storage */
 static gdk_return
-backup_new(Heap *hp, int lockbat)
+backup_new(Heap *hp)
 {
-       int batret, bakret, xx, ret = 0;
+       int batret, bakret, ret = -1;
        char *batpath, *bakpath;
        struct stat st;
 
-       /* file actions here interact with the global commits */
-       for (xx = 0; xx <= lockbat; xx++)
-               MT_lock_set(&GDKtrimLock(xx));
-
        /* check for an existing X.new in BATDIR, BAKDIR and SUBDIR */
        batpath = GDKfilepath(hp->farmid, BATDIR, hp->filename, ".new");
        bakpath = GDKfilepath(hp->farmid, BAKDIR, hp->filename, ".new");
-       if (batpath == NULL || bakpath == NULL) {
-               ret = -1;
-               goto bailout;
-       }
-       batret = MT_stat(batpath, &st);
-       bakret = MT_stat(bakpath, &st);
+       if (batpath != NULL && bakpath != NULL) {
+               /* file actions here interact with the global commits */
+               MT_lock_set(&GDKtmLock);
+
+               batret = MT_stat(batpath, &st);
+               bakret = MT_stat(bakpath, &st);
 
-       if (batret == 0 && bakret) {
-               /* no backup yet, so move the existing X.new there out
-                * of the way */
-               if ((ret = MT_rename(batpath, bakpath)) < 0)
-                       GDKsyserror("backup_new: rename %s to %s failed\n",
-                                   batpath, bakpath);
-               TRC_DEBUG(IO_, "rename(%s,%s) = %d\n", batpath, bakpath, ret);
-       } else if (batret == 0) {
-               /* there is a backup already; just remove the X.new */
-               if ((ret = MT_remove(batpath)) != 0)
-                       GDKsyserror("backup_new: remove %s failed\n", batpath);
-               TRC_DEBUG(IO_, "remove(%s) = %d\n", batpath, ret);
+               if (batret == 0 && bakret) {
+                       /* no backup yet, so move the existing X.new there out
+                        * of the way */
+                       if ((ret = MT_rename(batpath, bakpath)) < 0)
+                               GDKsyserror("backup_new: rename %s to %s 
failed\n",
+                                           batpath, bakpath);
+                       TRC_DEBUG(IO_, "rename(%s,%s) = %d\n", batpath, 
bakpath, ret);
+               } else if (batret == 0) {
+                       /* there is a backup already; just remove the X.new */
+                       if ((ret = MT_remove(batpath)) != 0)
+                               GDKsyserror("backup_new: remove %s failed\n", 
batpath);
+                       TRC_DEBUG(IO_, "remove(%s) = %d\n", batpath, ret);
+               } else {
+                       ret = 0;
+               }
+               MT_lock_unset(&GDKtmLock);
        }
-  bailout:
        GDKfree(batpath);
        GDKfree(bakpath);
-       for (xx = lockbat; xx >= 0; xx--)
-               MT_lock_unset(&GDKtrimLock(xx));
        return ret ? GDK_FAIL : GDK_SUCCEED;
 }
 
@@ -2093,7 +2125,7 @@ HEAPchangeaccess(Heap *hp, int dstmode, 
        }
        if (hp->storage == STORE_MMAP) {        /* 6=>4 */
                hp->dirty = true;
-               return backup_new(hp, BBP_THREADMASK) != GDK_SUCCEED ? 
STORE_INVALID : STORE_MMAP;      /* only called for existing bats */
+               return backup_new(hp) != GDK_SUCCEED ? STORE_INVALID : 
STORE_MMAP;      /* only called for existing bats */
        }
        return hp->storage;     /* 7=>5 */
 }
@@ -2105,7 +2137,7 @@ HEAPcommitpersistence(Heap *hp, bool wri
        if (existing) {         /* existing, ie will become transient */
                if (hp->storage == STORE_MMAP && hp->newstorage == STORE_PRIV 
&& writable) {    /* 6=>2 */
                        hp->dirty = true;
-                       return backup_new(hp, -1) != GDK_SUCCEED ? 
STORE_INVALID : STORE_MMAP;  /* only called for existing bats */
+                       return backup_new(hp) != GDK_SUCCEED ? STORE_INVALID : 
STORE_MMAP;      /* only called for existing bats */
                }
                return hp->newstorage;  /* 4=>0,5=>1,7=>3,c=>a no change */
        }
diff --git a/gdk/gdk_batop.c b/gdk/gdk_batop.c
--- a/gdk/gdk_batop.c
+++ b/gdk/gdk_batop.c
@@ -899,6 +899,7 @@ BATappend2(BAT *b, BAT *n, BAT *s, bool 
                        if (BATextend(b, grows) != GDK_SUCCEED)
                                return GDK_FAIL;
                }
+               MT_rwlock_wrlock(&b->thashlock);
                if (BATatoms[b->ttype].atomFix == NULL &&
                    b->ttype != TYPE_void &&
                    n->ttype != TYPE_void &&
@@ -908,7 +909,7 @@ BATappend2(BAT *b, BAT *n, BAT *s, bool 
                               Tloc(n, ci.seq - hseq),
                               cnt * Tsize(n));
                        for (BUN i = 0; b->thash && i < cnt; i++) {
-                               HASHappend(b, r, Tloc(b, r));
+                               HASHappend_locked(b, r, Tloc(b, r));
                                r++;
                        }
                        BATsetcount(b, BATcount(b) + cnt);
@@ -919,13 +920,16 @@ BATappend2(BAT *b, BAT *n, BAT *s, bool 
                                cnt--;
                                BUN p = canditer_next(&ci) - hseq;
                                const void *t = BUNtail(ni, p);
-                               if (bunfastapp_nocheck(b, r, t, Tsize(b)) != 
GDK_SUCCEED)
+                               if (bunfastapp_nocheck(b, r, t, Tsize(b)) != 
GDK_SUCCEED) {
+                                       MT_rwlock_wrunlock(&b->thashlock);
                                        return GDK_FAIL;
+                               }
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to