Changeset: 860a4dec7dde for MonetDB URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=860a4dec7dde Modified Files: gdk/gdk_hash.c gdk/gdk_orderidx.c Branch: Jul2017 Log Message:
We need to use locks when persisting indexes in the background. diffs (253 lines): diff --git a/gdk/gdk_hash.c b/gdk/gdk_hash.c --- a/gdk/gdk_hash.c +++ b/gdk/gdk_hash.c @@ -273,44 +273,42 @@ BATcheckhash(BAT *b) } #ifdef PERSISTENTHASH -struct hashsync { - Heap *hp; - bat id; -}; - static void BAThashsync(void *arg) { - struct hashsync *hs = arg; - Heap *hp = hs->hp; + BAT *b = arg; + Heap *hp; int fd; lng t0 = 0; const char *failed = " failed"; ALGODEBUG t0 = GDKusec(); - if (HEAPsave(hp, hp->filename, NULL) == GDK_SUCCEED && - (fd = GDKfdlocate(hp->farmid, hp->filename, "rb+", NULL)) >= 0) { - ((size_t *) hp->base)[0] |= 1 << 24; - if (write(fd, hp->base, SIZEOF_SIZE_T) >= 0) { - failed = ""; /* not failed */ - if (!(GDKdebug & FORCEMITOMASK)) { + MT_lock_set(&GDKhashLock(b->batCacheid)); + if (b->thash != NULL && (hp = b->thash->heap) != NULL) { + if (HEAPsave(hp, hp->filename, NULL) == GDK_SUCCEED && + (fd = GDKfdlocate(hp->farmid, hp->filename, "rb+", NULL)) >= 0) { + ((size_t *) hp->base)[0] |= 1 << 24; + if (write(fd, hp->base, SIZEOF_SIZE_T) >= 0) { + failed = ""; /* not failed */ + if (!(GDKdebug & FORCEMITOMASK)) { #if defined(NATIVE_WIN32) - _commit(fd); + _commit(fd); #elif defined(HAVE_FDATASYNC) - fdatasync(fd); + fdatasync(fd); #elif defined(HAVE_FSYNC) - fsync(fd); + fsync(fd); #endif + } + } else { + perror("write hash"); } - } else { - perror("write hash"); + close(fd); } - close(fd); + ALGODEBUG fprintf(stderr, "#BAThash: persisting hash %s (" LLFMT " usec)%s\n", hp->filename, GDKusec() - t0, failed); } - BBPunfix(hs->id); - GDKfree(arg); - ALGODEBUG fprintf(stderr, "#BAThash: persisting hash %s (" LLFMT " usec)%s\n", hp->filename, GDKusec() - t0, failed); + MT_lock_unset(&GDKhashLock(b->batCacheid)); + BBPunfix(b->batCacheid); } #endif @@ -500,25 +498,19 @@ BAThash(BAT *b, BUN masksize) memset((char *) h->Link + q * h->width, 0, (h->lim - q) * h->width); #endif hp->parentid = b->batCacheid; + b->thash = h; #ifdef PERSISTENTHASH if (BBP_status(b->batCacheid) & BBPEXISTING) { MT_Id tid; - struct hashsync *hs = GDKmalloc(sizeof(*hs)); - if (hs != NULL) { - BBPfix(b->batCacheid); - hs->id = b->batCacheid; - hs->hp = hp; - if (MT_create_thread(&tid, BAThashsync, hs, - MT_THR_DETACHED) < 0) { - /* couldn't start thread: clean up */ - BBPunfix(b->batCacheid); - GDKfree(hs); - } + BBPfix(b->batCacheid); + if (MT_create_thread(&tid, BAThashsync, b, + MT_THR_DETACHED) < 0) { + /* couldn't start thread: clean up */ + BBPunfix(b->batCacheid); } } else ALGODEBUG fprintf(stderr, "#BAThash: NOT persisting hash %d\n", b->batCacheid); #endif - b->thash = h; ALGODEBUG { t1 = GDKusec(); fprintf(stderr, "#BAThash: hash construction " LLFMT " usec\n", t1 - t0); @@ -575,27 +567,28 @@ void HASHdestroy(BAT *b) { if (b) { - if (b->thash == (Hash *) 1) { + Hash *hs = b->thash; + b->thash = NULL; + if (hs == (Hash *) 1) { GDKunlink(BBPselectfarm(b->batRole, b->ttype, hashheap), BATDIR, BBP_physical(b->batCacheid), "thash"); - } else if (b->thash) { + } else if (hs) { bat p = VIEWtparent(b); BAT *hp = NULL; if (p) hp = BBP_cache(p); - if ((!hp || b->thash != hp->thash) && b->thash != (Hash *) -1) { - ALGODEBUG if (*(size_t *) b->thash->heap->base & (1 << 24)) + if ((!hp || hs != hp->thash) && hs != (Hash *) -1) { + ALGODEBUG if (*(size_t *) hs->heap->base & (1 << 24)) fprintf(stderr, "#HASHdestroy: removing persisted hash %d\n", b->batCacheid); - HEAPfree(b->thash->heap, 1); - GDKfree(b->thash->heap); - GDKfree(b->thash); + HEAPfree(hs->heap, 1); + GDKfree(hs->heap); + GDKfree(hs); } } - b->thash = NULL; } } diff --git a/gdk/gdk_orderidx.c b/gdk/gdk_orderidx.c --- a/gdk/gdk_orderidx.c +++ b/gdk/gdk_orderidx.c @@ -13,44 +13,40 @@ #define ORDERIDX_VERSION ((oid) 3) #ifdef PERSISTENTIDX -struct idxsync { - Heap *hp; - bat id; - const char *func; -}; - static void BATidxsync(void *arg) { - struct idxsync *hs = arg; - Heap *hp = hs->hp; + BAT *b = arg; + Heap *hp; int fd; lng t0 = 0; ALGODEBUG t0 = GDKusec(); - if (HEAPsave(hp, hp->filename, NULL) != GDK_SUCCEED || - (fd = GDKfdlocate(hp->farmid, hp->filename, "rb+", NULL)) < 0) { - BBPunfix(hs->id); - GDKfree(arg); - return; - } - ((oid *) hp->base)[0] |= (oid) 1 << 24; - if (write(fd, hp->base, SIZEOF_SIZE_T) < 0) - perror("write orderidx"); - if (!(GDKdebug & FORCEMITOMASK)) { + MT_lock_set(&GDKhashLock(b->batCacheid)); + if ((hp = b->torderidx) != NULL) { + if (HEAPsave(hp, hp->filename, NULL) == GDK_SUCCEED && + (fd = GDKfdlocate(hp->farmid, hp->filename, "rb+", NULL)) >= 0) { + ((oid *) hp->base)[0] |= (oid) 1 << 24; + if (write(fd, hp->base, SIZEOF_SIZE_T) >= 0) { + if (!(GDKdebug & FORCEMITOMASK)) { #if defined(NATIVE_WIN32) - _commit(fd); + _commit(fd); #elif defined(HAVE_FDATASYNC) - fdatasync(fd); + fdatasync(fd); #elif defined(HAVE_FSYNC) - fsync(fd); + fsync(fd); #endif + } + } else { + perror("write orderidx"); + } + close(fd); + } + ALGODEBUG fprintf(stderr, "#BATidxsync: persisting orderidx %s (" LLFMT " usec)\n", hp->filename, GDKusec() - t0); } - close(fd); - BBPunfix(hs->id); - ALGODEBUG fprintf(stderr, "#%s: persisting orderidx %s (" LLFMT " usec)\n", hs->func, hp->filename, GDKusec() - t0); - GDKfree(arg); + MT_lock_unset(&GDKhashLock(b->batCacheid)); + BBPunfix(b->batCacheid); } #endif @@ -153,14 +149,9 @@ persistOIDX(BAT *b) if ((BBP_status(b->batCacheid) & BBPEXISTING) && b->batInserted == b->batCount) { MT_Id tid; - struct idxsync *hs = GDKmalloc(sizeof(*hs)); - if (hs != NULL) { - BBPfix(b->batCacheid); - hs->id = b->batCacheid; - hs->hp = b->torderidx; - hs->func = "BATorderidx"; - MT_create_thread(&tid, BATidxsync, hs, MT_THR_DETACHED); - } + BBPfix(b->batCacheid); + if (MT_create_thread(&tid, BATidxsync, b, MT_THR_DETACHED) < 0) + BBPunfix(b->batCacheid); } else ALGODEBUG fprintf(stderr, "#BATorderidx: NOT persisting index %d\n", b->batCacheid); #else @@ -479,24 +470,19 @@ GDKmergeidx(BAT *b, BAT**a, int n_ar) GDKfree(q); } + b->torderidx = m; #ifdef PERSISTENTIDX if ((BBP_status(b->batCacheid) & BBPEXISTING) && b->batInserted == b->batCount) { MT_Id tid; - struct idxsync *hs = GDKmalloc(sizeof(*hs)); - if (hs != NULL) { - BBPfix(b->batCacheid); - hs->id = b->batCacheid; - hs->hp = m; - hs->func = "GDKmergeidx"; - MT_create_thread(&tid, BATidxsync, hs, MT_THR_DETACHED); - } + BBPfix(b->batCacheid); + if (MT_create_thread(&tid, BATidxsync, b, MT_THR_DETACHED) < 0) + BBPunfix(b->batCacheid); } else ALGODEBUG fprintf(stderr, "#GDKmergeidx: NOT persisting index %d\n", b->batCacheid); #endif b->batDirtydesc = TRUE; - b->torderidx = m; MT_lock_unset(&GDKhashLock(b->batCacheid)); return GDK_SUCCEED; } _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list