Changeset: be6f3028804b for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/be6f3028804b
Modified Files:
        gdk/gdk_logger.c
        gdk/gdk_logger_internals.h
Branch: pax-log
Log Message:

Factor out log_queue implementation for reuse.


diffs (245 lines):

diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -1845,6 +1845,69 @@ log_cleanup(logger *lg, lng id)
        return GDK_SUCCEED;
 }
 
+static void
+log_queue_destroy(log_queue* q) {
+       MT_sema_destroy(&q->sema);
+       MT_lock_destroy(&q->queue_lock);
+}
+
+static void
+log_queue_initialize(log_queue* q, const char* sema_name, const char* 
queue_lock_name) {
+       q->begin = 0;
+       q->length = 0;
+       MT_sema_init(&q->sema, LOG_QUEUE_SIZE, sema_name);
+       MT_lock_init(&q->queue_lock, queue_lock_name);
+}
+
+static int
+log_queue_length(log_queue* q) {
+       MT_lock_set(&q->queue_lock);
+       const int fql = q->length;
+       MT_lock_unset(&q->queue_lock);
+       return fql;
+}
+
+static unsigned int
+log_queue_request_number(log_queue* q) {
+       // Semaphore protects ring buffer structure in queue against overflowing
+       static unsigned int _number = 0;
+       unsigned int result;
+       MT_sema_down(&q->sema);
+       MT_lock_set(&q->queue_lock);
+       result = ++_number;
+       const int end = (q->begin + q->length) % LOG_QUEUE_SIZE;
+       q->queue[end] = _number;
+       q->length++;
+       MT_lock_unset(&q->queue_lock);
+
+       return result;
+}
+
+static void
+log_queue_truncate_left(log_queue* q, int limit) {
+       MT_lock_set(&q->queue_lock);
+       q->begin = (q->begin + limit) % LOG_QUEUE_SIZE;
+       q->length -= limit;
+       MT_lock_unset(&q->queue_lock);
+
+       for (int i = 0; i < limit; i++)
+               MT_sema_up(&q->sema);
+}
+
+static int
+log_queue_has_number(log_queue* q, unsigned int number) {
+       MT_lock_set(&q->queue_lock);
+       const int fql = q->length;
+       MT_lock_unset(&q->queue_lock);
+       for (int i = 0; i < fql; i++) {
+               const int idx = (q->begin + i) % LOG_QUEUE_SIZE;
+               if (q->queue[idx] == number) {
+                       return 1;
+               }
+       }
+       return 0;
+}
+
 /* Load data from the logger logdir
  * Initialize new directories and catalog files if none are present,
  * unless running in read-only mode
@@ -2133,9 +2196,8 @@ log_load(int debug, const char *fn, cons
        ATOMIC_DESTROY(&lg->refcount);
        MT_lock_destroy(&lg->lock);
        MT_lock_destroy(&lg->rotation_lock);
-       MT_sema_destroy(&lg->flush_queue_semaphore);
        MT_lock_destroy(&lg->flush_lock);
-       MT_lock_destroy(&lg->flush_queue_lock);
+       log_queue_destroy(&lg->flush_queue);
        GDKfree(lg->fn);
        GDKfree(lg->dir);
        GDKfree(lg->local_dir);
@@ -2202,13 +2264,9 @@ log_new(int debug, const char *fn, const
        ATOMIC_INIT(&lg->refcount, 0);
        MT_lock_init(&lg->lock, fn);
        MT_lock_init(&lg->rotation_lock, "rotation_lock");
-       MT_sema_init(&lg->flush_queue_semaphore, FLUSH_QUEUE_SIZE, 
"flush_queue_semaphore");
        MT_lock_init(&lg->flush_lock, "flush_lock");
-       MT_lock_init(&lg->flush_queue_lock, "flush_queue_lock");
-
-       // flush variables
-       lg->flush_queue_begin = 0;
-       lg->flush_queue_length = 0;
+
+       log_queue_initialize(&lg->flush_queue, "flush_queue_semaphore", 
"flush_queue_lock");
 
        if (log_load(debug, fn, logdir, lg, filename) == GDK_SUCCEED) {
                return lg;
@@ -2256,9 +2314,8 @@ log_destroy(logger *lg)
        ATOMIC_DESTROY(&lg->refcount);
        MT_lock_destroy(&lg->lock);
        MT_lock_destroy(&lg->rotation_lock);
-       MT_sema_destroy(&lg->flush_queue_semaphore);
        MT_lock_destroy(&lg->flush_lock);
-       MT_lock_destroy(&lg->flush_queue_lock);
+       log_queue_destroy(&lg->flush_queue);
        GDKfree(lg->fn);
        GDKfree(lg->dir);
        GDKfree(lg->buf);
@@ -2924,55 +2981,6 @@ log_tcommit(logger *lg, ulng commit_ts)
        return GDK_SUCCEED;
 }
 
-static unsigned int
-request_number_flush_queue(logger *lg) {
-       // Semaphore protects ring buffer structure in queue against overflowing
-       static unsigned int _number = 0;
-       unsigned int result;
-       MT_sema_down(&lg->flush_queue_semaphore);
-       MT_lock_set(&lg->flush_queue_lock);
-       result = ++_number;
-       const int end = (lg->flush_queue_begin + lg->flush_queue_length) % 
FLUSH_QUEUE_SIZE;
-       lg->flush_queue[end] = _number;
-       lg->flush_queue_length++;
-       MT_lock_unset(&lg->flush_queue_lock);
-
-       return result;
-}
-
-static void
-left_truncate_flush_queue(logger *lg, int limit) {
-       MT_lock_set(&lg->flush_queue_lock);
-       lg->flush_queue_begin = (lg->flush_queue_begin + limit) % 
FLUSH_QUEUE_SIZE;
-       lg->flush_queue_length -= limit;
-       MT_lock_unset(&lg->flush_queue_lock);
-
-       for (int i = 0; i < limit; i++)
-               MT_sema_up(&lg->flush_queue_semaphore);
-}
-
-static int
-number_in_flush_queue(logger *lg, unsigned int number) {
-       MT_lock_set(&lg->flush_queue_lock);
-       const int fql = lg->flush_queue_length;
-       MT_lock_unset(&lg->flush_queue_lock);
-       for (int i = 0; i < fql; i++) {
-               const int idx = (lg->flush_queue_begin + i) % FLUSH_QUEUE_SIZE;
-               if (lg->flush_queue[idx] == number) {
-                       return 1;
-               }
-       }
-       return 0;
-}
-
-static int
-flush_queue_length(logger *lg) {
-       MT_lock_set(&lg->flush_queue_lock);
-       const int fql = lg->flush_queue_length;
-       MT_lock_unset(&lg->flush_queue_lock);
-       return fql;
-}
-
 gdk_return
 log_tflush(logger* lg, ulng log_file_id, ulng commit_ts) {
 
@@ -2991,13 +2999,14 @@ log_tflush(logger* lg, ulng log_file_id,
        MT_lock_set(&lg->rotation_lock);
        if (log_file_id == lg->id) { // TODO: introduce lg->flushed and get rid 
of log_file_id in signature
                MT_lock_unset(&lg->rotation_lock);
-               unsigned int number = request_number_flush_queue(lg);
+               log_queue* flush_queue = &lg->flush_queue;
+               unsigned int number = log_queue_request_number(flush_queue);
 
                MT_lock_set(&lg->flush_lock);
                /* the transaction is not yet flushed */
