Changeset: 2cfa7590d478 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/2cfa7590d478
Modified Files:
        gdk/gdk_logger.c
        sql/storage/store.c
Branch: group-commit
Log Message:

Fix the non-WAL commit.


diffs (206 lines):

diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -2724,26 +2724,21 @@ new_logfile(logger *lg)
 gdk_return
 log_tend(logger *lg)
 {
-       logformat l;
-       gdk_return result;
+       assert(!lg->flushnow);
 
        if (lg->debug & 1)
                fprintf(stderr, "#log_tend %d\n", lg->tid);
 
-       l.flag = LOG_END;
-       l.id = lg->tid;
-       if (lg->flushnow) {
-               lg->flushnow = 0;
-               if ((result = logger_commit(lg)) != GDK_SUCCEED && 
!LOG_DISABLED(lg))
-                       (void) ATOMIC_DEC(&lg->refcount);
-               return result;
-       }
-
        lg->end++;
        if (LOG_DISABLED(lg)) {
                return GDK_SUCCEED;
        }
 
+       gdk_return result;
+       logformat l;
+       l.flag = LOG_END;
+       l.id = lg->tid;
+
        if ((result = log_write_format(lg, &l)) != GDK_SUCCEED)
                (void) ATOMIC_DEC(&lg->refcount);
        return result;
@@ -2800,6 +2795,11 @@ flush_queue_length(logger *lg) {
 gdk_return
 log_tflush(logger* lg, ulng log_file_id) {
 
+       if (lg->flushnow) {
+               lg->flushnow = 0;
+               return logger_commit(lg);
+       }
+
        if (LOG_DISABLED(lg)) {
                return GDK_SUCCEED;
        }
diff --git a/sql/storage/store.c b/sql/storage/store.c
--- a/sql/storage/store.c
+++ b/sql/storage/store.c
@@ -3895,18 +3895,12 @@ transaction_check_dependencies_and_remov
        return ok;
 }
 
-
-static bool start_debug = 0;
-
 int
 sql_trans_commit(sql_trans *tr)
 {
        int ok = LOG_OK;
        sqlstore *store = tr->store;
 
-       static int thread = 0;
-       int this_thread = 0;
-
        if (!list_empty(tr->changes)) {
                int flush = 0;
                ulng commit_ts = 0, oldest = 0, log_file_id = 0;
@@ -3962,11 +3956,6 @@ sql_trans_commit(sql_trans *tr)
                        if (ok == LOG_OK && !flush)
                                ok = store->logger_api.log_tend(store); /* wal 
end */
                        if (ok == LOG_OK && !flush) {
-
-                               if (start_debug)
-                               {
-                                       this_thread = ++thread;
-                               }
                                store_lock(store);
                                commit_ts = tr->parent ? tr->parent->tid : 
store_timestamp(store);
                                oldest = tr->parent ? commit_ts : 
store_oldest(store);
@@ -4017,12 +4006,6 @@ sql_trans_commit(sql_trans *tr)
                                store_unlock(store);
 
                                MT_lock_unset(&store->commit); /* release the 
commit log when flushing to disk */
-
-                               if (this_thread == 1) {
-                                       bool wait = true;
-                                       while (wait) {sleep(1);}
-                                       wait = false;
-                               }
                                ok = store->logger_api.log_tflush(store, 
log_file_id); /* flush/sync */
                        }
                        if (ok == LOG_OK && !flush) { /* mark as done */
@@ -4085,62 +4068,60 @@ sql_trans_commit(sql_trans *tr)
                }
                /* when directly flushing: flush logger after changes got 
applied */
                if (flush) {
+                       // log_tend not required anymore when flushing directly
                        if (ok == LOG_OK) {
-                               ok = store->logger_api.log_tend(store); /* wal 
end */
-                               if (ok == LOG_OK) {
-                                       store_lock(store);
-                                       commit_ts = tr->parent ? 
tr->parent->tid : store_timestamp(store);
-                                       oldest = tr->parent ? commit_ts : 
store_oldest(store);
-                                       tr->logchanges = 0;
-                                       TRC_DEBUG(SQL_STORE, "Forwarding 
changes (" ULLFMT ", " ULLFMT ") -> " ULLFMT "\n", tr->tid, tr->ts, commit_ts);
-                                       /* apply committed changes */
-                                       if (ATOMIC_GET(&store->nr_active) == 1 
&& !tr->parent)
-                                               oldest = commit_ts;
-                                       store_pending_changes(store, oldest);
-                                       for(node *n=tr->changes->h; n && ok == 
LOG_OK; n = n->next) {
-                                               sql_change *c = n->data;
-
-                                               if (c->commit && ok == LOG_OK)
-                                                       ok = c->commit(tr, c, 
commit_ts, oldest);
-                                               else
-                                                       c->obj->new = 0;
-                                               c->ts = commit_ts;
-                                       }
-                                       /* propagate transaction dependencies 
to the storage only if other transactions are running */
-                                       if (ok == LOG_OK && !tr->parent && 
ATOMIC_GET(&store->nr_active) > 1) {
-                                               if 
(!list_empty(tr->dependencies)) {
-                                                       for (node *n = 
tr->dependencies->h; n && ok == LOG_OK; n = n->next) {
-                                                               
sql_dependency_change *lchange = (sql_dependency_change*) n->data;
-                                                               ok = 
transaction_add_hash_entry(store->dependencies, lchange->objid, lchange->type, 
commit_ts);
-                                                       }
-                                               }
-                                               if 
(!list_empty(tr->depchanges)) {
-                                                       for (node *n = 
tr->depchanges->h; n && ok == LOG_OK; n = n->next) {
-                                                               
sql_dependency_change *lchange = (sql_dependency_change*) n->data;
-                                                               ok = 
transaction_add_hash_entry(store->depchanges, lchange->objid, lchange->type, 
commit_ts);
-                                                       }
+                               store_lock(store);
+                               commit_ts = tr->parent ? tr->parent->tid : 
store_timestamp(store);
+                               oldest = tr->parent ? commit_ts : 
store_oldest(store);
+                               tr->logchanges = 0;
+                               TRC_DEBUG(SQL_STORE, "Forwarding changes (" 
ULLFMT ", " ULLFMT ") -> " ULLFMT "\n", tr->tid, tr->ts, commit_ts);
+                               /* apply committed changes */
+                               if (ATOMIC_GET(&store->nr_active) == 1 && 
!tr->parent)
+                                       oldest = commit_ts;
+                               store_pending_changes(store, oldest);
+                               for(node *n=tr->changes->h; n && ok == LOG_OK; 
n = n->next) {
+                                       sql_change *c = n->data;
+
+                                       if (c->commit && ok == LOG_OK)
+                                               ok = c->commit(tr, c, 
commit_ts, oldest);
+                                       else
+                                               c->obj->new = 0;
+                                       c->ts = commit_ts;
+                               }
+                               /* propagate transaction dependencies to the 
storage only if other transactions are running */
+                               if (ok == LOG_OK && !tr->parent && 
ATOMIC_GET(&store->nr_active) > 1) {
+                                       if (!list_empty(tr->dependencies)) {
+                                               for (node *n = 
tr->dependencies->h; n && ok == LOG_OK; n = n->next) {
+                                                       sql_dependency_change 
*lchange = (sql_dependency_change*) n->data;
+                                                       ok = 
transaction_add_hash_entry(store->dependencies, lchange->objid, lchange->type, 
commit_ts);
                                                }
                                        }
-                                       /* garbage collect */
-                                       for(node *n=tr->changes->h; n && ok == 
LOG_OK; ) {
-                                               node *next = n->next;
-                                               sql_change *c = n->data;
-
-                                               if (!c->cleanup || 
c->cleanup(store, c, oldest)) {
-                                                       _DELETE(c);
-                                               } else if (tr->parent) { /* 
need to keep everything */
-                                                       tr->parent->changes = 
sa_list_append(tr->sa, tr->parent->changes, c);
-                                               } else {
-                                                       store->changes = 
sa_list_append(tr->sa, store->changes, c);
+                                       if (!list_empty(tr->depchanges)) {
+                                               for (node *n = 
tr->depchanges->h; n && ok == LOG_OK; n = n->next) {
+                                                       sql_dependency_change 
*lchange = (sql_dependency_change*) n->data;
+                                                       ok = 
transaction_add_hash_entry(store->depchanges, lchange->objid, lchange->type, 
commit_ts);
                                                }
-                                               n = next;
                                        }
-                                       store_unlock(store);
-                                       ok = 
store->logger_api.log_tflush(store, log_file_id); /* flush/sync */
-                                       if (ok == LOG_OK) {
-                                               tr->ts = commit_ts;
-                                               ok = 
store->logger_api.log_tdone(store, commit_ts); /* mark as done */
+                               }
+                               /* garbage collect */
+                               for(node *n=tr->changes->h; n && ok == LOG_OK; 
) {
+                                       node *next = n->next;
+                                       sql_change *c = n->data;
+
+                                       if (!c->cleanup || c->cleanup(store, c, 
oldest)) {
+                                               _DELETE(c);
+                                       } else if (tr->parent) { /* need to 
keep everything */
+                                               tr->parent->changes = 
sa_list_append(tr->sa, tr->parent->changes, c);
+                                       } else {
+                                               store->changes = 
sa_list_append(tr->sa, store->changes, c);
                                        }
+                                       n = next;
+                               }
+                               store_unlock(store);
+                               ok = store->logger_api.log_tflush(store, 
log_file_id); /* flush/sync */
+                               if (ok == LOG_OK) {
+                                       tr->ts = commit_ts;
+                                       ok = store->logger_api.log_tdone(store, 
commit_ts); /* mark as done */
                                }
                        }
                        MT_lock_unset(&store->flush);
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to