Changeset: be6f3028804b for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/be6f3028804b Modified Files: gdk/gdk_logger.c gdk/gdk_logger_internals.h Branch: pax-log Log Message:
Factor out log_queue implementation for reuse. diffs (245 lines): diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c --- a/gdk/gdk_logger.c +++ b/gdk/gdk_logger.c @@ -1845,6 +1845,69 @@ log_cleanup(logger *lg, lng id) return GDK_SUCCEED; } +static void +log_queue_destroy(log_queue* q) { + MT_sema_destroy(&q->sema); + MT_lock_destroy(&q->queue_lock); +} + +static void +log_queue_initialize(log_queue* q, const char* sema_name, const char* queue_lock_name) { + q->begin = 0; + q->length = 0; + MT_sema_init(&q->sema, LOG_QUEUE_SIZE, sema_name); + MT_lock_init(&q->queue_lock, queue_lock_name); +} + +static int +log_queue_length(log_queue* q) { + MT_lock_set(&q->queue_lock); + const int fql = q->length; + MT_lock_unset(&q->queue_lock); + return fql; +} + +static unsigned int +log_queue_request_number(log_queue* q) { + // Semaphore protects ring buffer structure in queue against overflowing + static unsigned int _number = 0; + unsigned int result; + MT_sema_down(&q->sema); + MT_lock_set(&q->queue_lock); + result = ++_number; + const int end = (q->begin + q->length) % LOG_QUEUE_SIZE; + q->queue[end] = _number; + q->length++; + MT_lock_unset(&q->queue_lock); + + return result; +} + +static void +log_queue_truncate_left(log_queue* q, int limit) { + MT_lock_set(&q->queue_lock); + q->begin = (q->begin + limit) % LOG_QUEUE_SIZE; + q->length -= limit; + MT_lock_unset(&q->queue_lock); + + for (int i = 0; i < limit; i++) + MT_sema_up(&q->sema); +} + +static int +log_queue_has_number(log_queue* q, unsigned int number) { + MT_lock_set(&q->queue_lock); + const int fql = q->length; + MT_lock_unset(&q->queue_lock); + for (int i = 0; i < fql; i++) { + const int idx = (q->begin + i) % LOG_QUEUE_SIZE; + if (q->queue[idx] == number) { + return 1; + } + } + return 0; +} + /* Load data from the logger logdir * Initialize new directories and catalog files if none are present, * unless running in read-only mode @@ -2133,9 +2196,8 @@ log_load(int debug, const char *fn, cons ATOMIC_DESTROY(&lg->refcount); MT_lock_destroy(&lg->lock); MT_lock_destroy(&lg->rotation_lock); - MT_sema_destroy(&lg->flush_queue_semaphore); MT_lock_destroy(&lg->flush_lock); - MT_lock_destroy(&lg->flush_queue_lock); + log_queue_destroy(&lg->flush_queue); GDKfree(lg->fn); GDKfree(lg->dir); GDKfree(lg->local_dir); @@ -2202,13 +2264,9 @@ log_new(int debug, const char *fn, const ATOMIC_INIT(&lg->refcount, 0); MT_lock_init(&lg->lock, fn); MT_lock_init(&lg->rotation_lock, "rotation_lock"); - MT_sema_init(&lg->flush_queue_semaphore, FLUSH_QUEUE_SIZE, "flush_queue_semaphore"); MT_lock_init(&lg->flush_lock, "flush_lock"); - MT_lock_init(&lg->flush_queue_lock, "flush_queue_lock"); - - // flush variables - lg->flush_queue_begin = 0; - lg->flush_queue_length = 0; + + log_queue_initialize(&lg->flush_queue, "flush_queue_semaphore", "flush_queue_lock"); if (log_load(debug, fn, logdir, lg, filename) == GDK_SUCCEED) { return lg; @@ -2256,9 +2314,8 @@ log_destroy(logger *lg) ATOMIC_DESTROY(&lg->refcount); MT_lock_destroy(&lg->lock); MT_lock_destroy(&lg->rotation_lock); - MT_sema_destroy(&lg->flush_queue_semaphore); MT_lock_destroy(&lg->flush_lock); - MT_lock_destroy(&lg->flush_queue_lock); + log_queue_destroy(&lg->flush_queue); GDKfree(lg->fn); GDKfree(lg->dir); GDKfree(lg->buf); @@ -2924,55 +2981,6 @@ log_tcommit(logger *lg, ulng commit_ts) return GDK_SUCCEED; } -static unsigned int -request_number_flush_queue(logger *lg) { - // Semaphore protects ring buffer structure in queue against overflowing - static unsigned int _number = 0; - unsigned int result; - MT_sema_down(&lg->flush_queue_semaphore); - MT_lock_set(&lg->flush_queue_lock); - result = ++_number; - const int end = (lg->flush_queue_begin + lg->flush_queue_length) % FLUSH_QUEUE_SIZE; - lg->flush_queue[end] = _number; - lg->flush_queue_length++; - MT_lock_unset(&lg->flush_queue_lock); - - return result; -} - -static void -left_truncate_flush_queue(logger *lg, int limit) { - MT_lock_set(&lg->flush_queue_lock); - lg->flush_queue_begin = (lg->flush_queue_begin + limit) % FLUSH_QUEUE_SIZE; - lg->flush_queue_length -= limit; - MT_lock_unset(&lg->flush_queue_lock); - - for (int i = 0; i < limit; i++) - MT_sema_up(&lg->flush_queue_semaphore); -} - -static int -number_in_flush_queue(logger *lg, unsigned int number) { - MT_lock_set(&lg->flush_queue_lock); - const int fql = lg->flush_queue_length; - MT_lock_unset(&lg->flush_queue_lock); - for (int i = 0; i < fql; i++) { - const int idx = (lg->flush_queue_begin + i) % FLUSH_QUEUE_SIZE; - if (lg->flush_queue[idx] == number) { - return 1; - } - } - return 0; -} - -static int -flush_queue_length(logger *lg) { - MT_lock_set(&lg->flush_queue_lock); - const int fql = lg->flush_queue_length; - MT_lock_unset(&lg->flush_queue_lock); - return fql; -} - gdk_return log_tflush(logger* lg, ulng log_file_id, ulng commit_ts) { @@ -2991,13 +2999,14 @@ log_tflush(logger* lg, ulng log_file_id, MT_lock_set(&lg->rotation_lock); if (log_file_id == lg->id) { // TODO: introduce lg->flushed and get rid of log_file_id in signature MT_lock_unset(&lg->rotation_lock); - unsigned int number = request_number_flush_queue(lg); + log_queue* flush_queue = &lg->flush_queue; + unsigned int number = log_queue_request_number(flush_queue); MT_lock_set(&lg->flush_lock); /* the transaction is not yet flushed */ - if (number_in_flush_queue(lg, number)) { + if (log_queue_has_number(flush_queue, number)) { /* number of transactions in the group commit */ - const int fqueue_length = flush_queue_length(lg); + const int fqueue_length = log_queue_length(flush_queue); /* flush + fsync */ if (mnstr_flush(lg->output_log, MNSTR_FLUSH_DATA) || (!(GDKdebug & NOSYNCMASK) && mnstr_fsync(lg->output_log)) || @@ -3008,7 +3017,7 @@ log_tflush(logger* lg, ulng log_file_id, } else { /* flush succeeded */ - left_truncate_flush_queue(lg, fqueue_length); + log_queue_truncate_left(flush_queue, fqueue_length); } } /* else the transaction was already flushed in a group commit. diff --git a/gdk/gdk_logger_internals.h b/gdk/gdk_logger_internals.h --- a/gdk/gdk_logger_internals.h +++ b/gdk/gdk_logger_internals.h @@ -9,7 +9,7 @@ #ifndef _LOGGER_INTERNALS_H_ #define _LOGGER_INTERNALS_H_ -#define FLUSH_QUEUE_SIZE 2048 /* maximum size of the flush queue, i.e. maximum number of transactions committing simultaneously */ +#define LOG_QUEUE_SIZE 2048 /* maximum size of the flush queue, i.e. maximum number of transactions committing simultaneously */ typedef struct logged_range_t { ulng id; /* log file id */ @@ -19,6 +19,15 @@ typedef struct logged_range_t { struct logged_range_t *next; } logged_range; +typedef struct log_queue { + /* flush variables */ + unsigned int queue[LOG_QUEUE_SIZE]; /* circular array with the current transactions' ids waiting to be flushed */ + int begin; /* start index of the queue */ + int length; /* length of the queue */ + MT_Sema sema; /*to protect the queue against ring buffer overflows */ + MT_Lock queue_lock; /* to protect the queue against concurrent reads and writes */ +} log_queue; + struct logger { int debug; int version; @@ -47,6 +56,7 @@ struct logger { ATOMIC_TYPE refcount; /* Number of active writers and flushers in the logger */ // TODO check refcount in c->log and c->end MT_Lock rotation_lock; MT_Lock lock; + MT_Lock flush_lock; /* so only one transaction can flush to disk at any given time */ /* Store log_bids (int) to circumvent trouble with reference counting */ BAT *catalog_bid; /* int bid column */ BAT *catalog_id; /* object identifier is unique */ @@ -68,13 +78,7 @@ struct logger { void *buf; size_t bufsize; - /* flush variables */ - unsigned int flush_queue[FLUSH_QUEUE_SIZE]; /* circular array with the current transactions' ids waiting to be flushed */ - int flush_queue_begin; /* start index of the queue */ - int flush_queue_length; /* length of the queue */ - MT_Sema flush_queue_semaphore; /*to protect the queue against ring buffer overflows */ - MT_Lock flush_queue_lock; /* to protect the queue against concurrent reads and writes */ - MT_Lock flush_lock; /* so only one transaction can flush to disk at any given time */ + log_queue flush_queue; }; struct old_logger { _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org