Changeset: 5a1dd85be2ae for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/5a1dd85be2ae Modified Files: gdk/gdk_logger.c gdk/gdk_logger.h gdk/gdk_logger_internals.h sql/storage/bat/bat_storage.c Branch: pax-log Log Message:
Introduce segmented internal_log_bat diffs (241 lines): diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c --- a/gdk/gdk_logger.c +++ b/gdk/gdk_logger.c @@ -2460,7 +2460,7 @@ string_writer(logger *lg, BAT *b, lng of } static gdk_return -internal_log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt, int sliced) +internal_log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt, int sliced, lng total_cnt) { bte tpe = find_type(lg, b->ttype); gdk_return ok = GDK_SUCCEED; @@ -2489,13 +2489,18 @@ internal_log_bat(logger *lg, BAT *b, log if (is_row) l.flag = tpe; - if (log_write_format(lg, &l) != GDK_SUCCEED || - (!is_row && !mnstr_writeLng(lg->output_log, nr)) || - (!is_row && mnstr_write(lg->output_log, &tpe, 1, 1) != 1) || - (!is_row && !mnstr_writeLng(lg->output_log, offset))) { - ok = GDK_FAIL; - goto bailout; - } + if (lg->total_cnt == 0) // signals single bulk message or first part of bat logged in parts + if (log_write_format(lg, &l) != GDK_SUCCEED || + (!is_row && !mnstr_writeLng(lg->output_log, total_cnt)) || + (!is_row && mnstr_write(lg->output_log, &tpe, 1, 1) != 1) || + (!is_row && !mnstr_writeLng(lg->output_log, offset))) { + ok = GDK_FAIL; + goto bailout; + } + lg->total_cnt += cnt; + + if (lg->total_cnt == total_cnt) // This is the last to be logged part of this bat, we can already reset the total_cnt + lg->total_cnt = 0; /* if offset is just for the log, but BAT is already sliced, reset offset */ if (sliced) @@ -2581,7 +2586,7 @@ log_bat_persists(logger *lg, BAT *b, log lg->end++; if (lg->debug & 1) fprintf(stderr, "#persists id (%d) bat (%d)\n", id, b->batCacheid); - gdk_return r = internal_log_bat(lg, b, id, 0, BATcount(b), 0); + gdk_return r = internal_log_bat(lg, b, id, 0, BATcount(b), 0, BATcount(b)); logger_unlock(lg); if (r != GDK_SUCCEED) (void) ATOMIC_DEC(&lg->refcount); @@ -2619,10 +2624,10 @@ log_bat_transient(logger *lg, log_id id) } gdk_return -log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt) +log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt, lng total_cnt) { logger_lock(lg); - gdk_return r = internal_log_bat(lg, b, id, offset, cnt, 0); + gdk_return r = internal_log_bat(lg, b, id, offset, cnt, 0, total_cnt); logger_unlock(lg); return r; } @@ -2638,7 +2643,7 @@ log_delta(logger *lg, BAT *uid, BAT *uva lng nr; if (BATtdense(uid)) { - ok = internal_log_bat(lg, uval, id, uid->tseqbase, BATcount(uval), 1); + ok = internal_log_bat(lg, uval, id, uid->tseqbase, BATcount(uval), 1, BATcount(uval)); logger_unlock(lg); if (!LOG_DISABLED(lg) && ok != GDK_SUCCEED) (void) ATOMIC_DEC(&lg->refcount); diff --git a/gdk/gdk_logger.h b/gdk/gdk_logger.h --- a/gdk/gdk_logger.h +++ b/gdk/gdk_logger.h @@ -57,7 +57,7 @@ gdk_export int logger_sequence(logger *l /* todo pass the transaction id */ gdk_export gdk_return log_constant(logger *lg, int type, ptr val, log_id id, lng offset, lng cnt); -gdk_export gdk_return log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt); /* log slice from b */ +gdk_export gdk_return log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt, lng total_cnt); /* log slice from b */ gdk_export gdk_return log_bat_clear(logger *lg, log_id id); gdk_export gdk_return log_bat_persists(logger *lg, BAT *b, log_id id); gdk_export gdk_return log_bat_transient(logger *lg, log_id id); diff --git a/gdk/gdk_logger_internals.h b/gdk/gdk_logger_internals.h --- a/gdk/gdk_logger_internals.h +++ b/gdk/gdk_logger_internals.h @@ -34,6 +34,8 @@ struct logger { int row_insert_nrcols; /* nrcols == 0 no rowinsert, log_update will include the logformat */ + lng total_cnt; /* When logging the content of a bats in multiple runs, total_cnt is used the very first to signal this and keep track in the logging*/ + bool inmemory; char *fn; char *dir; diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c --- a/sql/storage/bat/bat_storage.c +++ b/sql/storage/bat/bat_storage.c @@ -3361,15 +3361,22 @@ log_segment(sql_trans *tr, segment *s, s } static int -log_segments(sql_trans *tr, segments *segs, sqlid id) +log_segments(sql_trans *tr, segments *segs, sqlid id, size_t* nr_appends) { /* log segments */ + size_t _nr_appends = 0; for (segment *seg = segs->h; seg; seg=seg->next) { if (seg->ts == tr->tid && seg->end-seg->start) { + + if (!seg->deleted) + _nr_appends += (seg->end - seg->start); if (log_segment(tr, seg, id) != LOG_OK) return LOG_ERR; } } + + if (nr_appends) + *nr_appends = _nr_appends; return LOG_OK; } @@ -3391,7 +3398,7 @@ log_create_storage(sql_trans *tr, storag if (ok == LOG_OK) ok = (log_bat_persists(store->logger, b, t->base.id) == GDK_SUCCEED)?LOG_OK:LOG_ERR; if (ok == LOG_OK) - ok = log_segments(tr, bat->segs, t->base.id); + ok = log_segments(tr, bat->segs, t->base.id, NULL); bat_destroy(b); return ok; } @@ -3849,7 +3856,7 @@ tr_log_cs( sql_trans *tr, sql_table *t, } static int -log_table_append(sql_trans *tr, sql_table *t, segments *segs) +log_table_append(sql_trans *tr, sql_table *t, segments *segs, size_t nr_appends) { sqlstore *store = tr->store; gdk_return ok = GDK_SUCCEED; @@ -3857,46 +3864,53 @@ log_table_append(sql_trans *tr, sql_tabl if (isTempTable(t)) return LOG_OK; size_t end = segs_end(segs, tr, t); - for (segment *cur = segs->h; cur && ok; cur = cur->next) { - if (cur->ts == tr->tid && !cur->deleted && cur->start < end) { - for (node *n = ol_first_node(t->columns); n && ok; n = n->next) { - sql_column *c = n->data; - column_storage *cs = ATOMIC_PTR_GET(&c->data); - - if (cs->cleared) { - ok = (tr_log_cs(tr, t, cs, cur, c->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL; - continue; - } - + + for (node *n = ol_first_node(t->columns); n && ok; n = n->next) { + sql_column *c = n->data; + column_storage *cs = ATOMIC_PTR_GET(&c->data); + + if (cs->cleared) { + ok = (tr_log_cs(tr, t, cs, NULL, c->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL; + continue; + } + + for (segment *cur = segs->h; cur && ok; cur = cur->next) { + if (cur->ts == tr->tid && !cur->deleted && cur->start < end) { /* append col*/ BAT *ins = temp_descriptor(cs->bid); assert(ins); assert(BATcount(ins) >= cur->end); - ok = log_bat(store->logger, ins, c->base.id, cur->start, cur->end-cur->start); + ok = log_bat(store->logger, ins, c->base.id, cur->start, cur->end-cur->start, nr_appends); bat_destroy(ins); - if (ok == GDK_SUCCEED && cs->ebid) { - BAT *ins = temp_descriptor(cs->ebid); - assert(ins); - if (BATcount(ins) > ins->batInserted) - ok = log_bat(store->logger, ins, -c->base.id, ins->batInserted, BATcount(ins)-ins->batInserted); - BATcommit(ins, BATcount(ins)); - bat_destroy(ins); - } } - if (t->idxs) { - for (node *n = ol_first_node(t->idxs); n && ok; n = n->next) { - sql_idx *i = n->data; - - if ((hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type)) - continue; - column_storage *cs = ATOMIC_PTR_GET(&i->data); - - if (cs) { + } + + if (ok == GDK_SUCCEED && cs->ebid) { + BAT *ins = temp_descriptor(cs->ebid); + assert(ins); + if (BATcount(ins) > ins->batInserted) + ok = log_bat(store->logger, ins, -c->base.id, ins->batInserted, BATcount(ins)-ins->batInserted, BATcount(ins)-ins->batInserted); + BATcommit(ins, BATcount(ins)); + bat_destroy(ins); + } + } + + if (t->idxs) { + for (node *n = ol_first_node(t->idxs); n && ok; n = n->next) { + sql_idx *i = n->data; + + if ((hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type)) + continue; + column_storage *cs = ATOMIC_PTR_GET(&i->data); + + if (cs) { + for (segment *cur = segs->h; cur && ok; cur = cur->next) { + if (cur->ts == tr->tid && !cur->deleted && cur->start < end) { /* append idx */ BAT *ins = temp_descriptor(cs->bid); assert(ins); assert(BATcount(ins) >= cur->end); - ok = log_bat(store->logger, ins, i->base.id, cur->start, cur->end-cur->start); + ok = log_bat(store->logger, ins, i->base.id, cur->start, cur->end-cur->start, nr_appends); bat_destroy(ins); } } @@ -3909,15 +3923,16 @@ log_table_append(sql_trans *tr, sql_tabl static int log_storage(sql_trans *tr, sql_table *t, storage *s, sqlid id) { + size_t nr_appends = 0; int ok = LOG_OK, cleared = s->cs.cleared; if (ok == LOG_OK && cleared) ok = tr_log_cs(tr, t, &s->cs, s->segs->h, t->base.id); if (ok == LOG_OK) ok = segments2cs(tr, s->segs, &s->cs); if (ok == LOG_OK) - ok = log_segments(tr, s->segs, id); + ok = log_segments(tr, s->segs, id, &nr_appends); if (ok == LOG_OK && !cleared) - ok = log_table_append(tr, t, s->segs); + ok = log_table_append(tr, t, s->segs, nr_appends); return ok; } _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org