Changeset: 95db0965e511 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/95db0965e511 Modified Files: clients/Tests/exports.stable.out gdk/gdk_logger.c gdk/gdk_logger.h gdk/gdk_logger_internals.h sql/server/rel_optimizer.c sql/storage/bat/bat_logger.c sql/storage/bat/bat_storage.c sql/storage/sql_storage.h sql/storage/store.c Branch: default Log Message:
on large copy-into's we now flush logs (and tables) iff possible. diffs (278 lines): diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out --- a/clients/Tests/exports.stable.out +++ b/clients/Tests/exports.stable.out @@ -539,7 +539,7 @@ gdk_return log_constant(logger *lg, int gdk_return log_delta(logger *lg, BAT *uid, BAT *uval, log_id id); gdk_return log_sequence(logger *lg, int seq, lng id); gdk_return log_tend(logger *lg); -gdk_return log_tstart(logger *lg, ulng commit_ts); +gdk_return log_tstart(logger *lg, ulng commit_ts, bool flush); lng logger_changes(logger *lg); logger *logger_create(int debug, const char *fn, const char *logdir, int version, preversionfix_fptr prefuncp, postversionfix_fptr postfuncp, void *funcdata); void logger_destroy(logger *lg); diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c --- a/gdk/gdk_logger.c +++ b/gdk/gdk_logger.c @@ -48,7 +48,7 @@ static gdk_return logger_del_bat(logger #define BATSIZE 0 -#define LOG_DISABLED(lg) ((lg)->debug&128 || (lg)->inmemory) +#define LOG_DISABLED(lg) ((lg)->debug&128 || (lg)->inmemory || (lg)->flushnow) static const char *log_commands[] = { "LOG_START", @@ -1386,7 +1386,7 @@ bm_subcommit(logger *lg) sizes[i] = 0; n[i++] = 0; /* n[0] is not used */ bids = (const log_bid *) Tloc(catalog_bid, 0); - if (!LOG_DISABLED(lg) && lg->catalog_cnt) + if (/*!LOG_DISABLED(lg) && */lg->catalog_cnt) cnts = (const lng *) Tloc(lg->catalog_cnt, 0); if (lg->catalog_lid) lids = (const lng *) Tloc(lg->catalog_lid, 0); @@ -1402,6 +1402,7 @@ bm_subcommit(logger *lg) n[i++] = col; } /* now commit catalog, so it's also up to date on disk */ + assert(!LOG_DISABLED(lg) || BATcount(catalog_bid) == lg->cnt); sizes[i] = lg->cnt; n[i++] = catalog_bid->batCacheid; sizes[i] = lg->cnt; @@ -2212,6 +2213,8 @@ log_constant(logger *lg, int type, ptr v if (LOG_DISABLED(lg) || !nr) { /* logging is switched off */ + if (nr) + return la_bat_update_count(lg, id, offset+cnt); return GDK_SUCCEED; } @@ -2260,6 +2263,8 @@ internal_log_bat(logger *lg, BAT *b, log if (LOG_DISABLED(lg) || !nr) { /* logging is switched off */ + if (nr) + return la_bat_update_count(lg, id, offset+cnt); return GDK_SUCCEED; } @@ -2343,13 +2348,11 @@ log_bat_persists(logger *lg, BAT *b, int logger_unlock(lg); return GDK_FAIL; } + } else { + lg->cnt++; } if (lg->debug & 1) fprintf(stderr, "#persists id (%d) bat (%d)\n", id, b->batCacheid); - if (LOG_DISABLED(lg)) { - logger_unlock(lg); - return GDK_SUCCEED; - } gdk_return r = internal_log_bat(lg, b, id, 0, BATcount(b), 0); logger_unlock(lg); return r; @@ -2365,15 +2368,14 @@ log_bat_transient(logger *lg, int id) l.flag = LOG_DESTROY; l.id = id; - if (LOG_DISABLED(lg)) { - logger_unlock(lg); - return GDK_SUCCEED; - } - - if (log_write_format(lg, &l) != GDK_SUCCEED) { - TRC_CRITICAL(GDK, "write failed\n"); - logger_unlock(lg); - return GDK_FAIL; + if (!LOG_DISABLED(lg)) { + if (log_write_format(lg, &l) != GDK_SUCCEED) { + TRC_CRITICAL(GDK, "write failed\n"); + logger_unlock(lg); + return GDK_FAIL; + } + } else { + lg->cnt--; } if (lg->debug & 1) fprintf(stderr, "#Logged destroyed bat (%d) %d\n", id, @@ -2466,7 +2468,8 @@ log_bat_clear(logger *lg, int id) logformat l; if (LOG_DISABLED(lg)) - return GDK_SUCCEED; + return la_bat_update_count(lg, id, 0); + // return GDK_SUCCEED; l.flag = LOG_CLEAR; l.id = id; @@ -2516,14 +2519,18 @@ log_tend(logger *lg) logformat l; gdk_return res = GDK_SUCCEED; - if (LOG_DISABLED(lg)) - return GDK_SUCCEED; - if (lg->debug & 1) fprintf(stderr, "#log_tend %d\n", lg->tid); l.flag = LOG_END; l.id = lg->tid; + if (lg->flushnow) { + lg->flushnow = 0; + return logger_commit(lg); + } + + if (LOG_DISABLED(lg)) + return GDK_SUCCEED; if (res != GDK_SUCCEED || log_write_format(lg, &l) != GDK_SUCCEED || @@ -2690,17 +2697,28 @@ logger_find_bat(logger *lg, log_id id) gdk_return -log_tstart(logger *lg, ulng commit_ts) +log_tstart(logger *lg, ulng commit_ts, bool flushnow) { logformat l; + if (flushnow) { + lg->id++; + logger_close_output(lg); + /* start new file */ + if (logger_open_output(lg) != GDK_SUCCEED) + return GDK_FAIL; + while (lg->saved_id+1 < lg->id) + logger_flush(lg, commit_ts); + lg->flushnow = flushnow; + } + if (lg->current) { + lg->current->last_tid = lg->tid+1; + lg->current->last_ts = commit_ts; + } + if (LOG_DISABLED(lg)) return GDK_SUCCEED; - assert(lg->current); - lg->current->last_tid = lg->tid+1; - lg->current->last_ts = commit_ts; - l.flag = LOG_START; l.id = ++lg->tid; diff --git a/gdk/gdk_logger.h b/gdk/gdk_logger.h --- a/gdk/gdk_logger.h +++ b/gdk/gdk_logger.h @@ -67,7 +67,7 @@ gdk_export gdk_return log_delta(logger * /* mark end of batgroup insert or clear */ //gdk_export gdk_return log_batgroup_end(logger *lg, oid id); -gdk_export gdk_return log_tstart(logger *lg, ulng commit_ts); +gdk_export gdk_return log_tstart(logger *lg, ulng commit_ts, bool flush); gdk_export gdk_return log_tend(logger *lg); gdk_export gdk_return log_sequence(logger *lg, int seq, lng id); diff --git a/gdk/gdk_logger_internals.h b/gdk/gdk_logger_internals.h --- a/gdk/gdk_logger_internals.h +++ b/gdk/gdk_logger_internals.h @@ -25,6 +25,7 @@ struct logger { int tid; /* current transaction id */ int saved_tid; /* id of transaction which was flushed out (into BBP storage) */ bool flushing; + bool flushnow; logged_range *pending; logged_range *current; diff --git a/sql/server/rel_optimizer.c b/sql/server/rel_optimizer.c --- a/sql/server/rel_optimizer.c +++ b/sql/server/rel_optimizer.c @@ -4653,7 +4653,7 @@ rel_push_join_exps_down(visitor *v, sql_ rel_select_add_exp(v->sql->sa, rel->l, e); list_remove_node(rel->exps, NULL, n); v->changes++; - } else if (right && (rel->op != op_anti || (e->flag != mark_notin && e->flag != mark_in)) && + } else if (right && (rel->op != op_anti || (e->flag != mark_notin && e->flag != mark_in)) && rel_rebind_exp(v->sql, rel->r, e)) { /* select expressions on right */ sql_rel *r = rel->r; if (!is_select(r->op)) { diff --git a/sql/storage/bat/bat_logger.c b/sql/storage/bat/bat_logger.c --- a/sql/storage/bat/bat_logger.c +++ b/sql/storage/bat/bat_logger.c @@ -2347,9 +2347,9 @@ bl_log_isnew(sqlstore *store) } static int -bl_tstart(sqlstore *store, ulng commit_ts) +bl_tstart(sqlstore *store, ulng commit_ts, bool flush) { - return log_tstart(store->logger, commit_ts) == GDK_SUCCEED ? LOG_OK : LOG_ERR; + return log_tstart(store->logger, commit_ts, flush) == GDK_SUCCEED ? LOG_OK : LOG_ERR; } static int diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c --- a/sql/storage/bat/bat_storage.c +++ b/sql/storage/bat/bat_storage.c @@ -2918,8 +2918,11 @@ claim_segment(sql_trans *tr, sql_table * } /* hard to only add this once per transaction (probably want to change to once per new segment) */ - if ((!inTransaction(tr, t) && !in_transaction && isGlobal(t)) || (!isNew(t) && isLocalTemp(t))) + if ((!inTransaction(tr, t) && !in_transaction && isGlobal(t)) || (!isNew(t) && isLocalTemp(t))) { trans_add(tr, &t->base, s, &tc_gc_del, &commit_update_del, isLocalTemp(t)?NULL:&log_update_del); + if (!isLocalTemp(t)) + tr->logchanges += cnt; + } return slot; } diff --git a/sql/storage/sql_storage.h b/sql/storage/sql_storage.h --- a/sql/storage/sql_storage.h +++ b/sql/storage/sql_storage.h @@ -248,7 +248,7 @@ typedef int (*logger_changes_fptr)(struc typedef int (*logger_get_sequence_fptr) (struct sqlstore *store, int seq, lng *id); typedef int (*log_isnew_fptr)(struct sqlstore *store); -typedef int (*log_tstart_fptr) (struct sqlstore *store, ulng commit_ts); +typedef int (*log_tstart_fptr) (struct sqlstore *store, ulng commit_ts, bool flush); typedef int (*log_tend_fptr) (struct sqlstore *store); typedef lng (*log_save_id_fptr) (struct sqlstore *store); typedef int (*log_sequence_fptr) (struct sqlstore *store, int seq, lng id); diff --git a/sql/storage/store.c b/sql/storage/store.c --- a/sql/storage/store.c +++ b/sql/storage/store.c @@ -3315,9 +3315,11 @@ sql_trans_commit(sql_trans *tr) } } if (tr->changes) { + int min_changes = GDKdebug & FORCEMITOMASK ? 5 : 100000; + int flush = (tr->logchanges > min_changes && !store->changes); /* log changes should only be done if there is something to log */ if (tr->logchanges > 0) { - ok = store->logger_api.log_tstart(store, commit_ts); + ok = store->logger_api.log_tstart(store, commit_ts, flush); /* log */ for(node *n=tr->changes->h; n && ok == LOG_OK; n = n->next) { sql_change *c = n->data; @@ -3329,7 +3331,7 @@ sql_trans_commit(sql_trans *tr) if (ok == LOG_OK && store->prev_oid != store->obj_id) ok = store->logger_api.log_sequence(store, OBJ_SID, store->obj_id); store->prev_oid = store->obj_id; - if (ok == LOG_OK) + if (ok == LOG_OK && !flush) ok = store->logger_api.log_tend(store); } tr->logchanges = 0; @@ -3342,6 +3344,9 @@ sql_trans_commit(sql_trans *tr) else c->obj->flags = 0; } + /* flush logger after changes got applied */ + if (ok == LOG_OK && flush) + ok = store->logger_api.log_tend(store); /* garbage collect */ for(node *n=tr->changes->h; n && ok == LOG_OK; ) { node *next = n->next; _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list