Changeset: 7bcba8c27acc for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/7bcba8c27acc
Modified Files:
        monetdb5/mal/mal_resource.c
        sql/backends/monet5/sql_upgrades.c
Branch: scatter
Log Message:

merged with jul2021


diffs (truncated from 1231 to 300 lines):

diff --git a/gdk/gdk_bbp.c b/gdk/gdk_bbp.c
--- a/gdk/gdk_bbp.c
+++ b/gdk/gdk_bbp.c
@@ -2331,6 +2331,7 @@ decref(bat i, bool logical, bool release
            (BBP_lrefs(i) > 0 &&
             (b == NULL ||
              (BATdirty(b) && (BBP_status(i) & BBPHOT)) ||
+             (BBP_status(i) & BBPSYNCING) || /* no swap during (sub)commit */
              (BBP_status(i) & (BBPPERSISTENT | BBPHOT)) == BBPHOT ||
              GDKinmemory(b->theap->farmid)))) {
                /* bat cannot be swapped out */
@@ -3097,6 +3098,7 @@ BBPsync(int cnt, bat *restrict subcommit
        gdk_return ret = GDK_SUCCEED;
        int t0 = 0, t1 = 0;
        str bakdir, deldir;
+       const bool lock = locked_by == 0 || locked_by != MT_getpid();
 
        if(!(bakdir = GDKfilepath(0, NULL, subcommit ? SUBDIR : BAKDIR, NULL)))
                return GDK_FAIL;
@@ -3115,18 +3117,40 @@ BBPsync(int cnt, bat *restrict subcommit
 
                while (++idx < cnt) {
                        bat i = subcommit ? subcommit[idx] : idx;
+                       if (lock)
+                               MT_lock_set(&GDKswapLock(i));
+                       /* set flag that we're syncing, i.e. that we'll
+                        * be between moving heap to backup dir and
+                        * saving the new version */
+                       BBP_status_on(i, BBPSYNCING);
+                       /* wait until unloading is finished before
+                        * attempting to make a backup */
+                       while (BBP_status(i) & BBPUNLOADING) {
+                               if (lock)
+                                       MT_lock_unset(&GDKswapLock(i));
+                               BBPspin(i, __func__, BBPUNLOADING);
+                               if (lock)
+                                       MT_lock_set(&GDKswapLock(i));
+                       }
                        BAT *b = dirty_bat(&i, subcommit != NULL);
                        if (i <= 0)
                                break;
                        if (BBP_status(i) & BBPEXISTING) {
-                               if (b != NULL && BBPbackup(b, subcommit != 
NULL) != GDK_SUCCEED)
+                               if (b != NULL && BBPbackup(b, subcommit != 
NULL) != GDK_SUCCEED) {
+                                       BBP_status_off(i, BBPSYNCING);
+                                       if (lock)
+                                               MT_lock_unset(&GDKswapLock(i));
                                        break;
+                               }
                        } else if (subcommit && (b = BBP_desc(i)) && 
BBP_status(i) & BBPDELETED) {
                                char o[10];
                                char *f;
                                snprintf(o, sizeof(o), "%o", (unsigned) 
b->batCacheid);
                                f = GDKfilepath(b->theap->farmid, BAKDIR, o, 
gettailname(b));
                                if (f == NULL) {
+                                       BBP_status_off(i, BBPSYNCING);
+                                       if (lock)
+                                               MT_lock_unset(&GDKswapLock(i));
                                        ret = GDK_FAIL;
                                        goto bailout;
                                }
@@ -3135,6 +3159,9 @@ BBPsync(int cnt, bat *restrict subcommit
                                GDKfree(f);
                                f = GDKfilepath(b->theap->farmid, BAKDIR, o, 
"theap");
                                if (f == NULL) {
+                                       BBP_status_off(i, BBPSYNCING);
+                                       if (lock)
+                                               MT_lock_unset(&GDKswapLock(i));
                                        ret = GDK_FAIL;
                                        goto bailout;
                                }
@@ -3142,6 +3169,8 @@ BBPsync(int cnt, bat *restrict subcommit
                                        file_move(b->theap->farmid, BAKDIR, 
SUBDIR, o, "theap");
                                GDKfree(f);
                        }
+                       if (lock)
+                               MT_lock_unset(&GDKswapLock(i));
                }
                if (idx < cnt)
                        ret = GDK_FAIL;
@@ -3162,6 +3191,8 @@ BBPsync(int cnt, bat *restrict subcommit
                                if (b != NULL && BATsave(b) != GDK_SUCCEED)
                                        break;  /* write error */
                        }
+                       /* we once again have a saved heap */
+                       BBP_status_off(i, BBPSYNCING);
                }
                if (idx < cnt)
                        ret = GDK_FAIL;
diff --git a/gdk/gdk_bbp.h b/gdk/gdk_bbp.h
--- a/gdk/gdk_bbp.h
+++ b/gdk/gdk_bbp.h
@@ -52,6 +52,7 @@
 #define BBPWAITING      (BBPUNLOADING|BBPLOADING|BBPSAVING|BBPDELETING)
 
 #define BBPHOT         4096    /* bat is "hot", i.e. is still in active use */
+#define BBPSYNCING     8192
 
 #define BBPTRIM_ALL    (((size_t)1) << (sizeof(size_t)*8 - 2)) /* very large 
positive size_t */
 
diff --git a/gdk/gdk_imprints.c b/gdk/gdk_imprints.c
--- a/gdk/gdk_imprints.c
+++ b/gdk/gdk_imprints.c
@@ -720,8 +720,7 @@ IMPSimprintsize(BAT *b)
 {
        lng sz = 0;
        if (b->timprints && b->timprints != (Imprints *) 1) {
-               sz = b->timprints->impcnt * b->timprints->bits / 8;
-               sz += b->timprints->dictcnt * sizeof(cchdc_t);
+               sz = (lng) b->timprints->imprints.free;
        }
        return sz;
 }
diff --git a/gdk/gdk_join.c b/gdk/gdk_join.c
--- a/gdk/gdk_join.c
+++ b/gdk/gdk_join.c
@@ -1796,7 +1796,7 @@ mergejoin(BAT **r1p, BAT **r2p, BAT *l, 
                lordering = l->tsorted && (r->tsorted || !equal_order) ? 1 : -1;
                rordering = equal_order ? lordering : -lordering;
                if (!l->tnonil && !nil_matches && !nil_on_miss) {
-                       nl = binsearch(NULL, 0, l->ttype, lvals, lvars, lwidth, 
0, BATcount(l), nil, l->tsorted ? 1 : -1, 1);
+                       nl = binsearch(NULL, 0, l->ttype, lvals, lvars, lwidth, 
0, BATcount(l), nil, l->tsorted ? 1 : -1, l->tsorted ? 1 : 0);
                        nl = canditer_search(lci, nl + l->hseqbase, true);
                        if (l->tsorted) {
                                canditer_setidx(lci, nl);
diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -777,7 +777,6 @@ la_apply(logger *lg, logaction *c)
        case LOG_CREATE:
                if (!lg->flushing)
                        ret = la_bat_create(lg, c);
-               lg->cnt++;
                break;
        case LOG_DESTROY:
                if (!lg->flushing)
@@ -934,38 +933,44 @@ logger_create_types_file(logger *lg, con
 static gdk_return
 logger_open_output(logger *lg)
 {
-       char id[32];
-       char *filename;
+       logged_range *new_range = 
(logged_range*)GDKmalloc(sizeof(logged_range));
 
-       if (LOG_DISABLED(lg)) {
-               lg->end = 0;
-               if (lg->id) /* go back to last used id */
-                       lg->id--;
-               return GDK_SUCCEED;
-       }
-       if (snprintf(id, sizeof(id), LLFMT, lg->id) >= (int) sizeof(id)) {
-               TRC_CRITICAL(GDK, "filename is too large\n");
-               return GDK_FAIL;
-       }
-       if (!(filename = GDKfilepath(BBPselectfarm(PERSISTENT, 0, offheap), 
lg->dir, LOGFILE, id))) {
+       if (!new_range) {
                TRC_CRITICAL(GDK, "allocation failure\n");
                return GDK_FAIL;
        }
 
-       lg->output_log = open_wstream(filename);
-       if (lg->output_log) {
-               short byteorder = 1234;
-               mnstr_write(lg->output_log, &byteorder, sizeof(byteorder), 1);
-       }
-       lg->end = 0;
+       if (LOG_DISABLED(lg)) {
+               lg->end = 0;
+       } else {
+               char id[32];
+               char *filename;
 
-       logged_range *new_range = 
(logged_range*)GDKmalloc(sizeof(logged_range));
+               if (snprintf(id, sizeof(id), LLFMT, lg->id) >= (int) 
sizeof(id)) {
+                       TRC_CRITICAL(GDK, "filename is too large\n");
+                       GDKfree(new_range);
+                       return GDK_FAIL;
+               }
+               if (!(filename = GDKfilepath(BBPselectfarm(PERSISTENT, 0, 
offheap), lg->dir, LOGFILE, id))) {
+                       TRC_CRITICAL(GDK, "allocation failure\n");
+                       GDKfree(new_range);
+                       return GDK_FAIL;
+               }
 
-       if (lg->output_log == NULL || mnstr_errnr(lg->output_log) || 
!new_range) {
-               TRC_CRITICAL(GDK, "creating %s failed: %s\n", filename, 
mnstr_peek_error(NULL));
-               GDKfree(new_range);
+               lg->output_log = open_wstream(filename);
+               if (lg->output_log) {
+                       short byteorder = 1234;
+                       mnstr_write(lg->output_log, &byteorder, 
sizeof(byteorder), 1);
+               }
+               lg->end = 0;
+
+               if (lg->output_log == NULL || mnstr_errnr(lg->output_log)) {
+                       TRC_CRITICAL(GDK, "creating %s failed: %s\n", filename, 
mnstr_peek_error(NULL));
+                       GDKfree(new_range);
+                       GDKfree(filename);
+                       return GDK_FAIL;
+               }
                GDKfree(filename);
-               return GDK_FAIL;
        }
        new_range->id = lg->id;
        new_range->first_tid = lg->tid;
@@ -977,7 +982,6 @@ logger_open_output(logger *lg)
        lg->current = new_range;
        if (!lg->pending)
                lg->pending = new_range;
-       GDKfree(filename);
        return GDK_SUCCEED;
 }
 
@@ -1403,7 +1407,6 @@ bm_subcommit(logger *lg)
                n[i++] = col;
        }
        /* now commit catalog, so it's also up to date on disk */
-       assert(!LOG_DISABLED(lg) || BATcount(catalog_bid) == lg->cnt);
        sizes[i] = lg->cnt;
        n[i++] = catalog_bid->batCacheid;
        sizes[i] = lg->cnt;
@@ -2100,14 +2103,6 @@ logger_cleanup_range(logger *lg)
 gdk_return
 logger_activate(logger *lg)
 {
-       if (LOG_DISABLED(lg)) {
-               if (lg->saved_id+1 == lg->id) {
-                       lg->saved_id++;
-                       lg->saved_tid = lg->tid;
-                       logger_cleanup_range(lg);
-               }
-               return GDK_SUCCEED;
-       }
        if (lg->end > 0 && lg->saved_id+1 == lg->id) {
                lg->id++;
                logger_close_output(lg);
@@ -2127,6 +2122,9 @@ logger_flush(logger *lg, ulng ts)
                lg->saved_tid = lg->tid;
                if (lid)
                        logger_cleanup_range(lg);
+               if (logger_commit(lg) != GDK_SUCCEED) {
+                       TRC_ERROR(GDK, "failed to commit");
+               }
                return GDK_SUCCEED;
        }
        if (lg->saved_id >= lid)
@@ -2285,6 +2283,8 @@ internal_log_bat(logger *lg, BAT *b, log
 
        if (LOG_DISABLED(lg) || !nr) {
                /* logging is switched off */
+               if (LOG_DISABLED(lg))
+                       lg->end += nr;
                if (nr)
                        return la_bat_update_count(lg, id, offset+cnt);
                return GDK_SUCCEED;
@@ -2371,7 +2371,7 @@ log_bat_persists(logger *lg, BAT *b, log
                        return GDK_FAIL;
                }
        } else {
-               lg->cnt++;
+               lg->end++;
        }
        if (lg->debug & 1)
                fprintf(stderr, "#persists id (%d) bat (%d)\n", id, 
b->batCacheid);
@@ -2397,7 +2397,7 @@ log_bat_transient(logger *lg, log_id id)
                        return GDK_FAIL;
                }
        } else {
-               lg->cnt--;
+               lg->end++;
        }
        if (lg->debug & 1)
                fprintf(stderr, "#Logged destroyed bat (%d) %d\n", id,
@@ -2440,6 +2440,7 @@ log_delta(logger *lg, BAT *uid, BAT *uva
        assert(nr);
 
        if (LOG_DISABLED(lg)) {
+               lg->end += nr;
                /* logging is switched off */
                logger_unlock(lg);
                return GDK_SUCCEED;
@@ -2489,9 +2490,10 @@ log_bat_clear(logger *lg, int id)
 {
        logformat l;
 
-       if (LOG_DISABLED(lg))
+       if (LOG_DISABLED(lg)) {
+               lg->end++;
                return la_bat_update_count(lg, id, 0);
-       //      return GDK_SUCCEED;
+       }
 
        l.flag = LOG_CLEAR;
        l.id = id;
@@ -2551,8 +2553,10 @@ log_tend(logger *lg)
                return logger_commit(lg);
        }
 
-       if (LOG_DISABLED(lg))
+       if (LOG_DISABLED(lg)) {
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to