Changeset: 5941372b456c for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/5941372b456c
Modified Files:
        gdk/gdk_logger.c
        sql/backends/monet5/sql.c
        sql/storage/sql_storage.h
        sql/storage/store.c
Branch: Jul2021
Log Message:

add missing locks
cleanup some more


diffs (222 lines):

diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -2017,7 +2017,9 @@ logger_destroy(logger *lg)
        if (LOG_DISABLED(lg)) {
                lg->saved_id = lg->id;
                lg->saved_tid = lg->tid;
+               logger_lock(lg);
                logger_commit(lg);
+               logger_unlock(lg);
        }
        if (lg->catalog_bid) {
                logger_lock(lg);
@@ -2120,9 +2122,10 @@ logger_flush(logger *lg, ulng ts)
                lg->saved_tid = lg->tid;
                if (lid)
                        logger_cleanup_range(lg);
-               if (logger_commit(lg) != GDK_SUCCEED) {
+               logger_lock(lg);
+               if (logger_commit(lg) != GDK_SUCCEED)
                        TRC_ERROR(GDK, "failed to commit");
-               }
+               logger_unlock(lg);
                return GDK_SUCCEED;
        }
        if (lg->saved_id >= lid)
@@ -2547,7 +2550,10 @@ log_tend(logger *lg, ulng commit_ts)
        l.id = lg->tid;
        if (lg->flushnow) {
                lg->flushnow = 0;
-               return logger_commit(lg);
+               logger_lock(lg);
+               gdk_return res = logger_commit(lg);
+               logger_unlock(lg);
+               return res;
        }
 
        if (lg->current) {
diff --git a/sql/backends/monet5/sql.c b/sql/backends/monet5/sql.c
--- a/sql/backends/monet5/sql.c
+++ b/sql/backends/monet5/sql.c
@@ -1277,7 +1277,7 @@ mvc_bind_wrap(Client cntxt, MalBlkPtr mb
        sql_table *t = mvc_bind_table(m, s, tname);
        sql_column *c = mvc_bind_column(m, t, cname);
        b = mvc_bind(m, sname, tname, cname, access);
-       if (b && b->ttype != coltype) {
+       if (b && b->ttype && b->ttype != coltype) {
                BBPunfix(b->batCacheid);
                throw(SQL,"sql.bind",SQLSTATE(42000) "Column type mismatch");
        }
diff --git a/sql/storage/sql_storage.h b/sql/storage/sql_storage.h
--- a/sql/storage/sql_storage.h
+++ b/sql/storage/sql_storage.h
@@ -446,6 +446,7 @@ typedef struct sqlstore {
     ATOMIC_TYPE timestamp;     /* timestamp counter */
     ATOMIC_TYPE transaction;/* transaction id counter */
        ulng oldest;
+       ulng oldest_pending;
        int readonly;                   /* store is readonly */
        int singleuser;                 /* store is for a single user only (==1 
enable, ==2 single user session running) */
        int first;                              /* just created the db */
@@ -465,6 +466,7 @@ typedef struct sqlstore {
 
 typedef struct sql_change {
        sql_base *obj;
+       ulng ts;        /* commit/rollback timestamp */
        void *data;     /* data changes */
        bool committed; /* commit or rollback */
        bool handled;   /* handled in commit */
diff --git a/sql/storage/store.c b/sql/storage/store.c
--- a/sql/storage/store.c
+++ b/sql/storage/store.c
@@ -58,6 +58,13 @@ store_oldest(sqlstore *store)
        return store_oldest_given_max(store, TRANSACTION_ID_BASE);
 }
 
+static ulng
+store_oldest_pending(sqlstore *store)
+{
+       assert(store->oldest_pending != TRANSACTION_ID_BASE);
+       return store->oldest_pending;
+}
+
 static inline bool
 instore(sqlid id)
 {
@@ -1979,7 +1986,7 @@ store_apply_deltas(sqlstore *store)
        flusher.working = true;
 
        store_lock(store);
-       ulng oldest = store_oldest(store);
+       ulng oldest = store_oldest_pending(store);
        store_unlock(store);
        if (oldest)
            res = store->logger_api.flush(store, oldest-1);
@@ -1988,23 +1995,11 @@ store_apply_deltas(sqlstore *store)
 }
 
 
-/* Call while holding store->flush */
-static void
-wait_until_flusher_idle(sqlstore *store)
-{
-       while (flusher.working) {
-               const int sleeptime = 100;
-               MT_lock_unset(&store->lock);
-               MT_sleep_ms(sleeptime);
-               MT_lock_set(&store->lock);
-       }
-}
 void
 store_suspend_log(sqlstore *store)
 {
        MT_lock_set(&store->lock);
        flusher.enabled = false;
-       wait_until_flusher_idle(store);
        MT_lock_unset(&store->lock);
 }
 
@@ -2019,6 +2014,7 @@ store_resume_log(sqlstore *store)
 static void
 store_pending_changes(sqlstore *store, ulng oldest)
 {
+       ulng oldest_changes = TRANSACTION_ID_BASE;
        if (!list_empty(store->changes)) { /* lets first cleanup old stuff */
                for(node *n=store->changes->h; n; ) {
                        node *next = n->next;
@@ -2029,9 +2025,13 @@ store_pending_changes(sqlstore *store, u
                        } else if (c->cleanup && c->cleanup(store, c, oldest)) {
                                list_remove_node(store->changes, store, n);
                                _DELETE(c);
+                       } else if (c->ts < oldest_changes) {
+                               oldest_changes = c->ts;
                        }
                        n = next;
                }
+               if (oldest_changes < TRANSACTION_ID_BASE)
+                       store->oldest_pending = oldest_changes;
        }
 }
 
@@ -2046,25 +2046,24 @@ store_manager(sqlstore *store)
        for (;;) {
                int res;
 
-               if (store->logger_api.changes(store) <= 0) {
+               if (ATOMIC_GET(&store->nr_active) == 0) {
+                       store_lock(store);
                        if (ATOMIC_GET(&store->nr_active) == 0) {
-                               store_lock(store);
-                               if (ATOMIC_GET(&store->nr_active) == 0) {
-                                       ulng oldest = store_timestamp(store)+1;
-                                       store_pending_changes(store, oldest);
-                               }
-                               store->logger_api.activate(store); /* rotate 
too new log file */
-                               store_unlock(store);
+                               ulng oldest = store_timestamp(store)+1;
+                               store_pending_changes(store, oldest);
                        }
-                       if (GDKexiting())
-                               break;
-                       const int sleeptime = 100;
-                       MT_lock_unset(&store->flush);
-                       MT_sleep_ms(sleeptime);
-                       flusher.countdown_ms -= sleeptime;
-                       MT_lock_set(&store->flush);
+                       store->logger_api.activate(store); /* rotate too new 
log file */
+                       store_unlock(store);
+               }
+               if (GDKexiting())
+                       break;
+               const int sleeptime = 100;
+               MT_lock_unset(&store->flush);
+               MT_sleep_ms(sleeptime);
+               flusher.countdown_ms -= sleeptime;
+               MT_lock_set(&store->flush);
+               if (store->logger_api.changes(store) <= 0)
                        continue;
-               }
                if (GDKexiting())
                        break;
 
@@ -2413,7 +2412,6 @@ store_hot_snapshot_to_stream(sqlstore *s
        MT_lock_set(&store->flush);
        MT_lock_set(&store->lock);
        locked = 1;
-       wait_until_flusher_idle(store);
        if (GDKexiting())
                goto end;
 
@@ -3232,21 +3230,9 @@ sql_trans_rollback(sql_trans *tr)
 
                        if (c->commit)
                                c->commit(tr, c, commit_ts, oldest);
+                       c->ts = commit_ts;
                }
-               if (!list_empty(store->changes)) { /* lets first cleanup old 
stuff */
-                       for(node *n=store->changes->h; n; ) {
-                               node *next = n->next;
-                               sql_change *c = n->data;
-
-                               if (!c->cleanup) {
-                                       _DELETE(c);
-                               } else if (c->cleanup && c->cleanup(store, c, 
oldest)) {
-                                       list_remove_node(store->changes, store, 
n);
-                                       _DELETE(c);
-                               }
-                               n = next;
-                       }
-               }
+               store_pending_changes(store, oldest);
                for(node *n=nl->h; n; n = n->next) {
                        sql_change *c = n->data;
 
@@ -3435,6 +3421,7 @@ sql_trans_commit(sql_trans *tr)
                                ok = c->commit(tr, c, commit_ts, oldest);
                        else
                                c->obj->flags = 0;
+                       c->ts = commit_ts;
                }
                /* flush logger after changes got applied */
                if (ok == LOG_OK && flush)
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to