Changeset: 95db0965e511 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/95db0965e511
Modified Files:
        clients/Tests/exports.stable.out
        gdk/gdk_logger.c
        gdk/gdk_logger.h
        gdk/gdk_logger_internals.h
        sql/server/rel_optimizer.c
        sql/storage/bat/bat_logger.c
        sql/storage/bat/bat_storage.c
        sql/storage/sql_storage.h
        sql/storage/store.c
Branch: default
Log Message:

on large copy-into's we now flush logs (and tables) iff possible.


diffs (278 lines):

diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -539,7 +539,7 @@ gdk_return log_constant(logger *lg, int 
 gdk_return log_delta(logger *lg, BAT *uid, BAT *uval, log_id id);
 gdk_return log_sequence(logger *lg, int seq, lng id);
 gdk_return log_tend(logger *lg);
-gdk_return log_tstart(logger *lg, ulng commit_ts);
+gdk_return log_tstart(logger *lg, ulng commit_ts, bool flush);
 lng logger_changes(logger *lg);
 logger *logger_create(int debug, const char *fn, const char *logdir, int 
version, preversionfix_fptr prefuncp, postversionfix_fptr postfuncp, void 
*funcdata);
 void logger_destroy(logger *lg);
diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -48,7 +48,7 @@ static gdk_return logger_del_bat(logger 
 
 #define BATSIZE 0
 
-#define LOG_DISABLED(lg) ((lg)->debug&128 || (lg)->inmemory)
+#define LOG_DISABLED(lg) ((lg)->debug&128 || (lg)->inmemory || (lg)->flushnow)
 
 static const char *log_commands[] = {
        "LOG_START",
@@ -1386,7 +1386,7 @@ bm_subcommit(logger *lg)
        sizes[i] = 0;
        n[i++] = 0;             /* n[0] is not used */
        bids = (const log_bid *) Tloc(catalog_bid, 0);
-       if (!LOG_DISABLED(lg) && lg->catalog_cnt)
+       if (/*!LOG_DISABLED(lg) && */lg->catalog_cnt)
                cnts = (const lng *) Tloc(lg->catalog_cnt, 0);
        if (lg->catalog_lid)
                lids = (const lng *) Tloc(lg->catalog_lid, 0);
@@ -1402,6 +1402,7 @@ bm_subcommit(logger *lg)
                n[i++] = col;
        }
        /* now commit catalog, so it's also up to date on disk */
+       assert(!LOG_DISABLED(lg) || BATcount(catalog_bid) == lg->cnt);
        sizes[i] = lg->cnt;
        n[i++] = catalog_bid->batCacheid;
        sizes[i] = lg->cnt;
@@ -2212,6 +2213,8 @@ log_constant(logger *lg, int type, ptr v
 
        if (LOG_DISABLED(lg) || !nr) {
                /* logging is switched off */
+               if (nr)
+                       return la_bat_update_count(lg, id, offset+cnt);
                return GDK_SUCCEED;
        }
 
@@ -2260,6 +2263,8 @@ internal_log_bat(logger *lg, BAT *b, log
 
        if (LOG_DISABLED(lg) || !nr) {
                /* logging is switched off */
+               if (nr)
+                       return la_bat_update_count(lg, id, offset+cnt);
                return GDK_SUCCEED;
        }
 
@@ -2343,13 +2348,11 @@ log_bat_persists(logger *lg, BAT *b, int
                        logger_unlock(lg);
                        return GDK_FAIL;
                }
+       } else {
+               lg->cnt++;
        }
        if (lg->debug & 1)
                fprintf(stderr, "#persists id (%d) bat (%d)\n", id, 
b->batCacheid);
-       if (LOG_DISABLED(lg)) {
-               logger_unlock(lg);
-               return GDK_SUCCEED;
-       }
        gdk_return r = internal_log_bat(lg, b, id, 0, BATcount(b), 0);
        logger_unlock(lg);
        return r;
@@ -2365,15 +2368,14 @@ log_bat_transient(logger *lg, int id)
        l.flag = LOG_DESTROY;
        l.id = id;
 
-       if (LOG_DISABLED(lg)) {
-               logger_unlock(lg);
-               return GDK_SUCCEED;
-       }
-
-       if (log_write_format(lg, &l) != GDK_SUCCEED) {
-               TRC_CRITICAL(GDK, "write failed\n");
-               logger_unlock(lg);
-               return GDK_FAIL;
+       if (!LOG_DISABLED(lg)) {
+               if (log_write_format(lg, &l) != GDK_SUCCEED) {
+                       TRC_CRITICAL(GDK, "write failed\n");
+                       logger_unlock(lg);
+                       return GDK_FAIL;
+               }
+       } else {
+               lg->cnt--;
        }
        if (lg->debug & 1)
                fprintf(stderr, "#Logged destroyed bat (%d) %d\n", id,
@@ -2466,7 +2468,8 @@ log_bat_clear(logger *lg, int id)
        logformat l;
 
        if (LOG_DISABLED(lg))
-               return GDK_SUCCEED;
+               return la_bat_update_count(lg, id, 0);
+       //      return GDK_SUCCEED;
 
        l.flag = LOG_CLEAR;
        l.id = id;
@@ -2516,14 +2519,18 @@ log_tend(logger *lg)
        logformat l;
        gdk_return res = GDK_SUCCEED;
 
-       if (LOG_DISABLED(lg))
-               return GDK_SUCCEED;
-
        if (lg->debug & 1)
                fprintf(stderr, "#log_tend %d\n", lg->tid);
 
        l.flag = LOG_END;
        l.id = lg->tid;
+       if (lg->flushnow) {
+               lg->flushnow = 0;
+               return logger_commit(lg);
+       }
+
+       if (LOG_DISABLED(lg))
+               return GDK_SUCCEED;
 
        if (res != GDK_SUCCEED ||
            log_write_format(lg, &l) != GDK_SUCCEED ||
@@ -2690,17 +2697,28 @@ logger_find_bat(logger *lg, log_id id)
 
 
 gdk_return
-log_tstart(logger *lg, ulng commit_ts)
+log_tstart(logger *lg, ulng commit_ts, bool flushnow)
 {
        logformat l;
 
+       if (flushnow) {
+               lg->id++;
+               logger_close_output(lg);
+               /* start new file */
+               if (logger_open_output(lg) != GDK_SUCCEED)
+                       return GDK_FAIL;
+               while (lg->saved_id+1 < lg->id)
+                       logger_flush(lg, commit_ts);
+               lg->flushnow = flushnow;
+       }
+       if (lg->current) {
+               lg->current->last_tid = lg->tid+1;
+               lg->current->last_ts = commit_ts;
+       }
+
        if (LOG_DISABLED(lg))
                return GDK_SUCCEED;
 
-       assert(lg->current);
-       lg->current->last_tid = lg->tid+1;
-       lg->current->last_ts = commit_ts;
-
        l.flag = LOG_START;
        l.id = ++lg->tid;
 
diff --git a/gdk/gdk_logger.h b/gdk/gdk_logger.h
--- a/gdk/gdk_logger.h
+++ b/gdk/gdk_logger.h
@@ -67,7 +67,7 @@ gdk_export gdk_return log_delta(logger *
 /* mark end of batgroup insert or clear */
 //gdk_export gdk_return log_batgroup_end(logger *lg, oid id);
 
-gdk_export gdk_return log_tstart(logger *lg, ulng commit_ts);
+gdk_export gdk_return log_tstart(logger *lg, ulng commit_ts, bool flush);
 gdk_export gdk_return log_tend(logger *lg);
 
 gdk_export gdk_return log_sequence(logger *lg, int seq, lng id);
diff --git a/gdk/gdk_logger_internals.h b/gdk/gdk_logger_internals.h
--- a/gdk/gdk_logger_internals.h
+++ b/gdk/gdk_logger_internals.h
@@ -25,6 +25,7 @@ struct logger {
        int tid;                /* current transaction id */
        int saved_tid;          /* id of transaction which was flushed out 
(into BBP storage)  */
        bool flushing;
+       bool flushnow;
        logged_range *pending;
        logged_range *current;
 
diff --git a/sql/server/rel_optimizer.c b/sql/server/rel_optimizer.c
--- a/sql/server/rel_optimizer.c
+++ b/sql/server/rel_optimizer.c
@@ -4653,7 +4653,7 @@ rel_push_join_exps_down(visitor *v, sql_
                                rel_select_add_exp(v->sql->sa, rel->l, e);
                                list_remove_node(rel->exps, NULL, n);
                                v->changes++;
-                       } else if (right && (rel->op != op_anti || (e->flag != 
mark_notin && e->flag != mark_in)) && 
+                       } else if (right && (rel->op != op_anti || (e->flag != 
mark_notin && e->flag != mark_in)) &&
                                           rel_rebind_exp(v->sql, rel->r, e)) { 
/* select expressions on right */
                                sql_rel *r = rel->r;
                                if (!is_select(r->op)) {
diff --git a/sql/storage/bat/bat_logger.c b/sql/storage/bat/bat_logger.c
--- a/sql/storage/bat/bat_logger.c
+++ b/sql/storage/bat/bat_logger.c
@@ -2347,9 +2347,9 @@ bl_log_isnew(sqlstore *store)
 }
 
 static int
-bl_tstart(sqlstore *store, ulng commit_ts)
+bl_tstart(sqlstore *store, ulng commit_ts, bool flush)
 {
-       return log_tstart(store->logger, commit_ts) == GDK_SUCCEED ? LOG_OK : 
LOG_ERR;
+       return log_tstart(store->logger, commit_ts, flush) == GDK_SUCCEED ? 
LOG_OK : LOG_ERR;
 }
 
 static int
diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c
--- a/sql/storage/bat/bat_storage.c
+++ b/sql/storage/bat/bat_storage.c
@@ -2918,8 +2918,11 @@ claim_segment(sql_trans *tr, sql_table *
        }
 
        /* hard to only add this once per transaction (probably want to change 
to once per new segment) */
-       if ((!inTransaction(tr, t) && !in_transaction && isGlobal(t)) || 
(!isNew(t) && isLocalTemp(t)))
+       if ((!inTransaction(tr, t) && !in_transaction && isGlobal(t)) || 
(!isNew(t) && isLocalTemp(t))) {
                trans_add(tr, &t->base, s, &tc_gc_del, &commit_update_del, 
isLocalTemp(t)?NULL:&log_update_del);
+               if (!isLocalTemp(t))
+                       tr->logchanges += cnt;
+       }
        return slot;
 }
 
diff --git a/sql/storage/sql_storage.h b/sql/storage/sql_storage.h
--- a/sql/storage/sql_storage.h
+++ b/sql/storage/sql_storage.h
@@ -248,7 +248,7 @@ typedef int (*logger_changes_fptr)(struc
 typedef int (*logger_get_sequence_fptr) (struct sqlstore *store, int seq, lng 
*id);
 
 typedef int (*log_isnew_fptr)(struct sqlstore *store);
-typedef int (*log_tstart_fptr) (struct sqlstore *store, ulng commit_ts);
+typedef int (*log_tstart_fptr) (struct sqlstore *store, ulng commit_ts, bool 
flush);
 typedef int (*log_tend_fptr) (struct sqlstore *store);
 typedef lng (*log_save_id_fptr) (struct sqlstore *store);
 typedef int (*log_sequence_fptr) (struct sqlstore *store, int seq, lng id);
diff --git a/sql/storage/store.c b/sql/storage/store.c
--- a/sql/storage/store.c
+++ b/sql/storage/store.c
@@ -3315,9 +3315,11 @@ sql_trans_commit(sql_trans *tr)
                }
        }
        if (tr->changes) {
+               int min_changes = GDKdebug & FORCEMITOMASK ? 5 : 100000;
+               int flush = (tr->logchanges > min_changes && !store->changes);
                /* log changes should only be done if there is something to log 
*/
                if (tr->logchanges > 0) {
-                       ok = store->logger_api.log_tstart(store, commit_ts);
+                       ok = store->logger_api.log_tstart(store, commit_ts, 
flush);
                        /* log */
                        for(node *n=tr->changes->h; n && ok == LOG_OK; n = 
n->next) {
                                sql_change *c = n->data;
@@ -3329,7 +3331,7 @@ sql_trans_commit(sql_trans *tr)
                        if (ok == LOG_OK && store->prev_oid != store->obj_id)
                                ok = store->logger_api.log_sequence(store, 
OBJ_SID, store->obj_id);
                        store->prev_oid = store->obj_id;
-                       if (ok == LOG_OK)
+                       if (ok == LOG_OK && !flush)
                                ok = store->logger_api.log_tend(store);
                }
                tr->logchanges = 0;
@@ -3342,6 +3344,9 @@ sql_trans_commit(sql_trans *tr)
                        else
                                c->obj->flags = 0;
                }
+               /* flush logger after changes got applied */
+               if (ok == LOG_OK && flush)
+                       ok = store->logger_api.log_tend(store);
                /* garbage collect */
                for(node *n=tr->changes->h; n && ok == LOG_OK; ) {
                        node *next = n->next;
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to