Changeset: 3f7be6f05660 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/3f7be6f05660
Modified Files:
        gdk/gdk_logger.c
Branch: default
Log Message:

Merge with Jun2023 branch.


diffs (truncated from 305 to 300 lines):

diff --git a/gdk/ChangeLog.Jun2023 b/gdk/ChangeLog.Jun2023
--- a/gdk/ChangeLog.Jun2023
+++ b/gdk/ChangeLog.Jun2023
@@ -1,3 +1,10 @@
 # ChangeLog file for GDK
 # This file is updated with Maddlog
 
+* Tue Nov  7 2023 Sjoerd Mullender <sjo...@acm.org>
+- When saving the SQL catalog during a low-level commit, we should
+  only save the part of the catalog that corresponds to the part of the
+  write-ahead log that has been processed.  What we did was save more,
+  which resulted in the catalog containing references to tables and
+  columns whose disk presence is otherwise only in the write-ahead log.
+
diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -94,7 +94,7 @@ typedef struct logformat_t {
 
 typedef enum { LOG_OK, LOG_EOF, LOG_ERR } log_return;
 
-static gdk_return bm_commit(logger *lg, uint32_t *updated, BUN maxupdated);
+static gdk_return bm_commit(logger *lg, logged_range *pending, uint32_t 
*updated, BUN maxupdated);
 static gdk_return tr_grow(trans *tr);
 
 #define log_lock(lg)   MT_lock_set(&(lg)->lock)
@@ -1089,6 +1089,8 @@ log_open_output(logger *lg)
        new_range->next = NULL;
        logged_range *current = lg->current;
        assert(current && current->next == NULL);
+       new_range->cnt = current->cnt;
+       new_range->deleted = current->deleted;
        current->next = new_range;
        ATOMIC_INC(&lg->nr_open_files);
        return GDK_SUCCEED;
@@ -1381,11 +1383,11 @@ log_readlogs(logger *lg, const char *fil
 }
 
 static gdk_return
-log_commit(logger *lg, uint32_t *updated, BUN maxupdated)
+log_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN 
maxupdated)
 {
        TRC_DEBUG(WAL, "commit");
 
-       return bm_commit(lg, updated, maxupdated);
+       return bm_commit(lg, pending, updated, maxupdated);
 }
 
 static gdk_return
@@ -1661,19 +1663,25 @@ cleanup_and_swap(logger *lg, int *r, con
                GDKclrerr();
        lg->cnt = BATcount(lg->catalog_bid);
        lg->deleted -= cleanup;
+       for (logged_range *p = lg->pending; p; p = p->next) {
+               p->cnt -= cleanup;
+               p->deleted -= cleanup;
+       }
        return rcnt;
 }
 
 /* this function is called with log_lock() held; it releases the lock
  * before returning */
 static gdk_return
