Changeset: 39dfededbb3d for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/39dfededbb3d Modified Files: gdk/gdk_bbp.c gdk/gdk_tm.c sql/include/sql_catalog.h sql/storage/bat/bat_storage.c sql/storage/objectset.c sql/storage/sql_storage.h sql/storage/store.c tools/merovingian/client/monetdb.c Branch: default Log Message:
Merge with Jan2022. diffs (truncated from 653 to 300 lines): diff --git a/gdk/gdk_bbp.c b/gdk/gdk_bbp.c --- a/gdk/gdk_bbp.c +++ b/gdk/gdk_bbp.c @@ -3337,7 +3337,7 @@ dirty_bat(bat *i, bool subcommit) static gdk_return file_move(int farmid, const char *srcdir, const char *dstdir, const char *name, const char *ext) { - if (GDKmove(farmid, srcdir, name, ext, dstdir, name, ext, true) == GDK_SUCCEED) { + if (GDKmove(farmid, srcdir, name, ext, dstdir, name, ext, false) == GDK_SUCCEED) { return GDK_SUCCEED; } else { char *path; @@ -3792,7 +3792,7 @@ BBPsync(int cnt, bat *restrict subcommit break; } if (BBP_status(i) & BBPEXISTING) { - if (b != NULL) { + if (b != NULL && b->batInserted > 0) { if (BBPbackup(b, subcommit != NULL) != GDK_SUCCEED) { if (lock) MT_lock_unset(&GDKswapLock(i)); @@ -3834,6 +3834,7 @@ BBPsync(int cnt, bat *restrict subcommit assert(sizes == NULL || bi.width == 0 || (bi.type == TYPE_msk ? ((size + 31) / 32) * 4 : size << bi.shift) <= bi.hfree); if (size > bi.count) /* includes sizes==NULL */ size = bi.count; + bi.b->batInserted = size; if (b && size != 0) { /* wait for BBPSAVING so that we * can set it, wait for diff --git a/gdk/gdk_tm.c b/gdk/gdk_tm.c --- a/gdk/gdk_tm.c +++ b/gdk/gdk_tm.c @@ -43,31 +43,6 @@ * situation is COLD-abort: quit the server and restart, so you get * the recovered disk images. */ -/* in the commit prelude, the delta status in the memory image of all - * bats is commited */ -static gdk_return -prelude(int cnt, bat *restrict subcommit, BUN *restrict sizes) -{ - int i = 0; - - while (++i < cnt) { - bat bid = subcommit ? subcommit[i] : i; - - if (BBP_status(bid) & BBPPERSISTENT) { - BAT *b = BBPquickdesc(bid); - - if (b) { - MT_lock_set(&b->theaplock); - assert(!isVIEW(b)); - assert(b->batRole == PERSISTENT); - assert(sizes == NULL || sizes[i] <= BATcount(b)); - BATcommit(b, sizes ? sizes[i] : BUN_NONE); - MT_lock_unset(&b->theaplock); - } - } - } - return GDK_SUCCEED; -} /* in the commit epilogue, the BBP-status of the bats is changed to * reflect their presence in the succeeded checkpoint. Also bats from @@ -146,8 +121,7 @@ TMcommit(void) /* commit with the BBP globally locked */ BBPlock(); - if (prelude(getBBPsize(), NULL, NULL) == GDK_SUCCEED && - BBPsync(getBBPsize(), NULL, NULL, getBBPlogno(), getBBPtransid()) == GDK_SUCCEED) { + if (BBPsync(getBBPsize(), NULL, NULL, getBBPlogno(), getBBPtransid()) == GDK_SUCCEED) { epilogue(getBBPsize(), NULL, true); ret = GDK_SUCCEED; } @@ -212,15 +186,13 @@ TMsubcommit_list(bat *restrict subcommit } } } - if (prelude(cnt, subcommit, sizes) == GDK_SUCCEED) { /* save the new bats outside the lock */ - /* lock just prevents other global (sub-)commits */ - MT_lock_set(&GDKtmLock); - if (BBPsync(cnt, subcommit, sizes, logno, transid) == GDK_SUCCEED) { /* write BBP.dir (++) */ - epilogue(cnt, subcommit, false); - ret = GDK_SUCCEED; - } - MT_lock_unset(&GDKtmLock); + /* lock just prevents other global (sub-)commits */ + MT_lock_set(&GDKtmLock); + if (BBPsync(cnt, subcommit, sizes, logno, transid) == GDK_SUCCEED) { /* write BBP.dir (++) */ + epilogue(cnt, subcommit, false); + ret = GDK_SUCCEED; } + MT_lock_unset(&GDKtmLock); return ret; } diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c --- a/sql/storage/bat/bat_storage.c +++ b/sql/storage/bat/bat_storage.c @@ -365,6 +365,7 @@ segments2cs(sql_trans *tr, segments *seg static void merge_segments(storage *s, sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest) { + sqlstore* store = tr->store; segment *cur = s->segs->h, *seg = NULL; for (; cur; cur = cur->next) { if (cur->ts == tr->tid) { @@ -372,24 +373,47 @@ merge_segments(storage *s, sql_trans *tr cur->oldts = 0; cur->ts = commit_ts; } - if (cur->ts <= oldest && cur->ts < TRANSACTION_ID_BASE) { /* possibly merge range */ - if (!seg) { /* skip first */ - seg = cur; - } else if (seg->end == cur->start && seg->deleted == cur->deleted) { - /* merge with previous */ - seg->end = cur->end; - seg->next = cur->next; - if (cur == s->segs->t) - s->segs->t = seg; - if (commit_ts == oldest) - _DELETE(cur); - else - mark4destroy(cur, change, commit_ts); - cur = seg; - } else { - seg = cur; /* begin of new merge */ + if (!seg) { + /* first segment */ + seg = cur; + } + else if (seg->ts < TRANSACTION_ID_BASE) { + /* possible merge since both deleted flags are equal */ + if (seg->deleted == cur->deleted && cur->ts < TRANSACTION_ID_BASE) { + int merge = 1; + node *n = store->active->h; + for (int i = 0; i < store->active->cnt; i++, n = n->next) { + ulng active = ((sql_trans*)n->data)->ts; + if(active == seg->ts || active == cur->ts) + continue; /* pretent that a recently committed transaction has already committed and is no longer active */ + if (active == tr->ts) + continue; /* pretent that committing transaction has already committed and is no longer active */ + if (seg->ts < active && cur->ts < active) + break; + if (seg->ts > active && cur->ts > active) + continue; + + assert((active > seg->ts && active < cur->ts) || (active < seg->ts && active > cur->ts)); + /* cannot safely merge since there is an active transaction between the segments */ + merge = false; + break; + } + /* merge segments */ + if (merge) { + seg->end = cur->end; + seg->next = cur->next; + if (cur == s->segs->t) + s->segs->t = seg; + if (commit_ts == oldest) + _DELETE(cur); + else + mark4destroy(cur, change, commit_ts); + cur = seg; + continue; + } } } + seg = cur; } } @@ -3414,12 +3438,18 @@ static int log_segments(sql_trans *tr, segments *segs, sqlid id) { /* log segments */ + lock_table(tr->store, id); for (segment *seg = segs->h; seg; seg=seg->next) { + unlock_table(tr->store, id); if (seg->ts == tr->tid && seg->end-seg->start) { - if (log_segment(tr, seg, id) != LOG_OK) + if (log_segment(tr, seg, id) != LOG_OK) { + unlock_table(tr->store, id); return LOG_ERR; + } } + lock_table(tr->store, id); } + unlock_table(tr->store, id); return LOG_OK; } @@ -3924,7 +3954,11 @@ log_table_append(sql_trans *tr, sql_tabl return LOG_ERR; size_t nr_appends = 0; + + lock_table(tr->store, t->base.id); for (segment *seg = segs->h; seg; seg=seg->next) { + unlock_table(tr->store, t->base.id); + if (seg->ts == tr->tid && seg->end-seg->start) { if (!seg->deleted) { if (log_segment(tr, seg, t->base.id) != LOG_OK) @@ -3933,7 +3967,9 @@ log_table_append(sql_trans *tr, sql_tabl nr_appends += (seg->end - seg->start); } } + lock_table(tr->store, t->base.id); } + unlock_table(tr->store, t->base.id); for (node *n = ol_first_node(t->columns); n && ok; n = n->next) { sql_column *c = n->data; @@ -3944,7 +3980,9 @@ log_table_append(sql_trans *tr, sql_tabl continue; } + lock_table(tr->store, t->base.id); if (!cs->cleared) for (segment *cur = segs->h; cur && ok; cur = cur->next) { + unlock_table(tr->store, t->base.id); if (cur->ts == tr->tid && !cur->deleted && cur->start < end) { /* append col*/ BAT *ins = temp_descriptor(cs->bid); @@ -3953,7 +3991,9 @@ log_table_append(sql_trans *tr, sql_tabl ok = log_bat(store->logger, ins, c->base.id, cur->start, cur->end-cur->start, nr_appends); bat_destroy(ins); } + lock_table(tr->store, t->base.id); } + unlock_table(tr->store, t->base.id); if (cs->ebid) { BAT *ins = temp_descriptor(cs->ebid); @@ -3974,7 +4014,9 @@ log_table_append(sql_trans *tr, sql_tabl column_storage *cs = ATOMIC_PTR_GET(&i->data); if (cs) { + lock_table(tr->store, t->base.id); for (segment *cur = segs->h; cur && ok; cur = cur->next) { + unlock_table(tr->store, t->base.id); if (cur->ts == tr->tid && !cur->deleted && cur->start < end) { /* append idx */ BAT *ins = temp_descriptor(cs->bid); @@ -3983,7 +4025,9 @@ log_table_append(sql_trans *tr, sql_tabl ok = log_bat(store->logger, ins, i->base.id, cur->start, cur->end-cur->start, nr_appends); bat_destroy(ins); } + lock_table(tr->store, t->base.id); } + unlock_table(tr->store, t->base.id); } } } @@ -4001,8 +4045,6 @@ log_storage(sql_trans *tr, sql_table *t, if (ok == LOG_OK && cleared) ok = tr_log_cs(tr, t, &s->cs, s->segs->h, t->base.id); if (ok == LOG_OK) - ok = segments2cs(tr, s->segs, &s->cs); - if (ok == LOG_OK) ok = log_segments(tr, s->segs, t->base.id); if (ok == LOG_OK && !cleared) ok = log_table_append(tr, t, s->segs); diff --git a/sql/storage/store.c b/sql/storage/store.c --- a/sql/storage/store.c +++ b/sql/storage/store.c @@ -7032,7 +7032,7 @@ sql_trans_begin(sql_session *s) tr->active = 1; (void) ATOMIC_INC(&store->nr_active); - list_append(store->active, s); + list_append(store->active, tr); TRC_DEBUG(SQL_STORE, "Exit sql_trans_begin for transaction: " ULLFMT "\n", tr->tid); store_unlock(store); @@ -7050,20 +7050,20 @@ sql_trans_end(sql_session *s, int ok) sql_trans_rollback(s->tr, false); } assert(s->tr->active); + sqlstore *store = s->tr->store; + store_lock(store); s->tr->active = 0; s->tr->status = 0; s->auto_commit = s->ac_on_commit; - sqlstore *store = s->tr->store; - store_lock(store); - list_remove_data(store->active, NULL, s); + list_remove_data(store->active, NULL, s->tr); ATOMIC_SET(&store->lastactive, GDKusec()); (void) ATOMIC_DEC(&store->nr_active); ulng oldest = store_get_timestamp(store); if (store->active && store->active->h) { for(node *n = store->active->h; n; n = n->next) { - sql_session *s = n->data; - if (s->tr->ts < oldest) - oldest = s->tr->ts; + sql_trans *tr = n->data; + if (tr->ts < oldest) + oldest = tr->ts; } } _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org