Changeset: 3f7be6f05660 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/3f7be6f05660 Modified Files: gdk/gdk_logger.c Branch: default Log Message:
Merge with Jun2023 branch. diffs (truncated from 305 to 300 lines): diff --git a/gdk/ChangeLog.Jun2023 b/gdk/ChangeLog.Jun2023 --- a/gdk/ChangeLog.Jun2023 +++ b/gdk/ChangeLog.Jun2023 @@ -1,3 +1,10 @@ # ChangeLog file for GDK # This file is updated with Maddlog +* Tue Nov 7 2023 Sjoerd Mullender <sjo...@acm.org> +- When saving the SQL catalog during a low-level commit, we should + only save the part of the catalog that corresponds to the part of the + write-ahead log that has been processed. What we did was save more, + which resulted in the catalog containing references to tables and + columns whose disk presence is otherwise only in the write-ahead log. + diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c --- a/gdk/gdk_logger.c +++ b/gdk/gdk_logger.c @@ -94,7 +94,7 @@ typedef struct logformat_t { typedef enum { LOG_OK, LOG_EOF, LOG_ERR } log_return; -static gdk_return bm_commit(logger *lg, uint32_t *updated, BUN maxupdated); +static gdk_return bm_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated); static gdk_return tr_grow(trans *tr); #define log_lock(lg) MT_lock_set(&(lg)->lock) @@ -1089,6 +1089,8 @@ log_open_output(logger *lg) new_range->next = NULL; logged_range *current = lg->current; assert(current && current->next == NULL); + new_range->cnt = current->cnt; + new_range->deleted = current->deleted; current->next = new_range; ATOMIC_INC(&lg->nr_open_files); return GDK_SUCCEED; @@ -1381,11 +1383,11 @@ log_readlogs(logger *lg, const char *fil } static gdk_return -log_commit(logger *lg, uint32_t *updated, BUN maxupdated) +log_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated) { TRC_DEBUG(WAL, "commit"); - return bm_commit(lg, updated, maxupdated); + return bm_commit(lg, pending, updated, maxupdated); } static gdk_return @@ -1661,19 +1663,25 @@ cleanup_and_swap(logger *lg, int *r, con GDKclrerr(); lg->cnt = BATcount(lg->catalog_bid); lg->deleted -= cleanup; + for (logged_range *p = lg->pending; p; p = p->next) { + p->cnt -= cleanup; + p->deleted -= cleanup; + } return rcnt; } /* this function is called with log_lock() held; it releases the lock * before returning */ static gdk_return -bm_subcommit(logger *lg, uint32_t *updated, BUN maxupdated) +bm_subcommit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated) { + BUN cnt = pending ? pending->cnt : BATcount(lg->catalog_bid); + BUN dcnt = pending ? pending->deleted : BATcount(lg->dcatalog); BUN p, q; BAT *catalog_bid = lg->catalog_bid; BAT *catalog_id = lg->catalog_id; BAT *dcatalog = lg->dcatalog; - BUN nn = 13 + BATcount(catalog_bid); + BUN nn = 13 + cnt; bat *n = GDKmalloc(sizeof(bat) * nn); bat *r = GDKmalloc(sizeof(bat) * nn); BUN *sizes = GDKmalloc(sizeof(BUN) * nn); @@ -1716,27 +1724,31 @@ bm_subcommit(logger *lg, uint32_t *updat n[i++] = col; } /* now commit catalog, so it's also up to date on disk */ - sizes[i] = lg->cnt; + sizes[i] = cnt; n[i++] = catalog_bid->batCacheid; - sizes[i] = lg->cnt; + sizes[i] = cnt; n[i++] = catalog_id->batCacheid; - sizes[i] = BATcount(dcatalog); + sizes[i] = dcnt; n[i++] = dcatalog->batCacheid; - if (cleanup && - (rcnt = cleanup_and_swap(lg, r, bids, lids, cnts, catalog_bid, - catalog_id, dcatalog, cleanup, updated, - maxupdated)) < 0) { - GDKfree(n); - GDKfree(r); - GDKfree(sizes); - log_unlock(lg); - return GDK_FAIL; + if (cleanup) { + if ((rcnt = cleanup_and_swap(lg, r, bids, lids, cnts, + catalog_bid, catalog_id, dcatalog, + cleanup, updated, + maxupdated)) < 0) { + GDKfree(n); + GDKfree(r); + GDKfree(sizes); + log_unlock(lg); + return GDK_FAIL; + } + cnt -= cleanup; + dcnt -= cleanup; } if (dcatalog != lg->dcatalog) { - i = subcommit_list_add(i, n, sizes, lg->catalog_bid->batCacheid, BATcount(lg->catalog_bid)); - i = subcommit_list_add(i, n, sizes, lg->catalog_id->batCacheid, BATcount(lg->catalog_bid)); - i = subcommit_list_add(i, n, sizes, lg->dcatalog->batCacheid, BATcount(lg->dcatalog)); + i = subcommit_list_add(i, n, sizes, lg->catalog_bid->batCacheid, cnt); + i = subcommit_list_add(i, n, sizes, lg->catalog_id->batCacheid, cnt); + i = subcommit_list_add(i, n, sizes, lg->dcatalog->batCacheid, dcnt); } if (lg->seqs_id) { sizes[i] = BATcount(lg->seqs_id); @@ -1994,7 +2006,7 @@ log_load(const char *fn, const char *log log_lock(lg); /* bm_subcommit releases the lock */ - if (bm_subcommit(lg, NULL, 0) != GDK_SUCCEED) { + if (bm_subcommit(lg, NULL, NULL, 0) != GDK_SUCCEED) { /* cannot commit catalog, so remove log */ if (MT_remove(filename) < 0) GDKsyserror("remove %s failed\n", filename); @@ -2144,7 +2156,7 @@ log_load(const char *fn, const char *log } dbg = ATOMIC_GET(&GDKdebug); ATOMIC_AND(&GDKdebug, ~CHECKMASK); - if (needcommit && bm_commit(lg, NULL, 0) != GDK_SUCCEED) { + if (needcommit && bm_commit(lg, NULL, NULL, 0) != GDK_SUCCEED) { GDKerror("Logger_new: commit failed"); goto error; } @@ -2169,7 +2181,7 @@ log_load(const char *fn, const char *log } dbg = ATOMIC_GET(&GDKdebug); ATOMIC_AND(&GDKdebug, ~CHECKMASK); - if (log_commit(lg, NULL, 0) != GDK_SUCCEED) { + if (log_commit(lg, NULL, NULL, 0) != GDK_SUCCEED) { goto error; } ATOMIC_SET(&GDKdebug, dbg); @@ -2333,7 +2345,7 @@ log_destroy(logger *lg) if (LOG_DISABLED(lg)) { lg->saved_id = lg->id; lg->saved_tid = lg->tid; - log_commit(lg, NULL, 0); + log_commit(lg, NULL, NULL, 0); } if (lg->catalog_bid) { log_lock(lg); @@ -2394,7 +2406,10 @@ log_create(int debug, const char *fn, co return NULL; } assert(lg->current == NULL); - logged_range dummy = { 0 }; + logged_range dummy = { + .cnt = BATcount(lg->catalog_bid), + .deleted = BATcount(lg->dcatalog), + }; lg->current = &dummy; if (log_open_output(lg) != GDK_SUCCEED) { log_destroy(lg); @@ -2407,12 +2422,12 @@ log_create(int debug, const char *fn, co return lg; } -static ulng +static logged_range * log_next_logfile(logger *lg, ulng ts) { int m = (ATOMIC_GET(&GDKdebug) & FORCEMITOMASK) ? 1000 : 100; if (!lg->pending || !lg->pending->next) - return 0; + return NULL; if (ATOMIC_GET(&lg->pending->refcount) == 0 && lg->pending != lg->current && lg->pending != lg->flush_ranges && (ulng) ATOMIC_GET(&lg->pending->last_ts) == (ulng) ATOMIC_GET(&lg->pending->flushed_ts) && (ulng) ATOMIC_GET(&lg->pending->flushed_ts) <= ts) { @@ -2422,9 +2437,9 @@ log_next_logfile(logger *lg, ulng ts) p->next != lg->flush_ranges && (ulng) ATOMIC_GET(&p->last_ts) == (ulng) ATOMIC_GET(&p->flushed_ts) && (ulng) ATOMIC_GET(&p->flushed_ts) <= ts; i++) p = p->next; - return p->id; + return p; } - return 0; + return NULL; } static void @@ -2483,13 +2498,14 @@ log_activate(logger *lg) gdk_return log_flush(logger *lg, ulng ts) { - ulng lid = log_next_logfile(lg, ts), olid = lg->saved_id; + logged_range *pending = log_next_logfile(lg, ts); + ulng lid = pending ? pending->id : 0, olid = lg->saved_id; if (LOG_DISABLED(lg)) { lg->saved_id = lid; lg->saved_tid = lg->tid; if (lid) log_cleanup_range(lg, lg->saved_id); - if (log_commit(lg, NULL, 0) != GDK_SUCCEED) + if (log_commit(lg, NULL, NULL, 0) != GDK_SUCCEED) TRC_ERROR(GDK, "failed to commit"); return GDK_SUCCEED; } @@ -2577,7 +2593,7 @@ log_flush(logger *lg, ulng ts) rotation_lock(lg); /* protect against concurrent log_tflush rotate check */ lg->saved_id = lid; rotation_unlock(lg); - if (log_commit(lg, updated, nupdated) != GDK_SUCCEED) { + if (log_commit(lg, pending, updated, nupdated) != GDK_SUCCEED) { TRC_ERROR(GDK, "failed to commit"); res = LOG_ERR; rotation_lock(lg); @@ -3088,19 +3104,20 @@ log_tflush(logger *lg, ulng file_id, uln { if (lg->flushnow) { rotation_lock(lg); + logged_range *p = lg->current; assert(lg->flush_ranges == lg->current); assert(ATOMIC_GET(&lg->current->flushed_ts) == ATOMIC_GET(&lg->current->last_ts)); log_tdone(lg, lg->current, commit_ts); ATOMIC_SET(&lg->current->flushed_ts, commit_ts); lg->id++; - lg->flushnow = 0; + lg->flushnow = false; if (log_open_output(lg) != GDK_SUCCEED) GDKfatal("Could not create new log file\n"); /* TODO: does not have to be fatal (yet) */ do_rotate(lg); rotation_unlock(lg); (void) do_flush_range_cleanup(lg); assert(lg->flush_ranges == lg->current); - return log_commit(lg, NULL, 0); + return log_commit(lg, p, NULL, 0); } if (LOG_DISABLED(lg)) @@ -3208,15 +3225,14 @@ log_tsequence(logger *lg, int seq, lng v } static gdk_return -bm_commit(logger *lg, uint32_t *updated, BUN maxupdated) +bm_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated) { - BUN p; log_lock(lg); BAT *b = lg->catalog_bid; const log_bid *bids; bids = (log_bid *) Tloc(b, 0); - for (p = b->batInserted; p < BATcount(b); p++) { + for (BUN p = b->batInserted, cnt = pending ? pending->cnt : BATcount(b); p < cnt; p++) { log_bid bid = bids[p]; BAT *lb; @@ -3234,7 +3250,7 @@ bm_commit(logger *lg, uint32_t *updated, TRC_DEBUG(WAL, "create %d (%d)\n", bid, BBP_lrefs(bid)); } /* bm_subcommit releases the lock */ - return bm_subcommit(lg, updated, maxupdated); + return bm_subcommit(lg, pending, updated, maxupdated); } static gdk_return @@ -3264,6 +3280,8 @@ log_add_bat(logger *lg, BAT *b, log_id i BUNappend(lg->catalog_lid, &lid, false) != GDK_SUCCEED) return GDK_FAIL; lg->cnt++; + if (lg->current) + lg->current->cnt++; BBPretain(bid); return GDK_SUCCEED; } @@ -3287,6 +3305,8 @@ log_del_bat(logger *lg, log_bid bid) return GDK_FAIL; if (BUNappend(lg->dcatalog, &pos, true) == GDK_SUCCEED) { lg->deleted++; + if (lg->current) + lg->current->deleted++; return GDK_SUCCEED; } return GDK_FAIL; diff --git a/gdk/gdk_logger_internals.h b/gdk/gdk_logger_internals.h --- a/gdk/gdk_logger_internals.h +++ b/gdk/gdk_logger_internals.h @@ -21,6 +21,8 @@ typedef struct logged_range_t { ATOMIC_TYPE refcount; struct logged_range_t *next; stream *output_log; _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org