Changeset: d8e8844f39e3 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/d8e8844f39e3 Branch: default Log Message:
Merges second head diffs (truncated from 372 to 300 lines): diff --git a/gdk/ChangeLog.Jun2023 b/gdk/ChangeLog.Jun2023 --- a/gdk/ChangeLog.Jun2023 +++ b/gdk/ChangeLog.Jun2023 @@ -1,3 +1,10 @@ # ChangeLog file for GDK # This file is updated with Maddlog +* Tue Nov 7 2023 Sjoerd Mullender <sjo...@acm.org> +- When saving the SQL catalog during a low-level commit, we should + only save the part of the catalog that corresponds to the part of the + write-ahead log that has been processed. What we did was save more, + which resulted in the catalog containing references to tables and + columns whose disk presence is otherwise only in the write-ahead log. + diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c --- a/gdk/gdk_logger.c +++ b/gdk/gdk_logger.c @@ -94,7 +94,7 @@ typedef struct logformat_t { typedef enum { LOG_OK, LOG_EOF, LOG_ERR } log_return; -static gdk_return bm_commit(logger *lg, uint32_t *updated, BUN maxupdated); +static gdk_return bm_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated); static gdk_return tr_grow(trans *tr); #define log_lock(lg) MT_lock_set(&(lg)->lock) @@ -1089,6 +1089,8 @@ log_open_output(logger *lg) new_range->next = NULL; logged_range *current = lg->current; assert(current && current->next == NULL); + new_range->cnt = current->cnt; + new_range->deleted = current->deleted; current->next = new_range; ATOMIC_INC(&lg->nr_open_files); return GDK_SUCCEED; @@ -1381,11 +1383,11 @@ log_readlogs(logger *lg, const char *fil } static gdk_return -log_commit(logger *lg, uint32_t *updated, BUN maxupdated) +log_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated) { TRC_DEBUG(WAL, "commit"); - return bm_commit(lg, updated, maxupdated); + return bm_commit(lg, pending, updated, maxupdated); } static gdk_return @@ -1492,7 +1494,7 @@ log_switch_bat(BAT *old, BAT *new, const static gdk_return bm_get_counts(logger *lg) { - BUN p, q, deleted = 0; + BUN p, q; const log_bid *bids = (const log_bid *) Tloc(lg->catalog_bid, 0); BATloop(lg->catalog_bid, p, q) { @@ -1505,7 +1507,6 @@ bm_get_counts(logger *lg) assert(b); cnt = BATcount(b); } else { - deleted++; lid = BBP_desc(bids[p]) ? 1 : -1; } if (BUNappend(lg->catalog_cnt, &cnt, false) != GDK_SUCCEED) @@ -1513,8 +1514,6 @@ bm_get_counts(logger *lg) if (BUNappend(lg->catalog_lid, &lid, false) != GDK_SUCCEED) return GDK_FAIL; } - lg->deleted = deleted; - lg->cnt = BATcount(lg->catalog_bid); return GDK_SUCCEED; } @@ -1632,7 +1631,7 @@ cleanup_and_swap(logger *lg, int *r, con r[rcnt++] = lg->catalog_id->batCacheid; r[rcnt++] = lg->dcatalog->batCacheid; - assert(lg->deleted - cleanup == BATcount(ndels)); + assert(BATcount(lg->dcatalog) - cleanup == BATcount(ndels)); logbat_destroy(lg->catalog_bid); logbat_destroy(lg->catalog_id); @@ -1659,21 +1658,25 @@ cleanup_and_swap(logger *lg, int *r, con strconcat_len(bak, sizeof(bak), lg->fn, "_catalog_lid", NULL); if (BBPrename(lg->catalog_lid, bak) < 0) GDKclrerr(); - lg->cnt = BATcount(lg->catalog_bid); - lg->deleted -= cleanup; + for (logged_range *p = lg->pending; p; p = p->next) { + p->cnt -= cleanup; + p->deleted -= cleanup; + } return rcnt; } /* this function is called with log_lock() held; it releases the lock * before returning */ static gdk_return -bm_subcommit(logger *lg, uint32_t *updated, BUN maxupdated) +bm_subcommit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated) { + BUN cnt = pending ? pending->cnt : BATcount(lg->catalog_bid); + BUN dcnt = pending ? pending->deleted : BATcount(lg->dcatalog); BUN p, q; BAT *catalog_bid = lg->catalog_bid; BAT *catalog_id = lg->catalog_id; BAT *dcatalog = lg->dcatalog; - BUN nn = 13 + BATcount(catalog_bid); + BUN nn = 13 + cnt; bat *n = GDKmalloc(sizeof(bat) * nn); bat *r = GDKmalloc(sizeof(bat) * nn); BUN *sizes = GDKmalloc(sizeof(BUN) * nn); @@ -1716,27 +1719,31 @@ bm_subcommit(logger *lg, uint32_t *updat n[i++] = col; } /* now commit catalog, so it's also up to date on disk */ - sizes[i] = lg->cnt; + sizes[i] = cnt; n[i++] = catalog_bid->batCacheid; - sizes[i] = lg->cnt; + sizes[i] = cnt; n[i++] = catalog_id->batCacheid; - sizes[i] = BATcount(dcatalog); + sizes[i] = dcnt; n[i++] = dcatalog->batCacheid; - if (cleanup && - (rcnt = cleanup_and_swap(lg, r, bids, lids, cnts, catalog_bid, - catalog_id, dcatalog, cleanup, updated, - maxupdated)) < 0) { - GDKfree(n); - GDKfree(r); - GDKfree(sizes); - log_unlock(lg); - return GDK_FAIL; + if (cleanup) { + if ((rcnt = cleanup_and_swap(lg, r, bids, lids, cnts, + catalog_bid, catalog_id, dcatalog, + cleanup, updated, + maxupdated)) < 0) { + GDKfree(n); + GDKfree(r); + GDKfree(sizes); + log_unlock(lg); + return GDK_FAIL; + } + cnt -= cleanup; + dcnt -= cleanup; } if (dcatalog != lg->dcatalog) { - i = subcommit_list_add(i, n, sizes, lg->catalog_bid->batCacheid, BATcount(lg->catalog_bid)); - i = subcommit_list_add(i, n, sizes, lg->catalog_id->batCacheid, BATcount(lg->catalog_bid)); - i = subcommit_list_add(i, n, sizes, lg->dcatalog->batCacheid, BATcount(lg->dcatalog)); + i = subcommit_list_add(i, n, sizes, lg->catalog_bid->batCacheid, cnt); + i = subcommit_list_add(i, n, sizes, lg->catalog_id->batCacheid, cnt); + i = subcommit_list_add(i, n, sizes, lg->dcatalog->batCacheid, dcnt); } if (lg->seqs_id) { sizes[i] = BATcount(lg->seqs_id); @@ -1994,7 +2001,7 @@ log_load(const char *fn, const char *log log_lock(lg); /* bm_subcommit releases the lock */ - if (bm_subcommit(lg, NULL, 0) != GDK_SUCCEED) { + if (bm_subcommit(lg, NULL, NULL, 0) != GDK_SUCCEED) { /* cannot commit catalog, so remove log */ if (MT_remove(filename) < 0) GDKsyserror("remove %s failed\n", filename); @@ -2144,7 +2151,7 @@ log_load(const char *fn, const char *log } dbg = ATOMIC_GET(&GDKdebug); ATOMIC_AND(&GDKdebug, ~CHECKMASK); - if (needcommit && bm_commit(lg, NULL, 0) != GDK_SUCCEED) { + if (needcommit && bm_commit(lg, NULL, NULL, 0) != GDK_SUCCEED) { GDKerror("Logger_new: commit failed"); goto error; } @@ -2169,7 +2176,7 @@ log_load(const char *fn, const char *log } dbg = ATOMIC_GET(&GDKdebug); ATOMIC_AND(&GDKdebug, ~CHECKMASK); - if (log_commit(lg, NULL, 0) != GDK_SUCCEED) { + if (log_commit(lg, NULL, NULL, 0) != GDK_SUCCEED) { goto error; } ATOMIC_SET(&GDKdebug, dbg); @@ -2333,7 +2340,7 @@ log_destroy(logger *lg) if (LOG_DISABLED(lg)) { lg->saved_id = lg->id; lg->saved_tid = lg->tid; - log_commit(lg, NULL, 0); + log_commit(lg, NULL, NULL, 0); } if (lg->catalog_bid) { log_lock(lg); @@ -2394,7 +2401,10 @@ log_create(int debug, const char *fn, co return NULL; } assert(lg->current == NULL); - logged_range dummy = { 0 }; + logged_range dummy = { + .cnt = BATcount(lg->catalog_bid), + .deleted = BATcount(lg->dcatalog), + }; lg->current = &dummy; if (log_open_output(lg) != GDK_SUCCEED) { log_destroy(lg); @@ -2407,12 +2417,12 @@ log_create(int debug, const char *fn, co return lg; } -static ulng +static logged_range * log_next_logfile(logger *lg, ulng ts) { int m = (ATOMIC_GET(&GDKdebug) & FORCEMITOMASK) ? 1000 : 100; if (!lg->pending || !lg->pending->next) - return 0; + return NULL; if (ATOMIC_GET(&lg->pending->refcount) == 0 && lg->pending != lg->current && lg->pending != lg->flush_ranges && (ulng) ATOMIC_GET(&lg->pending->last_ts) == (ulng) ATOMIC_GET(&lg->pending->flushed_ts) && (ulng) ATOMIC_GET(&lg->pending->flushed_ts) <= ts) { @@ -2422,9 +2432,9 @@ log_next_logfile(logger *lg, ulng ts) p->next != lg->flush_ranges && (ulng) ATOMIC_GET(&p->last_ts) == (ulng) ATOMIC_GET(&p->flushed_ts) && (ulng) ATOMIC_GET(&p->flushed_ts) <= ts; i++) p = p->next; - return p->id; + return p; } - return 0; + return NULL; } static void @@ -2483,13 +2493,14 @@ log_activate(logger *lg) gdk_return log_flush(logger *lg, ulng ts) { - ulng lid = log_next_logfile(lg, ts), olid = lg->saved_id; + logged_range *pending = log_next_logfile(lg, ts); + ulng lid = pending ? pending->id : 0, olid = lg->saved_id; if (LOG_DISABLED(lg)) { lg->saved_id = lid; lg->saved_tid = lg->tid; if (lid) log_cleanup_range(lg, lg->saved_id); - if (log_commit(lg, NULL, 0) != GDK_SUCCEED) + if (log_commit(lg, NULL, NULL, 0) != GDK_SUCCEED) TRC_ERROR(GDK, "failed to commit"); return GDK_SUCCEED; } @@ -2577,7 +2588,7 @@ log_flush(logger *lg, ulng ts) rotation_lock(lg); /* protect against concurrent log_tflush rotate check */ lg->saved_id = lid; rotation_unlock(lg); - if (log_commit(lg, updated, nupdated) != GDK_SUCCEED) { + if (log_commit(lg, pending, updated, nupdated) != GDK_SUCCEED) { TRC_ERROR(GDK, "failed to commit"); res = LOG_ERR; rotation_lock(lg); @@ -3088,19 +3099,20 @@ log_tflush(logger *lg, ulng file_id, uln { if (lg->flushnow) { rotation_lock(lg); + logged_range *p = lg->current; assert(lg->flush_ranges == lg->current); assert(ATOMIC_GET(&lg->current->flushed_ts) == ATOMIC_GET(&lg->current->last_ts)); log_tdone(lg, lg->current, commit_ts); ATOMIC_SET(&lg->current->flushed_ts, commit_ts); lg->id++; - lg->flushnow = 0; + lg->flushnow = false; if (log_open_output(lg) != GDK_SUCCEED) GDKfatal("Could not create new log file\n"); /* TODO: does not have to be fatal (yet) */ do_rotate(lg); rotation_unlock(lg); (void) do_flush_range_cleanup(lg); assert(lg->flush_ranges == lg->current); - return log_commit(lg, NULL, 0); + return log_commit(lg, p, NULL, 0); } if (LOG_DISABLED(lg)) @@ -3208,15 +3220,14 @@ log_tsequence(logger *lg, int seq, lng v } static gdk_return -bm_commit(logger *lg, uint32_t *updated, BUN maxupdated) +bm_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated) { - BUN p; log_lock(lg); BAT *b = lg->catalog_bid; const log_bid *bids; bids = (log_bid *) Tloc(b, 0); - for (p = b->batInserted; p < BATcount(b); p++) { + for (BUN p = b->batInserted, cnt = pending ? pending->cnt : BATcount(b); p < cnt; p++) { _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org