Changeset: b5e837ffc07d for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=b5e837ffc07d
Modified Files:
        gdk/gdk.h
        gdk/gdk_align.c
        gdk/gdk_bat.c
        gdk/gdk_batop.c
        gdk/gdk_bbp.c
        gdk/gdk_mosaic.c
        gdk/gdk_private.h
        gdk/gdk_utils.c
        monetdb5/modules/mosaic/mosaic.c
        monetdb5/modules/mosaic/mosaic_dictionary.c
Branch: mosaic
Log Message:

Step towards persistent mosaics
Modelled after the use of orderidx code.


diffs (truncated from 345 to 300 lines):

diff --git a/gdk/gdk.h b/gdk/gdk.h
--- a/gdk/gdk.h
+++ b/gdk/gdk.h
@@ -1983,7 +1983,7 @@ gdk_export gdk_return BAThash(BAT *b, BU
 
 /* support routines for the mosaic approach */
 #define MOSAIC_VERSION 20140808
-gdk_export gdk_return MOSalloc(BAT *b, BUN cap);
+gdk_export gdk_return BATmosaic(BAT *b, BUN cap);
 gdk_export void MOSdestroy(BAT *b);
 gdk_export int BATcheckmosaic(BAT *b);
 
diff --git a/gdk/gdk_align.c b/gdk/gdk_align.c
--- a/gdk/gdk_align.c
+++ b/gdk/gdk_align.c
@@ -336,6 +336,10 @@ VIEWunlink(BAT *b)
                /* unlink imprints shared with parent */
                if (tpb && b->timprints && b->timprints == tpb->timprints)
                        b->timprints = NULL;
+
+               /* unlink mosaic shared with parent */
+               if (tpb && b->tmosaic && b->tmosaic == tpb->tmosaic)
+                       b->tmosaic = NULL;
        }
 }
 
diff --git a/gdk/gdk_bat.c b/gdk/gdk_bat.c
--- a/gdk/gdk_bat.c
+++ b/gdk/gdk_bat.c
@@ -377,6 +377,7 @@ BATextend(BAT *b, BUN newcap)
        HASHdestroy(b);
        IMPSdestroy(b);
        OIDXdestroy(b);
+       MOSdestroy(b);
        return GDK_SUCCEED;
 }
 
diff --git a/gdk/gdk_batop.c b/gdk/gdk_batop.c
--- a/gdk/gdk_batop.c
+++ b/gdk/gdk_batop.c
@@ -389,6 +389,7 @@ BATappend(BAT *b, BAT *n, bit force)
 
        IMPSdestroy(b);         /* imprints do not support updates yet */
        OIDXdestroy(b);