-bm_subcommit(logger *lg, uint32_t *updated, BUN maxupdated)
+bm_subcommit(logger *lg, logged_range *pending, uint32_t *updated, BUN 
maxupdated)
 {
+       BUN cnt = pending ? pending->cnt : BATcount(lg->catalog_bid);
+       BUN dcnt = pending ? pending->deleted : BATcount(lg->dcatalog);
        BUN p, q;
        BAT *catalog_bid = lg->catalog_bid;
        BAT *catalog_id = lg->catalog_id;
        BAT *dcatalog = lg->dcatalog;
-       BUN nn = 13 + BATcount(catalog_bid);
+       BUN nn = 13 + cnt;
        bat *n = GDKmalloc(sizeof(bat) * nn);
        bat *r = GDKmalloc(sizeof(bat) * nn);
        BUN *sizes = GDKmalloc(sizeof(BUN) * nn);
@@ -1716,27 +1724,31 @@ bm_subcommit(logger *lg, uint32_t *updat
                n[i++] = col;
        }
        /* now commit catalog, so it's also up to date on disk */
-       sizes[i] = lg->cnt;
+       sizes[i] = cnt;
        n[i++] = catalog_bid->batCacheid;
-       sizes[i] = lg->cnt;
+       sizes[i] = cnt;
        n[i++] = catalog_id->batCacheid;
-       sizes[i] = BATcount(dcatalog);
+       sizes[i] = dcnt;
        n[i++] = dcatalog->batCacheid;
 
-       if (cleanup &&
-           (rcnt = cleanup_and_swap(lg, r, bids, lids, cnts, catalog_bid,
-                                    catalog_id, dcatalog, cleanup, updated,
-                                    maxupdated)) < 0) {
-               GDKfree(n);
-               GDKfree(r);
-               GDKfree(sizes);
-               log_unlock(lg);
-               return GDK_FAIL;
+       if (cleanup) {
+               if ((rcnt = cleanup_and_swap(lg, r, bids, lids, cnts,
+                                            catalog_bid, catalog_id, dcatalog,
+                                            cleanup, updated,
+                                            maxupdated)) < 0) {
+                       GDKfree(n);
+                       GDKfree(r);
+                       GDKfree(sizes);
+                       log_unlock(lg);
+                       return GDK_FAIL;
+               }
+               cnt -= cleanup;
+               dcnt -= cleanup;
        }
        if (dcatalog != lg->dcatalog) {
-               i = subcommit_list_add(i, n, sizes, 
lg->catalog_bid->batCacheid, BATcount(lg->catalog_bid));
-               i = subcommit_list_add(i, n, sizes, lg->catalog_id->batCacheid, 
BATcount(lg->catalog_bid));
-               i = subcommit_list_add(i, n, sizes, lg->dcatalog->batCacheid, 
BATcount(lg->dcatalog));
+               i = subcommit_list_add(i, n, sizes, 
lg->catalog_bid->batCacheid, cnt);
+               i = subcommit_list_add(i, n, sizes, lg->catalog_id->batCacheid, 
cnt);
+               i = subcommit_list_add(i, n, sizes, lg->dcatalog->batCacheid, 
dcnt);
        }
        if (lg->seqs_id) {
                sizes[i] = BATcount(lg->seqs_id);
@@ -1994,7 +2006,7 @@ log_load(const char *fn, const char *log
 
                log_lock(lg);
                /* bm_subcommit releases the lock */
-               if (bm_subcommit(lg, NULL, 0) != GDK_SUCCEED) {
+               if (bm_subcommit(lg, NULL, NULL, 0) != GDK_SUCCEED) {
                        /* cannot commit catalog, so remove log */
                        if (MT_remove(filename) < 0)
                                GDKsyserror("remove %s failed\n", filename);
@@ -2144,7 +2156,7 @@ log_load(const char *fn, const char *log
        }
        dbg = ATOMIC_GET(&GDKdebug);
        ATOMIC_AND(&GDKdebug, ~CHECKMASK);
-       if (needcommit && bm_commit(lg, NULL, 0) != GDK_SUCCEED) {
+       if (needcommit && bm_commit(lg, NULL, NULL, 0) != GDK_SUCCEED) {
                GDKerror("Logger_new: commit failed");
                goto error;
        }
@@ -2169,7 +2181,7 @@ log_load(const char *fn, const char *log
                }
                dbg = ATOMIC_GET(&GDKdebug);
                ATOMIC_AND(&GDKdebug, ~CHECKMASK);
-               if (log_commit(lg, NULL, 0) != GDK_SUCCEED) {
+               if (log_commit(lg, NULL, NULL, 0) != GDK_SUCCEED) {
                        goto error;
                }
                ATOMIC_SET(&GDKdebug, dbg);
@@ -2333,7 +2345,7 @@ log_destroy(logger *lg)
        if (LOG_DISABLED(lg)) {
                lg->saved_id = lg->id;
                lg->saved_tid = lg->tid;
-               log_commit(lg, NULL, 0);
+               log_commit(lg, NULL, NULL, 0);
        }
        if (lg->catalog_bid) {
                log_lock(lg);
@@ -2394,7 +2406,10 @@ log_create(int debug, const char *fn, co
                return NULL;
        }
        assert(lg->current == NULL);
-       logged_range dummy = { 0 };
+       logged_range dummy = {
+               .cnt = BATcount(lg->catalog_bid),
+               .deleted = BATcount(lg->dcatalog),
+       };
        lg->current = &dummy;
        if (log_open_output(lg) != GDK_SUCCEED) {
                log_destroy(lg);
@@ -2407,12 +2422,12 @@ log_create(int debug, const char *fn, co
        return lg;
 }
 
-static ulng
+static logged_range *
 log_next_logfile(logger *lg, ulng ts)
 {
        int m = (ATOMIC_GET(&GDKdebug) & FORCEMITOMASK) ? 1000 : 100;
        if (!lg->pending || !lg->pending->next)
-               return 0;
+               return NULL;
        if (ATOMIC_GET(&lg->pending->refcount) == 0 && lg->pending != 
lg->current && lg->pending != lg->flush_ranges &&
            (ulng) ATOMIC_GET(&lg->pending->last_ts) == (ulng) 
ATOMIC_GET(&lg->pending->flushed_ts) &&
            (ulng) ATOMIC_GET(&lg->pending->flushed_ts) <= ts) {
@@ -2422,9 +2437,9 @@ log_next_logfile(logger *lg, ulng ts)
                     p->next != lg->flush_ranges && (ulng) 
ATOMIC_GET(&p->last_ts) == (ulng) ATOMIC_GET(&p->flushed_ts)
                     && (ulng) ATOMIC_GET(&p->flushed_ts) <= ts; i++)
                        p = p->next;
-               return p->id;
+               return p;
        }
-       return 0;
+       return NULL;
 }
 
 static void
@@ -2483,13 +2498,14 @@ log_activate(logger *lg)
 gdk_return
 log_flush(logger *lg, ulng ts)
 {
-       ulng lid = log_next_logfile(lg, ts), olid = lg->saved_id;
+       logged_range *pending = log_next_logfile(lg, ts);
+       ulng lid = pending ? pending->id : 0, olid = lg->saved_id;
        if (LOG_DISABLED(lg)) {
                lg->saved_id = lid;
                lg->saved_tid = lg->tid;
                if (lid)
                        log_cleanup_range(lg, lg->saved_id);
-               if (log_commit(lg, NULL, 0) != GDK_SUCCEED)
+               if (log_commit(lg, NULL, NULL, 0) != GDK_SUCCEED)
                        TRC_ERROR(GDK, "failed to commit");
                return GDK_SUCCEED;
        }
@@ -2577,7 +2593,7 @@ log_flush(logger *lg, ulng ts)
                rotation_lock(lg);      /* protect against concurrent 
log_tflush rotate check */
                lg->saved_id = lid;
                rotation_unlock(lg);
-               if (log_commit(lg, updated, nupdated) != GDK_SUCCEED) {
+               if (log_commit(lg, pending, updated, nupdated) != GDK_SUCCEED) {
                        TRC_ERROR(GDK, "failed to commit");
                        res = LOG_ERR;
                        rotation_lock(lg);
@@ -3088,19 +3104,20 @@ log_tflush(logger *lg, ulng file_id, uln
 {
        if (lg->flushnow) {
                rotation_lock(lg);
+               logged_range *p = lg->current;
                assert(lg->flush_ranges == lg->current);
                assert(ATOMIC_GET(&lg->current->flushed_ts) == 
ATOMIC_GET(&lg->current->last_ts));
                log_tdone(lg, lg->current, commit_ts);
                ATOMIC_SET(&lg->current->flushed_ts, commit_ts);
                lg->id++;
-               lg->flushnow = 0;
+               lg->flushnow = false;
                if (log_open_output(lg) != GDK_SUCCEED)
                        GDKfatal("Could not create new log file\n");    /* 
TODO: does not have to be fatal (yet) */
                do_rotate(lg);
                rotation_unlock(lg);
                (void) do_flush_range_cleanup(lg);
                assert(lg->flush_ranges == lg->current);
-               return log_commit(lg, NULL, 0);
+               return log_commit(lg, p, NULL, 0);
        }
 
        if (LOG_DISABLED(lg))
@@ -3208,15 +3225,14 @@ log_tsequence(logger *lg, int seq, lng v
 }
 
 static gdk_return
-bm_commit(logger *lg, uint32_t *updated, BUN maxupdated)
+bm_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated)
 {
-       BUN p;
        log_lock(lg);
        BAT *b = lg->catalog_bid;
        const log_bid *bids;
 
        bids = (log_bid *) Tloc(b, 0);
-       for (p = b->batInserted; p < BATcount(b); p++) {
+       for (BUN p = b->batInserted, cnt = pending ? pending->cnt : 
BATcount(b); p < cnt; p++) {
                log_bid bid = bids[p];
                BAT *lb;
 
@@ -3234,7 +3250,7 @@ bm_commit(logger *lg, uint32_t *updated,
                TRC_DEBUG(WAL, "create %d (%d)\n", bid, BBP_lrefs(bid));
        }
        /* bm_subcommit releases the lock */
-       return bm_subcommit(lg, updated, maxupdated);
+       return bm_subcommit(lg, pending, updated, maxupdated);
 }
 
 static gdk_return
@@ -3264,6 +3280,8 @@ log_add_bat(logger *lg, BAT *b, log_id i
            BUNappend(lg->catalog_lid, &lid, false) != GDK_SUCCEED)
                return GDK_FAIL;
        lg->cnt++;
+       if (lg->current)
+               lg->current->cnt++;
        BBPretain(bid);
        return GDK_SUCCEED;
 }
@@ -3287,6 +3305,8 @@ log_del_bat(logger *lg, log_bid bid)
                return GDK_FAIL;
        if (BUNappend(lg->dcatalog, &pos, true) == GDK_SUCCEED) {
                lg->deleted++;
+               if (lg->current)
+                       lg->current->deleted++;
                return GDK_SUCCEED;
        }
        return GDK_FAIL;
diff --git a/gdk/gdk_logger_internals.h b/gdk/gdk_logger_internals.h
--- a/gdk/gdk_logger_internals.h
+++ b/gdk/gdk_logger_internals.h
@@ -21,6 +21,8 @@ typedef struct logged_range_t {
        ATOMIC_TYPE refcount;
        struct logged_range_t *next;
        stream *output_log;
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to