-               if (number_in_flush_queue(lg, number)) {
+               if (log_queue_has_number(flush_queue, number)) {
                        /* number of transactions in the group commit */
-                       const int fqueue_length = flush_queue_length(lg);
+                       const int fqueue_length = log_queue_length(flush_queue);
                        /* flush + fsync */
                        if (mnstr_flush(lg->output_log, MNSTR_FLUSH_DATA) ||
                                        (!(GDKdebug & NOSYNCMASK) && 
mnstr_fsync(lg->output_log)) ||
@@ -3008,7 +3017,7 @@ log_tflush(logger* lg, ulng log_file_id,
                        }
                        else {
                                /* flush succeeded */
-                               left_truncate_flush_queue(lg, fqueue_length);
+                               log_queue_truncate_left(flush_queue, 
fqueue_length);
                        }
                }
                /* else the transaction was already flushed in a group commit.
diff --git a/gdk/gdk_logger_internals.h b/gdk/gdk_logger_internals.h
--- a/gdk/gdk_logger_internals.h
+++ b/gdk/gdk_logger_internals.h
@@ -9,7 +9,7 @@
 #ifndef _LOGGER_INTERNALS_H_
 #define _LOGGER_INTERNALS_H_
 
-#define FLUSH_QUEUE_SIZE 2048 /* maximum size of the flush queue, i.e. maximum 
number of transactions committing simultaneously */
+#define LOG_QUEUE_SIZE 2048 /* maximum size of the flush queue, i.e. maximum 
number of transactions committing simultaneously */
 
 typedef struct logged_range_t {
        ulng id;                        /* log file id */
@@ -19,6 +19,15 @@ typedef struct logged_range_t {
        struct logged_range_t *next;
 } logged_range;
 
+typedef struct log_queue {
+       /* flush variables */
+       unsigned int queue[LOG_QUEUE_SIZE]; /* circular array with the current 
transactions' ids waiting to be flushed */
+       int begin; /* start index of the queue */
+       int length; /* length of the queue */
+       MT_Sema sema; /*to protect the queue against ring buffer overflows */
+       MT_Lock queue_lock; /* to protect the queue against concurrent reads 
and writes */
+} log_queue;
+
 struct logger {
        int debug;
        int version;
@@ -47,6 +56,7 @@ struct logger {
        ATOMIC_TYPE refcount; /* Number of active writers and flushers in the 
logger */ // TODO check refcount in c->log and c->end
        MT_Lock rotation_lock;
        MT_Lock lock;
+       MT_Lock flush_lock; /* so only one transaction can flush to disk at any 
given time */
        /* Store log_bids (int) to circumvent trouble with reference counting */
        BAT *catalog_bid;       /* int bid column */
        BAT *catalog_id;        /* object identifier is unique */
@@ -68,13 +78,7 @@ struct logger {
        void *buf;
        size_t bufsize;
 
-       /* flush variables */
-       unsigned int flush_queue[FLUSH_QUEUE_SIZE]; /* circular array with the 
current transactions' ids waiting to be flushed */
-       int flush_queue_begin; /* start index of the queue */
-       int flush_queue_length; /* length of the queue */
-       MT_Sema flush_queue_semaphore; /*to protect the queue against ring 
buffer overflows */
-       MT_Lock flush_queue_lock; /* to protect the queue against concurrent 
reads and writes */
-       MT_Lock flush_lock; /* so only one transaction can flush to disk at any 
given time */
+       log_queue flush_queue;
 };
 
 struct old_logger {
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to