Changeset: b5e837ffc07d for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=b5e837ffc07d Modified Files: gdk/gdk.h gdk/gdk_align.c gdk/gdk_bat.c gdk/gdk_batop.c gdk/gdk_bbp.c gdk/gdk_mosaic.c gdk/gdk_private.h gdk/gdk_utils.c monetdb5/modules/mosaic/mosaic.c monetdb5/modules/mosaic/mosaic_dictionary.c Branch: mosaic Log Message:
Step towards persistent mosaics Modelled after the use of orderidx code. diffs (truncated from 345 to 300 lines): diff --git a/gdk/gdk.h b/gdk/gdk.h --- a/gdk/gdk.h +++ b/gdk/gdk.h @@ -1983,7 +1983,7 @@ gdk_export gdk_return BAThash(BAT *b, BU /* support routines for the mosaic approach */ #define MOSAIC_VERSION 20140808 -gdk_export gdk_return MOSalloc(BAT *b, BUN cap); +gdk_export gdk_return BATmosaic(BAT *b, BUN cap); gdk_export void MOSdestroy(BAT *b); gdk_export int BATcheckmosaic(BAT *b); diff --git a/gdk/gdk_align.c b/gdk/gdk_align.c --- a/gdk/gdk_align.c +++ b/gdk/gdk_align.c @@ -336,6 +336,10 @@ VIEWunlink(BAT *b) /* unlink imprints shared with parent */ if (tpb && b->timprints && b->timprints == tpb->timprints) b->timprints = NULL; + + /* unlink mosaic shared with parent */ + if (tpb && b->tmosaic && b->tmosaic == tpb->tmosaic) + b->tmosaic = NULL; } } diff --git a/gdk/gdk_bat.c b/gdk/gdk_bat.c --- a/gdk/gdk_bat.c +++ b/gdk/gdk_bat.c @@ -377,6 +377,7 @@ BATextend(BAT *b, BUN newcap) HASHdestroy(b); IMPSdestroy(b); OIDXdestroy(b); + MOSdestroy(b); return GDK_SUCCEED; } diff --git a/gdk/gdk_batop.c b/gdk/gdk_batop.c --- a/gdk/gdk_batop.c +++ b/gdk/gdk_batop.c @@ -389,6 +389,7 @@ BATappend(BAT *b, BAT *n, bit force) IMPSdestroy(b); /* imprints do not support updates yet */ OIDXdestroy(b); + MOSdestroy(b); /* append two void,void bats */ if (b->ttype == TYPE_void && BATtdense(b)) { diff --git a/gdk/gdk_bbp.c b/gdk/gdk_bbp.c --- a/gdk/gdk_bbp.c +++ b/gdk/gdk_bbp.c @@ -4088,6 +4088,11 @@ BBPdiskscan(const char *parent) #else delete = TRUE; #endif + } else if (strncmp(p + 1, "tmosaic", 9) == 0) { + BAT *b = getdesc(bid); + delete = b == NULL; + if (!delete) + b->tmosaic = (Heap *) 1; } else if (strncmp(p + 1, "timprints", 9) == 0) { BAT *b = getdesc(bid); delete = b == NULL; diff --git a/gdk/gdk_mosaic.c b/gdk/gdk_mosaic.c --- a/gdk/gdk_mosaic.c +++ b/gdk/gdk_mosaic.c @@ -14,19 +14,93 @@ #include "gdk.h" #include "gdk_private.h" +#ifdef PERSISTENTMOSAIC +struct mosaicsync { + Heap *hp; + bat id; + const char *func; +}; + +static void +BATmosaicsync(void *arg) +{ + struct mosaicsync *hs = arg; + Heap *hp = hs->hp; + int fd; + lng t0 = GDKusec(); + + if (HEAPsave(hp, hp->filename, NULL) != GDK_SUCCEED || + (fd = GDKfdlocate(hp->farmid, hp->filename, "rb+", NULL)) < 0) { + BBPunfix(hs->id); + GDKfree(arg); + return; + } + ((oid *) hp->base)[0] |= (oid) 1 << 24; + if (write(fd, hp->base, SIZEOF_SIZE_T) < 0) + perror("write mosaic"); + if (!(GDKdebug & FORCEMITOMASK)) { +#if defined(NATIVE_WIN32) + _commit(fd); +#elif defined(HAVE_FDATASYNC) + fdatasync(fd); +#elif defined(HAVE_FSYNC) + fsync(fd); +#endif + } + close(fd); + BBPunfix(hs->id); + ALGODEBUG fprintf(stderr, "#%s: persisting mosaic %s (" LLFMT " usec)\n", hs->func, hp->filename, GDKusec() - t0); + GDKfree(arg); +} +#endif + gdk_return -MOSalloc(BAT *bn, BUN cap) +BATmosaic(BAT *bn, BUN cap) { - const char *nme = BBP_physical(bn->batCacheid); + const char *nme; + Heap *m; - if ( (bn->tmosaic = (Heap*)GDKzalloc(sizeof(Heap))) == NULL || - (bn->tmosaic->filename = GDKfilepath(NOFARM, NULL, nme, "mosaic")) == NULL) + MT_lock_set(&GDKmosaicLock(bn->batCacheid)); + if( bn->tmosaic){ + MT_lock_unset(&GDKmosaicLock(bn->batCacheid)); + return GDK_SUCCEED; + } + + nme = BBP_physical(bn->batCacheid); + if ( (m = (Heap*)GDKzalloc(sizeof(Heap))) == NULL || + (m->farmid = BBPselectfarm(bn->batRole, bn->ttype, varheap)) < 0 || + (m->filename = GDKfilepath(NOFARM, NULL, nme, "mosaic")) == NULL){ + if( m) + GDKfree(m->filename); + GDKfree(m); + MT_lock_unset(&GDKmosaicLock(bn->batCacheid)); + return GDK_FAIL; + } + + if( HEAPalloc(m, cap, Tsize(bn)) != GDK_SUCCEED){ + MT_lock_unset(&GDKmosaicLock(bn->batCacheid)); return GDK_FAIL; - - if( HEAPalloc(bn->tmosaic, cap, Tsize(bn)) != GDK_SUCCEED) - return GDK_FAIL; - bn->tmosaic->parentid = bn->batCacheid; - bn->tmosaic->farmid = BBPselectfarm(bn->batRole, bn->ttype, varheap); + } + m->parentid = bn->batCacheid; + +#ifdef PERSISTENTMOSAIC + if ((BBP_status(bn->batCacheid) & BBPEXISTING) && + bn->batInserted == bn->batCount) { + MT_Id tid; + struct mosaicsync *hs = GDKmalloc(sizeof(*hs)); + if (hs != NULL) { + BBPfix(bn->batCacheid); + hs->id = bn->batCacheid; + hs->hp = m; + hs->func = "BATmosaic"; + MT_create_thread(&tid, BATmosaicsync, hs, MT_THR_DETACHED); + } + } else + ALGODEBUG fprintf(stderr, "#BATmosaic: NOT persisting index %d\n", bn->batCacheid); +#endif + bn->batDirtydesc = TRUE; + bn->tmosaic = m; + MT_lock_unset(&GDKmosaicLock(bn->batCacheid)); return GDK_SUCCEED; } @@ -51,19 +125,26 @@ BATcheckmosaic(BAT *b) int ret; lng t; + if (VIEWtparent(b)) { + assert(b->tmosaic == NULL); + b = BBPdescriptor(VIEWtparent(b)); + } + assert(b->batCacheid > 0); t = GDKusec(); - MT_lock_set(&GDKhashLock(abs(b->batCacheid))); + MT_lock_set(&GDKmosaicLock(abs(b->batCacheid))); t = GDKusec() - t; - if (b->tmosaic == NULL) { + if (b->tmosaic == (Heap *) 1) { Heap *hp; const char *nme = BBP_physical(b->batCacheid); const char *ext = "mosaic"; int fd; + b->tmosaic = NULL; if ((hp = GDKzalloc(sizeof(*hp))) != NULL && (hp->farmid = BBPselectfarm(b->batRole, b->ttype, mosaicheap)) >= 0 && (hp->filename = GDKmalloc(strlen(nme) + 10)) != NULL) { + sprintf(hp->filename, "%s.%s", nme, ext); /* check whether a persisted mosaic can be found */ @@ -79,7 +160,7 @@ BATcheckmosaic(BAT *b) close(fd); b->tmosaic = hp; ALGODEBUG fprintf(stderr, "#BATcheckmosaic: reusing persisted mosaic %d\n", b->batCacheid); - MT_lock_unset(&GDKhashLock(abs(b->batCacheid))); + MT_lock_unset(&GDKmosaicLock(abs(b->batCacheid))); return 1; } close(fd); @@ -92,7 +173,7 @@ BATcheckmosaic(BAT *b) GDKclrerr(); /* we're not currently interested in errors */ } ret = b->tmosaic != NULL; - MT_lock_unset(&GDKhashLock(abs(b->batCacheid))); + MT_lock_unset(&GDKmosaicLock(abs(b->batCacheid))); ALGODEBUG if (ret) fprintf(stderr, "#BATcheckmosaic: already has mosaic %d, waited " LLFMT " usec\n", b->batCacheid, t); return ret; } diff --git a/gdk/gdk_private.h b/gdk/gdk_private.h --- a/gdk/gdk_private.h +++ b/gdk/gdk_private.h @@ -15,6 +15,7 @@ #define DISABLE_PARENT_HASH 1 /* #define PERSISTENTHASH 1 */ #define PERSISTENTIDX 1 +#define PERSISTENTMOSAIC 2 #include "gdk_system_private.h" @@ -229,6 +230,7 @@ typedef struct { MT_Lock swap; MT_Lock hash; MT_Lock imprints; + MT_Lock mosaic; } batlock_t; typedef struct { @@ -296,6 +298,8 @@ extern MT_Lock MT_system_lock; #define GDKswapLock(x) GDKbatLock[(x)&BBP_BATMASK].swap #define GDKhashLock(x) GDKbatLock[(x)&BBP_BATMASK].hash #define GDKimprintsLock(x) GDKbatLock[(x)&BBP_BATMASK].imprints +#define GDKmosaicLock(x) GDKbatLock[(x)&BBP_BATMASK].mosaic + #if SIZEOF_SIZE_T == 8 #define threadmask(y) ((int) ((mix_int((unsigned int) y) ^ mix_int((unsigned int) (y >> 32))) & BBP_THREADMASK)) #else diff --git a/gdk/gdk_utils.c b/gdk/gdk_utils.c --- a/gdk/gdk_utils.c +++ b/gdk/gdk_utils.c @@ -514,6 +514,7 @@ GDKinit(opt *set, int setlen) MT_lock_init(&GDKbatLock[i].swap, "GDKswapLock"); MT_lock_init(&GDKbatLock[i].hash, "GDKhashLock"); MT_lock_init(&GDKbatLock[i].imprints, "GDKimprintsLock"); + MT_lock_init(&GDKbatLock[i].mosaic, "GDKmosaicLock"); } for (i = 0; i <= BBP_THREADMASK; i++) { MT_lock_init(&GDKbbpLock[i].alloc, "GDKcacheLock"); diff --git a/monetdb5/modules/mosaic/mosaic.c b/monetdb5/modules/mosaic/mosaic.c --- a/monetdb5/modules/mosaic/mosaic.c +++ b/monetdb5/modules/mosaic/mosaic.c @@ -354,10 +354,11 @@ MOSoptimizerCost(Client cntxt, MOStask t str MOScompressInternal(Client cntxt, bat *ret, bat *bid, MOStask task, int debug) { - BAT *bsrc; // the BAT to be augmented with a compressed heap + BAT *o = NULL, *bsrc; // the BAT to be augmented with a compressed heap str msg = MAL_SUCCEED; int cand; int tpe, typewidth; + lng t0,t1; *ret = 0; @@ -377,7 +378,7 @@ MOScompressInternal(Client cntxt, bat *r case TYPE_flt: case TYPE_dbl: case TYPE_str: - typewidth = ATOMsize(tpe) * 8; + typewidth = ATOMsize(tpe) * 8; // size in bits break; default: // don't compress them @@ -385,7 +386,25 @@ MOScompressInternal(Client cntxt, bat *r return msg; } - if ( BATcount(bsrc) < MIN_INPUT_COUNT || BATcheckmosaic(bsrc)){ + if (BATcheckmosaic(bsrc)){ + /* already compressed */ + BBPkeepref(*ret = bsrc->batCacheid); + return msg; + } + assert(bsrc->tmosaic == NULL); + + if (VIEWtparent(bsrc)) { + bat p = VIEWtparent(bsrc); + o = bsrc; + bsrc = BATdescriptor(p); + if (BATcheckmosaic(bsrc)) { + BBPunfix(bsrc->batCacheid); + return MAL_SUCCEED; + } _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list