+       MOSdestroy(b);
 
        /* append two void,void bats */
        if (b->ttype == TYPE_void && BATtdense(b)) {
diff --git a/gdk/gdk_bbp.c b/gdk/gdk_bbp.c
--- a/gdk/gdk_bbp.c
+++ b/gdk/gdk_bbp.c
@@ -4088,6 +4088,11 @@ BBPdiskscan(const char *parent)
 #else
                        delete = TRUE;
 #endif
+               } else if (strncmp(p + 1, "tmosaic", 9) == 0) {
+                       BAT *b = getdesc(bid);
+                       delete = b == NULL;
+                       if (!delete)
+                               b->tmosaic = (Heap *) 1;
                } else if (strncmp(p + 1, "timprints", 9) == 0) {
                        BAT *b = getdesc(bid);
                        delete = b == NULL;
diff --git a/gdk/gdk_mosaic.c b/gdk/gdk_mosaic.c
--- a/gdk/gdk_mosaic.c
+++ b/gdk/gdk_mosaic.c
@@ -14,19 +14,93 @@
 #include "gdk.h"
 #include "gdk_private.h"
 
+#ifdef PERSISTENTMOSAIC
+struct mosaicsync {
+    Heap *hp;
+    bat id;
+    const char *func;
+};
+
+static void
+BATmosaicsync(void *arg)
+{
+    struct mosaicsync *hs = arg;
+    Heap *hp = hs->hp;
+    int fd;
+    lng t0 = GDKusec();
+
+    if (HEAPsave(hp, hp->filename, NULL) != GDK_SUCCEED ||
+        (fd = GDKfdlocate(hp->farmid, hp->filename, "rb+", NULL)) < 0) {
+        BBPunfix(hs->id);
+        GDKfree(arg);
+        return;
+    }
+    ((oid *) hp->base)[0] |= (oid) 1 << 24;
+    if (write(fd, hp->base, SIZEOF_SIZE_T) < 0)
+        perror("write mosaic");
+    if (!(GDKdebug & FORCEMITOMASK)) {
+#if defined(NATIVE_WIN32)
+        _commit(fd);
+#elif defined(HAVE_FDATASYNC)
+        fdatasync(fd);
+#elif defined(HAVE_FSYNC)
+        fsync(fd);
+#endif
+    }
+    close(fd);
+    BBPunfix(hs->id);
+    ALGODEBUG fprintf(stderr, "#%s: persisting mosaic %s (" LLFMT " usec)\n", 
hs->func, hp->filename, GDKusec() - t0);
+    GDKfree(arg);
+}
+#endif
+
 gdk_return
-MOSalloc(BAT *bn, BUN cap)
+BATmosaic(BAT *bn, BUN cap)
 {
-    const char *nme = BBP_physical(bn->batCacheid);
+    const char *nme;
+       Heap *m;
 
-    if ( (bn->tmosaic = (Heap*)GDKzalloc(sizeof(Heap))) == NULL ||
-        (bn->tmosaic->filename = GDKfilepath(NOFARM, NULL, nme, "mosaic")) == 
NULL)
+    MT_lock_set(&GDKmosaicLock(bn->batCacheid));
+       if( bn->tmosaic){
+               MT_lock_unset(&GDKmosaicLock(bn->batCacheid));
+               return GDK_SUCCEED;
+       }
+
+    nme = BBP_physical(bn->batCacheid);
+    if ( (m = (Heap*)GDKzalloc(sizeof(Heap))) == NULL ||
+               (m->farmid = BBPselectfarm(bn->batRole, bn->ttype, varheap)) < 
0 ||
+        (m->filename = GDKfilepath(NOFARM, NULL, nme, "mosaic")) == NULL){
+                       if( m)
+                               GDKfree(m->filename);
+                       GDKfree(m);
+                       MT_lock_unset(&GDKmosaicLock(bn->batCacheid));
+                       return GDK_FAIL;
+               }
+       
+    if( HEAPalloc(m, cap, Tsize(bn)) != GDK_SUCCEED){
+               MT_lock_unset(&GDKmosaicLock(bn->batCacheid));
         return GDK_FAIL;
-       
-    if( HEAPalloc(bn->tmosaic, cap, Tsize(bn)) != GDK_SUCCEED)
-        return GDK_FAIL;
-    bn->tmosaic->parentid = bn->batCacheid;
-    bn->tmosaic->farmid = BBPselectfarm(bn->batRole, bn->ttype, varheap);
+       }
+    m->parentid = bn->batCacheid;
+
+#ifdef PERSISTENTMOSAIC
+    if ((BBP_status(bn->batCacheid) & BBPEXISTING) &&
+        bn->batInserted == bn->batCount) {
+        MT_Id tid;
+        struct mosaicsync *hs = GDKmalloc(sizeof(*hs));
+        if (hs != NULL) {
+            BBPfix(bn->batCacheid);
+            hs->id = bn->batCacheid;
+            hs->hp = m;
+            hs->func = "BATmosaic";
+            MT_create_thread(&tid, BATmosaicsync, hs, MT_THR_DETACHED);
+        }
+    } else
+        ALGODEBUG fprintf(stderr, "#BATmosaic: NOT persisting index %d\n", 
bn->batCacheid);
+#endif
+       bn->batDirtydesc = TRUE;
+       bn->tmosaic = m;
+       MT_lock_unset(&GDKmosaicLock(bn->batCacheid));
     return GDK_SUCCEED;
 }
 
@@ -51,19 +125,26 @@ BATcheckmosaic(BAT *b)
        int ret;
        lng t;
 
+    if (VIEWtparent(b)) {
+        assert(b->tmosaic == NULL);
+        b = BBPdescriptor(VIEWtparent(b));
+    }
+
        assert(b->batCacheid > 0);
        t = GDKusec();
-       MT_lock_set(&GDKhashLock(abs(b->batCacheid)));
+       MT_lock_set(&GDKmosaicLock(abs(b->batCacheid)));
        t = GDKusec() - t;
-       if (b->tmosaic == NULL) {
+       if (b->tmosaic == (Heap *) 1) {
                Heap *hp;
                const char *nme = BBP_physical(b->batCacheid);
                const char *ext = "mosaic";
                int fd;
 
+               b->tmosaic = NULL;
                if ((hp = GDKzalloc(sizeof(*hp))) != NULL &&
                    (hp->farmid = BBPselectfarm(b->batRole, b->ttype, 
mosaicheap)) >= 0 &&
                    (hp->filename = GDKmalloc(strlen(nme) + 10)) != NULL) {
+
                        sprintf(hp->filename, "%s.%s", nme, ext);
 
                        /* check whether a persisted mosaic can be found */
@@ -79,7 +160,7 @@ BATcheckmosaic(BAT *b)
                                        close(fd);
                                        b->tmosaic = hp;
                                        ALGODEBUG fprintf(stderr, 
"#BATcheckmosaic: reusing persisted mosaic %d\n", b->batCacheid);
-                                       
MT_lock_unset(&GDKhashLock(abs(b->batCacheid)));
+                                       
MT_lock_unset(&GDKmosaicLock(abs(b->batCacheid)));
                                        return 1;
                                }
                                close(fd);
@@ -92,7 +173,7 @@ BATcheckmosaic(BAT *b)
                GDKclrerr();    /* we're not currently interested in errors */
        }
        ret = b->tmosaic != NULL;
-       MT_lock_unset(&GDKhashLock(abs(b->batCacheid)));
+       MT_lock_unset(&GDKmosaicLock(abs(b->batCacheid)));
        ALGODEBUG if (ret) fprintf(stderr, "#BATcheckmosaic: already has mosaic 
%d, waited " LLFMT " usec\n", b->batCacheid, t);
        return ret;
 }
diff --git a/gdk/gdk_private.h b/gdk/gdk_private.h
--- a/gdk/gdk_private.h
+++ b/gdk/gdk_private.h
@@ -15,6 +15,7 @@
 #define DISABLE_PARENT_HASH 1
 /* #define PERSISTENTHASH 1 */
 #define PERSISTENTIDX 1
+#define PERSISTENTMOSAIC 2
 
 #include "gdk_system_private.h"
 
@@ -229,6 +230,7 @@ typedef struct {
        MT_Lock swap;
        MT_Lock hash;
        MT_Lock imprints;
+       MT_Lock mosaic;
 } batlock_t;
 
 typedef struct {
@@ -296,6 +298,8 @@ extern MT_Lock MT_system_lock;
 #define GDKswapLock(x)  GDKbatLock[(x)&BBP_BATMASK].swap
 #define GDKhashLock(x)  GDKbatLock[(x)&BBP_BATMASK].hash
 #define GDKimprintsLock(x)  GDKbatLock[(x)&BBP_BATMASK].imprints
+#define GDKmosaicLock(x)  GDKbatLock[(x)&BBP_BATMASK].mosaic
+
 #if SIZEOF_SIZE_T == 8
 #define threadmask(y)  ((int) ((mix_int((unsigned int) y) ^ mix_int((unsigned 
int) (y >> 32))) & BBP_THREADMASK))
 #else
