Changeset: 64ddac7eabcd for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/64ddac7eabcd Modified Files: sql/include/sql_catalog.h sql/storage/bat/bat_storage.c sql/storage/objectset.c sql/storage/sql_storage.h sql/storage/store.c Branch: smart-merge Log Message:
Implement smarter segment merging diffs (truncated from 370 to 300 lines): diff --git a/sql/include/sql_catalog.h b/sql/include/sql_catalog.h --- a/sql/include/sql_catalog.h +++ b/sql/include/sql_catalog.h @@ -235,7 +235,7 @@ struct os_iter { /* transaction changes */ typedef int (*tc_valid_fptr) (struct sql_trans *tr, struct sql_change *c/*, ulng commit_ts, ulng oldest*/); typedef int (*tc_log_fptr) (struct sql_trans *tr, struct sql_change *c); /* write changes to the log */ -typedef int (*tc_commit_fptr) (struct sql_trans *tr, struct sql_change *c, ulng commit_ts, ulng oldest);/* commit/rollback changes */ +typedef int (*tc_commit_fptr) (struct sql_trans *tr, struct sql_change *c, ulng commit_ts, ulng oldest, ulng *active, ulng latest);/* commit/rollback changes */ typedef int (*tc_cleanup_fptr) (sql_store store, struct sql_change *c, ulng oldest); /* garbage collection, ie cleanup structures when possible */ typedef void (*destroy_fptr)(sql_store store, sql_base *b); typedef int (*validate_fptr)(struct sql_trans *tr, sql_base *b, int delete); diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c --- a/sql/storage/bat/bat_storage.c +++ b/sql/storage/bat/bat_storage.c @@ -19,15 +19,15 @@ static int log_update_col( sql_trans *tr, sql_change *c); static int log_update_idx( sql_trans *tr, sql_change *c); static int log_update_del( sql_trans *tr, sql_change *c); -static int commit_update_col( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest); -static int commit_update_idx( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest); -static int commit_update_del( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest); +static int commit_update_col( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest, ulng *active, ulng latest); +static int commit_update_idx( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest, ulng *active, ulng latest); +static int commit_update_del( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest, ulng *active, ulng latest); static int log_create_col(sql_trans *tr, sql_change *change); static int log_create_idx(sql_trans *tr, sql_change *change); static int log_create_del(sql_trans *tr, sql_change *change); -static int commit_create_col(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest); -static int commit_create_idx(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest); -static int commit_create_del(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest); +static int commit_create_col(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active, ulng latest); +static int commit_create_idx(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active, ulng latest); +static int commit_create_del(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active, ulng latest); static int tc_gc_col( sql_store Store, sql_change *c, ulng oldest); static int tc_gc_idx( sql_store Store, sql_change *c, ulng oldest); static int tc_gc_del( sql_store Store, sql_change *c, ulng oldest); @@ -363,7 +363,7 @@ segments2cs(sql_trans *tr, segments *seg /* TODO return LOG_OK/ERR */ static void -merge_segments(storage *s, sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest) +merge_segments(storage *s, sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active, ulng latest) { segment *cur = s->segs->h, *seg = NULL; for (; cur; cur = cur->next) { @@ -372,22 +372,45 @@ merge_segments(storage *s, sql_trans *tr cur->oldts = 0; cur->ts = commit_ts; } - if (cur->ts <= oldest && cur->ts < TRANSACTION_ID_BASE) { /* possibly merge range */ - if (!seg) { /* skip first */ + + if (!seg) { + /* first segment */ + seg = cur; + } + else { + /* possible merge since both deleted flags are equal + and the timestamp is lesser than the latest one + when the active transactions were computed */ + if (seg->deleted == cur->deleted && seg->ts < latest && cur->ts < TRANSACTION_ID_BASE) { + int merge = 1; + for (int i = 0; active[i] != 0; i++) { + if ((active[i] >= seg->ts && active[i] <= cur->ts) + || (active[i] <= seg->ts && active[i] >= cur->ts)) { + /* cannot safely merge since there is an active transaction between the segments */ + merge = 0; + break; + } + } + /* merge segments */ + if (merge) { + seg->end = cur->end; + seg->next = cur->next; + if (cur == s->segs->t) + s->segs->t = seg; + if (commit_ts == oldest) + _DELETE(cur); + else + mark4destroy(cur, change, commit_ts); + cur = seg; + } + /* skip merge */ + else { + seg = cur; + } + } + /* skip merge */ + else { seg = cur; - } else if (seg->end == cur->start && seg->deleted == cur->deleted) { - /* merge with previous */ - seg->end = cur->end; - seg->next = cur->next; - if (cur == s->segs->t) - s->segs->t = seg; - if (commit_ts == oldest) - _DELETE(cur); - else - mark4destroy(cur, change, commit_ts); - cur = seg; - } else { - seg = cur; /* begin of new merge */ } } } @@ -3132,10 +3155,12 @@ log_create_col(sql_trans *tr, sql_change } static int -commit_create_col_( sql_trans *tr, sql_column *c, ulng commit_ts, ulng oldest) +commit_create_col_( sql_trans *tr, sql_column *c, ulng commit_ts, ulng oldest, ulng *active, ulng latest) { int ok = LOG_OK; (void)oldest; + (void)active; + (void)latest; if(!isTempTable(c->t)) { sql_delta *delta = ATOMIC_PTR_GET(&c->data); @@ -3153,12 +3178,12 @@ commit_create_col_( sql_trans *tr, sql_c } static int -commit_create_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest) +commit_create_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active, ulng latest) { sql_column *c = (sql_column*)change->obj; if (!tr->parent) c->base.new = 0; - return commit_create_col_( tr, c, commit_ts, oldest); + return commit_create_col_( tr, c, commit_ts, oldest, active, latest); } /* will be called for new idx's and when new index columns are created */ @@ -3238,10 +3263,12 @@ log_create_idx(sql_trans *tr, sql_change } static int -commit_create_idx_( sql_trans *tr, sql_idx *i, ulng commit_ts, ulng oldest) +commit_create_idx_( sql_trans *tr, sql_idx *i, ulng commit_ts, ulng oldest, ulng *active, ulng latest) { int ok = LOG_OK; (void)oldest; + (void)active; + (void)latest; if(!isTempTable(i->t)) { sql_delta *delta = ATOMIC_PTR_GET(&i->data); @@ -3258,12 +3285,12 @@ commit_create_idx_( sql_trans *tr, sql_i } static int -commit_create_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest) +commit_create_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active, ulng latest) { sql_idx *i = (sql_idx*)change->obj; if (!tr->parent) i->base.new = 0; - return commit_create_idx_(tr, i, commit_ts, oldest); + return commit_create_idx_(tr, i, commit_ts, oldest, active, latest); } static int @@ -3475,7 +3502,7 @@ log_create_del(sql_trans *tr, sql_change } static int -commit_create_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest) +commit_create_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active, ulng latest) { int ok = LOG_OK; sql_table *t = (sql_table*)change->obj; @@ -3488,21 +3515,21 @@ commit_create_del( sql_trans *tr, sql_ch assert(ok == LOG_OK); if (ok != LOG_OK) return ok; - merge_segments(dbat, tr, change, commit_ts, commit_ts/* create is we are alone */ /*oldest*/); + merge_segments(dbat, tr, change, commit_ts, commit_ts, active, latest/* create is we are alone */ /*oldest*/); assert(dbat->cs.ts == tr->tid); dbat->cs.ts = commit_ts; if (ok == LOG_OK) { for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) { sql_column *c = n->data; - ok = commit_create_col_(tr, c, commit_ts, oldest); + ok = commit_create_col_(tr, c, commit_ts, oldest, active, latest); } if (t->idxs) { for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) { sql_idx *i = n->data; if (ATOMIC_PTR_GET(&i->data)) - ok = commit_create_idx_(tr, i, commit_ts, oldest); + ok = commit_create_idx_(tr, i, commit_ts, oldest, active, latest); } } if (!tr->parent) @@ -3640,12 +3667,14 @@ log_destroy_del(sql_trans *tr, sql_chang } static int -commit_destroy_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest) +commit_destroy_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active, ulng latest) { (void)tr; (void)change; (void)commit_ts; (void)oldest; + (void)active; + (void)latest; return 0; } @@ -4123,12 +4152,14 @@ log_update_col( sql_trans *tr, sql_chang } static int -commit_update_col_( sql_trans *tr, sql_column *c, ulng commit_ts, ulng oldest) +commit_update_col_( sql_trans *tr, sql_column *c, ulng commit_ts, ulng oldest, ulng *active, ulng latest) { int ok = LOG_OK; sql_delta *delta = ATOMIC_PTR_GET(&c->data); (void)oldest; + (void)active; + (void)latest; if (isTempTable(c->t)) { if (commit_ts) { /* commit */ if (c->t->commit_action == CA_COMMIT || c->t->commit_action == CA_PRESERVE) { @@ -4182,14 +4213,14 @@ tc_gc_rollbacked_storage( sql_store Stor static int -commit_update_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest) +commit_update_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active, ulng latest) { int ok = LOG_OK; sql_column *c = (sql_column*)change->obj; sql_delta *delta = ATOMIC_PTR_GET(&c->data); if (isTempTable(c->t)) - return commit_update_col_(tr, c, commit_ts, oldest); + return commit_update_col_(tr, c, commit_ts, oldest, active, latest); if (commit_ts) delta->cs.ts = commit_ts; if (!commit_ts) { /* rollback */ @@ -4263,11 +4294,13 @@ commit_update_idx_( sql_trans *tr, sql_i } static int -commit_update_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest) +commit_update_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active, ulng latest) { int ok = LOG_OK; sql_idx *i = (sql_idx*)change->obj; sql_delta *delta = ATOMIC_PTR_GET(&i->data); + (void)active; + (void)latest; if (isTempTable(i->t)) return commit_update_idx_( tr, i, commit_ts, oldest); @@ -4347,7 +4380,7 @@ commit_storage(sql_trans *tr, storage *d } static int -commit_update_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest) +commit_update_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active, ulng latest) { int ok = LOG_OK; sql_table *t = (sql_table*)change->obj; @@ -4400,11 +4433,11 @@ commit_update_del( sql_trans *tr, sql_ch ok = segments2cs(tr, dbat->segs, &dbat->cs); assert(ok == LOG_OK); if (ok == LOG_OK) - merge_segments(dbat, tr, change, commit_ts, oldest); + merge_segments(dbat, tr, change, commit_ts, oldest, active, latest); if (ok == LOG_OK && dbat == d && oldest == commit_ts) ok = merge_storage(dbat); } else if (ok == LOG_OK && tr->parent) {/* cleanup older save points */ - merge_segments(dbat, tr, change, commit_ts, oldest); + merge_segments(dbat, tr, change, commit_ts, oldest, active, latest); ATOMIC_PTR_SET(&t->data, savepoint_commit_storage(dbat, commit_ts)); } unlock_table(tr->store, t->base.id); diff --git a/sql/storage/objectset.c b/sql/storage/objectset.c --- a/sql/storage/objectset.c +++ b/sql/storage/objectset.c @@ -607,7 +607,7 @@ tc_gc_objectversion(sql_store store, sql } static int -tc_commit_objectversion(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest) +tc_commit_objectversion(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active_, ulng latest) { objectversion *ov = (objectversion*)change->data; if (commit_ts) { @@ -615,6 +615,8 @@ tc_commit_objectversion(sql_trans *tr, s ov->ts = commit_ts; _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org