Changeset: 2cfa7590d478 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/2cfa7590d478 Modified Files: gdk/gdk_logger.c sql/storage/store.c Branch: group-commit Log Message:
Fix the non-WAL commit. diffs (206 lines): diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c --- a/gdk/gdk_logger.c +++ b/gdk/gdk_logger.c @@ -2724,26 +2724,21 @@ new_logfile(logger *lg) gdk_return log_tend(logger *lg) { - logformat l; - gdk_return result; + assert(!lg->flushnow); if (lg->debug & 1) fprintf(stderr, "#log_tend %d\n", lg->tid); - l.flag = LOG_END; - l.id = lg->tid; - if (lg->flushnow) { - lg->flushnow = 0; - if ((result = logger_commit(lg)) != GDK_SUCCEED && !LOG_DISABLED(lg)) - (void) ATOMIC_DEC(&lg->refcount); - return result; - } - lg->end++; if (LOG_DISABLED(lg)) { return GDK_SUCCEED; } + gdk_return result; + logformat l; + l.flag = LOG_END; + l.id = lg->tid; + if ((result = log_write_format(lg, &l)) != GDK_SUCCEED) (void) ATOMIC_DEC(&lg->refcount); return result; @@ -2800,6 +2795,11 @@ flush_queue_length(logger *lg) { gdk_return log_tflush(logger* lg, ulng log_file_id) { + if (lg->flushnow) { + lg->flushnow = 0; + return logger_commit(lg); + } + if (LOG_DISABLED(lg)) { return GDK_SUCCEED; } diff --git a/sql/storage/store.c b/sql/storage/store.c --- a/sql/storage/store.c +++ b/sql/storage/store.c @@ -3895,18 +3895,12 @@ transaction_check_dependencies_and_remov return ok; } - -static bool start_debug = 0; - int sql_trans_commit(sql_trans *tr) { int ok = LOG_OK; sqlstore *store = tr->store; - static int thread = 0; - int this_thread = 0; - if (!list_empty(tr->changes)) { int flush = 0; ulng commit_ts = 0, oldest = 0, log_file_id = 0; @@ -3962,11 +3956,6 @@ sql_trans_commit(sql_trans *tr) if (ok == LOG_OK && !flush) ok = store->logger_api.log_tend(store); /* wal end */ if (ok == LOG_OK && !flush) { - - if (start_debug) - { - this_thread = ++thread; - } store_lock(store); commit_ts = tr->parent ? tr->parent->tid : store_timestamp(store); oldest = tr->parent ? commit_ts : store_oldest(store); @@ -4017,12 +4006,6 @@ sql_trans_commit(sql_trans *tr) store_unlock(store); MT_lock_unset(&store->commit); /* release the commit log when flushing to disk */ - - if (this_thread == 1) { - bool wait = true; - while (wait) {sleep(1);} - wait = false; - } ok = store->logger_api.log_tflush(store, log_file_id); /* flush/sync */ } if (ok == LOG_OK && !flush) { /* mark as done */ @@ -4085,62 +4068,60 @@ sql_trans_commit(sql_trans *tr) } /* when directly flushing: flush logger after changes got applied */ if (flush) { + // log_tend not required anymore when flushing directly if (ok == LOG_OK) { - ok = store->logger_api.log_tend(store); /* wal end */ - if (ok == LOG_OK) { - store_lock(store); - commit_ts = tr->parent ? tr->parent->tid : store_timestamp(store); - oldest = tr->parent ? commit_ts : store_oldest(store); - tr->logchanges = 0; - TRC_DEBUG(SQL_STORE, "Forwarding changes (" ULLFMT ", " ULLFMT ") -> " ULLFMT "\n", tr->tid, tr->ts, commit_ts); - /* apply committed changes */ - if (ATOMIC_GET(&store->nr_active) == 1 && !tr->parent) - oldest = commit_ts; - store_pending_changes(store, oldest); - for(node *n=tr->changes->h; n && ok == LOG_OK; n = n->next) { - sql_change *c = n->data; - - if (c->commit && ok == LOG_OK) - ok = c->commit(tr, c, commit_ts, oldest); - else - c->obj->new = 0; - c->ts = commit_ts; - } - /* propagate transaction dependencies to the storage only if other transactions are running */ - if (ok == LOG_OK && !tr->parent && ATOMIC_GET(&store->nr_active) > 1) { - if (!list_empty(tr->dependencies)) { - for (node *n = tr->dependencies->h; n && ok == LOG_OK; n = n->next) { - sql_dependency_change *lchange = (sql_dependency_change*) n->data; - ok = transaction_add_hash_entry(store->dependencies, lchange->objid, lchange->type, commit_ts); - } - } - if (!list_empty(tr->depchanges)) { - for (node *n = tr->depchanges->h; n && ok == LOG_OK; n = n->next) { - sql_dependency_change *lchange = (sql_dependency_change*) n->data; - ok = transaction_add_hash_entry(store->depchanges, lchange->objid, lchange->type, commit_ts); - } + store_lock(store); + commit_ts = tr->parent ? tr->parent->tid : store_timestamp(store); + oldest = tr->parent ? commit_ts : store_oldest(store); + tr->logchanges = 0; + TRC_DEBUG(SQL_STORE, "Forwarding changes (" ULLFMT ", " ULLFMT ") -> " ULLFMT "\n", tr->tid, tr->ts, commit_ts); + /* apply committed changes */ + if (ATOMIC_GET(&store->nr_active) == 1 && !tr->parent) + oldest = commit_ts; + store_pending_changes(store, oldest); + for(node *n=tr->changes->h; n && ok == LOG_OK; n = n->next) { + sql_change *c = n->data; + + if (c->commit && ok == LOG_OK) + ok = c->commit(tr, c, commit_ts, oldest); + else + c->obj->new = 0; + c->ts = commit_ts; + } + /* propagate transaction dependencies to the storage only if other transactions are running */ + if (ok == LOG_OK && !tr->parent && ATOMIC_GET(&store->nr_active) > 1) { + if (!list_empty(tr->dependencies)) { + for (node *n = tr->dependencies->h; n && ok == LOG_OK; n = n->next) { + sql_dependency_change *lchange = (sql_dependency_change*) n->data; + ok = transaction_add_hash_entry(store->dependencies, lchange->objid, lchange->type, commit_ts); } } - /* garbage collect */ - for(node *n=tr->changes->h; n && ok == LOG_OK; ) { - node *next = n->next; - sql_change *c = n->data; - - if (!c->cleanup || c->cleanup(store, c, oldest)) { - _DELETE(c); - } else if (tr->parent) { /* need to keep everything */ - tr->parent->changes = sa_list_append(tr->sa, tr->parent->changes, c); - } else { - store->changes = sa_list_append(tr->sa, store->changes, c); + if (!list_empty(tr->depchanges)) { + for (node *n = tr->depchanges->h; n && ok == LOG_OK; n = n->next) { + sql_dependency_change *lchange = (sql_dependency_change*) n->data; + ok = transaction_add_hash_entry(store->depchanges, lchange->objid, lchange->type, commit_ts); } - n = next; } - store_unlock(store); - ok = store->logger_api.log_tflush(store, log_file_id); /* flush/sync */ - if (ok == LOG_OK) { - tr->ts = commit_ts; - ok = store->logger_api.log_tdone(store, commit_ts); /* mark as done */ + } + /* garbage collect */ + for(node *n=tr->changes->h; n && ok == LOG_OK; ) { + node *next = n->next; + sql_change *c = n->data; + + if (!c->cleanup || c->cleanup(store, c, oldest)) { + _DELETE(c); + } else if (tr->parent) { /* need to keep everything */ + tr->parent->changes = sa_list_append(tr->sa, tr->parent->changes, c); + } else { + store->changes = sa_list_append(tr->sa, store->changes, c); } + n = next; + } + store_unlock(store); + ok = store->logger_api.log_tflush(store, log_file_id); /* flush/sync */ + if (ok == LOG_OK) { + tr->ts = commit_ts; + ok = store->logger_api.log_tdone(store, commit_ts); /* mark as done */ } } MT_lock_unset(&store->flush); _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org