Changeset: fe31b031a6ef for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/fe31b031a6ef Modified Files: sql/storage/store.c Branch: group-commit Log Message:
Refactor sql_trans_commit. diffs (268 lines): diff --git a/sql/storage/store.c b/sql/storage/store.c --- a/sql/storage/store.c +++ b/sql/storage/store.c @@ -3926,11 +3926,16 @@ sql_trans_commit(sql_trans *tr) } /* log changes should only be done if there is something to log */ - if (!tr->parent && tr->logchanges > 0) { - int min_changes = GDKdebug & FORCEMITOMASK ? 5 : 1000000; - flush = (tr->logchanges > min_changes && list_empty(store->changes)); - if (flush) - MT_lock_set(&store->flush); + const bool log = !tr->parent && tr->logchanges > 0; + + if (log) { + const int min_changes = GDKdebug & FORCEMITOMASK ? 5 : 1000000; + flush = log && (tr->logchanges > min_changes && list_empty(store->changes)); + } + + if (flush) + MT_lock_set(&store->flush); + if (log) { ok = store->logger_api.log_tstart(store, flush, &log_file_id); /* wal start */ /* log */ for(node *n=tr->changes->h; n && ok == LOG_OK; n = n->next) { @@ -3953,178 +3958,75 @@ sql_trans_commit(sql_trans *tr) if (ok == LOG_OK && store->prev_oid != store->obj_id) ok = store->logger_api.log_sequence(store, OBJ_SID, store->obj_id); store->prev_oid = store->obj_id; - if (ok == LOG_OK && !flush) - ok = store->logger_api.log_tend(store); /* wal end */ - if (ok == LOG_OK && !flush) { - store_lock(store); - commit_ts = tr->parent ? tr->parent->tid : store_timestamp(store); - oldest = tr->parent ? commit_ts : store_oldest(store); - tr->logchanges = 0; - TRC_DEBUG(SQL_STORE, "Forwarding changes (" ULLFMT ", " ULLFMT ") -> " ULLFMT "\n", tr->tid, tr->ts, commit_ts); - /* apply committed changes */ - if (ATOMIC_GET(&store->nr_active) == 1 && !tr->parent) - oldest = commit_ts; - store_pending_changes(store, oldest); - for(node *n=tr->changes->h; n && ok == LOG_OK; n = n->next) { - sql_change *c = n->data; - - if (c->commit && ok == LOG_OK) - ok = c->commit(tr, c, commit_ts, oldest); - else - c->obj->new = 0; - c->ts = commit_ts; - } - /* propagate transaction dependencies to the storage only if other transactions are running */ - if (ok == LOG_OK && !tr->parent && ATOMIC_GET(&store->nr_active) > 1) { - if (!list_empty(tr->dependencies)) { - for (node *n = tr->dependencies->h; n && ok == LOG_OK; n = n->next) { - sql_dependency_change *lchange = (sql_dependency_change*) n->data; - ok = transaction_add_hash_entry(store->dependencies, lchange->objid, lchange->type, commit_ts); - } - } - if (!list_empty(tr->depchanges)) { - for (node *n = tr->depchanges->h; n && ok == LOG_OK; n = n->next) { - sql_dependency_change *lchange = (sql_dependency_change*) n->data; - ok = transaction_add_hash_entry(store->depchanges, lchange->objid, lchange->type, commit_ts); - } - } - } - /* garbage collect */ - for(node *n=tr->changes->h; n && ok == LOG_OK; ) { - node *next = n->next; - sql_change *c = n->data; - - if (!c->cleanup || c->cleanup(store, c, oldest)) { - _DELETE(c); - } else if (tr->parent) { /* need to keep everything */ - tr->parent->changes = sa_list_append(tr->sa, tr->parent->changes, c); - } else { - store->changes = sa_list_append(tr->sa, store->changes, c); - } - n = next; - } - store_unlock(store); - - MT_lock_unset(&store->commit); /* release the commit log when flushing to disk */ - ok = store->logger_api.log_tflush(store, log_file_id); /* flush/sync */ + + + if (!flush) { + if (ok == LOG_OK) + ok = store->logger_api.log_tend(store); /* wal end */ } - if (ok == LOG_OK && !flush) { /* mark as done */ - tr->ts = commit_ts; - ok = store->logger_api.log_tdone(store, commit_ts); - } - - } else { - store_lock(store); - commit_ts = tr->parent ? tr->parent->tid : store_timestamp(store); - if (tr->parent) - tr->parent->logchanges += tr->logchanges; - oldest = tr->parent ? commit_ts : store_oldest(store); - tr->logchanges = 0; - TRC_DEBUG(SQL_STORE, "Forwarding changes (" ULLFMT ", " ULLFMT ") -> " ULLFMT "\n", tr->tid, tr->ts, commit_ts); - /* apply committed changes */ - if (ATOMIC_GET(&store->nr_active) == 1 && !tr->parent) - oldest = commit_ts; - store_pending_changes(store, oldest); - for(node *n=tr->changes->h; n && ok == LOG_OK; n = n->next) { - sql_change *c = n->data; - - if (c->commit && ok == LOG_OK) - ok = c->commit(tr, c, commit_ts, oldest); - else - c->obj->new = 0; - c->ts = commit_ts; - } - /* propagate transaction dependencies to the storage only if other transactions are running */ - if (ok == LOG_OK && !tr->parent && ATOMIC_GET(&store->nr_active) > 1) { - if (!list_empty(tr->dependencies)) { - for (node *n = tr->dependencies->h; n && ok == LOG_OK; n = n->next) { - sql_dependency_change *lchange = (sql_dependency_change*) n->data; - ok = transaction_add_hash_entry(store->dependencies, lchange->objid, lchange->type, commit_ts); - } - } - if (!list_empty(tr->depchanges)) { - for (node *n = tr->depchanges->h; n && ok == LOG_OK; n = n->next) { - sql_dependency_change *lchange = (sql_dependency_change*) n->data; - ok = transaction_add_hash_entry(store->depchanges, lchange->objid, lchange->type, commit_ts); - } + } + store_lock(store); + commit_ts = tr->parent ? tr->parent->tid : store_timestamp(store); + if (tr->parent) + tr->parent->logchanges += tr->logchanges; + oldest = tr->parent ? commit_ts : store_oldest(store); + tr->logchanges = 0; + TRC_DEBUG(SQL_STORE, "Forwarding changes (" ULLFMT ", " ULLFMT ") -> " ULLFMT "\n", tr->tid, tr->ts, commit_ts); + /* apply committed changes */ + if (ATOMIC_GET(&store->nr_active) == 1 && !tr->parent) + oldest = commit_ts; + store_pending_changes(store, oldest); + for(node *n=tr->changes->h; n && ok == LOG_OK; n = n->next) { + sql_change *c = n->data; + + if (c->commit && ok == LOG_OK) + ok = c->commit(tr, c, commit_ts, oldest); + else + c->obj->new = 0; + c->ts = commit_ts; + } + /* propagate transaction dependencies to the storage only if other transactions are running */ + if (ok == LOG_OK && !tr->parent && ATOMIC_GET(&store->nr_active) > 1) { + if (!list_empty(tr->dependencies)) { + for (node *n = tr->dependencies->h; n && ok == LOG_OK; n = n->next) { + sql_dependency_change *lchange = (sql_dependency_change*) n->data; + ok = transaction_add_hash_entry(store->dependencies, lchange->objid, lchange->type, commit_ts); } } - /* garbage collect */ - for(node *n=tr->changes->h; n && ok == LOG_OK; ) { - node *next = n->next; - sql_change *c = n->data; - - if (!c->cleanup || c->cleanup(store, c, oldest)) { - _DELETE(c); - } else if (tr->parent) { /* need to keep everything */ - tr->parent->changes = sa_list_append(tr->sa, tr->parent->changes, c); - } else { - store->changes = sa_list_append(tr->sa, store->changes, c); - } - n = next; - } - tr->ts = commit_ts; - store_unlock(store); - } - /* when directly flushing: flush logger after changes got applied */ - if (flush) { - // log_tend not required anymore when flushing directly - if (ok == LOG_OK) { - store_lock(store); - commit_ts = tr->parent ? tr->parent->tid : store_timestamp(store); - oldest = tr->parent ? commit_ts : store_oldest(store); - tr->logchanges = 0; - TRC_DEBUG(SQL_STORE, "Forwarding changes (" ULLFMT ", " ULLFMT ") -> " ULLFMT "\n", tr->tid, tr->ts, commit_ts); - /* apply committed changes */ - if (ATOMIC_GET(&store->nr_active) == 1 && !tr->parent) - oldest = commit_ts; - store_pending_changes(store, oldest); - for(node *n=tr->changes->h; n && ok == LOG_OK; n = n->next) { - sql_change *c = n->data; - - if (c->commit && ok == LOG_OK) - ok = c->commit(tr, c, commit_ts, oldest); - else - c->obj->new = 0; - c->ts = commit_ts; - } - /* propagate transaction dependencies to the storage only if other transactions are running */ - if (ok == LOG_OK && !tr->parent && ATOMIC_GET(&store->nr_active) > 1) { - if (!list_empty(tr->dependencies)) { - for (node *n = tr->dependencies->h; n && ok == LOG_OK; n = n->next) { - sql_dependency_change *lchange = (sql_dependency_change*) n->data; - ok = transaction_add_hash_entry(store->dependencies, lchange->objid, lchange->type, commit_ts); - } - } - if (!list_empty(tr->depchanges)) { - for (node *n = tr->depchanges->h; n && ok == LOG_OK; n = n->next) { - sql_dependency_change *lchange = (sql_dependency_change*) n->data; - ok = transaction_add_hash_entry(store->depchanges, lchange->objid, lchange->type, commit_ts); - } - } - } - /* garbage collect */ - for(node *n=tr->changes->h; n && ok == LOG_OK; ) { - node *next = n->next; - sql_change *c = n->data; - - if (!c->cleanup || c->cleanup(store, c, oldest)) { - _DELETE(c); - } else if (tr->parent) { /* need to keep everything */ - tr->parent->changes = sa_list_append(tr->sa, tr->parent->changes, c); - } else { - store->changes = sa_list_append(tr->sa, store->changes, c); - } - n = next; - } - store_unlock(store); - ok = store->logger_api.log_tflush(store, log_file_id); /* flush/sync */ - if (ok == LOG_OK) { - tr->ts = commit_ts; - ok = store->logger_api.log_tdone(store, commit_ts); /* mark as done */ + if (!list_empty(tr->depchanges)) { + for (node *n = tr->depchanges->h; n && ok == LOG_OK; n = n->next) { + sql_dependency_change *lchange = (sql_dependency_change*) n->data; + ok = transaction_add_hash_entry(store->depchanges, lchange->objid, lchange->type, commit_ts); } } - MT_lock_unset(&store->flush); + } + /* garbage collect */ + for(node *n=tr->changes->h; n && ok == LOG_OK; ) { + node *next = n->next; + sql_change *c = n->data; + + if (!c->cleanup || c->cleanup(store, c, oldest)) { + _DELETE(c); + } else if (tr->parent) { /* need to keep everything */ + tr->parent->changes = sa_list_append(tr->sa, tr->parent->changes, c); + } else { + store->changes = sa_list_append(tr->sa, store->changes, c); + } + n = next; + } + tr->ts = commit_ts; + store_unlock(store); + /* flush the log structure */ + if (log) { + if (!flush) + MT_lock_unset(&store->commit); /* release the commit log when flushing to disk */ + if (ok == LOG_OK) + ok = store->logger_api.log_tflush(store, log_file_id); /* flush/sync */ + if (!flush) + MT_lock_set(&store->commit); /* release the commit log when flushing to disk */ + ok = store->logger_api.log_tdone(store, commit_ts); + if (flush) + MT_lock_unset(&store->flush); } MT_lock_unset(&store->commit); list_destroy(tr->changes); _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org