Changeset: 58f0bc5ca6e3 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/58f0bc5ca6e3 Modified Files: sql/storage/bat/bat_storage.c sql/storage/store.c Branch: iso Log Message:
Merged with Jul2021 diffs (truncated from 2202 to 300 lines): diff --git a/gdk/gdk_bat.c b/gdk/gdk_bat.c --- a/gdk/gdk_bat.c +++ b/gdk/gdk_bat.c @@ -1253,18 +1253,22 @@ BUNappendmulti(BAT *b, const void *value BATrmprop(b, GDK_UNIQUE_ESTIMATE); b->theap->dirty |= count > 0; + MT_rwlock_wrlock(&b->thashlock); for (BUN i = 0; i < count; i++) { void *t = b->ttype && b->tvarsized ? ((void **) values)[i] : (void *) ((char *) values + i * Tsize(b)); setcolprops(b, t); gdk_return rc = bunfastapp_nocheck(b, p, t, Tsize(b)); - if (rc != GDK_SUCCEED) + if (rc != GDK_SUCCEED) { + MT_rwlock_wrunlock(&b->thashlock); return rc; + } if (b->thash) { - HASHappend(b, p, t); + HASHappend_locked(b, p, t); } p++; } + MT_rwlock_wrunlock(&b->thashlock); IMPSdestroy(b); /* no support for inserts in imprints yet */ OIDXdestroy(b); @@ -1405,7 +1409,6 @@ BUNinplacemulti(BAT *b, const oid *posit * clear it */ b->tnil = false; } - HASHdelete(b, p, val); /* first delete old value from hash */ if (b->ttype != TYPE_void && ATOMlinear(b->ttype)) { const ValRecord *prop; @@ -1449,6 +1452,9 @@ BUNinplacemulti(BAT *b, const oid *posit } OIDXdestroy(b); IMPSdestroy(b); + + MT_rwlock_wrlock(&b->thashlock); + HASHdelete_locked(b, p, val); /* first delete old value from hash */ if (b->tvarsized && b->ttype) { var_t _d; ptr _ptr; @@ -1469,13 +1475,17 @@ BUNinplacemulti(BAT *b, const oid *posit break; #endif } - if (ATOMreplaceVAR(b, &_d, t) != GDK_SUCCEED) + if (ATOMreplaceVAR(b, &_d, t) != GDK_SUCCEED) { + MT_rwlock_wrunlock(&b->thashlock); return GDK_FAIL; + } if (b->twidth < SIZEOF_VAR_T && (b->twidth <= 2 ? _d - GDK_VAROFFSET : _d) >= ((size_t) 1 << (8 * b->twidth))) { /* doesn't fit in current heap, upgrade it */ - if (GDKupgradevarheap(b, _d, 0, false) != GDK_SUCCEED) + if (GDKupgradevarheap(b, _d, 0, false) != GDK_SUCCEED) { + MT_rwlock_wrunlock(&b->thashlock); return GDK_FAIL; + } } _ptr = BUNtloc(bi, p); switch (b->twidth) { @@ -1498,10 +1508,11 @@ BUNinplacemulti(BAT *b, const oid *posit mskSetVal(b, p, * (msk *) t); } else { assert(BATatoms[b->ttype].atomPut == NULL); - if (ATOMfix(b->ttype, t) != GDK_SUCCEED) + if (ATOMfix(b->ttype, t) != GDK_SUCCEED || + ATOMunfix(b->ttype, BUNtloc(bi, p)) != GDK_SUCCEED) { + MT_rwlock_wrunlock(&b->thashlock); return GDK_FAIL; - if (ATOMunfix(b->ttype, BUNtloc(bi, p)) != GDK_SUCCEED) - return GDK_FAIL; + } switch (ATOMsize(b->ttype)) { case 0: /* void */ break; @@ -1530,7 +1541,8 @@ BUNinplacemulti(BAT *b, const oid *posit } } - HASHinsert(b, p, t); /* insert new value into hash */ + HASHinsert_locked(b, p, t); /* insert new value into hash */ + MT_rwlock_wrunlock(&b->thashlock); tt = b->ttype; prv = p > 0 ? p - 1 : BUN_NONE; @@ -1704,40 +1716,63 @@ BUNfnd(BAT *b, const void *v) if (BATordered(b) || BATordered_rev(b)) return SORTfnd(b, v); } - bi = bat_iterator(b); - switch (ATOMbasetype(b->ttype)) { - case TYPE_bte: - HASHfnd_bte(r, bi, v); - break; - case TYPE_sht: - HASHfnd_sht(r, bi, v); - break; - case TYPE_int: - HASHfnd_int(r, bi, v); - break; - case TYPE_flt: - HASHfnd_flt(r, bi, v); - break; - case TYPE_dbl: - HASHfnd_dbl(r, bi, v); - break; - case TYPE_lng: - HASHfnd_lng(r, bi, v); - break; + if (BAThash(b) == GDK_SUCCEED) { + MT_rwlock_rdlock(&b->thashlock); + if (b->thash == NULL) { + MT_rwlock_rdunlock(&b->thashlock); + goto hashfnd_failed; + } + bi = bat_iterator(b); + switch (ATOMbasetype(b->ttype)) { + case TYPE_bte: + HASHloop_bte(bi, b->thash, r, v) + break; + break; + case TYPE_sht: + HASHloop_sht(bi, b->thash, r, v) + break; + break; + case TYPE_int: + HASHloop_int(bi, b->thash, r, v) + break; + break; + case TYPE_flt: + HASHloop_flt(bi, b->thash, r, v) + break; + break; + case TYPE_dbl: + HASHloop_dbl(bi, b->thash, r, v) + break; + break; + case TYPE_lng: + HASHloop_lng(bi, b->thash, r, v) + break; + break; #ifdef HAVE_HGE - case TYPE_hge: - HASHfnd_hge(r, bi, v); - break; + case TYPE_hge: + HASHloop_hge(bi, b->thash, r, v) + break; + break; #endif - case TYPE_str: - HASHfnd_str(r, bi, v); - break; - default: - HASHfnd(r, bi, v); + case TYPE_uuid: + HASHloop_uuid(bi, b->thash, r, v) + break; + break; + case TYPE_str: + HASHloop_str(bi, b->thash, r, v) + break; + break; + default: + HASHloop(bi, b->thash, r, v) + break; + break; + } + MT_rwlock_rdunlock(&b->thashlock); + return r; } - return r; hashfnd_failed: /* can't build hash table, search the slow way */ + GDKclrerr(); return slowfnd(b, v); } @@ -2036,44 +2071,41 @@ BATroles(BAT *b, const char *tnme) /* rather than deleting X.new, we comply with the commit protocol and * move it to backup storage */ static gdk_return -backup_new(Heap *hp, int lockbat) +backup_new(Heap *hp) { - int batret, bakret, xx, ret = 0; + int batret, bakret, ret = -1; char *batpath, *bakpath; struct stat st; - /* file actions here interact with the global commits */ - for (xx = 0; xx <= lockbat; xx++) - MT_lock_set(&GDKtrimLock(xx)); - /* check for an existing X.new in BATDIR, BAKDIR and SUBDIR */ batpath = GDKfilepath(hp->farmid, BATDIR, hp->filename, ".new"); bakpath = GDKfilepath(hp->farmid, BAKDIR, hp->filename, ".new"); - if (batpath == NULL || bakpath == NULL) { - ret = -1; - goto bailout; - } - batret = MT_stat(batpath, &st); - bakret = MT_stat(bakpath, &st); + if (batpath != NULL && bakpath != NULL) { + /* file actions here interact with the global commits */ + MT_lock_set(&GDKtmLock); + + batret = MT_stat(batpath, &st); + bakret = MT_stat(bakpath, &st); - if (batret == 0 && bakret) { - /* no backup yet, so move the existing X.new there out - * of the way */ - if ((ret = MT_rename(batpath, bakpath)) < 0) - GDKsyserror("backup_new: rename %s to %s failed\n", - batpath, bakpath); - TRC_DEBUG(IO_, "rename(%s,%s) = %d\n", batpath, bakpath, ret); - } else if (batret == 0) { - /* there is a backup already; just remove the X.new */ - if ((ret = MT_remove(batpath)) != 0) - GDKsyserror("backup_new: remove %s failed\n", batpath); - TRC_DEBUG(IO_, "remove(%s) = %d\n", batpath, ret); + if (batret == 0 && bakret) { + /* no backup yet, so move the existing X.new there out + * of the way */ + if ((ret = MT_rename(batpath, bakpath)) < 0) + GDKsyserror("backup_new: rename %s to %s failed\n", + batpath, bakpath); + TRC_DEBUG(IO_, "rename(%s,%s) = %d\n", batpath, bakpath, ret); + } else if (batret == 0) { + /* there is a backup already; just remove the X.new */ + if ((ret = MT_remove(batpath)) != 0) + GDKsyserror("backup_new: remove %s failed\n", batpath); + TRC_DEBUG(IO_, "remove(%s) = %d\n", batpath, ret); + } else { + ret = 0; + } + MT_lock_unset(&GDKtmLock); } - bailout: GDKfree(batpath); GDKfree(bakpath); - for (xx = lockbat; xx >= 0; xx--) - MT_lock_unset(&GDKtrimLock(xx)); return ret ? GDK_FAIL : GDK_SUCCEED; } @@ -2093,7 +2125,7 @@ HEAPchangeaccess(Heap *hp, int dstmode, } if (hp->storage == STORE_MMAP) { /* 6=>4 */ hp->dirty = true; - return backup_new(hp, BBP_THREADMASK) != GDK_SUCCEED ? STORE_INVALID : STORE_MMAP; /* only called for existing bats */ + return backup_new(hp) != GDK_SUCCEED ? STORE_INVALID : STORE_MMAP; /* only called for existing bats */ } return hp->storage; /* 7=>5 */ } @@ -2105,7 +2137,7 @@ HEAPcommitpersistence(Heap *hp, bool wri if (existing) { /* existing, ie will become transient */ if (hp->storage == STORE_MMAP && hp->newstorage == STORE_PRIV && writable) { /* 6=>2 */ hp->dirty = true; - return backup_new(hp, -1) != GDK_SUCCEED ? STORE_INVALID : STORE_MMAP; /* only called for existing bats */ + return backup_new(hp) != GDK_SUCCEED ? STORE_INVALID : STORE_MMAP; /* only called for existing bats */ } return hp->newstorage; /* 4=>0,5=>1,7=>3,c=>a no change */ } diff --git a/gdk/gdk_batop.c b/gdk/gdk_batop.c --- a/gdk/gdk_batop.c +++ b/gdk/gdk_batop.c @@ -899,6 +899,7 @@ BATappend2(BAT *b, BAT *n, BAT *s, bool if (BATextend(b, grows) != GDK_SUCCEED) return GDK_FAIL; } + MT_rwlock_wrlock(&b->thashlock); if (BATatoms[b->ttype].atomFix == NULL && b->ttype != TYPE_void && n->ttype != TYPE_void && @@ -908,7 +909,7 @@ BATappend2(BAT *b, BAT *n, BAT *s, bool Tloc(n, ci.seq - hseq), cnt * Tsize(n)); for (BUN i = 0; b->thash && i < cnt; i++) { - HASHappend(b, r, Tloc(b, r)); + HASHappend_locked(b, r, Tloc(b, r)); r++; } BATsetcount(b, BATcount(b) + cnt); @@ -919,13 +920,16 @@ BATappend2(BAT *b, BAT *n, BAT *s, bool cnt--; BUN p = canditer_next(&ci) - hseq; const void *t = BUNtail(ni, p); - if (bunfastapp_nocheck(b, r, t, Tsize(b)) != GDK_SUCCEED) + if (bunfastapp_nocheck(b, r, t, Tsize(b)) != GDK_SUCCEED) { + MT_rwlock_wrunlock(&b->thashlock); return GDK_FAIL; + } _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list