Changeset: 5a1dd85be2ae for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/5a1dd85be2ae
Modified Files:
        gdk/gdk_logger.c
        gdk/gdk_logger.h
        gdk/gdk_logger_internals.h
        sql/storage/bat/bat_storage.c
Branch: pax-log
Log Message:

Introduce segmented internal_log_bat


diffs (241 lines):

diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -2460,7 +2460,7 @@ string_writer(logger *lg, BAT *b, lng of
 }
 
 static gdk_return
-internal_log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt, int 
sliced)
+internal_log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt, int 
sliced, lng total_cnt)
 {
        bte tpe = find_type(lg, b->ttype);
        gdk_return ok = GDK_SUCCEED;
@@ -2489,13 +2489,18 @@ internal_log_bat(logger *lg, BAT *b, log
 
        if (is_row)
                l.flag = tpe;
-       if (log_write_format(lg, &l) != GDK_SUCCEED ||
-           (!is_row && !mnstr_writeLng(lg->output_log, nr)) ||
-           (!is_row && mnstr_write(lg->output_log, &tpe, 1, 1) != 1) ||
-           (!is_row && !mnstr_writeLng(lg->output_log, offset))) {
-               ok = GDK_FAIL;
-               goto bailout;
-       }
+       if (lg->total_cnt == 0) // signals single bulk message or first part of 
bat logged in parts
+               if (log_write_format(lg, &l) != GDK_SUCCEED ||
+                       (!is_row && !mnstr_writeLng(lg->output_log, total_cnt)) 
||
+                       (!is_row && mnstr_write(lg->output_log, &tpe, 1, 1) != 
1) ||
+                       (!is_row && !mnstr_writeLng(lg->output_log, offset))) {
+                       ok = GDK_FAIL;
+                       goto bailout;
+               }
+       lg->total_cnt += cnt;
+
+       if (lg->total_cnt == total_cnt) // This is the last to be logged part 
of this bat, we can already reset the total_cnt
+               lg->total_cnt = 0;
 
        /* if offset is just for the log, but BAT is already sliced, reset 
offset */
        if (sliced)
@@ -2581,7 +2586,7 @@ log_bat_persists(logger *lg, BAT *b, log
        lg->end++;
        if (lg->debug & 1)
                fprintf(stderr, "#persists id (%d) bat (%d)\n", id, 
b->batCacheid);
-       gdk_return r = internal_log_bat(lg, b, id, 0, BATcount(b), 0);
+       gdk_return r = internal_log_bat(lg, b, id, 0, BATcount(b), 0, 
BATcount(b));
        logger_unlock(lg);
        if (r != GDK_SUCCEED)
                (void) ATOMIC_DEC(&lg->refcount);
@@ -2619,10 +2624,10 @@ log_bat_transient(logger *lg, log_id id)
 }
 
 gdk_return
-log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt)
+log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt, lng total_cnt)
 {
        logger_lock(lg);
-       gdk_return r = internal_log_bat(lg, b, id, offset, cnt, 0);
+       gdk_return r = internal_log_bat(lg, b, id, offset, cnt, 0, total_cnt);
        logger_unlock(lg);
        return r;
 }
