Changeset: fe31b031a6ef for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/fe31b031a6ef
Modified Files:
        sql/storage/store.c
Branch: group-commit
Log Message:

Refactor sql_trans_commit.


diffs (268 lines):

diff --git a/sql/storage/store.c b/sql/storage/store.c
--- a/sql/storage/store.c
+++ b/sql/storage/store.c
@@ -3926,11 +3926,16 @@ sql_trans_commit(sql_trans *tr)
                }
 
                /* log changes should only be done if there is something to log 
*/
-               if (!tr->parent && tr->logchanges > 0) {
-                       int min_changes = GDKdebug & FORCEMITOMASK ? 5 : 
1000000;
-                       flush = (tr->logchanges > min_changes && 
list_empty(store->changes));
-                       if (flush)
-                               MT_lock_set(&store->flush);
+               const bool log = !tr->parent && tr->logchanges > 0;
+
+               if (log) {
+                       const int min_changes = GDKdebug & FORCEMITOMASK ? 5 : 
1000000;
+                       flush = log && (tr->logchanges > min_changes && 
list_empty(store->changes));
+               }
+
+               if (flush)
+                       MT_lock_set(&store->flush);
+               if (log) {
                        ok = store->logger_api.log_tstart(store, flush, 
&log_file_id); /* wal start */
                        /* log */
                        for(node *n=tr->changes->h; n && ok == LOG_OK; n = 
n->next) {
@@ -3953,178 +3958,75 @@ sql_trans_commit(sql_trans *tr)
                        if (ok == LOG_OK && store->prev_oid != store->obj_id)
                                ok = store->logger_api.log_sequence(store, 
OBJ_SID, store->obj_id);
                        store->prev_oid = store->obj_id;
-                       if (ok == LOG_OK && !flush)
-                               ok = store->logger_api.log_tend(store); /* wal 
end */
-                       if (ok == LOG_OK && !flush) {
-                               store_lock(store);
-                               commit_ts = tr->parent ? tr->parent->tid : 
store_timestamp(store);
-                               oldest = tr->parent ? commit_ts : 
store_oldest(store);
-                               tr->logchanges = 0;
-                               TRC_DEBUG(SQL_STORE, "Forwarding changes (" 
ULLFMT ", " ULLFMT ") -> " ULLFMT "\n", tr->tid, tr->ts, commit_ts);
-                               /* apply committed changes */
-                               if (ATOMIC_GET(&store->nr_active) == 1 && 
!tr->parent)
-                                       oldest = commit_ts;
-                               store_pending_changes(store, oldest);
-                               for(node *n=tr->changes->h; n && ok == LOG_OK; 
n = n->next) {
-                                       sql_change *c = n->data;
-
-                                       if (c->commit && ok == LOG_OK)
-                                               ok = c->commit(tr, c, 
commit_ts, oldest);
-                                       else
-                                               c->obj->new = 0;
-                                       c->ts = commit_ts;
-                               }
-                               /* propagate transaction dependencies to the 
storage only if other transactions are running */
-                               if (ok == LOG_OK && !tr->parent && 
ATOMIC_GET(&store->nr_active) > 1) {
-                                       if (!list_empty(tr->dependencies)) {
-                                               for (node *n = 
tr->dependencies->h; n && ok == LOG_OK; n = n->next) {
-                                                       sql_dependency_change 
*lchange = (sql_dependency_change*) n->data;
-                                                       ok = 
transaction_add_hash_entry(store->dependencies, lchange->objid, lchange->type, 
commit_ts);
-                                               }
-                                       }
-                                       if (!list_empty(tr->depchanges)) {
-                                               for (node *n = 
tr->depchanges->h; n && ok == LOG_OK; n = n->next) {
-                                                       sql_dependency_change 
*lchange = (sql_dependency_change*) n->data;
-                                                       ok = 
transaction_add_hash_entry(store->depchanges, lchange->objid, lchange->type, 
commit_ts);
-                                               }
-                                       }
-                               }
-                               /* garbage collect */
-                               for(node *n=tr->changes->h; n && ok == LOG_OK; 
) {
-                                       node *next = n->next;
-                                       sql_change *c = n->data;
-
-                                       if (!c->cleanup || c->cleanup(store, c, 
oldest)) {
-                                               _DELETE(c);
-                                       } else if (tr->parent) { /* need to 
keep everything */
-                                               tr->parent->changes = 
sa_list_append(tr->sa, tr->parent->changes, c);
-                                       } else {
-                                               store->changes = 
sa_list_append(tr->sa, store->changes, c);
-                                       }
-                                       n = next;
-                               }
-                               store_unlock(store);
-
-                               MT_lock_unset(&store->commit); /* release the 
commit log when flushing to disk */
-                               ok = store->logger_api.log_tflush(store, 
log_file_id); /* flush/sync */
+
+
+                       if (!flush) {
+                               if (ok == LOG_OK)
+                                       ok = store->logger_api.log_tend(store); 
/* wal end */
                        }
-                       if (ok == LOG_OK && !flush) { /* mark as done */
-                               tr->ts = commit_ts;
-                               ok = store->logger_api.log_tdone(store, 
commit_ts);
-                       }
-
-               } else {
-                       store_lock(store);
-                       commit_ts = tr->parent ? tr->parent->tid : 
store_timestamp(store);
-                       if (tr->parent)
-                               tr->parent->logchanges += tr->logchanges;
-                       oldest = tr->parent ? commit_ts : store_oldest(store);
-                       tr->logchanges = 0;
-                       TRC_DEBUG(SQL_STORE, "Forwarding changes (" ULLFMT ", " 
ULLFMT ") -> " ULLFMT "\n", tr->tid, tr->ts, commit_ts);
-                       /* apply committed changes */
-                       if (ATOMIC_GET(&store->nr_active) == 1 && !tr->parent)
-                               oldest = commit_ts;
-                       store_pending_changes(store, oldest);
-                       for(node *n=tr->changes->h; n && ok == LOG_OK; n = 
n->next) {
-                               sql_change *c = n->data;
-
-                               if (c->commit && ok == LOG_OK)
-                                       ok = c->commit(tr, c, commit_ts, 
oldest);
-                               else
-                                       c->obj->new = 0;
-                               c->ts = commit_ts;
-                       }
-                       /* propagate transaction dependencies to the storage 
only if other transactions are running */
-                       if (ok == LOG_OK && !tr->parent && 
ATOMIC_GET(&store->nr_active) > 1) {
-                               if (!list_empty(tr->dependencies)) {
-                                       for (node *n = tr->dependencies->h; n 
&& ok == LOG_OK; n = n->next) {
-                                               sql_dependency_change *lchange 
= (sql_dependency_change*) n->data;
-                                               ok = 
transaction_add_hash_entry(store->dependencies, lchange->objid, lchange->type, 
commit_ts);
-                                       }
-                               }
-                               if (!list_empty(tr->depchanges)) {
-                                       for (node *n = tr->depchanges->h; n && 
ok == LOG_OK; n = n->next) {
-                                               sql_dependency_change *lchange 
= (sql_dependency_change*) n->data;
-                                               ok = 
transaction_add_hash_entry(store->depchanges, lchange->objid, lchange->type, 
commit_ts);
-                                       }
+               }
+               store_lock(store);
+               commit_ts = tr->parent ? tr->parent->tid : 
store_timestamp(store);
+               if (tr->parent)
+                       tr->parent->logchanges += tr->logchanges;
+               oldest = tr->parent ? commit_ts : store_oldest(store);
+               tr->logchanges = 0;
+               TRC_DEBUG(SQL_STORE, "Forwarding changes (" ULLFMT ", " ULLFMT 
") -> " ULLFMT "\n", tr->tid, tr->ts, commit_ts);
+               /* apply committed changes */
+               if (ATOMIC_GET(&store->nr_active) == 1 && !tr->parent)
+                       oldest = commit_ts;
+               store_pending_changes(store, oldest);
+               for(node *n=tr->changes->h; n && ok == LOG_OK; n = n->next) {
+                       sql_change *c = n->data;
+
+                       if (c->commit && ok == LOG_OK)
+                               ok = c->commit(tr, c, commit_ts, oldest);
+                       else
+                               c->obj->new = 0;
+                       c->ts = commit_ts;
+               }
+               /* propagate transaction dependencies to the storage only if 
other transactions are running */
+               if (ok == LOG_OK && !tr->parent && 
ATOMIC_GET(&store->nr_active) > 1) {
+                       if (!list_empty(tr->dependencies)) {
+                               for (node *n = tr->dependencies->h; n && ok == 
LOG_OK; n = n->next) {
+                                       sql_dependency_change *lchange = 
(sql_dependency_change*) n->data;
+                                       ok = 
transaction_add_hash_entry(store->dependencies, lchange->objid, lchange->type, 
commit_ts);
                                }
                        }
-                       /* garbage collect */
-                       for(node *n=tr->changes->h; n && ok == LOG_OK; ) {
-                               node *next = n->next;
-                               sql_change *c = n->data;
-
-                               if (!c->cleanup || c->cleanup(store, c, 
oldest)) {
-                                       _DELETE(c);
-                               } else if (tr->parent) { /* need to keep 
everything */
-                                       tr->parent->changes = 
sa_list_append(tr->sa, tr->parent->changes, c);
-                               } else {
-                                       store->changes = sa_list_append(tr->sa, 
store->changes, c);
-                               }
-                               n = next;
-                       }
-                       tr->ts = commit_ts;
-                       store_unlock(store);
-               }
-               /* when directly flushing: flush logger after changes got 
applied */
-               if (flush) {
-                       // log_tend not required anymore when flushing directly
-                       if (ok == LOG_OK) {
-                               store_lock(store);
-                               commit_ts = tr->parent ? tr->parent->tid : 
store_timestamp(store);
-                               oldest = tr->parent ? commit_ts : 
store_oldest(store);
-                               tr->logchanges = 0;
-                               TRC_DEBUG(SQL_STORE, "Forwarding changes (" 
ULLFMT ", " ULLFMT ") -> " ULLFMT "\n", tr->tid, tr->ts, commit_ts);
-                               /* apply committed changes */
-                               if (ATOMIC_GET(&store->nr_active) == 1 && 
!tr->parent)
-                                       oldest = commit_ts;
-                               store_pending_changes(store, oldest);
-                               for(node *n=tr->changes->h; n && ok == LOG_OK; 
n = n->next) {
-                                       sql_change *c = n->data;
-
-                                       if (c->commit && ok == LOG_OK)
-                                               ok = c->commit(tr, c, 
commit_ts, oldest);
-                                       else
-                                               c->obj->new = 0;
-                                       c->ts = commit_ts;
-                               }
-                               /* propagate transaction dependencies to the 
storage only if other transactions are running */
-                               if (ok == LOG_OK && !tr->parent && 
ATOMIC_GET(&store->nr_active) > 1) {
-                                       if (!list_empty(tr->dependencies)) {
-                                               for (node *n = 
tr->dependencies->h; n && ok == LOG_OK; n = n->next) {
-                                                       sql_dependency_change 
*lchange = (sql_dependency_change*) n->data;
-                                                       ok = 
transaction_add_hash_entry(store->dependencies, lchange->objid, lchange->type, 
commit_ts);
-                                               }
-                                       }
-                                       if (!list_empty(tr->depchanges)) {
-                                               for (node *n = 
tr->depchanges->h; n && ok == LOG_OK; n = n->next) {
-                                                       sql_dependency_change 
*lchange = (sql_dependency_change*) n->data;
-                                                       ok = 
transaction_add_hash_entry(store->depchanges, lchange->objid, lchange->type, 
commit_ts);
-                                               }
-                                       }
-                               }
-                               /* garbage collect */
-                               for(node *n=tr->changes->h; n && ok == LOG_OK; 
) {
-                                       node *next = n->next;
-                                       sql_change *c = n->data;
-
-                                       if (!c->cleanup || c->cleanup(store, c, 
oldest)) {
-                                               _DELETE(c);
-                                       } else if (tr->parent) { /* need to 
keep everything */
-                                               tr->parent->changes = 
sa_list_append(tr->sa, tr->parent->changes, c);
-                                       } else {
-                                               store->changes = 
sa_list_append(tr->sa, store->changes, c);
-                                       }
-                                       n = next;
-                               }
-                               store_unlock(store);
-                               ok = store->logger_api.log_tflush(store, 
log_file_id); /* flush/sync */
-                               if (ok == LOG_OK) {
-                                       tr->ts = commit_ts;
-                                       ok = store->logger_api.log_tdone(store, 
commit_ts); /* mark as done */
+                       if (!list_empty(tr->depchanges)) {
+                               for (node *n = tr->depchanges->h; n && ok == 
LOG_OK; n = n->next) {
+                                       sql_dependency_change *lchange = 
(sql_dependency_change*) n->data;
+                                       ok = 
transaction_add_hash_entry(store->depchanges, lchange->objid, lchange->type, 
commit_ts);
                                }
                        }
-                       MT_lock_unset(&store->flush);
+               }
+               /* garbage collect */
+               for(node *n=tr->changes->h; n && ok == LOG_OK; ) {
+                       node *next = n->next;
+                       sql_change *c = n->data;
+
+                       if (!c->cleanup || c->cleanup(store, c, oldest)) {
+                               _DELETE(c);
+                       } else if (tr->parent) { /* need to keep everything */
+                               tr->parent->changes = sa_list_append(tr->sa, 
tr->parent->changes, c);
+                       } else {
+                               store->changes = sa_list_append(tr->sa, 
store->changes, c);
+                       }
+                       n = next;
+               }
+               tr->ts = commit_ts;
+               store_unlock(store);
+               /* flush the log structure */
+               if (log) {
+                       if (!flush)
+                               MT_lock_unset(&store->commit); /* release the 
commit log when flushing to disk */
+                       if (ok == LOG_OK)
+                               ok = store->logger_api.log_tflush(store, 
log_file_id); /* flush/sync */
+                       if (!flush)
+                               MT_lock_set(&store->commit); /* release the 
commit log when flushing to disk */
+                       ok = store->logger_api.log_tdone(store, commit_ts);
+                       if (flush)
+                               MT_lock_unset(&store->flush);
                }
                MT_lock_unset(&store->commit);
                list_destroy(tr->changes);
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to