Changeset: 7b584610a8fd for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=7b584610a8fd
Modified Files:
        gdk/gdk_logger.c
        gdk/gdk_logger.h
        gdk/gdk_logger_internals.h
        geom/monetdb5/geom.c
        sql/backends/monet5/sql.mal
        sql/backends/monet5/sql_upgrades.c
        sql/scripts/25_debug.sql
        sql/storage/bat/bat_storage.c
        sql/storage/store.c
Branch: unlock
Log Message:

amnesia logging
        removed lots of upgrade code (as backwards compatibility will be on the 
previous release only!!)
        initial batgroup logging, one combined log for a full update 
(clear/deletes and inserts)


diffs (truncated from 1573 to 300 lines):

diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -82,6 +82,7 @@
 #define LOG_USE_ID     15
 #define LOG_CLEAR_ID   16
 #define LOG_UPDATE_PAX 17
+#define LOG_INSERT_OFFSET      18
 
 #ifdef NATIVE_WIN32
 #define getfilepos _ftelli64
@@ -119,6 +120,7 @@ static const char *log_commands[] = {
        "LOG_USE_ID",
        "LOG_CLEAR_ID",
        "LOG_UPDATE_PAX",
+       "LOG_INSERT_OFFSET",
 };
 
 typedef struct logaction {
@@ -127,6 +129,7 @@ typedef struct logaction {
        int ht;                 /* vid(-1),void etc */
        int tt;
        lng id;
+       lng offset;
        char *name;             /* optional */
        char tpe;               /* tpe of column */
        oid cid;                /* id of object */
@@ -153,6 +156,7 @@ typedef struct logformat_t {
 
 typedef enum {LOG_OK, LOG_EOF, LOG_ERR} log_return;
 
+#include "gdk_geomlogger.h"
 static gdk_return bm_commit(logger *lg);
 static gdk_return tr_grow(trans *tr);
 
@@ -397,13 +401,19 @@ log_read_id(logger *lg, char *tpe, oid *
        return LOG_OK;
 }
 
+static log_return log_read_batgroup(logger *lg, trans *tr, logformat *l, char 
tpe, oid id);
 static log_return
-log_read_updates(logger *lg, trans *tr, logformat *l, char *name, int tpe, oid 
id, int pax)
+log_read_updates(logger *lg, trans *tr, logformat *l, char *name, int tpe, oid 
id, int pax, lng offset)
 {
+       if (tpe == LOG_BATGROUP || tpe == LOG_BATGROUP_ID)
+               return log_read_batgroup(lg, tr, l, tpe, id);
        log_bid bid = logger_find_bat(lg, name, tpe, id);
        BAT *b = BATdescriptor(bid);
        log_return res = LOG_OK;
-       int ht = -1, tt = -1, tseq = 0;
+       int ht = -1, tt = -1, tseq = 0, has_offset = (l->flag == 
LOG_INSERT_OFFSET);
+
+       if (l->flag == LOG_INSERT_OFFSET)
+               l->flag = LOG_INSERT;
 
        assert(!lg->inmemory);
        if (lg->debug & 1)
@@ -486,6 +496,8 @@ log_read_updates(logger *lg, trans *tr, 
 
                if (ht == TYPE_void && l->flag == LOG_INSERT) {
                        lng nr = l->nr;
+                       if (has_offset && mnstr_readLng(lg->input_log, &offset) 
!= 1)
+                               res = LOG_ERR;
                        for (; res == LOG_OK && nr > 0; nr--) {
                                void *t = rt(tv, lg->input_log, 1);
 
@@ -588,6 +600,7 @@ log_read_updates(logger *lg, trans *tr, 
                                tr->changes[tr->nr].tt = tt;
                                tr->changes[tr->nr].tpe = tpe;
                                tr->changes[tr->nr].cid = id;
+                               tr->changes[tr->nr].offset = offset;
                                if (name && (tr->changes[tr->nr].name = 
GDKstrdup(name)) == NULL) {
                                        logbat_destroy(b);
                                        BBPreclaim(uid);
@@ -609,6 +622,48 @@ log_read_updates(logger *lg, trans *tr, 
        return res;
 }
 
+static log_return
+log_read_batgroup(logger *lg, trans *tr, logformat *l, char bg_tpe, oid id)
+{
+       lng offset = 0, nr_deleted = 0, nr_inserted = l->nr;
+       oid rid;
+       log_return res = LOG_OK;
+       char tpe = 0, *name = NULL;
+
+       if (mnstr_readLng(lg->input_log, &offset) != 1 ||
+           mnstr_readLng(lg->input_log, &nr_deleted) != 1) {
+               fprintf(stderr, "!ERROR: log_read_batgroup: read failed\n");
+               return LOG_EOF;
+       }
+       /* read deletes and inserts */
+       l->flag = LOG_INSERT;
+       if (nr_deleted) {
+               l->nr = nr_deleted;
+               if (bg_tpe == LOG_BATGROUP_ID && log_read_id(lg, &tpe, &rid) == 
LOG_OK) {
+                       res = log_read_updates(lg, tr, l, name, tpe, rid, 
bg_tpe == LOG_BATGROUP_ID, 0);
+               } else {
+                       res = LOG_ERR;
+               } 
+       }
+       if (l->nr) {
+               l->nr = nr_inserted;
+               while (res == LOG_OK) {
+                       if (bg_tpe == LOG_BATGROUP_ID && log_read_id(lg, &tpe, 
&rid) == LOG_OK) { 
+                               if (tpe == LOG_BATGROUP_END)
+                                       break;
+                               res = log_read_updates(lg, tr, l, name, tpe, 
rid, bg_tpe == LOG_BATGROUP_ID, offset);
+                       } else {
+                               res = LOG_ERR;
+                       }
+               }
+       }
+       if ((tpe != LOG_BATGROUP_END && log_read_id(lg, &tpe, &rid) != LOG_OK) 
|| (rid != id)) {
+               fprintf(stderr, "!ERROR: log_read_batgroup: read failed\n");
+               return LOG_EOF;
+       }
+       return res;
+}
+
 static gdk_return
 la_bat_updates(logger *lg, logaction *la)
 {
@@ -626,9 +681,46 @@ la_bat_updates(logger *lg, logaction *la
        if (b == NULL)
                return GDK_FAIL;
        if (la->type == LOG_INSERT) {
-               if (BATappend(b, la->b, NULL, true) != GDK_SUCCEED) {
-                       logbat_destroy(b);
-                       return GDK_FAIL;
+               BUN cnt = BATcount(b);
+
+               /* handle offset 0 ie clear */
+               if (la->offset == 0 && cnt)
+                       BATclear(b, true);
+               /* handle offset */
+               if (cnt <= (BUN)la->offset) {
+                       if (cnt < (BUN)la->offset) { /* insert nils */
+                               const void *tv = ATOMnilptr(b->ttype);
+                               lng i, d = la->offset - BATcount(b);
+                               for(i=0;i<d;i++) {
+                                       if (BUNappend(b, tv, true) != 
GDK_SUCCEED) {
+                                               logbat_destroy(b);
+                                               return GDK_FAIL;
+                                       }
+                               }
+                       }
+                       if (BATcount(b) == (BUN)la->offset && BATappend(b, 
la->b, NULL, true) != GDK_SUCCEED) {
+                               logbat_destroy(b);
+                               return GDK_FAIL;
+                       }
+               } else {
+                       BATiter vi = bat_iterator(la->b);
+                       BUN p, q;
+
+                       for (p=0, q = la->offset; p<(BUN)la->nr; p++, q++) {
+                               const void *t = BUNtail(vi, p);
+
+                               if (q < cnt) {
+                                       if (BUNreplace(b, q, t, true) != 
GDK_SUCCEED) {
+                                               logbat_destroy(b);
+                                               return GDK_FAIL;
+                                       }
+                               } else {
+                                       if (BUNappend(b, t, true) != 
GDK_SUCCEED) {
+                                               logbat_destroy(b);
+                                               return GDK_FAIL;
+                                       }
+                               }
+                       }
                }
        } else if (la->type == LOG_UPDATE) {
                BATiter vi = bat_iterator(la->b);
@@ -1100,7 +1192,7 @@ logger_read_transaction(logger *lg)
                char tpe;
                oid id;
 
-               if ((l.flag >= LOG_INSERT && l.flag <= LOG_CLEAR) || l.flag == 
LOG_CREATE_ID || l.flag == LOG_USE_ID) {
+               if ((l.flag >= LOG_INSERT && l.flag <= LOG_CLEAR) || l.flag == 
LOG_CREATE_ID || l.flag == LOG_USE_ID || l.flag == LOG_INSERT_OFFSET) {
                        name = log_read_string(lg);
 
                        if (name == NULL) {
@@ -1159,11 +1251,12 @@ logger_read_transaction(logger *lg)
                        err = log_read_seq(lg, tr, &l);
                        break;
                case LOG_INSERT:
+               case LOG_INSERT_OFFSET:
                case LOG_UPDATE:
                        if (name == NULL || tr == NULL)
                                err = LOG_EOF;
                        else
-                               err = log_read_updates(lg, tr, &l, name, 0, 0, 
0);
+                               err = log_read_updates(lg, tr, &l, name, 0, 0, 
0, 0);
                        break;
                case LOG_INSERT_ID:
                case LOG_UPDATE_ID:
@@ -1173,7 +1266,7 @@ logger_read_transaction(logger *lg)
                        if (log_read_id(lg, &tpe, &id) != LOG_OK)
                                err = LOG_ERR;
                        else
-                               err = log_read_updates(lg, tr, &l, name, tpe, 
id, pax);
+                               err = log_read_updates(lg, tr, &l, name, tpe, 
id, pax, 0);
                }       break;
                case LOG_CREATE:
                        if (name == NULL || tr == NULL)
@@ -2139,6 +2232,7 @@ logger_new(int debug, const char *fn, co
        lg->saved_tid = getBBPinfo();
        lg->input_log = NULL;
        lg->flush_id = 0; /* after normal restart set to current id */
+       lg->bg.id = 0;
 
        len = snprintf(filename, sizeof(filename), "%s%c%s%c", logdir, DIR_SEP, 
fn, DIR_SEP);
        if (len == -1 || len >= FILENAME_MAX) {
@@ -2590,7 +2684,7 @@ log_bat_persists(logger *lg, BAT *b, con
        if (lg->debug & 1)
                fprintf(stderr, "#Logged new bat [%s,%s] %s " BUNFMT " (%d)\n",
                        ha, ta, name, BATcount(b), b->batCacheid);
-       return log_bat(lg, b, name, tpe, id);
+       return log_bat(lg, b, name, tpe, id, 0);
 }
 
 gdk_return
@@ -2720,11 +2814,12 @@ log_delta(logger *lg, BAT *uid, BAT *uva
 }
 
 gdk_return
-log_bat(logger *lg, BAT *b, const char *name, char tpe, oid id)
+log_bat(logger *lg, BAT *b, const char *name, char tpe, oid id, lng offset)
 {
        gdk_return ok = GDK_SUCCEED;
        logformat l;
        BUN p;
+       int is_bg = (lg->bg.id != 0 && lg->bg.with_id);
 
        l.tid = lg->tid;
        l.nr = (BUNlast(b) - b->batInserted);
@@ -2740,10 +2835,16 @@ log_bat(logger *lg, BAT *b, const char *
                gdk_return (*wt) (const void *, stream *, size_t) = 
BATatoms[b->ttype].atomWrite;
 
                l.flag = tpe?LOG_INSERT_ID:LOG_INSERT;
-               if (log_write_format(lg, &l) != GDK_SUCCEED ||
+               assert(!offset || is_bg || name);
+               assert(!is_bg || l.nr == lg->bg.nr_inserted); /* or these are 
the deleted !*/
+               if (name && offset && !tpe)
+                       l.flag = LOG_INSERT_OFFSET;
+               if ((!is_bg && log_write_format(lg, &l) != GDK_SUCCEED) ||
                    (tpe ? log_write_id(lg, tpe, id) : log_write_string(lg, 
name)) != GDK_SUCCEED)
                        return GDK_FAIL;
 
+               if (offset && name && !tpe && !mnstr_writeLng(lg->output_log, 
offset))
+                       return GDK_FAIL;
                if (b->ttype > TYPE_void &&
                    b->ttype < TYPE_str &&
                    !isVIEW(b)) {
@@ -2772,6 +2873,9 @@ log_bat_clear(logger *lg, const char *na
 {
        logformat l;
 
+       if (lg->bg.id != 0) /* logged as part of the batgroup */
+               return GDK_SUCCEED;
+
        l.nr = 1;
        l.tid = lg->tid;
        lg->changes += l.nr;
@@ -2793,27 +2897,61 @@ log_bat_clear(logger *lg, const char *na
 }
 
 gdk_return 
-log_batgroup_insert(logger *lg, oid id, lng nr)
+log_batgroup(logger *lg, char tpe_id, oid id, bool cleared, lng nr_inserted, 
lng offset, lng nr_deleted)
 {
-       (void)lg;
-       (void)id;
-       (void)nr;
-       return GDK_SUCCEED;
-}
-
-gdk_return 
-log_batgroup_clear(logger *lg, oid id)
-{
-       (void)lg;
-       (void)id;
-       return GDK_SUCCEED;
+       logformat l;
+       char tpe = (tpe_id)?LOG_BATGROUP_ID:LOG_BATGROUP; 
+
+       if (!tpe_id) { /* Only optimize for id case */
+               lg->bg.id = id;
+               lg->bg.with_id = 0;
+               return GDK_SUCCEED;
+       }
+
+       l.flag = LOG_INSERT_ID; /* combination LOG_INSERT_ID + tpe == 
LOG_BATGROUP[_ID] */
+       l.nr = nr_inserted;
+       l.tid = lg->tid;
+
+       lg->changes += l.nr + cleared;
+
+       if (LOG_DISABLED(lg) || lg->inmemory) {
+               /* logging is switched off */
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to