diff --git a/gdk/gdk_utils.c b/gdk/gdk_utils.c
--- a/gdk/gdk_utils.c
+++ b/gdk/gdk_utils.c
@@ -514,6 +514,7 @@ GDKinit(opt *set, int setlen)
                MT_lock_init(&GDKbatLock[i].swap, "GDKswapLock");
                MT_lock_init(&GDKbatLock[i].hash, "GDKhashLock");
                MT_lock_init(&GDKbatLock[i].imprints, "GDKimprintsLock");
+               MT_lock_init(&GDKbatLock[i].mosaic, "GDKmosaicLock");
        }
        for (i = 0; i <= BBP_THREADMASK; i++) {
                MT_lock_init(&GDKbbpLock[i].alloc, "GDKcacheLock");
diff --git a/monetdb5/modules/mosaic/mosaic.c b/monetdb5/modules/mosaic/mosaic.c
--- a/monetdb5/modules/mosaic/mosaic.c
+++ b/monetdb5/modules/mosaic/mosaic.c
@@ -354,10 +354,11 @@ MOSoptimizerCost(Client cntxt, MOStask t
 str
 MOScompressInternal(Client cntxt, bat *ret, bat *bid, MOStask task, int debug)
 {
-       BAT *bsrc;              // the BAT to be augmented with a compressed 
heap
+       BAT *o = NULL, *bsrc;           // the BAT to be augmented with a 
compressed heap
        str msg = MAL_SUCCEED;
        int cand;
        int tpe, typewidth;
+       lng t0,t1;
        
        *ret = 0;
 
@@ -377,7 +378,7 @@ MOScompressInternal(Client cntxt, bat *r
        case TYPE_flt:
        case TYPE_dbl:
        case TYPE_str:
-               typewidth = ATOMsize(tpe) * 8;
+               typewidth = ATOMsize(tpe) * 8; // size in bits
                break;
        default:
                // don't compress them
@@ -385,7 +386,25 @@ MOScompressInternal(Client cntxt, bat *r
                return msg;
        }
 
-       if ( BATcount(bsrc) < MIN_INPUT_COUNT  || BATcheckmosaic(bsrc)){
+    if (BATcheckmosaic(bsrc)){
+               /* already compressed */
+               BBPkeepref(*ret = bsrc->batCacheid);
+               return msg;
+       }
+    assert(bsrc->tmosaic == NULL);
+
+    if (VIEWtparent(bsrc)) {
+        bat p = VIEWtparent(bsrc);
+        o = bsrc;
+        bsrc = BATdescriptor(p);
+        if (BATcheckmosaic(bsrc)) {
+            BBPunfix(bsrc->batCacheid);
+            return MAL_SUCCEED;
+        }
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to