@@ -2638,7 +2643,7 @@ log_delta(logger *lg, BAT *uid, BAT *uva
        lng nr;
 
        if (BATtdense(uid)) {
-               ok = internal_log_bat(lg, uval, id, uid->tseqbase, 
BATcount(uval), 1);
+               ok = internal_log_bat(lg, uval, id, uid->tseqbase, 
BATcount(uval), 1, BATcount(uval));
                logger_unlock(lg);
                if (!LOG_DISABLED(lg) && ok != GDK_SUCCEED)
                        (void) ATOMIC_DEC(&lg->refcount);
diff --git a/gdk/gdk_logger.h b/gdk/gdk_logger.h
--- a/gdk/gdk_logger.h
+++ b/gdk/gdk_logger.h
@@ -57,7 +57,7 @@ gdk_export int logger_sequence(logger *l
 
 /* todo pass the transaction id */
 gdk_export gdk_return log_constant(logger *lg, int type, ptr val, log_id id, 
lng offset, lng cnt);
-gdk_export gdk_return log_bat(logger *lg, BAT *b, log_id id, lng offset, lng 
cnt); /* log slice from b */
+gdk_export gdk_return log_bat(logger *lg, BAT *b, log_id id, lng offset, lng 
cnt, lng total_cnt); /* log slice from b */
 gdk_export gdk_return log_bat_clear(logger *lg, log_id id);
 gdk_export gdk_return log_bat_persists(logger *lg, BAT *b, log_id id);
 gdk_export gdk_return log_bat_transient(logger *lg, log_id id);
diff --git a/gdk/gdk_logger_internals.h b/gdk/gdk_logger_internals.h
--- a/gdk/gdk_logger_internals.h
+++ b/gdk/gdk_logger_internals.h
@@ -34,6 +34,8 @@ struct logger {
 
        int row_insert_nrcols;  /* nrcols == 0 no rowinsert, log_update will 
include the logformat */
 
+       lng total_cnt; /* When logging the content of a bats in multiple runs, 
total_cnt is used the very first to signal this and keep track in the logging*/
+
        bool inmemory;
        char *fn;
        char *dir;
diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c
--- a/sql/storage/bat/bat_storage.c
+++ b/sql/storage/bat/bat_storage.c
@@ -3361,15 +3361,22 @@ log_segment(sql_trans *tr, segment *s, s
 }
 
 static int
-log_segments(sql_trans *tr, segments *segs, sqlid id)
+log_segments(sql_trans *tr, segments *segs, sqlid id, size_t* nr_appends)
 {
        /* log segments */
+       size_t _nr_appends = 0;
        for (segment *seg = segs->h; seg; seg=seg->next) {
                if (seg->ts == tr->tid && seg->end-seg->start) {
+
+                       if (!seg->deleted)
+                               _nr_appends += (seg->end - seg->start);
                        if (log_segment(tr, seg, id) != LOG_OK)
                                return LOG_ERR;
                }
        }
+
+       if (nr_appends)
+               *nr_appends = _nr_appends;
        return LOG_OK;
 }
 
@@ -3391,7 +3398,7 @@ log_create_storage(sql_trans *tr, storag
        if (ok == LOG_OK)
                ok = (log_bat_persists(store->logger, b, t->base.id) == 
GDK_SUCCEED)?LOG_OK:LOG_ERR;
        if (ok == LOG_OK)
-               ok = log_segments(tr, bat->segs, t->base.id);
+               ok = log_segments(tr, bat->segs, t->base.id, NULL);
        bat_destroy(b);
        return ok;
 }
@@ -3849,7 +3856,7 @@ tr_log_cs( sql_trans *tr, sql_table *t, 
 }
 
 static int
-log_table_append(sql_trans *tr, sql_table *t, segments *segs)
+log_table_append(sql_trans *tr, sql_table *t, segments *segs, size_t 
nr_appends)
 {
        sqlstore *store = tr->store;
        gdk_return ok = GDK_SUCCEED;
@@ -3857,46 +3864,53 @@ log_table_append(sql_trans *tr, sql_tabl
        if (isTempTable(t))
                return LOG_OK;
        size_t end = segs_end(segs, tr, t);
-       for (segment *cur = segs->h; cur && ok; cur = cur->next) {
-               if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
-                       for (node *n = ol_first_node(t->columns); n && ok; n = 
n->next) {
-                               sql_column *c = n->data;
-                               column_storage *cs = ATOMIC_PTR_GET(&c->data);
-
-                               if (cs->cleared) {
-                                       ok = (tr_log_cs(tr, t, cs, cur, 
c->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
-                                       continue;
-                               }
-
+
+       for (node *n = ol_first_node(t->columns); n && ok; n = n->next) {
+               sql_column *c = n->data;
+               column_storage *cs = ATOMIC_PTR_GET(&c->data);
+
+               if (cs->cleared) {
+                       ok = (tr_log_cs(tr, t, cs, NULL, c->base.id) == 
LOG_OK)? GDK_SUCCEED : GDK_FAIL;
+                       continue;
+               }
+
+               for (segment *cur = segs->h; cur && ok; cur = cur->next) {
+                       if (cur->ts == tr->tid && !cur->deleted && cur->start < 
end) {
                                /* append col*/
                                BAT *ins = temp_descriptor(cs->bid);
                                assert(ins);
                                assert(BATcount(ins) >= cur->end);
-                               ok = log_bat(store->logger, ins, c->base.id, 
cur->start, cur->end-cur->start);
+                               ok = log_bat(store->logger, ins, c->base.id, 
cur->start, cur->end-cur->start, nr_appends);
                                bat_destroy(ins);
-                               if (ok == GDK_SUCCEED && cs->ebid) {
-                                       BAT *ins = temp_descriptor(cs->ebid);
-                                       assert(ins);
-                                       if (BATcount(ins) > ins->batInserted)
-                                               ok = log_bat(store->logger, 
ins, -c->base.id, ins->batInserted, BATcount(ins)-ins->batInserted);
-                                       BATcommit(ins, BATcount(ins));
-                                       bat_destroy(ins);
-                               }
                        }
-                       if (t->idxs) {
-                               for (node *n = ol_first_node(t->idxs); n && ok; 
n = n->next) {
-                                       sql_idx *i = n->data;
-
-                                       if ((hash_index(i->type) && 
list_length(i->columns) <= 1) || !idx_has_column(i->type))
-                                               continue;
-                                       column_storage *cs = 
ATOMIC_PTR_GET(&i->data);
-
-                                       if (cs) {
+               }
+
+               if (ok == GDK_SUCCEED && cs->ebid) {
+                       BAT *ins = temp_descriptor(cs->ebid);
+                       assert(ins);
+                       if (BATcount(ins) > ins->batInserted)
+                               ok = log_bat(store->logger, ins, -c->base.id, 
ins->batInserted, BATcount(ins)-ins->batInserted, 
BATcount(ins)-ins->batInserted);
+                       BATcommit(ins, BATcount(ins));
+                       bat_destroy(ins);
+               }
+       }
+
+       if (t->idxs) {
+               for (node *n = ol_first_node(t->idxs); n && ok; n = n->next) {
+                       sql_idx *i = n->data;
+
+                       if ((hash_index(i->type) && list_length(i->columns) <= 
1) || !idx_has_column(i->type))
+                               continue;
+                       column_storage *cs = ATOMIC_PTR_GET(&i->data);
+
+                       if (cs) {
+                               for (segment *cur = segs->h; cur && ok; cur = 
cur->next) {
+                                       if (cur->ts == tr->tid && !cur->deleted 
&& cur->start < end) {
                                                /* append idx */
                                                BAT *ins = 
temp_descriptor(cs->bid);
                                                assert(ins);
                                                assert(BATcount(ins) >= 
cur->end);
-                                               ok = log_bat(store->logger, 
ins, i->base.id, cur->start, cur->end-cur->start);
+                                               ok = log_bat(store->logger, 
ins, i->base.id, cur->start, cur->end-cur->start, nr_appends);
                                                bat_destroy(ins);
                                        }
                                }
@@ -3909,15 +3923,16 @@ log_table_append(sql_trans *tr, sql_tabl
 static int
 log_storage(sql_trans *tr, sql_table *t, storage *s, sqlid id)
 {
+       size_t nr_appends = 0;
        int ok = LOG_OK, cleared = s->cs.cleared;
        if (ok == LOG_OK && cleared)
                ok =  tr_log_cs(tr, t, &s->cs, s->segs->h, t->base.id);
        if (ok == LOG_OK)
                ok = segments2cs(tr, s->segs, &s->cs);
        if (ok == LOG_OK)
-               ok = log_segments(tr, s->segs, id);
+               ok = log_segments(tr, s->segs, id, &nr_appends);
        if (ok == LOG_OK && !cleared)
-               ok = log_table_append(tr, t, s->segs);
+               ok = log_table_append(tr, t, s->segs, nr_appends);
        return ok;
 }
 
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to