Changeset: 6173f57cb268 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/6173f57cb268
Modified Files:
        gdk/gdk_bbp.c
        monetdb5/modules/atoms/json.c
Branch: Dec2023
Log Message:

Perform the JSON upgrade by creating new heaps


diffs (290 lines):

diff --git a/gdk/gdk_bbp.c b/gdk/gdk_bbp.c
--- a/gdk/gdk_bbp.c
+++ b/gdk/gdk_bbp.c
@@ -1476,21 +1476,199 @@ movestrbats(void)
 #endif
 
 #ifdef GDKLIBRARY_JSON
+static gdk_return jsonupgradebat(BAT *b,
+                                      json_storage_conversion fixJSONStorage) {
+       const char *nme = BBP_physical(b->batCacheid);
+       char *srcdir = GDKfilepath(NOFARM, BATDIR, nme, NULL);
+
+        if (srcdir == NULL) {
+               TRC_CRITICAL(GDK, "GDKfilepath failed\n");
+               return GDK_FAIL;
+        }
+
+       char *s;
+       if ((s = strrchr(srcdir, DIR_SEP)) != NULL)
+               *s = 0;
+        const char *bnme;
+        if ((bnme = strrchr(nme, DIR_SEP)) != NULL) {
+               bnme++;
+        } else {
+               bnme = nme;
+        }
+
+        long_str filename;
+       snprintf(filename, sizeof(filename), "BACKUP%c%s", DIR_SEP, bnme);
+
+       /* A json column should not normally have any index structures */
+       HASHdestroy(b);
+       IMPSdestroy(b);
+       OIDXdestroy(b);
+       PROPdestroy(b);
+       STRMPdestroy(b);
+       RTREEdestroy(b);
+
+       /* bakup the current heaps */
+       if (GDKmove(b->theap->farmid, srcdir, bnme, "tail",
+                    BAKDIR, bnme, "tail", false) != GDK_SUCCEED) {
+                GDKfree(srcdir);
+               TRC_CRITICAL(GDK, "cannot make backup of %s.tail\n", nme);
+               return GDK_FAIL;
+        }
+       GDKclrerr();
+        if (GDKmove(b->theap->farmid, srcdir, bnme, "theap",
+                    BAKDIR, bnme, "theap", true) != GDK_SUCCEED) {
+                GDKfree(srcdir);
+               TRC_CRITICAL(GDK, "cannot make backup of %s.theap\n", nme);
+               return GDK_FAIL;
+        }
+
+
+        /* load the old heaps */
+        Heap h1 = *b->theap;
+        h1.base = NULL;
+        h1.dirty = false;
+        strconcat_len(h1.filename, sizeof(h1.filename), filename, ".tail", 
NULL);
+        if (HEAPload(&h1, filename, "tail", false) != GDK_SUCCEED) {
+                GDKfree(srcdir);
+                TRC_CRITICAL(GDK, "loading old tail heap "
+                             "for BAT %d failed\n", b->batCacheid);
+               return GDK_FAIL;
+        }
+
+        Heap vh1 = *b->tvheap;
+        vh1.base = NULL;
+       vh1.dirty = false;
+        strconcat_len(vh1.filename, sizeof(vh1.filename), filename, ".theap", 
NULL);
+        if (HEAPload(&vh1, filename, "theap", false) != GDK_SUCCEED) {
+                GDKfree(srcdir);
+               HEAPfree(&h1, false);
+                TRC_CRITICAL(GDK, "loading old string heap "
+                             "for BAT %d failed\n", b->batCacheid);
+               return GDK_FAIL;
+        }
+
+       /* create the new heaps */
+        Heap *h2 = GDKmalloc(sizeof(Heap));
+        Heap *vh2 = GDKmalloc(sizeof(Heap));
+        if (h2 == NULL || vh2 == NULL) {
+               GDKfree(h2);
+               GDKfree(vh2);
+               GDKfree(srcdir);
+               HEAPfree(&h1, false);
+               HEAPfree(&vh1, false);
+                TRC_CRITICAL(GDK, "allocating new heaps "
+                             "for BAT %d failed\n", b->batCacheid);
+               return GDK_FAIL;
+        }
+        *h2 = *b->theap;
+       h2->base = NULL;
+        if (HEAPalloc(h2, b->batCapacity, b->twidth) != GDK_SUCCEED) {
+                GDKfree(h2);
+                GDKfree(vh2);
+                GDKfree(srcdir);
+               HEAPfree(&h1, false);
+               HEAPfree(&vh1, false);
+               TRC_CRITICAL(GDK, "allocating new tail heap "
+                            "for BAT %d failed\n", b->batCacheid);
+               return GDK_FAIL;
+
+        }
+        h2->dirty = true;
+       h2->free = h1.free;
+
+       *vh2 = *b->tvheap;
+       strconcat_len(vh2->filename, sizeof(vh2->filename), nme, ".theap", 
NULL);
+       strHeap(vh2, b->batCapacity);
+       if (vh2->base == NULL) {
+               GDKfree(srcdir);
+               HEAPfree(&h1, false);
+               HEAPfree(&vh1, false);
+               HEAPfree(h2, false);
+               GDKfree(h2);
+               GDKfree(vh2);
+               TRC_CRITICAL(GDK, "allocating new string heap "
+                            "for BAT %d failed\n", b->batCacheid);
+               return GDK_FAIL;
+       }
+        vh2->dirty = true;
+        ATOMIC_INIT(&h2->refs, 1);
+        ATOMIC_INIT(&vh2->refs, 1);
+        Heap *ovh = b->tvheap;
+        b->tvheap = vh2;
+       vh2 = NULL;
+
+        for (BUN i = 0; i < b->batCount; i++) {
+                var_t o = ((var_t *) h1.base)[i];
+                const char *s = vh1.base + o;
+               char *ns;
+                if (fixJSONStorage(&ns, &s) != GDK_SUCCEED) {
+                       GDKfree(srcdir);
+                       HEAPfree(&h1, false);
+                       HEAPfree(&vh1, false);
+                        HEAPdecref(h2, false);
+                        HEAPdecref(b->tvheap, false);
+                       b->tvheap = ovh;
+                       TRC_CRITICAL(GDK, "converting value "
+                                    "in BAT %d failed\n", b->batCacheid);
+                       return GDK_FAIL;
+                }
+               var_t no = strPut(b, &o, ns);
+               GDKfree(ns);
+                if (no == 0) {
+                       GDKfree(srcdir);
+                       HEAPfree(&h1, false);
+                       HEAPfree(&vh1, false);
+                        HEAPdecref(h2, false);
+                        HEAPdecref(b->tvheap, false);
+                       b->tvheap = ovh;
+                       TRC_CRITICAL(GDK, "storing new value "
+                                    "in BAT %d failed\n", b->batCacheid);
+                       return GDK_FAIL;
+
+                }
+               ((var_t *)h2->base)[i] = no;
+        }
+
+       /* cleanup */
+        HEAPfree(&h1, false);
+        HEAPfree(&vh1, false);
+        if (HEAPsave(h2, nme, BATtailname(b), true, h2->free, NULL) !=
+            GDK_SUCCEED) {
+                HEAPdecref(h2, false);
+               HEAPdecref(b->tvheap, false);
+                b->tvheap = ovh;
+                GDKfree(srcdir);
+                TRC_CRITICAL(GDK, "saving heap failed\n");
+               return GDK_FAIL;
+        }
+
+        if (HEAPsave(b->tvheap, nme, "theap", true, b->tvheap->free,
+                     &b->theaplock) != GDK_SUCCEED) {
+                HEAPfree(b->tvheap, false);
+                b->tvheap = ovh;
+                GDKfree(srcdir);
+                TRC_CRITICAL(GDK, "saving string failed\n");
+               return GDK_FAIL;
+        }
+
+        HEAPdecref(b->theap, false);
+        b->theap = h2;
+       HEAPfree(h2, false);
+        HEAPdecref(ovh, false);
+       HEAPfree(b->tvheap, false);
+       GDKfree(srcdir);
+
+       return GDK_SUCCEED;
+}
+
 gdk_return
 BBPjson_upgrade(json_storage_conversion fixJSONStorage) {
-       (void) fixJSONStorage;
-#if 0
        bat bid;
        BAT *b;
-       bat *cmlst;
-       int cnt = 1;
        int JSON_type = ATOMindex("json");
 
-       if ((cmlst = GDKmalloc(ATOMIC_GET(&BBPsize) * sizeof(bat))) == NULL) {
-               TRC_CRITICAL(GDK, "json storage upgrade: failed to allocate 
space");
-               return GDK_FAIL;
-       }
-       cmlst[0] = 0;
+       BBPlock();
+
        for (bid = 1; bid < (bat) ATOMIC_GET(&BBPsize); bid++) {
                if ((b = BBP_desc(bid)) == NULL) {
                        /* not a valid BAT */
@@ -1507,57 +1685,18 @@ BBPjson_upgrade(json_storage_conversion 
                        continue;
                }
                fprintf(stderr, "Upgrading json bat %d\n", bid);
-
-               BAT *b = BATdescriptor(bid);
-               BAT *newb;
-               BATiter bi;
-               struct canditer ci;
-               oid x;
-               str out = NULL;
-
-               newb = COLnew(0, b->ttype, b->batCapacity, PERSISTENT);
-               if (newb == NULL) {
-                       TRC_CRITICAL(GDK, "json storage upgrade: new bat 
creation failed");
+                if (jsonupgradebat(b, fixJSONStorage) != GDK_SUCCEED) {
+                        BBPunlock();
+                       GDKunlink(0, BATDIR, "jsonupgradeneeded", NULL);
                        return GDK_FAIL;
-               }
-
-               canditer_init(&ci, b, NULL);
-               bi = bat_iterator(b);
-               for (BUN i = 0; i < ci.ncand; i++) {
-                       x = canditer_next(&ci);
-                       const char *cs = BUNtvar(bi, x);
-                       if (!strNil(cs)) {
-                               if(fixJSONStorage(&out, &cs) != GDK_SUCCEED) {
-                                       TRC_CRITICAL(GDK, "could not convert 
json string for %s", cs);
-                                       GDKfree(cmlst);
-                                       return GDK_FAIL;
-                               }
-                               if (BUNappend(newb, out, false) != GDK_SUCCEED) 
{
-                                       TRC_CRITICAL(GDK, "json storage 
upgrade: appending value to bat failed");
-                                       GDKfree(out);
-                                       GDKfree(cmlst);
-                                       return GDK_FAIL;
-                               }
-                               GDKfree(out);
-                               out = NULL;
-                       }
-               }
-               bat_iterator_end(&bi);
-               if (BBPsave(newb) != GDK_SUCCEED) {
-                       GDKfree(cmlst);
-                       return GDK_FAIL;
-               }
-               cmlst[cnt++] = newb->batCacheid;
-               BBPunfix(newb->batCacheid);
-               BBPunfix(bid);
+                }
+
        }
-       if (TMsubcommit_list(cmlst, NULL, cnt, -1, -1) != GDK_SUCCEED) {
-               GDKfree(cmlst);
+       BBPunlock();
+       if (TMcommit() != GDK_SUCCEED) {
+               TRC_CRITICAL(GDK, "failed to commit changes\n");
                return GDK_FAIL;
        }
-       GDKfree(cmlst);
-#endif // 0
-       /* We did the upgrade, remove the signal file */
        GDKunlink(0, BATDIR, "jsonupgradeneeded", NULL);
        return GDK_SUCCEED;
 }
diff --git a/monetdb5/modules/atoms/json.c b/monetdb5/modules/atoms/json.c
--- a/monetdb5/modules/atoms/json.c
+++ b/monetdb5/modules/atoms/json.c
@@ -602,7 +602,7 @@ JSONprelude(void)
                /* The file exists so we need to run the upgrade code */
                if (BBPjson_upgrade(upgradeJSONStorage) != GDK_SUCCEED) {
                        GDKfree(jsonupgrade);
-                       throw(MAL, "json.prelude", SQLSTATE(HY013) 
MAL_MALLOC_FAIL); // Fix exception reason
+                       throw(MAL, "json.prelude", "JSON storage upgrade 
failed"); // Fix exception reason
                }
        }
        GDKfree(jsonupgrade);
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to